KDP/Lab 1 RTI 2021

Izvor: SI Wiki
< КДП
Datum izmene: 12. maj 2022. u 18:18; autor: KockaAdmiralac (razgovor | doprinosi) (Prvi lab drugi dan za RTI ove godine)
(razl) ← Starija izmena | Trenutna verzija (razl) | Novija izmena → (razl)
Pređi na navigaciju Pređi na pretragu

Prva laboratorijska vežba 2021. godine za RTI nosila je 10 bodova. Održana je u dva dana, gde je prvog dana postavka zadatka bila ista kao 2020. godine, a drugog dana zadatak opisan ispod.

Postavka

Potrebno je napisati Java aplikaciju koja pronalazi filmove sa najvišim ocenama u svom žanru, po decenijama. Rešenje mora biti maksimalno konkurentno i otporno na prekide.

U zavisnosti od dodeljene grupe, neke deljene objekte je potrebno implementirati preko određene sinhronizacione primitive:

  1. Semafori (java.util.concurrent.Semaphore)
  2. Monitori (synchronized nad metodom)
  3. Regioni (synchronized nad objektom)

Delovi aplikacije:

  • Klasa Producer obrađuje ulaznu datoteku sa filmovima i informacije o njima ubacuje u deljeni objekat za dalju obradu. Za implementaciju ovog deljenog objekta koristiti sinhronizacionu primitivu određenu grupom. Nakon što su svi filmovi izvađeni iz deljenog objekta, ova klasa zatim obrađuje ulaznu datoteku sa ocenama filmova i informacije o njima ubacuje u drugi deljeni objekat za dalju obradu. Za implementaciju ovog drugog deljenog objekta neophodno je koristiti semafore. Ova klasa obradu obavlja u svom toku kontrole. Podaci o filmovima se čitaju red po red iz arhive sa filmovima (title.basics.tsv.gz) a podaci o ocenama filmova se čitaju red po red iz arhive sa ocenama filmova (title.ratings.tsv.gz). Napomena: Datoteke sadrže preko osam miliona redova, pa je preporučeno skratiti ih na oko 300.000 pre pokretanja programa (u rešenju ispod koriste se nazivi title.basics_300000.tsv i title.ratings_300000.tsv).
  • Klasa Consumer obrađuje informacije o dobijenim filmovima i ocenama i rezultate obrade na kraju prosleđuje na dalju obradu. Ova klasa obradu obavlja u svom toku kontrole. Klasa čita informacije o nekom filmu iz jednog, a ocenama iz drugog deljenog objekta. Tokom obrade, ažurira informaciju o broju ukupnom obrađenih filmova koristeći odvojeni deljeni objekat. Kad više nema ocena za obradu, klasa prosleđuje filmove sa maksimalnim ocenama na dalju obradu koristeći još jedan deljeni objekat i takođe ažurira informaciju o broju obrađenih filmova. Sinhronizacija između Consumer niti se obavlja korišćenjem deljenog objekta implementiranog preko sinhronizacione primitive određene grupom.
  • Klasa Combiner na osnovu prikupljenih lokalnih maksimuma ocena po žanrovima po decenijama određuje globalne maksimume. Ova klasa obradu obavlja u svom toku kontrole. Klasa preuzima podatke iz deljenog objekta i nakon završene obrade rezultat ubacuje u sledeći deljeni objekat.
  • Klasa Printer na svakih M (zadaje se) sekundi štampa informacije o broju obrađenih filmova, sve dok se ne prikupe konačni podaci i na kraju štampa njih na osnovu podataka koje je dostavila klasa Combiner. Ova klasa obradu obavlja u svom toku kontrole. Izlaz aplikacije je spisak decenija, pod svakom decenijom ispisani žanrovi filmova koji su snimani u toj deceniji, i za svaki žanr film sa najvišom ocenom.
  • Klasa Test testira ovaj sistem. Klasa u svojoj početnoj main metodi kreira sve potrebne deljene objekte, pokreće jednu instancu klase Producer, veći (podesiv) broj instanci klase Consumer, jednu instancu klase Combiner i jednu instancu klase Printer. Nakon kompletno završene obrade, štampa koliko je iznosilo ukupno vreme obrade za dati broj niti koje rade obradu.
  • Nijedna klasa niti (Producer, Consumer, Combiner, Printer) ne sme da zna za broj instanci klase Consumer!

Rešenje

Strukture podataka

Klase koje nemaju veze sa konkurentnošću već čisto čuvanjem i obradom podataka.

Film.java

Klasa koja sadrži podatke o jednom filmu i u konstruktoru obrađuje jednu liniju ulaznog fajla.

package kdpl12022;

public class Film {
	public final String tconst;
	public final String titleType;
	public final String primaryTitle;
	public final String originalTitle;
	public final boolean isAdult;
	public final int startYear;
	public final int endYear;
	public final int runtimeMinutes;
	public final String[] genres;
	public Rating rating;
	public Film(String row) {
		String[] splitRow = row.split("\t");
		tconst = splitRow[0];
		titleType = splitRow[1];
		primaryTitle = splitRow[2];
		originalTitle = splitRow[3];
		isAdult = splitRow[4].equals("1");
		startYear = splitRow[5].equals("\\N") ? -1 : Integer.parseInt(splitRow[5]);
		endYear = splitRow[6].equals("\\N") ? -1 : Integer.parseInt(splitRow[6]);
		runtimeMinutes = splitRow[7].equals("\\N") ? -1 : Integer.parseInt(splitRow[7]);
		genres = splitRow[8].equals("\\N") ? new String[0] : splitRow[8].split(",");
	}
}

Rating.java

Klasa koja sadrži podatke o ocenama jednog filma i u konstruktoru obrađuje jednu liniju ulaznog fajla.

package kdpl12022;

public class Rating {
	public final String tconst;
	public final double averageRating;
	public final int numVotes;
	public Rating(String row) {
		String[] splitRow = row.split("\t");
		tconst = splitRow[0];
		averageRating = Double.parseDouble(splitRow[1]);
		numVotes = Integer.parseInt(splitRow[2]);
	}
}

Sinhronizacija

Klase koje služe sa sinhronizaciju između niti. Mogu biti navedene u više varijanti realizacije, tako da sve dele zajednički interfejs.

BoundedBuffer.java

Videti interfejs i različite implementacije na prvom labu 2020. godine.

Barrier.java

Videti interfejs i različite implementacije na prvom labu 2020. godine.

AtomicBroadcastBuffer.java

Sinhronizaciona primitiva atomic broadcast bafera na koji jedan proizvođač može da šalje podatke, a više potrošača može da čita te podatke tako da svaki potrošač pročita svaki poslati podatak.

package kdpl12022;

public interface AtomicBroadcastBuffer<T> {
	public void put(T data);
	public T get(int id);
}

SemaphoreAtomicBroadcastBuffer.java

Atomic broadcast bafer implementiran preko semafora.

package kdpl12022;

import java.util.concurrent.Semaphore;

public class SemaphoreAtomicBroadcastBuffer<T> implements AtomicBroadcastBuffer<T> {
	private final int capacity;
	private final int consumers;
	private final Semaphore[] itemMutex;
	private final Semaphore[] consumerSems;
	private final Semaphore producerSem;
	// Protected by itemMutex.
	private final T[] buffer;
	private int consumerCount[];
	// Used by only one thread.
	private int producerProgress = 0;
	private int consumerProgress[];
	
	@SuppressWarnings("unchecked")
	public SemaphoreAtomicBroadcastBuffer(int capacity, int consumers) {
		this.capacity = capacity;
		this.consumers = consumers;
		producerSem = new Semaphore(capacity);
		itemMutex = new Semaphore[capacity];
		for (int i = 0; i < capacity; ++i) {
			itemMutex[i] = new Semaphore(1);
		}
		consumerSems = new Semaphore[consumers];
		for (int i = 0; i < consumers; ++i) {
			consumerSems[i] = new Semaphore(0);
		}
		consumerProgress = new int[consumers];
		consumerCount = new int[capacity];
		buffer = (T[]) new Object[capacity];
	}
	
	@Override
	public void put(T data) {
		producerSem.acquireUninterruptibly();
		buffer[producerProgress] = data;
		consumerCount[producerProgress] = consumers;
		producerProgress = (producerProgress + 1) % capacity;
		for (int i = 0; i < consumers; ++i) {
			consumerSems[i].release();
		}
	}

	@Override
	public T get(int id) {
		consumerSems[id].acquireUninterruptibly();
		int currentElement = consumerProgress[id];
		consumerProgress[id] = (consumerProgress[id] + 1) % capacity;
		itemMutex[currentElement].acquireUninterruptibly();
		T item = buffer[currentElement];
		int currentConsumerCount = --consumerCount[currentElement];
		itemMutex[currentElement].release();
		if (currentConsumerCount == 0) {
			producerSem.release();
		}
		return item;
	}

}

Niti

Klase niti pomenute u zadatku.

Producer.java

Čita datoteku title.basics_300000.tsv i u deljeni objekat ubacuje pročitane linije, a nakon što su sve Consumer niti pročitale podatke o filmovima čita datoteku title.ratings_300000.tsv i ubacuje podatke o ocenama u drugi deljeni objekat. Signalizira da je završila ubacivanjem null u deljene objekte.

package kdpl12022;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

public class Producer extends Thread {
	private final BoundedBuffer<String> lines;
	private final AtomicBroadcastBuffer<Rating> ratings;
	private final Barrier consumersBarrier;

	public Producer(BoundedBuffer<String> lines, AtomicBroadcastBuffer<Rating> ratings, Barrier consumersBarrier) {
		super("Producer");
		this.lines = lines;
		this.ratings = ratings;
		this.consumersBarrier = consumersBarrier;
	}

	@Override
	public void run() {
		try (BufferedReader reader = new BufferedReader(new FileReader("title.basics_300000.tsv"))) {
			// Skip first line (header).
			String line = reader.readLine();
			while ((line = reader.readLine()) != null) {
				lines.put(line);
			}
			// Tell consumers the queue is empty.
			lines.put(null);
		} catch (IOException ex) {
			ex.printStackTrace();
		}
		consumersBarrier.await(0);
		try (BufferedReader reader = new BufferedReader(new FileReader("title.ratings_300000.tsv"))) {
			// Skip first line (header).
			String line = reader.readLine();
			while ((line = reader.readLine()) != null) {
				ratings.put(new Rating(line));
			}
			// Tell consumers the queue is empty.
			ratings.put(null);
		} catch (IOException ex) {
			ex.printStackTrace();
		}
	}
}

Consumer.java

Čita linije iz prvog deljenog objekta i obrađuje ih, tokom čega ažurira svoj broj obrađenih linija i na kraju čeka na barijeri da sve ostale Consumer niti završe. Nakon toga, čita podatke o ocenama filmova iz deljenog objekta i ukoliko ima film za koji je to ocena čuva tu ocenu u tom filmu. Na kraju šalje sve filmove koje predstavljaju lokalne maksimume u deljeni objekat i opet čeka da sve ostale Consumer niti završe. Ukupno obrađeni filmovi se računaju kroz dve promenljive: jedna za ukupan broj filmova pročitanih od Producer i druga za ukupan broj ocena spojenih sa filmovima. Ove dve promenljive na kraju ne moraju da budu iste (neki filmovi nemaju ocene, a neke ocene su odsečene kad su fajlovi skraćeni na 300.000 linija) niti ukupan broj primljenih filmova mora da bude jednak broju linija (neke linije ne sadrže godinu početka filma uopšte i stoga se ne računaju).

package kdpl12022;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class Consumer extends Thread {
	private final int id;
	private final BoundedBuffer<String> lines;
	private final AtomicBroadcastBuffer<Rating> ratings;
	private final Barrier consumersBarrier;
	private final AtomicInteger totalFilms;
	private final AtomicInteger matchedFilms;
	private final BoundedBuffer<Map<Integer, Map<String, Film>>> localFilms;
	private final Barrier combinerBarrier;

	public Consumer(int id, BoundedBuffer<String> lines, AtomicBroadcastBuffer<Rating> ratings,
			Barrier consumersBarrier, AtomicInteger totalFilms, AtomicInteger matchedFilms,
			BoundedBuffer<Map<Integer, Map<String, Film>>> localFilms, Barrier combinerBarrier) {
		super("Consumer");
		this.id = id;
		this.lines = lines;
		this.ratings = ratings;
		this.consumersBarrier = consumersBarrier;
		this.totalFilms = totalFilms;
		this.matchedFilms = matchedFilms;
		this.localFilms = localFilms;
		this.combinerBarrier = combinerBarrier;
	}

	@Override
	public void run() {
		String line;
		Map<String, Film> films = new HashMap<>();
		Map<Integer, Map<String, Film>> maximumRatings = new HashMap<>();
		while ((line = lines.get()) != null) {
			Film film = new Film(line);
			if (film.startYear == -1) {
				continue;
			}
			films.put(film.tconst, film);
			totalFilms.incrementAndGet();
		}
		// Tell other consumers there is nothing to consume.
		lines.put(null);
		consumersBarrier.arrived();
		Rating rating;
		while ((rating = ratings.get(id)) != null) {
			Film film = films.get(rating.tconst);
			if (film != null) {
				film.rating = rating;
				int decade = film.startYear / 10;
				Map<String, Film> genreMap = maximumRatings.get(decade);
				if (genreMap == null) {
					genreMap = new HashMap<>();
					maximumRatings.put(decade, genreMap);
				}
				for (String genre : film.genres) {
					Film currentGenreMaximum = genreMap.get(genre);
					if (currentGenreMaximum == null) {
						genreMap.put(genre, film);
					} else {
						if (film.rating.averageRating > currentGenreMaximum.rating.averageRating) {
							genreMap.put(genre, film);
						}
					}
				}
			}
			matchedFilms.incrementAndGet();
		}
		localFilms.put(maximumRatings);
		combinerBarrier.arrived();
	}
}

Combiner.java

Čeka da Consumer niti završe, zatim dohvata njihove rezultate i formira mapu globalnih maksimuma, i na kraju javlja Printer niti da je završila.

package kdpl12022;

import java.util.Map;
import java.util.TreeMap;

public class Combiner extends Thread {
	private final BoundedBuffer<Map<Integer, Map<String, Film>>> localFilms;
	private final Barrier combinerBarrier;
	private final Barrier printerBarrier;
	private final Map<Integer, Map<String, Film>> maximumRatings;

	public Combiner(BoundedBuffer<Map<Integer, Map<String, Film>>> localFilms, Barrier combinerBarrier,
			Map<Integer, Map<String, Film>> maximumRatings, Barrier printerBarrier) {
		super("Combiner");
		this.localFilms = localFilms;
		this.combinerBarrier = combinerBarrier;
		this.maximumRatings = maximumRatings;
		this.printerBarrier = printerBarrier;
	}

	@Override
	public void run() {
		combinerBarrier.await(0);
		localFilms.put(null);
		Map<Integer, Map<String, Film>> films;
		while ((films = localFilms.get()) != null) {
			for (int decade : films.keySet()) {
				Map<String, Film> genreMap = maximumRatings.get(decade);
				if (genreMap == null) {
					genreMap = new TreeMap<>();
					maximumRatings.put(decade, genreMap);
				}
				for (String genre : films.get(decade).keySet()) {
					Film film = films.get(decade).get(genre);
					Film currentGenreMaximum = genreMap.get(genre);
					if (currentGenreMaximum == null) {
						genreMap.put(genre, film);
					} else {
						if (film.rating.averageRating > currentGenreMaximum.rating.averageRating) {
							genreMap.put(genre, film);
						}
					}
				}
			}
		}
		printerBarrier.arrived();
	}
}

Printer.java

Čeka na Combiner nit da ubaci rezultate za ispisivanje, a u međuvremenu čita broj ukupno obrađenih filmova i ispisuje na svakih M sekundi.

package kdpl12022;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class Printer extends Thread {
	private final long printTime;
	private final AtomicInteger totalFilms;
	private final AtomicInteger matchedFilms;
	private final Barrier printerBarrier;
	private final Map<Integer, Map<String, Film>> maximumRatings;

	public Printer(long printTime, AtomicInteger totalFilms, AtomicInteger matchedFilms, Barrier printerBarrier,
			Map<Integer, Map<String, Film>> maximumRatings) {
		super("Printer");
		this.printTime = printTime;
		this.totalFilms = totalFilms;
		this.matchedFilms = matchedFilms;
		this.printerBarrier = printerBarrier;
		this.maximumRatings = maximumRatings;
	}

	@Override
	public void run() {
		while (!printerBarrier.await(printTime * 1000)) {
			System.out.println(matchedFilms.get() + "/" + totalFilms.get());
		}
		for (Integer decade : maximumRatings.keySet()) {
			System.out.println((decade * 10) + "-" + (decade * 10 + 9) + ":");
			Map<String, Film> genres = maximumRatings.get(decade);
			for (String genre : genres.keySet()) {
				System.out.println("\t" + genre + ": " + genres.get(genre).primaryTitle);
			}
		}
	}
}

Test.java

Povezuje sve ostale niti.

package kdpl12022;

import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;

public class Test {
	private static final int BUFFER_LENGTH = 10000;

	public static void main(String[] args) {
		final int consumersNumber = args.length == 0 ? 5 : Integer.parseInt(args[0]);
		final long M = args.length < 4 ? 5 : Long.parseLong(args[4]);
		long startTime = System.currentTimeMillis();

		// Buffer for film file lines.
		BoundedBuffer<String> lines = new MonitorBoundedBuffer<>(BUFFER_LENGTH);
		// Buffer for ratings file lines.
		AtomicBroadcastBuffer<Rating> ratings = new SemaphoreAtomicBroadcastBuffer<>(BUFFER_LENGTH, consumersNumber);
		// Barrier used to tell the producers that consumers have finished reading films.
		Barrier consumersBarrier = new MonitorBarrier(consumersNumber);
		// Barrier used to tell the combiner that consumers have finished sending films.
		Barrier combinerBarrier = new MonitorBarrier(consumersNumber);
		// Variables for updating the amount of processed films.
		AtomicInteger totalFilms = new AtomicInteger();
		AtomicInteger matchedFilms = new AtomicInteger();
		// Buffer for local maximum maps that consumers are sending to the combiner.
		BoundedBuffer<Map<Integer, Map<String, Film>>> localFilms = new MonitorBoundedBuffer<>(consumersNumber + 1);
		// Barrier for the printer to wait for the combiner to finish.
		Barrier printerBarrier = new MonitorBarrier(1);
		// Combiner maximum rated films.
		Map<Integer, Map<String, Film>> maximumRatings = new TreeMap<>();

		Producer producer = new Producer(lines, ratings, consumersBarrier);
		producer.start();

		for (int i = 0; i < consumersNumber; ++i) {
			Consumer consumer = new Consumer(i, lines, ratings, consumersBarrier, totalFilms, matchedFilms, localFilms, combinerBarrier);
			consumer.start();
		}

		Combiner combiner = new Combiner(localFilms, combinerBarrier, maximumRatings, printerBarrier);
		combiner.start();

		Printer printer = new Printer(M, totalFilms, matchedFilms, printerBarrier, maximumRatings);
		printer.start();

		try {
			printer.join();
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		long endTime = System.currentTimeMillis();
		System.out.println("Total time: " + (endTime - startTime) + "ms");
	}
}