IEP/K2 Septembar 2021
Drugi kolokvijum u septembarskom roku 2021. godine održan je 17. septembra.
Postavka
Posmatra se evidencija o položenim ispitima. U jednom redu se nalaze identifikator studenta i lista ispitima[sic] koje je položio dati student. Svaki rad[sic] sadrži informacije o položenom ispitu kao što su šifra predmeta, šifra roka, i ocena koju je student dobio. Za potrebe navedene evidencije podaci se čuvaju u tekstualnoj datoteci na Hadoop sistemu. Podaci su dati u obliku:
<Student><TAB>{<Exam>{;<Exam>}}
Gde polje <Student> predstavlja identifikator studenta, a polje <Exam> sadrži šifru predmeta, nakon koga dolazi znak ,, pa šifra roka, nakon koga dolazi znak , i na kraju ocena.
- U programskom jeziku Java sastaviti Map/Reduce posao koji vraća statističke podatke o ispitima u ispitnim rokovima: šifru predmeta, šifru roka, broj studenata koji su polagali dati ispit, broj studenata koji su dobili ocenu 6, broj studenata koji su dobili ocenu 7, broj studenata koji su dobili ocenu 8, broj studenata koji su dobili ocenu 9, broj studenata koji su dobili ocenu 10. Voditi računa o konkurentnosti.
- U programskom jeziku Java sastaviti lanac od dva Map/Reduce posla koji vraća spisak predmeta sa najvišim prosekom (MAX) (prosek nije po roku nego od svih koji su ga ikada polagali), pri čemu je svaki od predmeta položilo barem studenata (, parametar koji se prosleđuje računarima koji rade obradu). Voditi računa o konkurentnosti.
Odgovor[sic] se predaju u vidu dva[sic] java datoteka (Ocene1.java i Ocene2.java).
Ocene1.java
package rs.etf.iep.mapreduce;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Ocene1 {
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
for (String exam : value.toString().split("\t")[1].split(";")) {
String[] examSplit = exam.split(",");
String examCode = examSplit[0];
String examName = examSplit[1];
int grade = Integer.parseInt(examSplit[2]);
int[] gradeSplit = new int[5];
gradeSplit[grade - 6] = 1;
StringBuilder sb = new StringBuilder();
sb.append(1);
for (int i = 0; i < 5; ++i) {
sb.append("\t");
sb.append(gradeSplit[i]);
}
context.write(new Text(examCode + "\t" + examName), new Text(sb.toString()));
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int studentCount = 0;
int[] grades = new int[5];
for (Text value : values) {
String[] valueSplit = value.toString().split("\t");
studentCount += Integer.parseInt(valueSplit[0]);
for (int i = 0; i < 5; ++i) {
grades[i] += Integer.parseInt(valueSplit[i + 1]);
}
}
StringBuilder sb = new StringBuilder();
sb.append(studentCount);
for (int i = 0; i < 5; ++i) {
sb.append("\t");
sb.append(grades[i]);
}
context.write(key, new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception {
FileUtils.deleteDirectory(new File(args[1]));
Job job = Job.getInstance();
job.setJarByClass(Ocene1.class);
job.setJobName("ocene1");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass(Reduce.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Ocene2.java
package rs.etf.iep.mapreduce;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Ocene2 {
public static final int DEFAULT_N = 1;
public static class Map1 extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
for (String exam : value.toString().split("\t")[1].split(";")) {
String[] examSplit = exam.split(",");
context.write(new Text(examSplit[0]), new Text(examSplit[2] + "\t1"));
}
}
}
public static class Reduce1 extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
for (Text value : values) {
String[] valueSplit = value.toString().split("\t");
sum += Integer.parseInt(valueSplit[0]);
count += Integer.parseInt(valueSplit[1]);
}
context.write(key, new Text(sum + "\t" + count));
}
}
public static void job1(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(Ocene2.class);
job.setJobName("ocene2-1");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map1.class);
job.setReducerClass(Reduce1.class);
job.setCombinerClass(Reduce1.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path("ocene2-temp"));
job.waitForCompletion(true);
}
public static class Map2 extends Mapper<LongWritable, Text, Text, Text> {
private static Text text = new Text("ocene2-text");
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splitValue = value.toString().split("\t");
int N = context.getConfiguration().getInt("N", DEFAULT_N);
double sum = Double.parseDouble(splitValue[1]);
double count = Double.parseDouble(splitValue[2]);
if (count >= N) {
context.write(text, new Text(splitValue[0] + "\t" + (sum / count)));
}
}
}
public static class Reduce2 extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double maxAvg = 0;
List<Text> list = new LinkedList<>();
for (Text value : values) {
String[] valueSplit = value.toString().split("\t");
double avg = Double.parseDouble(valueSplit[1]);
if (avg > maxAvg) {
list.clear();
maxAvg = avg;
}
if (avg == maxAvg) {
list.add(value);
}
}
for (Text value : list) {
context.write(key, value);
}
}
}
public static void job2(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInt("N", Integer.parseInt(args[2]));
Job job = Job.getInstance(conf, "ocene2-2");
job.setJarByClass(Ocene2.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map2.class);
job.setReducerClass(Reduce2.class);
job.setCombinerClass(Reduce2.class);
FileInputFormat.setInputPaths(job, new Path("ocene2-temp"));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
FileUtils.deleteDirectory(new File(args[1]));
FileUtils.deleteDirectory(new File("ocene2-temp"));
job1(args);
job2(args);
}
}
Provera
Sledeći sadržaj datoteke koja se prosleđuje kao prvi argument oba programa može se koristiti za testiranje:
Pera Peric predmet1,rok1,6;predmet2,rok1,10;predmet1,rok2,9 Marko Markovic predmet1,rok1,8;predmet2,rok1,6;predmet3,rok3,9