IEP/K2 Jun 2022
- Ovaj rok nije rešen. Pomozite SI Wiki tako što ćete ga rešiti.
Drugi kolokvijum u julskom roku 2022. godine održan je 17. septembra. Trajao je 90 minuta i bila je dozvoljena upotreba materijala sa predavanja.
Postavka
Posmatra se registar istraživača sa podacima o radovima i vezama između istraživača. U jednom redu se nalaze identifikator istraživača i lista radova tog istraživača. Svaki rad sadrži informacije o radu i listu istraživača. Za potrebe navedene evidencije podaci se čuvaju u tekstualnoj datoteci na Hadoop sistemu. Podaci su dati u obliku:
<Researcher><TAB><Work>{;<Work>}
Gde polje <Researcher> predstavlja identifikator istraživača, a polje <Work> predstavlja identifikator rada, nakon koga dolazi znak : i lista identifikatora istraživača koji su takođe učestvovali na tom radu a koji su razdvojeni znakom ,. U listi identifikatora se sigurno nalazi identifikator datog istraživača.
- U programskom jeziku Java sastaviti Map/Reduce posao koji vraća statističke podatke o broju različitih istraživača, koautora, sa kojima je neki istraživač sarađivao na radovima: minimalan broj različitih koautora, maksimalan broj različitih koautora, i prosečan broj različitih koautora po istraživaču. Ovo izračunati za istraživale[sic] koji imaju barem N unetih radova (N parametar koji se prosleđuje računarima koji rade obradu). Voditi računa o konkurentnosti.
- U programskom jeziku Java sastaviti lanac od dva ili više Map/Reduce poslova koji za zadatog istraživača R vraća istraživača K sa kojima bi istraživača[sic] R mogao sarađivati. Istraživač K je takav istraživač da je sa njim sarađivao najveći broj različitih istraživača koji su sarađivali sa istraživačem R, a da istraživači R i K nisu sarađivali. Dva istraživača su sarađivala ako su učestvovali na barem jednom zajedničkom radu. Parametar R se prosleđuje računarima koji rade obradu. Ukoliko postoji više istraživača sa istim najvećim brojem vratiti sve takve istraživače. Voditi računa o konkurentnosti.
Odgovor[sic] se predaju u vidu dve java datoteka (Istrazivaci1.java i Istrazivaci2.java).
Istrazivaci1.java
package main;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import main.Istrazivaci2.Work;
import scala.Tuple4;
public class Istrazivaci1 {
static class Researcher implements Serializable {
private String name;
private List<Work> works = new LinkedList<>();
public Researcher(String line) {
String[] data = line.split("\t");
this.name = data[0];
if (data.length <= 1)
return;
for (String workString : data[1].split(";")) {
this.works.add(new Work(workString));
}
}
@Override
public String toString() {
return "Researcher [name=" + name + ", works=" + works + "]";
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<Work> getWorks() {
return works;
}
public void setWorks(List<Work> works) {
this.works = works;
}
}
static class Work implements Serializable {
private String title;
private List<String> coauthors = new LinkedList<>();
public Work(String workString) {
String[] data = workString.split(":");
if (data.length <= 1)
return;
this.title = data[0];
coauthors.addAll(Arrays.asList(data[1].split(",")));
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public List<String> getCoauthors() {
return coauthors;
}
public void setCoauthors(List<String> coauthors) {
this.coauthors = coauthors;
}
@Override
public String toString() {
return "Work [title=" + title + ", coauthors=" + coauthors + "]";
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("istrazivaci1").setMaster("local");
LogManager.getRootLogger().setLevel(Level.WARN); // iskljuci nepotrebne poruke
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
zadatak(sc, "resources/Researchers_V1.txt", 3);
}
}
public static void zadatak(JavaSparkContext sc, String filename, int N) {
var result = sc.textFile(filename).map(line -> new Researcher(line)).filter(researcher -> researcher.works.size() >= N)
.map(researcher -> {
Set<String> uniqueResearchers = new HashSet<>();
for (Work work : researcher.works)
uniqueResearchers.addAll(work.getCoauthors());
int cnt = uniqueResearchers.size() - 1; // ne ukljucuj i samog istrazivaca jer se nalazi u koautorima
return new Tuple4<>(cnt, cnt, cnt, 1);
})
.reduce((s1, s2) -> {
return new Tuple4<Integer, Integer, Integer, Integer>(Math.min(s1._1(), s2._1()),
Math.max(s1._2(), s2._2()), s1._3() + s2._3(), s1._4() + s2._4());
});
int minCoauthors = result._1();
int maxCoauthors = result._2();
int sumCoauthors = result._3();
int countCoauthors = result._4();
System.out.println("Min: " + minCoauthors+", Max: " +maxCoauthors +", Avg: "+sumCoauthors *1.0/countCoauthors);
}
}
Istrazivaci2.java
package main;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import org.apache.log4j.LogManager;
import org.apache.log4j.Level;
public class Istrazivaci1 {
static class Researcher implements Serializable {
private String name;
private List<Work> works = new LinkedList<>();
public Researcher(String line) {
String[] data = line.split("\t");
this.name = data[0];
if (data.length <= 1)
return;
for (String workString : data[1].split(";")) {
this.works.add(new Work(workString));
}
}
@Override
public String toString() {
return "Researcher [name=" + name + ", works=" + works + "]";
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<Work> getWorks() {
return works;
}
public void setWorks(List<Work> works) {
this.works = works;
}
}
static class Work implements Serializable {
private String title;
private List<String> coauthors = new LinkedList<>();
public Work(String workString) {
String[] data = workString.split(":");
if (data.length <= 1)
return;
this.title = data[0];
coauthors.addAll(Arrays.asList(data[1].split(",")));
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public List<String> getCoauthors() {
return coauthors;
}
public void setCoauthors(List<String> coauthors) {
this.coauthors = coauthors;
}
@Override
public String toString() {
return "Work [title=" + title + ", coauthors=" + coauthors + "]";
}
}
static class TupleComparator
implements Comparator<Tuple2<Integer, Iterable<Tuple2<String, Integer>>>>, Serializable {
@Override
public int compare(Tuple2<Integer, Iterable<Tuple2<String, Integer>>> o1,
Tuple2<Integer, Iterable<Tuple2<String, Integer>>> o2) {
return o1._1() - o2._1();
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("istrazivaci2").setMaster("local");
LogManager.getRootLogger().setLevel(Level.WARN); // iskljuci nepotrebne poruke
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
zadatak(sc, "resources/Researchers_V1.txt", "1");
}
}
public static void zadatak(JavaSparkContext sc, String filename, String researcher) {
var initialCoauthors = sc.textFile(filename).map(Researcher::new).filter(r -> r.getName().equals(researcher))
.flatMap(r -> {
Set<String> coauthorsSet = new HashSet<String>();
for (Work work : r.getWorks()) {
coauthorsSet.addAll(work.getCoauthors());
}
coauthorsSet.remove(r.getName());
return coauthorsSet.iterator();
});
Set<String> initialCoauthorsSet = new HashSet<>(initialCoauthors.collect());
var result = sc.textFile(filename).map(Researcher::new)
.filter(r -> !r.getName().equals(researcher) && !initialCoauthorsSet.contains(r.getName())).map(r -> {
Set<String> coauthorsSet = new HashSet<String>();
for (Work work : r.getWorks()) {
coauthorsSet.addAll(work.getCoauthors());
}
coauthorsSet.remove(r.getName());
return new Tuple2<>(r.getName(), coauthorsSet);
}).map(pair -> {
String researcherName = pair._1();
Set<String> coauthorsSet = pair._2();
Set<String> intersection = new HashSet(initialCoauthorsSet);
intersection.retainAll(coauthorsSet);
int numberConnections = intersection.size();
return new Tuple2<>(researcherName, numberConnections);
}).groupBy(pair -> pair._2()).max(new TupleComparator());
System.out.println("Max number of connections: " + result._1());
System.out.println("Authors with max number of connections: ");
result._2().forEach(pair -> System.out.println(pair._1()));
}
}
Provera
Na kolokvijumu su bile dostupne Researchers_V1.txt i Researchers_V2.txt datoteke za testiranje rešenja.