КДП/Лаб 1 2020

Извор: SI Wiki
Пређи на навигацију Пређи на претрагу

Прва лабораторијска вежба 2020. године носила је 10 бодова. Исти текст поновио се и на наредних неколико првих лабораторијских вежби. За разлику од лабораторијске вежбе за самосталну вежбу, није био дат скелет кода.

Поставка

Потребно је написати Јава апликацију која проналази филмове са највећим бројем редитеља. Решење мора бити максимално конкурентно и отпорно на прекиде.

У зависности од додељене групе, неке дељене објекте је потребно имплементирати преко одређене синхронизационе примитиве:

  1. Семафори (java.util.concurrent.Semaphore)
  2. Монитори (synchronized над методом)
  3. Региони (synchronized над објектом)

Делови апликације:

  • Класа Producer обрађује улазну датотеку са филмовима и информацију о сваком појединачном филму убацује у дељени објекат за даљу обраду. За имплементацију дељеног објекта користити синхронизациону примитиву одређену групом. Ова класа обраду обавља у свом току контроле. Подаци о филмовима се читају ред по ред из архиве са филмовима (title.crew.tsv.gz).
  • Класа Consumer обрађује информације о добијеном филму и резултате обраде на крају прослеђује на даљу обраду. Ова класа обраду обавља у свом току контроле. Класа чита информације о неком филму из дељеног објекта. Чувају се само филмови са максималним бројем редитеља (од дотад обрађених филмова). На сваких K (задаје се) обрађених филмова које један Consumer обради, ажурира информацију о свом броју обрађених филмова користећи посебан посебан дељени објекат. Када не буде било више филмова за обраду, Consumer прослеђује свој локални максимум броја редитеља даље, у зависности од тога да ли има филмове са глобалним максимумом редитеља прослеђује филмове даље на обраду и такође ажурира информацију о броју обрађених филмова. Синхронизација између Consumer нити се обавља коришћењем дељеног објекта имплементираног преко синхронизационе примитиве одређене групом.
  • Класа Combiner на основу прикупљених локалних максимума броја редитеља одређује глобални максимум броја редитеља и јавља Consumer нитима које су пријавили тај локални максимум да пошаљу обрађене филмове. Ова класа обраду обавља у свом току контроле. Класа преузима податке из дељеног објекта и након завршене обраде резултат убацује у следећи дељени објекат.
  • Класа Printer на сваких M (задаје се) секунди штампа информације о броју обрађених филмова, све док се не прикупе коначни подаци и на крају штампа њих на основу података које је доставила класа Combiner. Ова класа обраду обавља у свом току контроле. Излаз апликације је листа која садржи идентификатор филма и број редитеља тог филма.
  • Класа Test тестира овај систем. Класа у својој почетној main методи креира све потребне дељене објекте, покреће једну инстанцу класе Producer, већи (подесив) број инстанци класе Consumer, једну инстанцу класе Combiner и једну инстанцу класе Printer. Након комплетно завршене обраде, штампа колико је износило укупно време обраде за дати број нити које раде обраду.
  • Ниједна класа нити (Producer, Consumer, Combiner, Printer) не сме да зна за број инстанци класе Consumer!

Решење

Као крајње решење очекује се да програм испише Teatr Telewizji (tt0441074) уколико се користи dataset из 2022. године.

Структуре података

Класе које немају везе са конкурентношћу већ чисто чувањем и обрадом података.

Film.java

Класа која садржи податке о једном филму и у конструктору обрађује једну линију улазног фајла.

package kdpl12020;

public class Film {
	public final String tconst;
	public final String[] directors;
	public final String[] writers;

	public Film(String line) {
		String[] splitLine = line.split("\t");
		tconst = splitLine[0];
		String directorsCell = splitLine[1];
		String writersCell = splitLine[2];
		directors = directorsCell.equals("\\N") ? new String[0] : directorsCell.split(",");
		writers = writersCell.equals("\\N") ? new String[0] : writersCell.split(",");
	}
}

Result.java

Садржи податке о крајњем резултату обраде једне Consumer нити.

package kdpl12020;

public class Result {
	public final int id;
	public final int maxDirectors;
	
	public Result(int id, int maxDirectors) {
		this.id = id;
		this.maxDirectors = maxDirectors;
	}
}

Синхронизација

Класе које служе са синхронизацију између нити. Могу бити наведене у више варијанти реализације, тако да све деле заједнички интерфејс.

BoundedBuffer.java

Генерички ограничени бафер, сличан MessageBox<T> са вежби.

package kdpl12020;

public interface BoundedBuffer<T> {
	public void put(T data);
	public T get();
}

MonitorBoundedBuffer.java

Генерички ограничени бафер имплементиран преко монитора.

package kdpl12020;

import java.util.ArrayList;
import java.util.List;

public class MonitorBoundedBuffer<T> implements BoundedBuffer<T> {
	private final int capacity;
	private final List<T> buffer;

	public MonitorBoundedBuffer(int capacity) {
		this.capacity = capacity;
		buffer = new ArrayList<T>(capacity);
	}

	@Override
	public synchronized void put(T data) {
		while (buffer.size() == capacity) {
			try {
				wait();
			} catch (InterruptedException ex) {
				// Ignoring all interrupts.
			}
		}
		buffer.add(data);
		notifyAll();
	}

	@Override
	public synchronized T get() {
		while (buffer.size() == 0) {
			try {
				wait();
			} catch (InterruptedException ex) {
				// Ignoring all interrupts.
			}
		}
		T data = buffer.remove(0);
		notifyAll();
		return data;
	}
}

SemaphoreBoundedBuffer.java

Генерички ограничени бафер имплементиран преко семафора.

package kdpl12020;

import java.util.concurrent.Semaphore;

public class SemaphoreBoundedBuffer<T> implements BoundedBuffer<T> {
	
	private final T buffer[];
	private int readIndex;
	private int writeIndex;
	private int capacity;
	
	private final Semaphore mutexProducer;
	private final Semaphore mutexConsumer;
	private final Semaphore empty;
	private final Semaphore full;
	
	@SuppressWarnings("unchecked")
	public SemaphoreBoundedBuffer(int capacity) {
		readIndex = writeIndex = 0;
		buffer = (T[]) new Object[capacity];
		this.capacity = capacity;
		
		mutexProducer = new Semaphore(1);
		mutexConsumer = new Semaphore(1);
		empty = new Semaphore(capacity);
		full = new Semaphore(0);
	}

	@Override
	public void put(T data) {
		empty.acquireUninterruptibly();
		
		mutexProducer.acquireUninterruptibly();
		buffer[writeIndex] = data;
		writeIndex = (writeIndex + 1) % capacity;
		mutexProducer.release();
		
		full.release();
	}

	@Override
	public T get() {
		full.acquireUninterruptibly();
		
		mutexConsumer.acquireUninterruptibly();
		T data = buffer[readIndex];
		readIndex = (readIndex + 1) % capacity;
		mutexConsumer.release();
		
		empty.release();
		return data;
	}

}

RegionBoundedBuffer.java

Генерички ограничени бафер имплементиран преко региона.

package kdpl12020;

import java.util.ArrayList;
import java.util.List;

public class RegionBoundedBuffer<T> implements BoundedBuffer<T> {
	
	private final int capacity;
	private final List<T> buffer;
	
	public RegionBoundedBuffer(int capacity) {
		this.capacity = capacity;
		buffer = new ArrayList<T>(capacity);
	}

	@Override
	public void put(T data) {
		synchronized (buffer) {
			while (buffer.size() == capacity) {
				try {
					buffer.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			buffer.add(data);
			buffer.notifyAll();
		}
	}

	@Override
	public T get() {
		synchronized (buffer) {
			while (buffer.size() == 0) {
				try {
					buffer.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			T data = buffer.remove(0);
			buffer.notifyAll();
			return data;
		}
	}

}

Barrier.java

Синхронизациона примитива баријере, на којој више нити може да чека док до исте тачке не дођу и све остале нити. У овој реализацији се користи као једнократна, али могуће је ресетовати баријеру у неком тренутку како би могла изнова да се користи.

package kdpl12020;

public interface Barrier {
	public void arrived();
	public boolean await(long timeout);
	public void reset();
}

MonitorBarrier.java

Баријера имплементирана преко монитора.

package kdpl12020;

public class MonitorBarrier implements Barrier {
	private final int count;
	private int currentCount = 0;
	boolean passed = false;
	int round = 0;
	public MonitorBarrier(int count) {
		this.count = count;
	}
	@Override
	public synchronized void arrived() {
		int myRound = round;
		if (passed) {
			return;
		}
		if (++currentCount == count) {
			passed = true;
			notifyAll();
		} else {
			while (!passed && round == myRound) {
				try {
					wait();
				} catch (InterruptedException e) {}
			}
		}
	}
	@Override
	public synchronized boolean await(long timeout) {
		int myRound = round;
		if (passed) {
			return true;
		}
		while (!passed && myRound == round) {
			try {
				long timeBefore = System.currentTimeMillis();
				wait(timeout);
				long timeAfter = System.currentTimeMillis();
				if (timeout != 0 && timeAfter - timeBefore >= timeout) {
					break;
				}
			} catch (InterruptedException e) {}
		}
		return passed;
	}
	@Override
	public synchronized void reset() {
        notifyAll();
		currentCount = 0;
		passed = false;
		round++;
	}
}

SemaphoreBarrier.java

Баријера имплементирана преко семафора.

package kdpl12020;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreBarrier implements Barrier {
	private final Semaphore sem1 = new Semaphore(1);
	private final Semaphore sem2 = new Semaphore(0);
	private final Semaphore awaiter = new Semaphore(0);
	private final Semaphore mutex = new Semaphore(1);
	private final int count;
	// Protected by sem1 and sem2.
	private int currentCount = 0;
	// Protected by mutex.
	private int awaiting = 0;
	private boolean passed = false;

	public SemaphoreBarrier(int count) {
		this.count = count;
	}
	
	@Override
	public void arrived() {
		sem1.acquireUninterruptibly();
		++currentCount;
		if (currentCount == count) {
			sem2.release();
			mutex.acquireUninterruptibly();
			awaiter.release(awaiting);
			passed = true;
			mutex.release();
		} else {
			sem1.release();
		}
		sem2.acquireUninterruptibly();
		--currentCount;
		if (currentCount == 0) {
			sem1.release();
		} else {
			sem2.release();
		}
	}

	@Override
	public boolean await(long timeout) {
		boolean acquired;
		mutex.acquireUninterruptibly();
		if (passed) {
			mutex.release();
			return true;
		}
		++awaiting;
		mutex.release();
		try {
			if (timeout == 0) {
				awaiter.acquireUninterruptibly();
				acquired = true;
			} else {
				acquired = awaiter.tryAcquire(timeout, TimeUnit.MILLISECONDS);
			}
		} catch (InterruptedException e) {
			return false;
		}
		mutex.acquireUninterruptibly();
		--awaiting;
		mutex.release();
		return acquired;
	}

	@Override
	public void reset() {
		passed = false;
	}

}

RegionBarrier.java

Баријера имплементирана преко региона.

package kdpl12020;

// Jednokratna barijera.
public class RegionBarrier implements Barrier {

	private static class Barrier {
		public int numOfWaitingThreads = 0;
		public boolean barrierPassed = false;
	}
	
	private final int totalNumOfThreads;
	private final Barrier barrier;
	
	public RegionBarrier(int totalNumOfThreads) {
		this.totalNumOfThreads = totalNumOfThreads;
		barrier = new Barrier();
	}

	@Override
	public void arrived() {
		synchronized (barrier) {
			if (++barrier.numOfWaitingThreads == totalNumOfThreads) {
				barrier.barrierPassed = true;
				barrier.notifyAll();
			} else {
				while (!barrier.barrierPassed) {
					try {
						barrier.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}

	@Override
	public boolean await(long timeout) {
		synchronized (barrier) {
			while (!barrier.barrierPassed) {
				long timeBefore = System.currentTimeMillis();
				try {
					barrier.wait(timeout);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				long timeAfter = System.currentTimeMillis();
				if (timeout != 0 && (timeAfter - timeBefore) >= timeout) {
					break;
				}
			}
			return barrier.barrierPassed;
		}
	}
    
    @Override
    public void reset() {}
	
}

Нити

Класе нити поменуте у задатку.

Producer.java

Чита задату датотеку и у дељени објекат убацује прочитане линије. Сигнализира да је завршила убацивањем null у дељени објекат.

package kdpl12020;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

public class Producer extends Thread {
	private final String fileName;
	private final BoundedBuffer<String> lines;

	public Producer(String fileName, BoundedBuffer<String> lines) {
		super("Producer");
		this.fileName = fileName;
		this.lines = lines;
	}

	@Override
	public void run() {
		try (BufferedReader reader = new BufferedReader(new FileReader(fileName))) {
			// Skip first line (header).
			String line = reader.readLine();
			while ((line = reader.readLine()) != null) {
				lines.put(line);
			}
			// Tell consumers the queue is empty.
			lines.put(null);
		} catch (IOException ex) {
			ex.printStackTrace();
		}
	}

}

Consumer.java

Чита линије из дељеног објекта и обрађује их, током чега ажурира свој број обрађених линија и на крају чека на баријери да све остале Consumer нити заврше. Након тога чека да Combiner нит јави да ли је она изабрана за слање филмова, и затим шаље филмове у нови дељени објекат.

package kdpl12020;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;

public class Consumer extends Thread {
	private final int id;
	private final int k;
	private final BoundedBuffer<String> lines;
	private final ConcurrentMap<Integer, Integer> progress;
	private final BoundedBuffer<Result> results;
	private final ConcurrentMap<Integer, Boolean> selected;
	private final Barrier selectedBarrier;
	private final Barrier barrier;
	private final BoundedBuffer<Film> sending;

	public Consumer(int id, int k, BoundedBuffer<String> lines, ConcurrentMap<Integer, Integer> progress,
			BoundedBuffer<Result> results, Barrier barrier, ConcurrentMap<Integer, Boolean> selected,
			Barrier selectedBarrier, BoundedBuffer<Film> sending) {
		super("Consumer" + id);
		this.id = id;
		this.k = k;
		this.lines = lines;
		this.progress = progress;
		this.results = results;
		this.barrier = barrier;
		this.selected = selected;
		this.selectedBarrier = selectedBarrier;
		this.sending = sending;
	}

	@Override
	public void run() {
		String line;
		List<Film> films = new ArrayList<>();
		int maxDirectors = 0;
		int processed = 0;
		while ((line = lines.get()) != null) {
			Film data = new Film(line);
			int directorCount = data.directors.length;
			if (directorCount > maxDirectors) {
				maxDirectors = directorCount;
				films.clear();
			}
			if (directorCount == maxDirectors) {
				films.add(data);
			}
			if (++processed % k == 0) {
				// Update our progress every K processed lines.
				progress.put(id, processed);
			}
		}
		progress.put(id, processed);
		// Signal other consumers we're done.
		lines.put(null);
		// Signal the combiner we're done.
		results.put(new Result(id, maxDirectors));
		barrier.arrived();
		selectedBarrier.await(0);
		if (selected.getOrDefault(id, false)) {
			// Send data to the combiner.
			for (Film film : films) {
				sending.put(film);
			}
			// Signal the combiner sending data is over.
			sending.put(null);
		}
	}

}

Combiner.java

Чека да Consumer нити заврше, затим дохвата њихове резултате, бира нити које треба да пошаљу своје филмове, чека на послате филмове и на крају јавља Printer да испише резултате.

package kdpl12020;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;

public class Combiner extends Thread {
	private final Barrier consumerBarrier;
	private final BoundedBuffer<Result> results;
	private final ConcurrentMap<Integer, Boolean> selected;
	private final Barrier selectedBarrier;
	private final BoundedBuffer<Film> sending;
	private final List<Film> finalData;
	private final Barrier printerBarrier;

	public Combiner(Barrier consumerBarrier, BoundedBuffer<Result> results, ConcurrentMap<Integer, Boolean> selected,
			Barrier selectedBarrier, BoundedBuffer<Film> sending, List<Film> finalData, Barrier printerBarrier) {
		super("Combiner");
		this.consumerBarrier = consumerBarrier;
		this.results = results;
		this.selected = selected;
		this.selectedBarrier = selectedBarrier;
		this.sending = sending;
		this.finalData = finalData;
		this.printerBarrier = printerBarrier;
	}

	@Override
	public void run() {
		int maxDirectors = 0;
		List<Integer> maxId = new ArrayList<>();
		Result result;
		consumerBarrier.await(0);
		results.put(null);
		while ((result = results.get()) != null) {
			if (result.maxDirectors > maxDirectors) {
				maxDirectors = result.maxDirectors;
				maxId.clear();
			}
			if (result.maxDirectors == maxDirectors) {
				maxId.add(result.id);
			}
		}
		int senders = maxId.size();
		for (int id : maxId) {
			selected.put(id, true);
		}
		selectedBarrier.arrived();
		while (true) {
			Film data = sending.get();
			if (data == null) {
				if (--senders == 0) {
					break;
				}
			} else {
				finalData.add(data);
			}
		}
		printerBarrier.arrived();
	}

}

Printer.java

Чека на Combiner нит да убаци резултате за исписивање, а у међувремену чита статус Consumer нити и исписује на сваких M секунди.

package kdpl12020;

import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;

public class Printer extends Thread {
    private final static int NUMBER_OF_MILISECONDS_IN_ONE_SECOND = 1000;
	private final long interval;
	private final ConcurrentMap<Integer, Integer> progress;
	private final Barrier printerBarrier;
	private final List<Film> finalData;

	public Printer(long interval, ConcurrentMap<Integer, Integer> progress, Barrier printerBarrier, List<Film> finalData) {
		super("Printer");
		this.interval = interval;
		this.progress = progress;
		this.printerBarrier = printerBarrier;
		this.finalData = finalData;
	}

	@Override
	public void run() {
		while (true) {
			if (printerBarrier.await(interval * NUMBER_OF_MILISECONDS_IN_ONE_SECOND)) {
				break;
			}
			for (Integer id : new TreeSet<Integer>(progress.keySet())) {
				System.out.println("Consumer " + id + ": " + progress.get(id));
			}
		}
		System.out.println("Final result:");
		for (Film film : finalData) {
			System.out.println(film.tconst + " (" + film.directors.length + ")");
		}
	}

}

Test.java

Повезује све остале нити.

package kdpl12020;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class Test {
	private static final int BUFFER_LENGTH = 10000;
	
	public static void main(String[] args) {
		final int consumersNumber = args.length == 0 ? 5 : Integer.parseInt(args[0]);
		final int K = args.length < 2 ? 1000 : Integer.parseInt(args[1]);
		final String fileName = args.length < 3 ? "title.crew.tsv" : args[3];
		final long M = args.length < 4 ? 5 : Long.parseLong(args[4]);
		long startTime = System.currentTimeMillis();

		// Buffer for lines in the TSV file.
		BoundedBuffer<String> lines = new MonitorBoundedBuffer<>(BUFFER_LENGTH);
		// Barrier for consumers to wait on.
		Barrier consumerBarrier = new MonitorBarrier(consumersNumber);
		// Map of how are consumers progressing.
		ConcurrentMap<Integer, Integer> progress = new ConcurrentHashMap<>();
		// List of consumer result counts.
		BoundedBuffer<Result> results = new MonitorBoundedBuffer<>(consumersNumber + 1);
		// Map of whether consumers were selected for sending.
		ConcurrentMap<Integer, Boolean> selected = new ConcurrentHashMap<>();
		// Barrier for consumers to wait on for the combiner to select them.
		Barrier selectedBarrier = new MonitorBarrier(1);
		// Barrier for the printer to wait on for the combiner to finish.
		Barrier printerBarrier = new MonitorBarrier(1);
		// Receiving list of films at the combiner.
		BoundedBuffer<Film> sending = new MonitorBoundedBuffer<>(BUFFER_LENGTH);
		// Final received list of films for printing.
		List<Film> finalData = new ArrayList<>();

		Producer producer = new Producer(fileName, lines);
		producer.start();

		for (int i = 0; i < consumersNumber; i++) {
			Consumer consumer = new Consumer(i, K, lines, progress, results, consumerBarrier, selected, selectedBarrier, sending);
			consumer.start();
		}

		Combiner combiner = new Combiner(consumerBarrier, results, selected, selectedBarrier, sending, finalData, printerBarrier);
		combiner.start();

		Printer printer = new Printer(M, progress, printerBarrier, finalData);
		printer.start();

		try {
			printer.join();
		} catch (Exception e) {
			e.printStackTrace();
		}
		long endTime = System.currentTimeMillis();
		System.out.println("Total time: " + (endTime - startTime) + "ms");
	}
}