ИЕП/К2 2022 — разлика између измена

Извор: SI Wiki
Пређи на навигацију Пређи на претрагу
м (Test datoteke)
м (Objašnjenje)
 
(Није приказано 11 међуизмена 3 корисника)
Ред 11: Ред 11:
</div>
</div>
Одговор<sup>[sic]</sup> се предају у виду два<sup>[sic]</sup> јава датотека (<code>Ocene1.java</code> и <code>Ocene2.java</code>).
Одговор<sup>[sic]</sup> се предају у виду два<sup>[sic]</sup> јава датотека (<code>Ocene1.java</code> и <code>Ocene2.java</code>).
== ''MapReduce'' ==
На предмету се од школске 2023/2024. године ради ''Apache Spark'' уместо ''Hadoop'' са ''MapReduce''. Решење овог колоквијума са ''MapReduce'' може се видети на [[Special:Permalink/4291|верзији странице из маја 2022]], док су испод дата решења у ''Apache Spark''.


== <code>Ocene1.java</code> ==
== <code>Ocene1.java</code> ==
<syntaxhighlight lang="java">
<syntaxhighlight lang="java">
package rs.etf.iep.mapreduce;
package spark_iep;


import java.io.File;
import java.util.LinkedList;
import java.util.List;


import org.apache.commons.io.FileUtils;
import org.apache.spark.*;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.*;
import org.apache.hadoop.io.LongWritable;
import scala.Tuple2;
import org.apache.hadoop.io.Text;
import scala.Tuple5;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Ocene1 {
public class Ocene1 {
private static final int MIN_INVALID_GRADE = 3;
private static final int MAX_INVALID_GRADE = 11;
private static Text formatMapRow(int studentCount, int minGrade, int maxGrade, int gradeSum, double avgGrade) {
return new Text(studentCount + "\t" + minGrade + "\t" + maxGrade + "\t" + gradeSum + "\t" + avgGrade);
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context) {
try {
String[] split = value.toString().split("\t");
if (split.length <= 1) {
// Студент нема ниједан испит.
return;
}
for (String exam : split[1].split(";")) {
String[] examSplit = exam.split(",");
String examCode = examSplit[0];
String examName = examSplit[1];
int grade = Integer.parseInt(examSplit[2]);
Text emitKey = new Text(examCode + "\t" + examName);
Text emitValue = formatMapRow(1, grade, grade, grade, grade);
context.write(emitKey, emitValue);
}
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}


public static class Reduce extends Reducer<Text, Text, Text, Text> {
public static void main(String[] args) {
@Override
SparkConf conf = new SparkConf()
public void reduce(Text key, Iterable<Text> values, Context context) {
.setAppName("Ocene1")
try {
.setMaster("local");
int studentCount = 0;
try(JavaSparkContext sc = new JavaSparkContext(conf);){
int gradeSum = 0;
JavaRDD<String> ulazniPodaci = sc.textFile("studenti_test.txt");
int minGrade = MAX_INVALID_GRADE;
//obrada ulaznih podataka
int maxGrade = MIN_INVALID_GRADE;
List<Tuple2<String,Integer[]>> rezultat = ulazniPodaci.flatMapToPair(
for (Text value : values) {
s->{
String[] valueSplit = value.toString().split("\t");
List<Tuple2<String, Integer[]>> lista = new LinkedList<>();
studentCount += Integer.parseInt(valueSplit[0]);
String[] podaciSvi = s.split("\t");
gradeSum += Integer.parseInt(valueSplit[3]);
//za slucaj da student nema polozene ispite
int currMinGrade = Integer.parseInt(valueSplit[1]);
if(podaciSvi.length==1) return lista.iterator();
if (currMinGrade < minGrade) {
//niz podataka o ispitima za studenta
minGrade = currMinGrade;
String[] podaciIspiti = podaciSvi[1].split(";");
}
for(String p:podaciIspiti) {
int currMaxGrade = Integer.parseInt(valueSplit[2]);
//pod[0] = predmet1,rok1,6
if (currMaxGrade > maxGrade) {
String[] pod = p.split(",");
maxGrade = currMaxGrade;
//konvertujemo ocenu u string radi dalje obrade
Integer ocena = Integer.parseInt(pod[2]);
//torka spremna za obradu i dodavanje u listu
Tuple2<String, Integer[]> podatakZaListu = new Tuple2<>(pod[0]+"&"+pod[1], new Integer[] {ocena, ocena, ocena, 1});
lista.add(podatakZaListu);
}
return lista.iterator();
}
}
}
)
context.write(key, formatMapRow(studentCount, minGrade, maxGrade, gradeSum,
//prvi clan se koristi za max, drugi za min, treci je suma vrednosti, cetvrti brojac vrednosti
((double) gradeSum) / ((double) studentCount)));
.reduceByKey((a,b)->new Integer[] {Math.max(a[0], b[0]), Math.min(a[1], b[1]), a[2]+b[2], a[3]+b[3]}).collect();
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
for(Tuple2<String, Integer[]> r:rezultat) {
e.printStackTrace();
System.out.println("Predmet&Rok: "+r._1()+", max:"+r._2()[0]+", min:"+r._2()[1]+", avg:"+(r._2()[2]*1.0/r._2()[3]));
}
}
}
}
}
}
}


public static void main(String[] args) throws Exception {
FileUtils.deleteDirectory(new File(args[1]));
Job job = Job.getInstance();
job.setJarByClass(Ocene1.class);
job.setJobName("ocene1");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass(Reduce.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
</syntaxhighlight>
</syntaxhighlight>


== <code>Ocene2.java</code> ==
== <code>Ocene2.java</code> ==
<syntaxhighlight lang="java">
<syntaxhighlight lang="java">
package rs.etf.iep.mapreduce;
package spark_iep;


import java.io.File;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;


import org.apache.commons.io.FileUtils;
import org.apache.spark.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.*;
import org.apache.hadoop.fs.Path;
import scala.Tuple2;
import org.apache.hadoop.io.LongWritable;
import scala.Tuple5;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Ocene2 {
public class Ocene2 {
private static final int MIN_INVALID_GRADE = 3;
//predmet zadat rok R, polagalo najvise studenata, a da nema ocene N


public static class Map1 extends Mapper<LongWritable, Text, Text, Text> {
public static void main(String[] args) {
private String R;
SparkConf conf = new SparkConf()
 
.setAppName("Ocene1")
@Override
.setMaster("local");
public void setup(Context context) {
try(JavaSparkContext sc = new JavaSparkContext(conf);){
R = context.getConfiguration().get("R");
String rok = "jun2020";
}
String zadataOcena = "10";
 
JavaRDD<String> ulazniPodaci = sc.textFile("studenti_test.txt");
@Override
//obrada ulaznih podataka
public void map(LongWritable key, Text value, Context context) {
//Tuple2<Integer,String[]>
try {
List<Tuple2<Integer,String[]>>  rezultat = ulazniPodaci.flatMapToPair(
String[] split = value.toString().split("\t");
s->{
if (split.length <= 1) {
List<Tuple2<String[], String>> lista = new LinkedList<>();
// Студент нема ниједан испит.
String[] podaciSvi = s.split("\t");
return;
//za slucaj da student nema polozene ispite
}
if(podaciSvi.length==1) return lista.iterator();
for (String exam : split[1].split(";")) {
//niz podataka o ispitima za studenta
String[] examSplit = exam.split(",");
String[] podaciIspiti = podaciSvi[1].split(";");
if (examSplit[1].equals(R)) {
for(String p:podaciIspiti) {
context.write(new Text(examSplit[0]), new Text(examSplit[2] + "\t1"));
//pod[0] = predmet1,rok1,6
String[] pod = p.split(",");
//konvertujemo ocenu u string radi dalje obrade
//torka spremna za obradu i dodavanje u listu
Integer ocena = Integer.parseInt(pod[2]);
//([predmet,rok], ocena)
Tuple2<String[], String> podatakZaListu = new Tuple2<>(new String[] {pod[0],pod[1]}, pod[2]);
lista.add(podatakZaListu);
}
return lista.iterator();
}
}
}
)
} catch (Exception e) {
//isfiltriraj rok
// Хватају се све грешке јер Hadoop понекад може да их не испише.
.filter(s->s._1[1].equals(rok))
e.printStackTrace();
//reformatiraj kljuc predmet, vrednost ocena
.mapToPair(s->new Tuple2<String, String>(s._1[0], s._2))
                    //ni jedna ocena u roku nije zadata ocena
.filter(s->!(s._2.equals(zadataOcena)))
//pravimoStringOcena ocena1,ocena2...
.reduceByKey((a,b)->(a+";"+b))
//reformatiraj da bude ([predmet1,ocena1;ocena2...],brojOcena)
.mapToPair(s->new Tuple2<Integer, String[]>(s._2.split(";").length, new String[] {s._1, s._2}))
//sortiraj po broju ocena
.sortByKey(false)
//vrati sve, ovako zbog lakse obrade ako nema
.collect()
;
if(rezultat.size()==0) {
System.out.println("Nema jbg");
}else {
System.out.println("Trazeni predmet je "+rezultat.get(0)._2[0]);
}
}
}
}
}
}
}


public static class Reduce1 extends Reducer<Text, Text, Text, Text> {
private int N;
@Override
public void setup(Context context) {
N = context.getConfiguration().getInt("N", MIN_INVALID_GRADE);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) {
try {
int studentCount = 0;
int grade = MIN_INVALID_GRADE;
for (Text value : values) {
String[] valueSplit = value.toString().split("\t");
grade = Integer.parseInt(valueSplit[0]);
if (grade == N) {
return;
}
studentCount += Integer.parseInt(valueSplit[1]);
}
context.write(key, new Text(grade + "\t" + studentCount));
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
public static void job1(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("R", args[2]);
conf.setInt("N", Integer.parseInt(args[3]));
Job job = Job.getInstance(conf, "ocene2-1");
job.setJarByClass(Ocene2.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map1.class);
job.setReducerClass(Reduce1.class);
job.setCombinerClass(Reduce1.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path("ocene2-temp"));
job.waitForCompletion(true);
}
public static class Map2 extends Mapper<LongWritable, Text, Text, Text> {
private static final Text text = new Text("ocene2-text");
@Override
public void map(LongWritable key, Text value, Context context) {
try {
context.write(text, value);
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
public static class Reduce2 extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) {
try {
int maxStudentCount = 0;
String subject = null;
for (Text value : values) {
String[] valueSplit = value.toString().split("\t");
int studentCount = Integer.parseInt(valueSplit[2]);
if (studentCount > maxStudentCount) {
maxStudentCount = studentCount;
subject = valueSplit[0];
}
}
context.write(new Text(subject), new Text(String.valueOf(maxStudentCount)));
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
public static void job2(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJobName("ocene2-2");
job.setJarByClass(Ocene2.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map2.class);
job.setCombinerClass(Reduce2.class);
FileInputFormat.setInputPaths(job, new Path("ocene2-temp"));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
FileUtils.deleteDirectory(new File(args[1]));
FileUtils.deleteDirectory(new File("ocene2-temp"));
job1(args);
job2(args);
}
}
</syntaxhighlight>
</syntaxhighlight>



Тренутна верзија на датум 21. април 2024. у 00:22

Други колоквијум 2022. године одржан је 5. маја. На колоквијуму су били доступни Hadoop документација, презентација са предавања, виртуелна машина коришћена на предавању и два текстуална фајла као примери уноса (без очекиваног исписа или примера R и N параметара).

Поставка

Посматра се евиденција о положеним испитима. У једном реду се налазе идентификатор студента и листа испитима[sic] које је положио дати студент. Сваки рад[sic] садржи информације о положеном испиту као што су шифра предмета, шифра рока, и оцена коју је студент добио. За потребе наведене евиденције подаци се чувају у текстуалној датотеци на Hadoop систему. Подаци су дати у облику:

<Student><TAB>{<Exam>{;<Exam>}}

Где поље <Student> представља идентификатор студента, а поље <Exam> садржи шифру предмета, након кога долази знак ,, па шифра рока, након кога долази знак , и на крају оцена.

  1. У програмском језику Јава саставити Map/Reduce посао који враћа статистичке податке о испитима у испитним роковима: шифру предмета, шифру рока, број студената који су полагали дати испит, минималну оцену, максималну оцену и просечну оцену. Водити рачуна о конкурентности.
  2. У програмском језику Јава саставити ланац од два Map/Reduce посла који враћа предмет[1] који је у задатом испитном R полагало највише студената, а да ни један од тих студената у том року није добио задату оцену N. Параметри R и N се прослеђује[sic] рачунарима који раде обраду. Водити рачуна о конкурентности.

Одговор[sic] се предају у виду два[sic] јава датотека (Ocene1.java и Ocene2.java).

MapReduce

На предмету се од школске 2023/2024. године ради Apache Spark уместо Hadoop са MapReduce. Решење овог колоквијума са MapReduce може се видети на верзији странице из маја 2022, док су испод дата решења у Apache Spark.

Ocene1.java

package spark_iep;

import java.util.LinkedList;
import java.util.List;

import org.apache.spark.*;
import org.apache.spark.api.java.*;
import scala.Tuple2;
import scala.Tuple5;

public class Ocene1 {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Ocene1")
				.setMaster("local");
		try(JavaSparkContext sc = new JavaSparkContext(conf);){
			JavaRDD<String> ulazniPodaci = sc.textFile("studenti_test.txt");
			//obrada ulaznih podataka
			List<Tuple2<String,Integer[]>> rezultat = ulazniPodaci.flatMapToPair(
					s->{
						List<Tuple2<String, Integer[]>> lista = new LinkedList<>();
						String[] podaciSvi = s.split("\t");
						//za slucaj da student nema polozene ispite
						if(podaciSvi.length==1) return lista.iterator();
						//niz podataka o ispitima za studenta
						String[] podaciIspiti = podaciSvi[1].split(";");
						for(String p:podaciIspiti) {
							//pod[0] = predmet1,rok1,6
							String[] pod = p.split(",");
							//konvertujemo ocenu u string radi dalje obrade
							Integer ocena = Integer.parseInt(pod[2]);
							//torka spremna za obradu i dodavanje u listu
							Tuple2<String, Integer[]> podatakZaListu = new Tuple2<>(pod[0]+"&"+pod[1], new Integer[] {ocena, ocena, ocena, 1});
							lista.add(podatakZaListu);
						}
						return lista.iterator();
					}
					)
					//prvi clan se koristi za max, drugi za min, treci je suma vrednosti, cetvrti brojac vrednosti
					.reduceByKey((a,b)->new Integer[] {Math.max(a[0], b[0]), Math.min(a[1], b[1]), a[2]+b[2], a[3]+b[3]}).collect();
			
			for(Tuple2<String, Integer[]> r:rezultat) {
				System.out.println("Predmet&Rok: "+r._1()+", max:"+r._2()[0]+", min:"+r._2()[1]+", avg:"+(r._2()[2]*1.0/r._2()[3]));
				
			}
			
		}
	}
	
}

Ocene2.java

package spark_iep;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;

import org.apache.spark.*;
import org.apache.spark.api.java.*;
import scala.Tuple2;
import scala.Tuple5;

public class Ocene2 {
	
	//predmet zadat rok R, polagalo najvise studenata, a da nema ocene N

	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Ocene1")
				.setMaster("local");
		try(JavaSparkContext sc = new JavaSparkContext(conf);){
			String rok = "jun2020";
			String zadataOcena = "10";
			JavaRDD<String> ulazniPodaci = sc.textFile("studenti_test.txt");
			//obrada ulaznih podataka
			//Tuple2<Integer,String[]>
			List<Tuple2<Integer,String[]>>  rezultat = ulazniPodaci.flatMapToPair(
					s->{
						List<Tuple2<String[], String>> lista = new LinkedList<>();
						String[] podaciSvi = s.split("\t");
						//za slucaj da student nema polozene ispite
						if(podaciSvi.length==1) return lista.iterator();
						//niz podataka o ispitima za studenta
						String[] podaciIspiti = podaciSvi[1].split(";");
						for(String p:podaciIspiti) {
							//pod[0] = predmet1,rok1,6
							String[] pod = p.split(",");
							//konvertujemo ocenu u string radi dalje obrade
							//torka spremna za obradu i dodavanje u listu
							Integer ocena = Integer.parseInt(pod[2]);
							//([predmet,rok], ocena)
							Tuple2<String[], String> podatakZaListu = new Tuple2<>(new String[] {pod[0],pod[1]}, pod[2]);
							lista.add(podatakZaListu);
						}
						return lista.iterator();
					}
					)
					//isfiltriraj rok
			 		.filter(s->s._1[1].equals(rok))
			 		//reformatiraj kljuc predmet, vrednost ocena
			 		.mapToPair(s->new Tuple2<String, String>(s._1[0], s._2))
                    //ni jedna ocena u roku nije zadata ocena
			 		.filter(s->!(s._2.equals(zadataOcena)))
			 		//pravimoStringOcena ocena1,ocena2...
			 		.reduceByKey((a,b)->(a+";"+b))
			 		//reformatiraj da bude ([predmet1,ocena1;ocena2...],brojOcena)
			 		.mapToPair(s->new Tuple2<Integer, String[]>(s._2.split(";").length, new String[] {s._1, s._2}))
			 		//sortiraj po broju ocena
			 		.sortByKey(false)
			 		//vrati sve, ovako zbog lakse obrade ako nema
			 		.collect()
					;
			if(rezultat.size()==0) {
				System.out.println("Nema jbg");
			}else {
				System.out.println("Trazeni predmet je "+rezultat.get(0)._2[0]);
			}
			
			
		}
	}
	
}

Провера

Следећи садржај датотеке која се прослеђује као први аргумент оба програма може се користити за тестирање:

pera	predmet1,jun2020,9;predmet2,jun2020,10;predmet3,jun2020,9;predmet1,jul2020,10;predmet3,jul2020,10
mika	predmet1,jun2020,6;predmet2,jun2020,6;predmet3,jun2020,7;predmet1,jul2020,6
zika	predmet1,jun2020,8
jovan	

(додати табулатор на крај последњег реда ручно уколико се не ископира).

На колоквијуму су биле доступне Students_V0.txt и Students_V1.txt датотеке за тестирање решења.

Напомене

  1. Уколико их има више, вратити било који. Није гарантовано да овај предмет постоји.