KDP/Lab 1 2020
Prva laboratorijska vežba 2020. godine nosila je 10 bodova. Isti tekst ponovio se i na narednih nekoliko prvih laboratorijskih vežbi. Za razliku od laboratorijske vežbe za samostalnu vežbu, nije bio dat skelet koda.
Postavka
Potrebno je napisati Java aplikaciju koja pronalazi filmove sa najvećim brojem reditelja. Rešenje mora biti maksimalno konkurentno i otporno na prekide.
U zavisnosti od dodeljene grupe, neke deljene objekte je potrebno implementirati preko određene sinhronizacione primitive:
- Semafori (
java.util.concurrent.Semaphore) - Monitori (
synchronizednad metodom) - Regioni (
synchronizednad objektom)
Delovi aplikacije:
- Klasa
Producerobrađuje ulaznu datoteku sa filmovima i informaciju o svakom pojedinačnom filmu ubacuje u deljeni objekat za dalju obradu. Za implementaciju deljenog objekta koristiti sinhronizacionu primitivu određenu grupom. Ova klasa obradu obavlja u svom toku kontrole. Podaci o filmovima se čitaju red po red iz arhive sa filmovima (title.crew.tsv.gz). - Klasa
Consumerobrađuje informacije o dobijenom filmu i rezultate obrade na kraju prosleđuje na dalju obradu. Ova klasa obradu obavlja u svom toku kontrole. Klasa čita informacije o nekom filmu iz deljenog objekta. Čuvaju se samo filmovi sa maksimalnim brojem reditelja (od dotad obrađenih filmova). Na svakih K (zadaje se) obrađenih filmova koje jedanConsumerobradi, ažurira informaciju o svom broju obrađenih filmova koristeći poseban poseban deljeni objekat. Kada ne bude bilo više filmova za obradu,Consumerprosleđuje svoj lokalni maksimum broja reditelja dalje, u zavisnosti od toga da li ima filmove sa globalnim maksimumom reditelja prosleđuje filmove dalje na obradu i takođe ažurira informaciju o broju obrađenih filmova. Sinhronizacija izmeđuConsumerniti se obavlja korišćenjem deljenog objekta implementiranog preko sinhronizacione primitive određene grupom. - Klasa
Combinerna osnovu prikupljenih lokalnih maksimuma broja reditelja određuje globalni maksimum broja reditelja i javljaConsumernitima koje su prijavili taj lokalni maksimum da pošalju obrađene filmove. Ova klasa obradu obavlja u svom toku kontrole. Klasa preuzima podatke iz deljenog objekta i nakon završene obrade rezultat ubacuje u sledeći deljeni objekat. - Klasa
Printerna svakih M (zadaje se) sekundi štampa informacije o broju obrađenih filmova, sve dok se ne prikupe konačni podaci i na kraju štampa njih na osnovu podataka koje je dostavila klasaCombiner. Ova klasa obradu obavlja u svom toku kontrole. Izlaz aplikacije je lista koja sadrži identifikator filma i broj reditelja tog filma. - Klasa
Testtestira ovaj sistem. Klasa u svojoj početnojmainmetodi kreira sve potrebne deljene objekte, pokreće jednu instancu klaseProducer, veći (podesiv) broj instanci klaseConsumer, jednu instancu klaseCombineri jednu instancu klasePrinter. Nakon kompletno završene obrade, štampa koliko je iznosilo ukupno vreme obrade za dati broj niti koje rade obradu. - Nijedna klasa niti (
Producer,Consumer,Combiner,Printer) ne sme da zna za broj instanci klase Consumer!
Rešenje
Strukture podataka
Klase koje nemaju veze sa konkurentnošću već čisto čuvanjem i obradom podataka.
Film.java
Klasa koja sadrži podatke o jednom filmu i u konstruktoru obrađuje jednu liniju ulaznog fajla.
package kdpl12020;
public class Film {
public final String tconst;
public final String[] directors;
public final String[] writers;
public Film(String line) {
String[] splitLine = line.split("\t");
tconst = splitLine[0];
String directorsCell = splitLine[1];
String writersCell = splitLine[2];
directors = directorsCell.equals("\\N") ? new String[0] : directorsCell.split(",");
writers = writersCell.equals("\\N") ? new String[0] : writersCell.split(",");
}
}
Result.java
Sadrži podatke o krajnjem rezultatu obrade jedne Consumer niti.
package kdpl12020;
public class Result {
public final int id;
public final int maxDirectors;
public Result(int id, int maxDirectors) {
this.id = id;
this.maxDirectors = maxDirectors;
}
}
Sinhronizacija
Klase koje služe sa sinhronizaciju između niti. Mogu biti navedene u više varijanti realizacije, tako da sve dele zajednički interfejs.
BoundedBuffer.java
Generički ograničeni bafer, sličan MessageBox<T> sa vežbi.
package kdpl12020;
public interface BoundedBuffer<T> {
public void put(T data);
public T get();
}
MonitorBoundedBuffer.java
Generički ograničeni bafer implementiran preko monitora.
package kdpl12020;
import java.util.ArrayList;
import java.util.List;
public class MonitorBoundedBuffer<T> implements BoundedBuffer<T> {
private final int capacity;
private final List<T> buffer;
public MonitorBoundedBuffer(int capacity) {
this.capacity = capacity;
buffer = new ArrayList<T>(capacity);
}
@Override
public synchronized void put(T data) {
while (buffer.size() == capacity) {
try {
wait();
} catch (InterruptedException ex) {
// Ignoring all interrupts.
}
}
buffer.add(data);
notifyAll();
}
@Override
public synchronized T get() {
while (buffer.size() == 0) {
try {
wait();
} catch (InterruptedException ex) {
// Ignoring all interrupts.
}
}
T data = buffer.remove(0);
notifyAll();
return data;
}
}
Barrier.java
Sinhronizaciona primitiva barijere, na kojoj više niti može da čeka dok do iste tačke ne dođu i sve ostale niti. U ovoj realizaciji se koristi kao jednokratna, ali moguće je resetovati barijeru u nekom trenutku kako bi mogla iznova da se koristi.
package kdpl12020;
public interface Barrier {
public void arrived();
public boolean await(long timeout);
public void reset();
}
MonitorBarrier.java
Barijera implementirana preko monitora.
package kdpl12020;
public class MonitorBarrier implements Barrier {
private final int count;
private int currentCount = 0;
boolean passed = false;
int round = 0;
public MonitorBarrier(int count) {
this.count = count;
}
@Override
public synchronized void arrived() {
int myRound = round;
if (passed) {
return;
}
if (++currentCount == count) {
passed = true;
notifyAll();
} else {
while (!passed && round == myRound) {
try {
wait();
} catch (InterruptedException e) {}
}
}
}
@Override
public synchronized boolean await(long timeout) {
int myRound = round;
if (passed) {
return true;
}
while (!passed && myRound == round) {
try {
long timeBefore = System.currentTimeMillis();
wait(timeout);
long timeAfter = System.currentTimeMillis();
if (timeout != 0 && timeAfter - timeBefore >= timeout) {
break;
}
} catch (InterruptedException e) {}
}
return currentCount == count;
}
@Override
public synchronized void reset() {
currentCount = 0;
passed = false;
round++;
}
}
Niti
Klase niti pomenute u zadatku.
Producer.java
Čita zadatu datoteku i u deljeni objekat ubacuje pročitane linije. Signalizira da je završila ubacivanjem null u deljeni objekat.
package kdpl12020;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
public class Producer extends Thread {
private final String fileName;
private final BoundedBuffer<String> lines;
public Producer(String fileName, BoundedBuffer<String> lines) {
super("Producer");
this.fileName = fileName;
this.lines = lines;
}
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new FileReader(fileName))) {
// Skip first line (header).
String line = reader.readLine();
while ((line = reader.readLine()) != null) {
lines.put(line);
}
// Tell consumers the queue is empty.
lines.put(null);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
Consumer.java
Čita linije iz deljenog objekta i obrađuje ih, tokom čega ažurira svoj broj obrađenih linija i na kraju čeka na barijeri da sve ostale Consumer niti završe. Nakon toga čeka da Combiner nit javi da li je ona izabrana za slanje filmova, i zatim šalje filmove u novi deljeni objekat.
package kdpl12020;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
public class Consumer extends Thread {
private final int id;
private final int k;
private final BoundedBuffer<String> lines;
private final ConcurrentMap<Integer, Integer> progress;
private final BoundedBuffer<Result> results;
private final ConcurrentMap<Integer, Boolean> selected;
private final Barrier selectedBarrier;
private final Barrier barrier;
private final BoundedBuffer<Film> sending;
public Consumer(int id, int k, BoundedBuffer<String> lines, ConcurrentMap<Integer, Integer> progress,
BoundedBuffer<Result> results, Barrier barrier, ConcurrentMap<Integer, Boolean> selected,
Barrier selectedBarrier, BoundedBuffer<Film> sending) {
super("Consumer" + id);
this.id = id;
this.k = k;
this.lines = lines;
this.progress = progress;
this.results = results;
this.barrier = barrier;
this.selected = selected;
this.selectedBarrier = selectedBarrier;
this.sending = sending;
}
@Override
public void run() {
String line;
List<Film> films = new ArrayList<>();
int maxDirectors = 0;
int processed = 0;
while ((line = lines.get()) != null) {
Film data = new Film(line);
int directorCount = data.directors.length;
if (directorCount > maxDirectors) {
maxDirectors = directorCount;
films.clear();
}
if (directorCount == maxDirectors) {
films.add(data);
}
if (++processed % k == 0) {
// Update our progress every K processed lines.
progress.put(id, processed);
}
}
progress.put(id, processed);
// Signal other consumers we're done.
lines.put(null);
// Signal the combiner we're done.
results.put(new Result(id, maxDirectors));
barrier.arrived();
selectedBarrier.await(0);
if (selected.getOrDefault(id, false)) {
// Send data to the combiner.
for (Film film : films) {
sending.put(film);
}
// Signal the combiner sending data is over.
sending.put(null);
}
}
}
Combiner.java
Čeka da Consumer niti završe, zatim dohvata njihove rezultate, bira niti koje treba da pošalju svoje filmove, čeka na poslate filmove i na kraju javlja Printer da ispiše rezultate.
package kdpl12020;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
public class Combiner extends Thread {
private final Barrier consumerBarrier;
private final BoundedBuffer<Result> results;
private final ConcurrentMap<Integer, Boolean> selected;
private final Barrier selectedBarrier;
private final BoundedBuffer<Film> sending;
private final List<Film> finalData;
private final Barrier printerBarrier;
public Combiner(Barrier consumerBarrier, BoundedBuffer<Result> results, ConcurrentMap<Integer, Boolean> selected,
Barrier selectedBarrier, BoundedBuffer<Film> sending, List<Film> finalData, Barrier printerBarrier) {
super("Combiner");
this.consumerBarrier = consumerBarrier;
this.results = results;
this.selected = selected;
this.selectedBarrier = selectedBarrier;
this.sending = sending;
this.finalData = finalData;
this.printerBarrier = printerBarrier;
}
@Override
public void run() {
int maxDirectors = 0;
List<Integer> maxId = new ArrayList<>();
Result result;
consumerBarrier.await(0);
results.put(null);
while ((result = results.get()) != null) {
if (result.maxDirectors > maxDirectors) {
maxDirectors = result.maxDirectors;
maxId.clear();
}
if (result.maxDirectors == maxDirectors) {
maxId.add(result.id);
}
}
int senders = maxId.size();
for (int id : maxId) {
selected.put(id, true);
}
selectedBarrier.arrived();
while (true) {
Film data = sending.get();
if (data == null) {
if (--senders == 0) {
break;
}
} else {
finalData.add(data);
}
}
printerBarrier.arrived();
}
}
Printer.java
Čeka na Combiner nit da ubaci rezultate za ispisivanje, a u međuvremenu čita status Consumer niti i ispisuje na svakih M sekundi.
package kdpl12020;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
public class Printer extends Thread {
private final long interval;
private final ConcurrentMap<Integer, Integer> progress;
private final Barrier printerBarrier;
private final List<Film> finalData;
public Printer(long interval, ConcurrentMap<Integer, Integer> progress, Barrier printerBarrier, List<Film> finalData) {
super("Printer");
this.interval = interval;
this.progress = progress;
this.printerBarrier = printerBarrier;
this.finalData = finalData;
}
@Override
public void run() {
while (true) {
if (printerBarrier.await(interval * 1000)) {
break;
}
for (Integer id : new TreeSet<Integer>(progress.keySet())) {
System.out.println("Consumer " + id + ": " + progress.get(id));
}
}
System.out.println("Final result:");
for (Film film : finalData) {
System.out.println(film.tconst + " (" + film.directors.length + ")");
}
}
}
Test.java
Povezuje sve ostale niti.
package kdpl12020;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class Test {
private static final int BUFFER_LENGTH = 10000;
public static void main(String[] args) {
final int consumersNumber = args.length == 0 ? 5 : Integer.parseInt(args[0]);
final int K = args.length < 2 ? 1000 : Integer.parseInt(args[1]);
final String fileName = args.length < 3 ? "title.crew.tsv" : args[3];
final long M = args.length < 4 ? 5 : Long.parseLong(args[4]);
// Buffer for lines in the TSV file.
BoundedBuffer<String> lines = new MonitorBoundedBuffer<>(BUFFER_LENGTH);
// Barrier for consumers to wait on.
Barrier consumerBarrier = new MonitorBarrier(consumersNumber);
// Map of how are consumers progressing.
ConcurrentMap<Integer, Integer> progress = new ConcurrentHashMap<>();
// List of consumer result counts.
BoundedBuffer<Result> results = new MonitorBoundedBuffer<>(consumersNumber + 1);
// Map of whether consumers were selected for sending.
ConcurrentMap<Integer, Boolean> selected = new ConcurrentHashMap<>();
// Barrier for consumers to wait on for the combiner to select them.
Barrier selectedBarrier = new MonitorBarrier(1);
// Barrier for the printer to wait on for the combiner to finish.
Barrier printerBarrier = new MonitorBarrier(1);
// Receiving list of films at the combiner.
BoundedBuffer<Film> sending = new MonitorBoundedBuffer<>(BUFFER_LENGTH);
// Final received list of films for printing.
List<Film> finalData = new ArrayList<>();
Producer producer = new Producer(fileName, lines);
producer.start();
for (int i = 0; i < consumersNumber; i++) {
Consumer consumer = new Consumer(i, K, lines, progress, results, consumerBarrier, selected, selectedBarrier, sending);
consumer.start();
}
Combiner combiner = new Combiner(consumerBarrier, results, selected, selectedBarrier, sending, finalData, printerBarrier);
combiner.start();
Printer printer = new Printer(M, progress, printerBarrier, finalData);
printer.start();
try {
printer.join();
} catch (Exception e) {
e.printStackTrace();
}
}
}