ИЕП/К2 Јун 2022

Извор: SI Wiki
Пређи на навигацију Пређи на претрагу
Овај рок није решен. Помозите SI Wiki тако што ћете га решити.

Други колоквијум у јулском року 2022. године одржан је 17. септембра. Трајао је 90 минута и била је дозвољена употреба материјала са предавања.

Поставка

Посматра се регистар истраживача са подацима о радовима и везама између истраживача. У једном реду се налазе идентификатор истраживача и листа радова тог истраживача. Сваки рад садржи информације о раду и листу истраживача. За потребе наведене евиденције подаци се чувају у текстуалној датотеци на Hadoop систему. Подаци су дати у облику:

<Researcher><TAB><Work>{;<Work>}

Где поље <Researcher> представља идентификатор истраживача, а поље <Work> представља идентификатор рада, након кога долази знак : и листа идентификатора истраживача који су такође учествовали на том раду а који су раздвојени знаком ,. У листи идентификатора се сигурно налази идентификатор датог истраживача.

  1. У програмском језику Јава саставити Map/Reduce посао који враћа статистичке податке о броју различитих истраживача, коаутора, са којима је неки истраживач сарађивао на радовима: минималан број различитих коаутора, максималан број различитих коаутора, и просечан број различитих коаутора по истраживачу. Ово израчунати за истраживале[sic] који имају барем N унетих радова (N параметар који се прослеђује рачунарима који раде обраду). Водити рачуна о конкурентности.
  2. У програмском језику Јава саставити ланац од два или више Map/Reduce послова који за задатог истраживача R враћа истраживача K са којима би истраживача[sic] R могао сарађивати. Истраживач K је такав истраживач да је са њим сарађивао највећи број различитих истраживача који су сарађивали са истраживачем R, а да истраживачи R и K нису сарађивали. Два истраживача су сарађивала ако су учествовали на барем једном заједничком раду. Параметар R се прослеђује рачунарима који раде обраду. Уколико постоји више истраживача са истим највећим бројем вратити све такве истраживаче. Водити рачуна о конкурентности.

Одговор[sic] се предају у виду две јава датотека (Istrazivaci1.java и Istrazivaci2.java).

Istrazivaci1.java

package spark_iep;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;

import org.apache.spark.*;
import org.apache.spark.api.java.*;
import scala.Tuple2;
import scala.Tuple4;
import scala.Tuple5;

public class Istrazivaci1 {
	

	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Ocene1")
				.setMaster("local");
		try(JavaSparkContext sc = new JavaSparkContext(conf);){
			Integer N = 8;
			JavaRDD<String> ulazniPodaci = sc.textFile("istrazivaci.txt");
			//obrada ulaznih podataka
			//torka (IDAutora, (min, max, sum, cnt))
			List<Tuple2<String,Tuple4<Integer,Integer,Integer,Integer>>> res = ulazniPodaci.flatMapToPair(
					s->{
						List<Tuple2<String, Tuple4<Integer, Integer, Integer, Integer>>> lista = new LinkedList<>();
						String[] podaci = s.split("\t");
						if(podaci.length==1) return lista.iterator();
						String IDAutora = podaci[0];
						String[] radovi = podaci[1].split(";");
						for(String rad:radovi) {
							String[] razbijeni = rad.split(":");
							if(razbijeni.length==1) return lista.iterator();
							String IDRada = razbijeni[0];
							String[] koautori = razbijeni[1].split(",");
							Integer brojKoautora = koautori.length - 1;
							Tuple2<String, Tuple4<Integer, Integer, Integer, Integer>> clan = new Tuple2<>(IDAutora, new Tuple4<>(brojKoautora, brojKoautora, brojKoautora, 1));
							lista.add(clan);
						}
						return lista.iterator();
					}
					)
			.reduceByKey((a,b)->
			
			{
				return new Tuple4<>(Math.min(a._1(), b._1()),
						Math.max(a._2(), b._2()),
						a._3()+b._3(),
						a._4()+b._4()
				);
			}
					
					)
			.filter(s->s._2._4()>=N)
			.collect();
			
			
			for(Tuple2<String,Tuple4<Integer,Integer,Integer,Integer>> r:res) {
				System.out.println("Istrazivac: "+r._1+" a statistika je min: "+r._2._1()+" max: "+r._2._2()+" a avg je: "+(r._2._3()*1.0/r._2._4()));
			}
			
			
			
		}
	}
	
}


Испод се налази алтернативно решење, које поред примене класа, поставку тумачи као да се очекује један излаз, тј. статистика на основу скупа а не појединачних истраживача.


package main;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;

import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import main.Istrazivaci2.Work;
import scala.Tuple4;

public class Istrazivaci1 {

	static class Researcher implements Serializable {
		private String name;
		private List<Work> works = new LinkedList<>();

		public Researcher(String line) {
			String[] data = line.split("\t");
			this.name = data[0];

			if (data.length <= 1)
				return;

			for (String workString : data[1].split(";")) {
				this.works.add(new Work(workString));
			}

		}

		@Override
		public String toString() {
			return "Researcher [name=" + name + ", works=" + works + "]";
		}

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}

		public List<Work> getWorks() {
			return works;
		}

		public void setWorks(List<Work> works) {
			this.works = works;
		}
	}

	static class Work implements Serializable {

		private String title;
		private List<String> coauthors = new LinkedList<>();

		public Work(String workString) {
			String[] data = workString.split(":");
			if (data.length <= 1)
				return;
			this.title = data[0];

			coauthors.addAll(Arrays.asList(data[1].split(",")));

		}

		public String getTitle() {
			return title;
		}

		public void setTitle(String title) {
			this.title = title;
		}

		public List<String> getCoauthors() {
			return coauthors;
		}

		public void setCoauthors(List<String> coauthors) {
			this.coauthors = coauthors;
		}

		@Override
		public String toString() {
			return "Work [title=" + title + ", coauthors=" + coauthors + "]";
		}

	}

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("istrazivaci1").setMaster("local");
		LogManager.getRootLogger().setLevel(Level.WARN); // iskljuci nepotrebne poruke
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			zadatak(sc, "resources/Researchers_V1.txt", 3);
		}
	}

	public static void zadatak(JavaSparkContext sc, String filename, int N) {
		var result = sc.textFile(filename).map(line -> new Researcher(line)).filter(researcher -> researcher.works.size() >= N)
				.map(researcher -> {
					Set<String> uniqueResearchers = new HashSet<>();
					for (Work work : researcher.works)
						uniqueResearchers.addAll(work.getCoauthors());
					
					int cnt = uniqueResearchers.size() - 1; // ne ukljucuj i samog istrazivaca jer se nalazi u koautorima
					return new Tuple4<>(cnt, cnt, cnt, 1);
				})
				.reduce((s1, s2) -> {
					return new Tuple4<Integer, Integer, Integer, Integer>(Math.min(s1._1(), s2._1()),
							Math.max(s1._2(), s2._2()), s1._3() + s2._3(), s1._4() + s2._4());
				});
		
		int minCoauthors = result._1();
		int maxCoauthors = result._2();
		int sumCoauthors = result._3();
		int countCoauthors = result._4();
		System.out.println("Min: " + minCoauthors+", Max: " +maxCoauthors +", Avg: "+sumCoauthors *1.0/countCoauthors);
		
	}

}

Istrazivaci2.java

package spark_iep;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

import org.apache.spark.*;
import org.apache.spark.api.java.*;
import scala.Tuple2;
import scala.Tuple4;
import scala.Tuple5;

public class Istrazivaci2 {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Ocene1")
				.setMaster("local");
		try(JavaSparkContext sc = new JavaSparkContext(conf);){
			JavaRDD<String> ulazniPodaci = sc.textFile("istrazivaci.txt");
			//obrada ulaznih podataka
			String R = "1";
			//primer clana liste - prvi ja autor, drugo skup svih sa kojima je saradjivao
			//(Zika, (Srbljan, Milicev, Zika, Zaki))
			JavaPairRDD<String, HashSet<String>> formatirani = ulazniPodaci.mapToPair(
					s->{
						String[] podaci = s.split("\t");
						if(podaci.length==1) return new Tuple2<>(podaci[0], new HashSet<String>());
						String[] radovi = podaci[1].split(";");
						HashSet<String> kolaboratori = new HashSet<>();
						for(String rad:radovi) {
							String[] razbijeni = rad.split(":");
							if(razbijeni.length==1) continue;
							String[] koautori = razbijeni[1].split(",");
							HashSet<String> setKoAutora = new HashSet<>(Arrays.asList(koautori));
							kolaboratori.addAll(setKoAutora);
						}
						return new Tuple2<>(podaci[0],kolaboratori);
					}
					);

			Tuple2<String, HashSet<String>>  sviSaKojimaJeIstrazivaoR = formatirani.filter(s->s._1.equals(R)).first();
			HashSet<String> kolaboratoriR = sviSaKojimaJeIstrazivaoR._2();
			
			
			JavaPairRDD<Integer, String> finalni = formatirani
			//da isfiltriramo sve koji su saradjivali sa R
			.filter(s->!(s._2.contains(R)))
			.mapToPair(s->{
				HashSet<String> datiPodaci = s._2;
				//menja datiPodaci tako da sadrzi samo one koje deli sa R
				datiPodaci.retainAll(kolaboratoriR);
				//sada imamo torku ciji je prvi clan broj zajednickih a drugi oznaka tog istrazivaca
				return new Tuple2<Integer, String>(datiPodaci.size(), s._1);
				
			})
			//sortiramo da bismo dobili one sa najvise, false da bi se sortiralno opadajuce
			.sortByKey(false)
			;
			
			try {
				//ispisuje sve sa MAX, moglo potencijalno i samo 1
				Tuple2<Integer, String> res = finalni.first();
				List<Tuple2<Integer, String>> finn = finalni.filter(s->s._1().equals(res._1())).collect();
				System.out.println("Max broj je  "+res._1()+" a saradnici su:");
				for(Tuple2<Integer, String> r:finn) {
					System.out.println("Istrazivac "+r._2());
				}
			}catch(Exception e) {
				System.out.println("Nema takvog za istrazivaca:"+R);
			}
			
			
			
			
			
		}
	}
	
}

Решење испод не би било вредновано са максималним бројем поена јер користи groupBy што је скупа операција због непотребног слања веће количине података преко мреже.

package main;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

import org.apache.log4j.LogManager;
import org.apache.log4j.Level;

public class Istrazivaci2 {

	static class Researcher implements Serializable {
		private String name;
		private List<Work> works = new LinkedList<>();

		public Researcher(String line) {
			String[] data = line.split("\t");
			this.name = data[0];

			if (data.length <= 1)
				return;

			for (String workString : data[1].split(";")) {
				this.works.add(new Work(workString));
			}

		}

		@Override
		public String toString() {
			return "Researcher [name=" + name + ", works=" + works + "]";
		}

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}

		public List<Work> getWorks() {
			return works;
		}

		public void setWorks(List<Work> works) {
			this.works = works;
		}
	}

	static class Work implements Serializable {

		private String title;
		private List<String> coauthors = new LinkedList<>();

		public Work(String workString) {
			String[] data = workString.split(":");
			if (data.length <= 1)
				return;
			this.title = data[0];

			coauthors.addAll(Arrays.asList(data[1].split(",")));

		}

		public String getTitle() {
			return title;
		}

		public void setTitle(String title) {
			this.title = title;
		}

		public List<String> getCoauthors() {
			return coauthors;
		}

		public void setCoauthors(List<String> coauthors) {
			this.coauthors = coauthors;
		}

		@Override
		public String toString() {
			return "Work [title=" + title + ", coauthors=" + coauthors + "]";
		}

	}

	static class TupleComparator
			implements Comparator<Tuple2<Integer, Iterable<Tuple2<String, Integer>>>>, Serializable {

		@Override
		public int compare(Tuple2<Integer, Iterable<Tuple2<String, Integer>>> o1,
				Tuple2<Integer, Iterable<Tuple2<String, Integer>>> o2) {

			return o1._1() - o2._1();
		}

	}

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("istrazivaci2").setMaster("local");
		LogManager.getRootLogger().setLevel(Level.WARN); // iskljuci nepotrebne poruke
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			zadatak(sc, "resources/Researchers_V1.txt", "1");
		}
	}

	public static void zadatak(JavaSparkContext sc, String filename, String researcher) {
		var initialCoauthors = sc.textFile(filename).map(Researcher::new).filter(r -> r.getName().equals(researcher))
				.flatMap(r -> {
					Set<String> coauthorsSet = new HashSet<String>();

					for (Work work : r.getWorks()) {
						coauthorsSet.addAll(work.getCoauthors());
					}
					coauthorsSet.remove(r.getName());
					return coauthorsSet.iterator();
				});

		Set<String> initialCoauthorsSet = new HashSet<>(initialCoauthors.collect());

		var result = sc.textFile(filename).map(Researcher::new)
				// izbaci researchera i sve povezane sa njegovim "komsijama"
				.filter(r -> !r.getName().equals(researcher) && !initialCoauthorsSet.contains(r.getName())) 
				.map(r -> {
					Set<String> coauthorsSet = new HashSet<String>();

					for (Work work : r.getWorks()) {
						coauthorsSet.addAll(work.getCoauthors());
					}
					coauthorsSet.remove(r.getName());

					return new Tuple2<>(r.getName(), coauthorsSet);
				})
				// izbaci istrazivaca ako je povezan sa researcherom
				.filter(pair->!pair._2().contains(researcher)) 
				.map(pair -> {
					String researcherName = pair._1();
					Set<String> coauthorsSet = pair._2();

					Set<String> intersection = new HashSet(initialCoauthorsSet);
					intersection.retainAll(coauthorsSet);
					int numberConnections = intersection.size();

					return new Tuple2<>(researcherName, numberConnections);
				}).groupBy(pair -> pair._2()).max(new TupleComparator());

		System.out.println("Max number of connections: " + result._1());
		System.out.println("Authors with max number of connections: ");
		result._2().forEach(pair -> System.out.println(pair._1()));

	}
}

Провера

Решење је инцијално могуће тестирати на мањем скупу података испод.

Zika Rad1:Zika,Srbljan,Milicev,Zaki;Rad4:Srbljan,Zika

Srbljan Rad1:Zika,Srbljan,Milicev,Zaki;Rad4:Srbljan,Zika;Rad2:Srbljan,Milicev,Zaki,Cmiki

Cmiki Rad2:Srbljan,Milicev,Zaki,Cmiki;Rad3:Zaki,Cmiki;Rad5:Zaki,Cmiki

Zaki Rad1:Zika,Srbljan,Milicev,Zaki;Rad2:Srbljan,Milicev,Zaki,Cmiki;Rad3:Zaki,Cmiki;Rad5:Zaki,Cmiki

Milicev Rad1:Zika,Srbljan,Milicev,Zaki;Rad2:Srbljan,Milicev,Zaki,Cmiki

Оквиран испис за први задатак је (N=3):

Istrazivac: Srbljan a statistika je min: 1 max: 3 a avg je: 2.3333333333333335

Istrazivac: Zaki a statistika je min: 1 max: 3 a avg je: 2.0

Istrazivac: Cmiki a statistika je min: 1 max: 3 a avg je: 1.6666666666666667

Окриван испис за други задатак је (R="Cmiki"):

Max broj je 3 a saradnici su:

Istrazivac Zika

На колоквијуму су биле доступне Researchers_V1.txt и Researchers_V2.txt датотеке за тестирање решења.

Очекивани излази:

Задатак 1 - V1 (N=8):

Istrazivac: 52 a statistika je min: 3 max: 9 a avg je: 5.444444444444445

Istrazivac: 81 a statistika je min: 1 max: 10 a avg je: 4.8

Istrazivac: 166 a statistika je min: 2 max: 11 a avg je: 6.2

Istrazivac: 60 a statistika je min: 2 max: 8 a avg je: 5.125

Istrazivac: 20 a statistika je min: 0 max: 6 a avg je: 3.625

Istrazivac: 27 a statistika je min: 2 max: 7 a avg je: 4.5

Задатак 1 - V2 (N=8):

Istrazivac: 76 a statistika je min: 4 max: 10 a avg je: 6.0

Istrazivac: 80 a statistika je min: 2 max: 14 a avg je: 4.75

Istrazivac: 63 a statistika je min: 3 max: 8 a avg je: 5.444444444444445

Istrazivac: 179 a statistika je min: 3 max: 8 a avg je: 5.555555555555555

Задатак 2 - V1 (N=1):

Max broj je 3 a saradnici su:

Istrazivac 19

Istrazivac 20

Istrazivac 35

Istrazivac 48

Istrazivac 51

Istrazivac 60

Istrazivac 66

Istrazivac 69

Istrazivac 76

Istrazivac 81

Istrazivac 89

Istrazivac 120

Istrazivac 138

Задатак 2 - V2 (N=1):

Max broj je 8 a saradnici su:

Istrazivac 134