КДП/Лаб 1 2020
Прва лабораторијска вежба 2020. године носила је 10 бодова. Исти текст поновио се и на наредних неколико првих лабораторијских вежби. За разлику од лабораторијске вежбе за самосталну вежбу, није био дат скелет кода.
Поставка
Потребно је написати Јава апликацију која проналази филмове са највећим бројем редитеља. Решење мора бити максимално конкурентно и отпорно на прекиде.
У зависности од додељене групе, неке дељене објекте је потребно имплементирати преко одређене синхронизационе примитиве:
- Семафори (
java.util.concurrent.Semaphore) - Монитори (
synchronizedнад методом) - Региони (
synchronizedнад објектом)
Делови апликације:
- Класа
Producerобрађује улазну датотеку са филмовима и информацију о сваком појединачном филму убацује у дељени објекат за даљу обраду. За имплементацију дељеног објекта користити синхронизациону примитиву одређену групом. Ова класа обраду обавља у свом току контроле. Подаци о филмовима се читају ред по ред из архиве са филмовима (title.crew.tsv.gz). - Класа
Consumerобрађује информације о добијеном филму и резултате обраде на крају прослеђује на даљу обраду. Ова класа обраду обавља у свом току контроле. Класа чита информације о неком филму из дељеног објекта. Чувају се само филмови са максималним бројем редитеља (од дотад обрађених филмова). На сваких K (задаје се) обрађених филмова које једанConsumerобради, ажурира информацију о свом броју обрађених филмова користећи посебан посебан дељени објекат. Када не буде било више филмова за обраду,Consumerпрослеђује свој локални максимум броја редитеља даље, у зависности од тога да ли има филмове са глобалним максимумом редитеља прослеђује филмове даље на обраду и такође ажурира информацију о броју обрађених филмова. Синхронизација измеђуConsumerнити се обавља коришћењем дељеног објекта имплементираног преко синхронизационе примитиве одређене групом. - Класа
Combinerна основу прикупљених локалних максимума броја редитеља одређује глобални максимум броја редитеља и јављаConsumerнитима које су пријавили тај локални максимум да пошаљу обрађене филмове. Ова класа обраду обавља у свом току контроле. Класа преузима податке из дељеног објекта и након завршене обраде резултат убацује у следећи дељени објекат. - Класа
Printerна сваких M (задаје се) секунди штампа информације о броју обрађених филмова, све док се не прикупе коначни подаци и на крају штампа њих на основу података које је доставила класаCombiner. Ова класа обраду обавља у свом току контроле. Излаз апликације је листа која садржи идентификатор филма и број редитеља тог филма. - Класа
Testтестира овај систем. Класа у својој почетнојmainметоди креира све потребне дељене објекте, покреће једну инстанцу класеProducer, већи (подесив) број инстанци класеConsumer, једну инстанцу класеCombinerи једну инстанцу класеPrinter. Након комплетно завршене обраде, штампа колико је износило укупно време обраде за дати број нити које раде обраду. - Ниједна класа нити (
Producer,Consumer,Combiner,Printer) не сме да зна за број инстанци класе Consumer!
Решење
Као крајње решење очекује се да програм испише Teatr Telewizji (tt0441074) уколико се користи dataset из 2022. године.
Структуре података
Класе које немају везе са конкурентношћу већ чисто чувањем и обрадом података.
Film.java
Класа која садржи податке о једном филму и у конструктору обрађује једну линију улазног фајла.
package kdpl12020;
public class Film {
public final String tconst;
public final String[] directors;
public final String[] writers;
public Film(String line) {
String[] splitLine = line.split("\t");
tconst = splitLine[0];
String directorsCell = splitLine[1];
String writersCell = splitLine[2];
directors = directorsCell.equals("\\N") ? new String[0] : directorsCell.split(",");
writers = writersCell.equals("\\N") ? new String[0] : writersCell.split(",");
}
}
Result.java
Садржи податке о крајњем резултату обраде једне Consumer нити.
package kdpl12020;
public class Result {
public final int id;
public final int maxDirectors;
public Result(int id, int maxDirectors) {
this.id = id;
this.maxDirectors = maxDirectors;
}
}
Синхронизација
Класе које служе са синхронизацију између нити. Могу бити наведене у више варијанти реализације, тако да све деле заједнички интерфејс.
BoundedBuffer.java
Генерички ограничени бафер, сличан MessageBox<T> са вежби.
package kdpl12020;
public interface BoundedBuffer<T> {
public void put(T data);
public T get();
}
MonitorBoundedBuffer.java
Генерички ограничени бафер имплементиран преко монитора.
package kdpl12020;
import java.util.ArrayList;
import java.util.List;
public class MonitorBoundedBuffer<T> implements BoundedBuffer<T> {
private final int capacity;
private final List<T> buffer;
public MonitorBoundedBuffer(int capacity) {
this.capacity = capacity;
buffer = new ArrayList<T>(capacity);
}
@Override
public synchronized void put(T data) {
while (buffer.size() == capacity) {
try {
wait();
} catch (InterruptedException ex) {
// Ignoring all interrupts.
}
}
buffer.add(data);
notifyAll();
}
@Override
public synchronized T get() {
while (buffer.size() == 0) {
try {
wait();
} catch (InterruptedException ex) {
// Ignoring all interrupts.
}
}
T data = buffer.remove(0);
notifyAll();
return data;
}
}
SemaphoreBoundedBuffer.java
Генерички ограничени бафер имплементиран преко семафора.
package kdpl12020;
import java.util.concurrent.Semaphore;
public class SemaphoreBoundedBuffer<T> implements BoundedBuffer<T> {
private final T buffer[];
private int readIndex;
private int writeIndex;
private int capacity;
private final Semaphore mutexProducer;
private final Semaphore mutexConsumer;
private final Semaphore empty;
private final Semaphore full;
@SuppressWarnings("unchecked")
public SemaphoreBoundedBuffer(int capacity) {
readIndex = writeIndex = 0;
buffer = (T[]) new Object[capacity];
this.capacity = capacity;
mutexProducer = new Semaphore(1);
mutexConsumer = new Semaphore(1);
empty = new Semaphore(capacity);
full = new Semaphore(0);
}
@Override
public void put(T data) {
empty.acquireUninterruptibly();
mutexProducer.acquireUninterruptibly();
buffer[writeIndex] = data;
writeIndex = (writeIndex + 1) % capacity;
mutexProducer.release();
full.release();
}
@Override
public T get() {
full.acquireUninterruptibly();
mutexConsumer.acquireUninterruptibly();
T data = buffer[readIndex];
readIndex = (readIndex + 1) % capacity;
mutexConsumer.release();
empty.release();
return data;
}
}
RegionBoundedBuffer.java
Генерички ограничени бафер имплементиран преко региона.
package kdpl12020;
import java.util.ArrayList;
import java.util.List;
public class RegionBoundedBuffer<T> implements BoundedBuffer<T> {
private final int capacity;
private final List<T> buffer;
public RegionBoundedBuffer(int capacity) {
this.capacity = capacity;
buffer = new ArrayList<T>(capacity);
}
@Override
public void put(T data) {
synchronized (buffer) {
while (buffer.size() == capacity) {
try {
buffer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buffer.add(data);
buffer.notifyAll();
}
}
@Override
public T get() {
synchronized (buffer) {
while (buffer.size() == 0) {
try {
buffer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T data = buffer.remove(0);
buffer.notifyAll();
return data;
}
}
}
Barrier.java
Синхронизациона примитива баријере, на којој више нити може да чека док до исте тачке не дођу и све остале нити. У овој реализацији се користи као једнократна, али могуће је ресетовати баријеру у неком тренутку како би могла изнова да се користи.
package kdpl12020;
public interface Barrier {
public void arrived();
public boolean await(long timeout);
public void reset();
}
MonitorBarrier.java
Баријера имплементирана преко монитора.
package kdpl12020;
public class MonitorBarrier implements Barrier {
private final int count;
private int currentCount = 0;
boolean passed = false;
int round = 0;
public MonitorBarrier(int count) {
this.count = count;
}
@Override
public synchronized void arrived() {
int myRound = round;
if (passed) {
return;
}
if (++currentCount == count) {
passed = true;
notifyAll();
} else {
while (!passed && round == myRound) {
try {
wait();
} catch (InterruptedException e) {}
}
}
}
@Override
public synchronized boolean await(long timeout) {
int myRound = round;
if (passed) {
return true;
}
while (!passed && myRound == round) {
try {
long timeBefore = System.currentTimeMillis();
wait(timeout);
long timeAfter = System.currentTimeMillis();
if (timeout != 0 && timeAfter - timeBefore >= timeout) {
break;
}
} catch (InterruptedException e) {}
}
return passed;
}
@Override
public synchronized void reset() {
notifyAll();
currentCount = 0;
passed = false;
round++;
}
}
SemaphoreBarrier.java
Баријера имплементирана преко семафора.
package kdpl12020;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreBarrier implements Barrier {
private final Semaphore sem1 = new Semaphore(1);
private final Semaphore sem2 = new Semaphore(0);
private final Semaphore awaiter = new Semaphore(0);
private final Semaphore mutex = new Semaphore(1);
private final int count;
// Protected by sem1 and sem2.
private int currentCount = 0;
// Protected by mutex.
private int awaiting = 0;
private boolean passed = false;
public SemaphoreBarrier(int count) {
this.count = count;
}
@Override
public void arrived() {
sem1.acquireUninterruptibly();
++currentCount;
if (currentCount == count) {
sem2.release();
mutex.acquireUninterruptibly();
awaiter.release(awaiting);
passed = true;
mutex.release();
} else {
sem1.release();
}
sem2.acquireUninterruptibly();
--currentCount;
if (currentCount == 0) {
sem1.release();
} else {
sem2.release();
}
}
@Override
public boolean await(long timeout) {
boolean acquired;
mutex.acquireUninterruptibly();
if (passed) {
mutex.release();
return true;
}
++awaiting;
mutex.release();
try {
if (timeout == 0) {
awaiter.acquireUninterruptibly();
acquired = true;
} else {
acquired = awaiter.tryAcquire(timeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
return false;
}
mutex.acquireUninterruptibly();
--awaiting;
mutex.release();
return acquired;
}
@Override
public void reset() {
passed = false;
}
}
RegionBarrier.java
Баријера имплементирана преко региона.
package kdpl12020;
// Jednokratna barijera.
public class RegionBarrier implements Barrier {
private static class Barrier {
public int numOfWaitingThreads = 0;
public boolean barrierPassed = false;
}
private final int totalNumOfThreads;
private final Barrier barrier;
public RegionBarrier(int totalNumOfThreads) {
this.totalNumOfThreads = totalNumOfThreads;
barrier = new Barrier();
}
@Override
public void arrived() {
synchronized (barrier) {
if (++barrier.numOfWaitingThreads == totalNumOfThreads) {
barrier.barrierPassed = true;
barrier.notifyAll();
} else {
while (!barrier.barrierPassed) {
try {
barrier.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
@Override
public boolean await(long timeout) {
synchronized (barrier) {
while (!barrier.barrierPassed) {
long timeBefore = System.currentTimeMillis();
try {
barrier.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
long timeAfter = System.currentTimeMillis();
if (timeout != 0 && (timeAfter - timeBefore) >= timeout) {
break;
}
}
return barrier.barrierPassed;
}
}
@Override
public void reset() {}
}
Нити
Класе нити поменуте у задатку.
Producer.java
Чита задату датотеку и у дељени објекат убацује прочитане линије. Сигнализира да је завршила убацивањем null у дељени објекат.
package kdpl12020;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
public class Producer extends Thread {
private final String fileName;
private final BoundedBuffer<String> lines;
public Producer(String fileName, BoundedBuffer<String> lines) {
super("Producer");
this.fileName = fileName;
this.lines = lines;
}
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new FileReader(fileName))) {
// Skip first line (header).
String line = reader.readLine();
while ((line = reader.readLine()) != null) {
lines.put(line);
}
// Tell consumers the queue is empty.
lines.put(null);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
Consumer.java
Чита линије из дељеног објекта и обрађује их, током чега ажурира свој број обрађених линија и на крају чека на баријери да све остале Consumer нити заврше. Након тога чека да Combiner нит јави да ли је она изабрана за слање филмова, и затим шаље филмове у нови дељени објекат.
package kdpl12020;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
public class Consumer extends Thread {
private final int id;
private final int k;
private final BoundedBuffer<String> lines;
private final ConcurrentMap<Integer, Integer> progress;
private final BoundedBuffer<Result> results;
private final ConcurrentMap<Integer, Boolean> selected;
private final Barrier selectedBarrier;
private final Barrier barrier;
private final BoundedBuffer<Film> sending;
public Consumer(int id, int k, BoundedBuffer<String> lines, ConcurrentMap<Integer, Integer> progress,
BoundedBuffer<Result> results, Barrier barrier, ConcurrentMap<Integer, Boolean> selected,
Barrier selectedBarrier, BoundedBuffer<Film> sending) {
super("Consumer" + id);
this.id = id;
this.k = k;
this.lines = lines;
this.progress = progress;
this.results = results;
this.barrier = barrier;
this.selected = selected;
this.selectedBarrier = selectedBarrier;
this.sending = sending;
}
@Override
public void run() {
String line;
List<Film> films = new ArrayList<>();
int maxDirectors = 0;
int processed = 0;
while ((line = lines.get()) != null) {
Film data = new Film(line);
int directorCount = data.directors.length;
if (directorCount > maxDirectors) {
maxDirectors = directorCount;
films.clear();
}
if (directorCount == maxDirectors) {
films.add(data);
}
if (++processed % k == 0) {
// Update our progress every K processed lines.
progress.put(id, processed);
}
}
progress.put(id, processed);
// Signal other consumers we're done.
lines.put(null);
// Signal the combiner we're done.
results.put(new Result(id, maxDirectors));
barrier.arrived();
selectedBarrier.await(0);
if (selected.getOrDefault(id, false)) {
// Send data to the combiner.
for (Film film : films) {
sending.put(film);
}
// Signal the combiner sending data is over.
sending.put(null);
}
}
}
Combiner.java
Чека да Consumer нити заврше, затим дохвата њихове резултате, бира нити које треба да пошаљу своје филмове, чека на послате филмове и на крају јавља Printer да испише резултате.
package kdpl12020;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
public class Combiner extends Thread {
private final Barrier consumerBarrier;
private final BoundedBuffer<Result> results;
private final ConcurrentMap<Integer, Boolean> selected;
private final Barrier selectedBarrier;
private final BoundedBuffer<Film> sending;
private final List<Film> finalData;
private final Barrier printerBarrier;
public Combiner(Barrier consumerBarrier, BoundedBuffer<Result> results, ConcurrentMap<Integer, Boolean> selected,
Barrier selectedBarrier, BoundedBuffer<Film> sending, List<Film> finalData, Barrier printerBarrier) {
super("Combiner");
this.consumerBarrier = consumerBarrier;
this.results = results;
this.selected = selected;
this.selectedBarrier = selectedBarrier;
this.sending = sending;
this.finalData = finalData;
this.printerBarrier = printerBarrier;
}
@Override
public void run() {
int maxDirectors = 0;
List<Integer> maxId = new ArrayList<>();
Result result;
consumerBarrier.await(0);
results.put(null);
while ((result = results.get()) != null) {
if (result.maxDirectors > maxDirectors) {
maxDirectors = result.maxDirectors;
maxId.clear();
}
if (result.maxDirectors == maxDirectors) {
maxId.add(result.id);
}
}
int senders = maxId.size();
for (int id : maxId) {
selected.put(id, true);
}
selectedBarrier.arrived();
while (true) {
Film data = sending.get();
if (data == null) {
if (--senders == 0) {
break;
}
} else {
finalData.add(data);
}
}
printerBarrier.arrived();
}
}
Printer.java
Чека на Combiner нит да убаци резултате за исписивање, а у међувремену чита статус Consumer нити и исписује на сваких M секунди.
package kdpl12020;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
public class Printer extends Thread {
private final static int NUMBER_OF_MILISECONDS_IN_ONE_SECOND = 1000;
private final long interval;
private final ConcurrentMap<Integer, Integer> progress;
private final Barrier printerBarrier;
private final List<Film> finalData;
public Printer(long interval, ConcurrentMap<Integer, Integer> progress, Barrier printerBarrier, List<Film> finalData) {
super("Printer");
this.interval = interval;
this.progress = progress;
this.printerBarrier = printerBarrier;
this.finalData = finalData;
}
@Override
public void run() {
while (true) {
if (printerBarrier.await(interval * NUMBER_OF_MILISECONDS_IN_ONE_SECOND)) {
break;
}
for (Integer id : new TreeSet<Integer>(progress.keySet())) {
System.out.println("Consumer " + id + ": " + progress.get(id));
}
}
System.out.println("Final result:");
for (Film film : finalData) {
System.out.println(film.tconst + " (" + film.directors.length + ")");
}
}
}
Test.java
Повезује све остале нити.
package kdpl12020;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class Test {
private static final int BUFFER_LENGTH = 10000;
public static void main(String[] args) {
final int consumersNumber = args.length == 0 ? 5 : Integer.parseInt(args[0]);
final int K = args.length < 2 ? 1000 : Integer.parseInt(args[1]);
final String fileName = args.length < 3 ? "title.crew.tsv" : args[3];
final long M = args.length < 4 ? 5 : Long.parseLong(args[4]);
long startTime = System.currentTimeMillis();
// Buffer for lines in the TSV file.
BoundedBuffer<String> lines = new MonitorBoundedBuffer<>(BUFFER_LENGTH);
// Barrier for consumers to wait on.
Barrier consumerBarrier = new MonitorBarrier(consumersNumber);
// Map of how are consumers progressing.
ConcurrentMap<Integer, Integer> progress = new ConcurrentHashMap<>();
// List of consumer result counts.
BoundedBuffer<Result> results = new MonitorBoundedBuffer<>(consumersNumber + 1);
// Map of whether consumers were selected for sending.
ConcurrentMap<Integer, Boolean> selected = new ConcurrentHashMap<>();
// Barrier for consumers to wait on for the combiner to select them.
Barrier selectedBarrier = new MonitorBarrier(1);
// Barrier for the printer to wait on for the combiner to finish.
Barrier printerBarrier = new MonitorBarrier(1);
// Receiving list of films at the combiner.
BoundedBuffer<Film> sending = new MonitorBoundedBuffer<>(BUFFER_LENGTH);
// Final received list of films for printing.
List<Film> finalData = new ArrayList<>();
Producer producer = new Producer(fileName, lines);
producer.start();
for (int i = 0; i < consumersNumber; i++) {
Consumer consumer = new Consumer(i, K, lines, progress, results, consumerBarrier, selected, selectedBarrier, sending);
consumer.start();
}
Combiner combiner = new Combiner(consumerBarrier, results, selected, selectedBarrier, sending, finalData, printerBarrier);
combiner.start();
Printer printer = new Printer(M, progress, printerBarrier, finalData);
printer.start();
try {
printer.join();
} catch (Exception e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println("Total time: " + (endTime - startTime) + "ms");
}
}