ИЕП/К2 Септембар 2021 — разлика између измена

Извор: SI Wiki
Пређи на навигацију Пређи на претрагу
 
Ред 150: Ред 150:
== <code>Ocene2.java</code> ==
== <code>Ocene2.java</code> ==
<syntaxhighlight lang="java">
<syntaxhighlight lang="java">
package rs.etf.iep.mapreduce;
package k2_sept_2021;


import java.io.File;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
 
import k2_sept_2021.Ocene1.Exam;
import scala.Tuple2;
 
import java.io.Serializable;
 
import org.apache.log4j.LogManager;
import org.apache.log4j.Level;
 
import java.util.List;
import java.util.LinkedList;
import java.util.LinkedList;
import java.util.List;


import org.apache.commons.io.FileUtils;
public class Ocene2 {
import org.apache.hadoop.conf.Configuration;
 
import org.apache.hadoop.fs.Path;
static class Exam implements Serializable {
import org.apache.hadoop.io.LongWritable;
String subject;
import org.apache.hadoop.io.Text;
String examTerm;
import org.apache.hadoop.mapreduce.Job;
int grade;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Ocene2 {
public Exam(String examString) {
public static final int DEFAULT_N = 1;
String[] data = examString.split(",");
this.subject = data[0];
public static class Map1 extends Mapper<LongWritable, Text, Text, Text> {
this.examTerm = data[1];
@Override
this.grade = Integer.parseInt(data[2]);
public void map(LongWritable key, Text value, Context context) {
try {
String[] split = value.toString().split("\t");
if (split.length <= 1) {
// Студент нема ниједан испит.
return;
}
for (String exam : split[1].split(";")) {
String[] examSplit = exam.split(",");
context.write(new Text(examSplit[0]), new Text(examSplit[2] + "\t1"));
}
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
}


public static class Reduce1 extends Reducer<Text, Text, Text, Text> {
public String getSubject() {
@Override
return subject;
public void reduce(Text key, Iterable<Text> values, Context context) {
try {
int sum = 0;
int count = 0;
for (Text value : values) {
String[] valueSplit = value.toString().split("\t");
sum += Integer.parseInt(valueSplit[0]);
count += Integer.parseInt(valueSplit[1]);
}
context.write(key, new Text(sum + "\t" + count));
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
}


public static void job1(String[] args) throws Exception {
public void setSubject(String subject) {
Job job = Job.getInstance();
this.subject = subject;
job.setJarByClass(Ocene2.class);
}
job.setJobName("ocene2-1");


job.setInputFormatClass(TextInputFormat.class);
public String getExamTerm() {
job.setOutputFormatClass(TextOutputFormat.class);
return examTerm;
}


job.setOutputKeyClass(Text.class);
public void setExamTerm(String examTerm) {
job.setOutputValueClass(Text.class);
this.examTerm = examTerm;
}


job.setMapperClass(Map1.class);
public int getGrade() {
job.setReducerClass(Reduce1.class);
return grade;
job.setCombinerClass(Reduce1.class);
}


FileInputFormat.setInputPaths(job, new Path(args[0]));
public void setGrade(int grade) {
FileOutputFormat.setOutputPath(job, new Path("ocene2-temp"));
this.grade = grade;
 
}
job.waitForCompletion(true);
}
public static class Map2 extends Mapper<LongWritable, Text, Text, Text> {
private static final Text text = new Text("ocene2-text");
private int N;


@Override
@Override
public void setup(Context context) {
public String toString() {
N = context.getConfiguration().getInt("N", DEFAULT_N);
return "Exam [subject=" + subject + ", examTerm=" + examTerm + ", grade=" + grade + "]";
}
}


@Override
public void map(LongWritable key, Text value, Context context) {
try {
String[] splitValue = value.toString().split("\t");
double sum = Double.parseDouble(splitValue[1]);
double count = Double.parseDouble(splitValue[2]);
if (count >= N) {
context.write(text, new Text(splitValue[0] + "\t" + (sum / count)));
}
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
}


public static class Reduce2 extends Reducer<Text, Text, Text, Text> {
static class Student implements Serializable {
@Override
private String name;
public void reduce(Text key, Iterable<Text> values, Context context) {
private List<Exam> exams = new LinkedList<>();
try {
 
double maxAvg = 0;
public Student(String line) {
List<String> list = new LinkedList<>();
String[] data = line.split("\t");
for (Text value : values) {
this.name = data[0];
String[] valueSplit = value.toString().split("\t");
if (data.length <= 1)
double avg = Double.parseDouble(valueSplit[1]);
return;
if (avg > maxAvg) {
 
list.clear();
for (String examString : data[1].split(";")) {
maxAvg = avg;
exams.add(new Exam(examString));
}
if (avg == maxAvg) {
list.add(value.toString());
}
}
for (String value : list) {
context.write(key, new Text(value));
}
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
}
}


public static void job2(String[] args) throws Exception {
public String getName() {
Configuration conf = new Configuration();
return name;
conf.setInt("N", Integer.parseInt(args[2]));
}


Job job = Job.getInstance(conf, "ocene2-2");
public void setName(String name) {
job.setJarByClass(Ocene2.class);
this.name = name;
}


job.setInputFormatClass(TextInputFormat.class);
public List<Exam> getExams() {
job.setOutputFormatClass(TextOutputFormat.class);
return exams;
}


job.setOutputKeyClass(Text.class);
public void setExams(List<Exam> exams) {
job.setOutputValueClass(Text.class);
this.exams = exams;
}


job.setMapperClass(Map2.class);
}
job.setReducerClass(Reduce2.class);


FileInputFormat.setInputPaths(job, new Path("ocene2-temp"));
public static void main(String[] args) {
FileOutputFormat.setOutputPath(job, new Path(args[1]));
SparkConf conf = new SparkConf().setAppName("ocene1").setMaster("local");
LogManager.getRootLogger().setLevel(Level.WARN); // iskljuci nepotrebne poruke
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
zadatak(sc, "resources/Students_V0.txt", 2);
}
}


job.waitForCompletion(true);
public static void zadatak(JavaSparkContext sc, String filename, int N) {
}
var results =
public static void main(String[] args) throws Exception {
sc.textFile(filename)
FileUtils.deleteDirectory(new File(args[1]));
.map(Student::new)
FileUtils.deleteDirectory(new File("ocene2-temp"));
.flatMapToPair(student->{
job1(args);
List<Tuple2<String, int[]>> gradeInfo = new LinkedList<Tuple2<String, int[]>>();
job2(args);
for (Exam exam : student.getExams()) {
int[] info = new int[2]; // count, grade sum
info[0] = 1;
info[1] = exam.getGrade();
gradeInfo.add(new Tuple2<>(exam.getSubject(), info));
}
return gradeInfo.iterator();
})
.reduceByKey((info1, info2)->{
int[] resultInfo = new int[2];
resultInfo[0] = info1[0] + info2[0];
resultInfo[1] = info1[1]+ info2[1];
return resultInfo;
})
.filter(subjectPair->subjectPair._2()[0] >= N)
.map(subjectPair->{
double averageGrade = subjectPair._2()[1] *1.0 / subjectPair._2()[0];
return new Tuple2<>(subjectPair._1(), averageGrade);
})
.sortBy(avgGradePair->avgGradePair._2(), false, N)
.collect();
double maxAvg = results.get(0)._2();
double tolerance = 10e-6;
for (Tuple2<String, Double> result:results) {
double avg = result._2();
if (maxAvg <= avg+tolerance) {
System.out.println("Predmet: "+result._1() +", prosek: "+avg);
}
}
}
}
}
}
</syntaxhighlight>
</syntaxhighlight>



Тренутна верзија на датум 20. април 2024. у 18:13

Други колоквијум у септембарском року 2021. године одржан је 17. септембра.

Поставка

Посматра се евиденција о положеним испитима. У једном реду се налазе идентификатор студента и листа испитима[sic] које је положио дати студент. Сваки рад[sic] садржи информације о положеном испиту као што су шифра предмета, шифра рока, и оцена коју је студент добио. За потребе наведене евиденције подаци се чувају у текстуалној датотеци на Hadoop систему. Подаци су дати у облику:

<Student><TAB>{<Exam>{;<Exam>}}

Где поље <Student> представља идентификатор студента, а поље <Exam> садржи шифру предмета, након кога долази знак ,, па шифра рока, након кога долази знак , и на крају оцена.

  1. У програмском језику Јава саставити Map/Reduce посао који враћа статистичке податке о испитима у испитним роковима: шифру предмета, шифру рока, број студената који су полагали дати испит, број студената који су добили оцену 6, број студената који су добили оцену 7, број студената који су добили оцену 8, број студената који су добили оцену 9, број студената који су добили оцену 10. Водити рачуна о конкурентности.
  2. У програмском језику Јава саставити ланац од два Map/Reduce посла који враћа списак предмета са највишим просеком (MAX) (просек није по року него од свих који су га икада полагали), при чему је сваки од предмета положило барем студената (, параметар који се прослеђује рачунарима који раде обраду). Водити рачуна о конкурентности.

Одговор[sic] се предају у виду два[sic] јава датотека (Ocene1.java и Ocene2.java).

Ocene1.java

package k2_sept_2021;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

import java.io.Serializable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Level;

import java.util.List;
import java.util.LinkedList;

public class Ocene1 {

	static class Exam implements Serializable {
		String subject;
		String examTerm;
		int grade;

		public Exam(String examString) {
			String[] data = examString.split(",");
			this.subject = data[0];
			this.examTerm = data[1];
			this.grade = Integer.parseInt(data[2]);
		}

		public String getSubject() {
			return subject;
		}

		public void setSubject(String subject) {
			this.subject = subject;
		}

		public String getExamTerm() {
			return examTerm;
		}

		public void setExamTerm(String examTerm) {
			this.examTerm = examTerm;
		}

		public int getGrade() {
			return grade;
		}

		public void setGrade(int grade) {
			this.grade = grade;
		}

		@Override
		public String toString() {
			return "Exam [subject=" + subject + ", examTerm=" + examTerm + ", grade=" + grade + "]";
		}

	}

	static class Student implements Serializable {
		private String name;
		private List<Exam> exams = new LinkedList<>();

		public Student(String line) {
			String[] data = line.split("\t");
			this.name = data[0];
			if (data.length <= 1)
				return;

			for (String examString : data[1].split(";")) {
				exams.add(new Exam(examString));
			}
		}

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}

		public List<Exam> getExams() {
			return exams;
		}

		public void setExams(List<Exam> exams) {
			this.exams = exams;
		}

	}

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("ocene1").setMaster("local");
		LogManager.getRootLogger().setLevel(Level.WARN); // iskljuci nepotrebne poruke
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			zadatak(sc, "resources/students_test.tsv");
		}
	}

	public static void zadatak(JavaSparkContext sc, String filename) {
		var result = sc.textFile(filename).map(Student::new).flatMapToPair(student -> {
			System.out.println(student);
			List<Tuple2<String, int[]>> gradeInfo = new LinkedList<Tuple2<String, int[]>>();
			for (Exam exam : student.getExams()) {
				int[] gradeArray = new int[6]; // {0,0,0,0,0,0} - Broj polaganja, broj 6, broj 7, broj 8 , broj 9, broj 10
				String examName = exam.getSubject() + " - " + exam.getExamTerm();
				if (exam.getGrade() <= 5)
					continue;
				gradeArray[0] = 1;
				gradeArray[exam.getGrade() - 5]++;
				gradeInfo.add(new Tuple2<>(examName, gradeArray));
			}
			return gradeInfo.iterator();
		}).reduceByKey((gradeInfo1, gradeInfo2) -> {
			int[] resultGrades = new int[6];
			for (int i = 0; i < gradeInfo1.length; i++) {
				resultGrades[i] = gradeInfo1[i] + gradeInfo2[i];
			}
			return resultGrades;
		}).collectAsMap();

		for (String examName : result.keySet()) {
			int[] gi = result.get(examName);
			System.out.println("Ispit: " + examName + " Br polaganja: "+ gi[0] + " Br 6:" + gi[1] + " Br 7:" + gi[2] + " Br 8:" + gi[3] + " Br 9:"
					+ gi[4] + " Br 10:" + gi[5]);

		}
	}
}

Ocene2.java

package k2_sept_2021;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import k2_sept_2021.Ocene1.Exam;
import scala.Tuple2;

import java.io.Serializable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Level;

import java.util.List;
import java.util.LinkedList;

public class Ocene2 {

	static class Exam implements Serializable {
		String subject;
		String examTerm;
		int grade;

		public Exam(String examString) {
			String[] data = examString.split(",");
			this.subject = data[0];
			this.examTerm = data[1];
			this.grade = Integer.parseInt(data[2]);
		}

		public String getSubject() {
			return subject;
		}

		public void setSubject(String subject) {
			this.subject = subject;
		}

		public String getExamTerm() {
			return examTerm;
		}

		public void setExamTerm(String examTerm) {
			this.examTerm = examTerm;
		}

		public int getGrade() {
			return grade;
		}

		public void setGrade(int grade) {
			this.grade = grade;
		}

		@Override
		public String toString() {
			return "Exam [subject=" + subject + ", examTerm=" + examTerm + ", grade=" + grade + "]";
		}

	}

	static class Student implements Serializable {
		private String name;
		private List<Exam> exams = new LinkedList<>();

		public Student(String line) {
			String[] data = line.split("\t");
			this.name = data[0];
			if (data.length <= 1)
				return;

			for (String examString : data[1].split(";")) {
				exams.add(new Exam(examString));
			}
		}

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}

		public List<Exam> getExams() {
			return exams;
		}

		public void setExams(List<Exam> exams) {
			this.exams = exams;
		}

	}

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("ocene1").setMaster("local");
		LogManager.getRootLogger().setLevel(Level.WARN); // iskljuci nepotrebne poruke
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			zadatak(sc, "resources/Students_V0.txt", 2);
		}
	}

	public static void zadatak(JavaSparkContext sc, String filename, int N) {
		
		var results = 
				sc.textFile(filename)
				.map(Student::new)
				.flatMapToPair(student->{
					List<Tuple2<String, int[]>> gradeInfo = new LinkedList<Tuple2<String, int[]>>();
					for (Exam exam : student.getExams()) {
						int[] info = new int[2]; // count, grade sum
						info[0] = 1;
						info[1] = exam.getGrade();
						
						gradeInfo.add(new Tuple2<>(exam.getSubject(), info));
						
					}
					return gradeInfo.iterator();
				})
				.reduceByKey((info1, info2)->{
					int[] resultInfo = new int[2];
					resultInfo[0] = info1[0] + info2[0];
					resultInfo[1] = info1[1]+ info2[1];
					return resultInfo;
				})
				.filter(subjectPair->subjectPair._2()[0] >= N)
				.map(subjectPair->{
					double averageGrade = subjectPair._2()[1] *1.0 / subjectPair._2()[0];
					return new Tuple2<>(subjectPair._1(), averageGrade);
				})
				.sortBy(avgGradePair->avgGradePair._2(), false, N)
				.collect();
		
		double maxAvg = results.get(0)._2();
		double tolerance = 10e-6;
		for (Tuple2<String, Double> result:results) {
			double avg = result._2();
			if (maxAvg <= avg+tolerance) {
				System.out.println("Predmet: "+result._1() +", prosek: "+avg);
			}
		}
		
		
	}
}

Провера

Следећи садржај датотеке која се прослеђује као први аргумент оба програма може се користити за тестирање:

Pera Peric	predmet1,rok1,6;predmet2,rok1,10;predmet1,rok2,9
Marko Markovic	predmet1,rok1,8;predmet2,rok1,6;predmet3,rok3,9

На колоквијуму су биле доступне Students_V0.txt и Students_V1.txt датотеке за тестирање решења.