КДП/Лаб 1 РТИ 2021
Прва лабораторијска вежба 2021. године за РТИ носила је 10 бодова. Одржана је у два дана, где је првог дана поставка задатка била иста као 2020. године, а другог дана задатак описан испод.
Поставка
Потребно је написати Јава апликацију која проналази филмове са највишим оценама у свом жанру, по деценијама. Решење мора бити максимално конкурентно и отпорно на прекиде.
У зависности од додељене групе, неке дељене објекте је потребно имплементирати преко одређене синхронизационе примитиве:
- Семафори (
java.util.concurrent.Semaphore
) - Монитори (
synchronized
над методом) - Региони (
synchronized
над објектом) java.util.concurrent.locks.Lock
Делови апликације:
- Класа
Producer
обрађује улазну датотеку са филмовима и информације о њима убацује у дељени објекат за даљу обраду. За имплементацију овог дељеног објекта користити синхронизациону примитиву одређену групом. Након што су сви филмови извађени из дељеног објекта, ова класа затим обрађује улазну датотеку са оценама филмова и информације о њима убацује у други дељени објекат за даљу обраду. За имплементацију овог другог дељеног објекта неопходно је користити семафоре. Ова класа обраду обавља у свом току контроле. Подаци о филмовима се читају ред по ред из архиве са филмовима (title.basics.tsv.gz) а подаци о оценама филмова се читају ред по ред из архиве са оценама филмова (title.ratings.tsv.gz). Напомена: Датотеке садрже преко осам милиона редова, па је препоручено скратити их на око 300.000 пре покретања програма (у решењу испод користе се називи title.basics_300000.tsv и title.ratings_300000.tsv). - Класа
Consumer
обрађује информације о добијеним филмовима и оценама и резултате обраде на крају прослеђује на даљу обраду. Ова класа обраду обавља у свом току контроле. Класа чита информације о неком филму из једног, а оценама из другог дељеног објекта. Током обраде, ажурира информацију о броју укупном обрађених филмова користећи одвојени дељени објекат. Кад више нема оцена за обраду, класа прослеђује филмове са максималним оценама на даљу обраду користећи још један дељени објекат и такође ажурира информацију о броју обрађених филмова. Синхронизација измеђуConsumer
нити се обавља коришћењем дељеног објекта имплементираног преко синхронизационе примитиве одређене групом. - Класа
Combiner
на основу прикупљених локалних максимума оцена по жанровима по деценијама одређује глобалне максимуме. Ова класа обраду обавља у свом току контроле. Класа преузима податке из дељеног објекта и након завршене обраде резултат убацује у следећи дељени објекат. - Класа
Printer
на сваких M (задаје се) секунди штампа информације о броју обрађених филмова, све док се не прикупе коначни подаци и на крају штампа њих на основу података које је доставила класаCombiner
. Ова класа обраду обавља у свом току контроле. Излаз апликације је списак деценија, под сваком деценијом исписани жанрови филмова који су снимани у тој деценији, и за сваки жанр филм са највишом оценом. - Класа
Test
тестира овај систем. Класа у својој почетнојmain
методи креира све потребне дељене објекте, покреће једну инстанцу класеProducer
, већи (подесив) број инстанци класеConsumer
, једну инстанцу класеCombiner
и једну инстанцу класеPrinter
. Након комплетно завршене обраде, штампа колико је износило укупно време обраде за дати број нити које раде обраду. - Ниједна класа нити (
Producer
,Consumer
,Combiner
,Printer
) не сме да зна за број инстанци класе Consumer!
Решење
Структуре података
Класе које немају везе са конкурентношћу већ чисто чувањем и обрадом података.
Film.java
Класа која садржи податке о једном филму и у конструктору обрађује једну линију улазног фајла.
package kdpl12022;
public class Film {
public final String tconst;
public final String titleType;
public final String primaryTitle;
public final String originalTitle;
public final boolean isAdult;
public final int startYear;
public final int endYear;
public final int runtimeMinutes;
public final String[] genres;
public Rating rating;
public Film(String row) {
String[] splitRow = row.split("\t");
tconst = splitRow[0];
titleType = splitRow[1];
primaryTitle = splitRow[2];
originalTitle = splitRow[3];
isAdult = splitRow[4].equals("1");
startYear = splitRow[5].equals("\\N") ? -1 : Integer.parseInt(splitRow[5]);
endYear = splitRow[6].equals("\\N") ? -1 : Integer.parseInt(splitRow[6]);
runtimeMinutes = splitRow[7].equals("\\N") ? -1 : Integer.parseInt(splitRow[7]);
genres = splitRow[8].equals("\\N") ? new String[0] : splitRow[8].split(",");
}
}
Rating.java
Класа која садржи податке о оценама једног филма и у конструктору обрађује једну линију улазног фајла.
package kdpl12022;
public class Rating {
public final String tconst;
public final double averageRating;
public final int numVotes;
public Rating(String row) {
String[] splitRow = row.split("\t");
tconst = splitRow[0];
averageRating = Double.parseDouble(splitRow[1]);
numVotes = Integer.parseInt(splitRow[2]);
}
}
Синхронизација
Класе које служе са синхронизацију између нити. Могу бити наведене у више варијанти реализације, тако да све деле заједнички интерфејс.
BoundedBuffer.java
Barrier.java
AtomicBroadcastBuffer.java
Синхронизациона примитива atomic broadcast бафера на који један произвођач може да шаље податке, а више потрошача може да чита те податке тако да сваки потрошач прочита сваки послати податак.
package kdpl12022;
public interface AtomicBroadcastBuffer<T> {
public void put(T data);
public T get(int id);
}
SemaphoreAtomicBroadcastBuffer.java
Atomic broadcast бафер имплементиран преко семафора.
package kdpl12022;
import java.util.concurrent.Semaphore;
public class SemaphoreAtomicBroadcastBuffer<T> implements AtomicBroadcastBuffer<T> {
private final int capacity;
private final int consumers;
private final Semaphore[] itemMutex;
private final Semaphore[] consumerSems;
private final Semaphore producerSem;
// Protected by itemMutex.
private final T[] buffer;
private int consumerCount[];
// Used by only one thread.
private int producerProgress = 0;
private int consumerProgress[];
@SuppressWarnings("unchecked")
public SemaphoreAtomicBroadcastBuffer(int capacity, int consumers) {
this.capacity = capacity;
this.consumers = consumers;
producerSem = new Semaphore(capacity);
itemMutex = new Semaphore[capacity];
for (int i = 0; i < capacity; ++i) {
itemMutex[i] = new Semaphore(1);
}
consumerSems = new Semaphore[consumers];
for (int i = 0; i < consumers; ++i) {
consumerSems[i] = new Semaphore(0);
}
consumerProgress = new int[consumers];
consumerCount = new int[capacity];
buffer = (T[]) new Object[capacity];
}
@Override
public void put(T data) {
producerSem.acquireUninterruptibly();
buffer[producerProgress] = data;
consumerCount[producerProgress] = consumers;
producerProgress = (producerProgress + 1) % capacity;
for (int i = 0; i < consumers; ++i) {
consumerSems[i].release();
}
}
@Override
public T get(int id) {
consumerSems[id].acquireUninterruptibly();
int currentElement = consumerProgress[id];
consumerProgress[id] = (consumerProgress[id] + 1) % capacity;
itemMutex[currentElement].acquireUninterruptibly();
T item = buffer[currentElement];
int currentConsumerCount = --consumerCount[currentElement];
itemMutex[currentElement].release();
if (currentConsumerCount == 0) {
producerSem.release();
}
return item;
}
}
Нити
Класе нити поменуте у задатку.
Producer.java
Чита датотеку title.basics_300000.tsv
и у дељени објекат убацује прочитане линије, а након што су све Consumer
нити прочитале податке о филмовима чита датотеку title.ratings_300000.tsv
и убацује податке о оценама у други дељени објекат. Сигнализира да је завршила убацивањем null
у дељене објекте.
package kdpl12022;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
public class Producer extends Thread {
private final BoundedBuffer<String> lines;
private final AtomicBroadcastBuffer<Rating> ratings;
private final Barrier consumersBarrier;
public Producer(BoundedBuffer<String> lines, AtomicBroadcastBuffer<Rating> ratings, Barrier consumersBarrier) {
super("Producer");
this.lines = lines;
this.ratings = ratings;
this.consumersBarrier = consumersBarrier;
}
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new FileReader("title.basics_300000.tsv"))) {
// Skip first line (header).
String line = reader.readLine();
while ((line = reader.readLine()) != null) {
lines.put(line);
}
// Tell consumers the queue is empty.
lines.put(null);
} catch (IOException ex) {
ex.printStackTrace();
}
consumersBarrier.await(0);
try (BufferedReader reader = new BufferedReader(new FileReader("title.ratings_300000.tsv"))) {
// Skip first line (header).
String line = reader.readLine();
while ((line = reader.readLine()) != null) {
ratings.put(new Rating(line));
}
// Tell consumers the queue is empty.
ratings.put(null);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
Consumer.java
Чита линије из првог дељеног објекта и обрађује их, током чега ажурира свој број обрађених линија и на крају чека на баријери да све остале Consumer
нити заврше. Након тога, чита податке о оценама филмова из дељеног објекта и уколико има филм за који је то оцена чува ту оцену у том филму. На крају шаље све филмове које представљају локалне максимуме у дељени објекат и опет чека да све остале Consumer
нити заврше. Укупно обрађени филмови се рачунају кроз две променљиве: једна за укупан број филмова прочитаних од Producer
и друга за укупан број оцена спојених са филмовима. Ове две променљиве на крају не морају да буду исте (неки филмови немају оцене, а неке оцене су одсечене кад су фајлови скраћени на 300.000 линија) нити укупан број примљених филмова мора да буде једнак броју линија (неке линије не садрже годину почетка филма уопште и стога се не рачунају).
package kdpl12022;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class Consumer extends Thread {
private final int id;
private final BoundedBuffer<String> lines;
private final AtomicBroadcastBuffer<Rating> ratings;
private final Barrier consumersBarrier;
private final AtomicInteger totalFilms;
private final AtomicInteger matchedFilms;
private final BoundedBuffer<Map<Integer, Map<String, Film>>> localFilms;
private final Barrier combinerBarrier;
public Consumer(int id, BoundedBuffer<String> lines, AtomicBroadcastBuffer<Rating> ratings,
Barrier consumersBarrier, AtomicInteger totalFilms, AtomicInteger matchedFilms,
BoundedBuffer<Map<Integer, Map<String, Film>>> localFilms, Barrier combinerBarrier) {
super("Consumer");
this.id = id;
this.lines = lines;
this.ratings = ratings;
this.consumersBarrier = consumersBarrier;
this.totalFilms = totalFilms;
this.matchedFilms = matchedFilms;
this.localFilms = localFilms;
this.combinerBarrier = combinerBarrier;
}
@Override
public void run() {
String line;
Map<String, Film> films = new HashMap<>();
Map<Integer, Map<String, Film>> maximumRatings = new HashMap<>();
while ((line = lines.get()) != null) {
Film film = new Film(line);
if (film.startYear == -1) {
continue;
}
films.put(film.tconst, film);
totalFilms.incrementAndGet();
}
// Tell other consumers there is nothing to consume.
lines.put(null);
consumersBarrier.arrived();
Rating rating;
while ((rating = ratings.get(id)) != null) {
Film film = films.get(rating.tconst);
if (film != null) {
film.rating = rating;
int decade = film.startYear / 10;
Map<String, Film> genreMap = maximumRatings.get(decade);
if (genreMap == null) {
genreMap = new HashMap<>();
maximumRatings.put(decade, genreMap);
}
for (String genre : film.genres) {
Film currentGenreMaximum = genreMap.get(genre);
if (currentGenreMaximum == null) {
genreMap.put(genre, film);
} else {
if (film.rating.averageRating > currentGenreMaximum.rating.averageRating) {
genreMap.put(genre, film);
}
}
}
}
matchedFilms.incrementAndGet();
}
localFilms.put(maximumRatings);
combinerBarrier.arrived();
}
}
Combiner.java
Чека да Consumer
нити заврше, затим дохвата њихове резултате и формира мапу глобалних максимума, и на крају јавља Printer
нити да је завршила.
package kdpl12022;
import java.util.Map;
import java.util.TreeMap;
public class Combiner extends Thread {
private final BoundedBuffer<Map<Integer, Map<String, Film>>> localFilms;
private final Barrier combinerBarrier;
private final Barrier printerBarrier;
private final Map<Integer, Map<String, Film>> maximumRatings;
public Combiner(BoundedBuffer<Map<Integer, Map<String, Film>>> localFilms, Barrier combinerBarrier,
Map<Integer, Map<String, Film>> maximumRatings, Barrier printerBarrier) {
super("Combiner");
this.localFilms = localFilms;
this.combinerBarrier = combinerBarrier;
this.maximumRatings = maximumRatings;
this.printerBarrier = printerBarrier;
}
@Override
public void run() {
combinerBarrier.await(0);
localFilms.put(null);
Map<Integer, Map<String, Film>> films;
while ((films = localFilms.get()) != null) {
for (int decade : films.keySet()) {
Map<String, Film> genreMap = maximumRatings.get(decade);
if (genreMap == null) {
genreMap = new TreeMap<>();
maximumRatings.put(decade, genreMap);
}
for (String genre : films.get(decade).keySet()) {
Film film = films.get(decade).get(genre);
Film currentGenreMaximum = genreMap.get(genre);
if (currentGenreMaximum == null) {
genreMap.put(genre, film);
} else {
if (film.rating.averageRating > currentGenreMaximum.rating.averageRating) {
genreMap.put(genre, film);
}
}
}
}
}
printerBarrier.arrived();
}
}
Printer.java
Чека на Combiner
нит да убаци резултате за исписивање, а у међувремену чита број укупно обрађених филмова и исписује на сваких M секунди.
package kdpl12022;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class Printer extends Thread {
private final long printTime;
private final AtomicInteger totalFilms;
private final AtomicInteger matchedFilms;
private final Barrier printerBarrier;
private final Map<Integer, Map<String, Film>> maximumRatings;
public Printer(long printTime, AtomicInteger totalFilms, AtomicInteger matchedFilms, Barrier printerBarrier,
Map<Integer, Map<String, Film>> maximumRatings) {
super("Printer");
this.printTime = printTime;
this.totalFilms = totalFilms;
this.matchedFilms = matchedFilms;
this.printerBarrier = printerBarrier;
this.maximumRatings = maximumRatings;
}
@Override
public void run() {
while (!printerBarrier.await(printTime * 1000)) {
System.out.println(matchedFilms.get() + "/" + totalFilms.get());
}
for (Integer decade : maximumRatings.keySet()) {
System.out.println((decade * 10) + "-" + (decade * 10 + 9) + ":");
Map<String, Film> genres = maximumRatings.get(decade);
for (String genre : genres.keySet()) {
System.out.println("\t" + genre + ": " + genres.get(genre).primaryTitle);
}
}
}
}
Test.java
Повезује све остале нити.
package kdpl12022;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
public class Test {
private static final int BUFFER_LENGTH = 10000;
public static void main(String[] args) {
final int consumersNumber = args.length == 0 ? 5 : Integer.parseInt(args[0]);
final long M = args.length < 4 ? 5 : Long.parseLong(args[4]);
long startTime = System.currentTimeMillis();
// Buffer for film file lines.
BoundedBuffer<String> lines = new MonitorBoundedBuffer<>(BUFFER_LENGTH);
// Buffer for ratings file lines.
AtomicBroadcastBuffer<Rating> ratings = new SemaphoreAtomicBroadcastBuffer<>(BUFFER_LENGTH, consumersNumber);
// Barrier used to tell the producers that consumers have finished reading films.
Barrier consumersBarrier = new MonitorBarrier(consumersNumber);
// Barrier used to tell the combiner that consumers have finished sending films.
Barrier combinerBarrier = new MonitorBarrier(consumersNumber);
// Variables for updating the amount of processed films.
AtomicInteger totalFilms = new AtomicInteger();
AtomicInteger matchedFilms = new AtomicInteger();
// Buffer for local maximum maps that consumers are sending to the combiner.
BoundedBuffer<Map<Integer, Map<String, Film>>> localFilms = new MonitorBoundedBuffer<>(consumersNumber + 1);
// Barrier for the printer to wait for the combiner to finish.
Barrier printerBarrier = new MonitorBarrier(1);
// Combiner maximum rated films.
Map<Integer, Map<String, Film>> maximumRatings = new TreeMap<>();
Producer producer = new Producer(lines, ratings, consumersBarrier);
producer.start();
for (int i = 0; i < consumersNumber; ++i) {
Consumer consumer = new Consumer(i, lines, ratings, consumersBarrier, totalFilms, matchedFilms, localFilms, combinerBarrier);
consumer.start();
}
Combiner combiner = new Combiner(localFilms, combinerBarrier, maximumRatings, printerBarrier);
combiner.start();
Printer printer = new Printer(M, totalFilms, matchedFilms, printerBarrier, maximumRatings);
printer.start();
try {
printer.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println("Total time: " + (endTime - startTime) + "ms");
}
}