IEP/K2 2022
Drugi kolokvijum 2022. godine održan je 5. maja. Na kolokvijumu su bili dostupni Hadoop dokumentacija, prezentacija sa predavanja, virtuelna mašina korišćena na predavanju i dva tekstualna fajla kao primeri unosa (bez očekivanog ispisa ili primera R i N parametara).
Postavka
Posmatra se evidencija o položenim ispitima. U jednom redu se nalaze identifikator studenta i lista ispitima[sic] koje je položio dati student. Svaki rad[sic] sadrži informacije o položenom ispitu kao što su šifra predmeta, šifra roka, i ocena koju je student dobio. Za potrebe navedene evidencije podaci se čuvaju u tekstualnoj datoteci na Hadoop sistemu. Podaci su dati u obliku:
<Student><TAB>{<Exam>{;<Exam>}}
Gde polje <Student>
predstavlja identifikator studenta, a polje <Exam>
sadrži šifru predmeta, nakon koga dolazi znak ,
, pa šifra roka, nakon koga dolazi znak ,
i na kraju ocena.
- U programskom jeziku Java sastaviti Map/Reduce posao koji vraća statističke podatke o ispitima u ispitnim rokovima: šifru predmeta, šifru roka, broj studenata koji su polagali dati ispit, minimalnu ocenu, maksimalnu ocenu i prosečnu ocenu. Voditi računa o konkurentnosti.
- U programskom jeziku Java sastaviti lanac od dva Map/Reduce posla koji vraća predmet[1] koji je u zadatom ispitnom R polagalo najviše studenata, a da ni jedan od tih studenata u tom roku nije dobio zadatu ocenu N. Parametri R i N se prosleđuje[sic] računarima koji rade obradu. Voditi računa o konkurentnosti.
Odgovor[sic] se predaju u vidu dva[sic] java datoteka (Ocene1.java
i Ocene2.java
).
Ocene1.java
package rs.etf.iep.mapreduce;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Ocene1 {
private static final int MIN_INVALID_GRADE = 3;
private static final int MAX_INVALID_GRADE = 11;
private static Text formatMapRow(int studentCount, int minGrade, int maxGrade, int gradeSum, double avgGrade) {
return new Text(studentCount + "\t" + minGrade + "\t" + maxGrade + "\t" + gradeSum + "\t" + avgGrade);
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context) {
try {
String[] split = value.toString().split("\t");
if (split.length <= 1) {
// Студент нема ниједан испит.
return;
}
for (String exam : split[1].split(";")) {
String[] examSplit = exam.split(",");
String examCode = examSplit[0];
String examName = examSplit[1];
int grade = Integer.parseInt(examSplit[2]);
Text emitKey = new Text(examCode + "\t" + examName);
Text emitValue = formatMapRow(1, grade, grade, grade, grade);
context.write(emitKey, emitValue);
}
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) {
try {
int studentCount = 0;
int gradeSum = 0;
int minGrade = MAX_INVALID_GRADE;
int maxGrade = MIN_INVALID_GRADE;
for (Text value : values) {
String[] valueSplit = value.toString().split("\t");
studentCount += Integer.parseInt(valueSplit[0]);
gradeSum += Integer.parseInt(valueSplit[3]);
int currMinGrade = Integer.parseInt(valueSplit[1]);
if (currMinGrade < minGrade) {
minGrade = currMinGrade;
}
int currMaxGrade = Integer.parseInt(valueSplit[2]);
if (currMaxGrade > maxGrade) {
maxGrade = currMaxGrade;
}
}
context.write(key, formatMapRow(studentCount, minGrade, maxGrade, gradeSum,
((double) gradeSum) / ((double) studentCount)));
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
FileUtils.deleteDirectory(new File(args[1]));
Job job = Job.getInstance();
job.setJarByClass(Ocene1.class);
job.setJobName("ocene1");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass(Reduce.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Ocene2.java
package rs.etf.iep.mapreduce;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Ocene2 {
private static final int MIN_INVALID_GRADE = 3;
public static class Map1 extends Mapper<LongWritable, Text, Text, Text> {
private String R;
@Override
public void setup(Context context) {
R = context.getConfiguration().get("R");
}
@Override
public void map(LongWritable key, Text value, Context context) {
try {
String[] split = value.toString().split("\t");
if (split.length <= 1) {
// Студент нема ниједан испит.
return;
}
for (String exam : split[1].split(";")) {
String[] examSplit = exam.split(",");
if (examSplit[1].equals(R)) {
context.write(new Text(examSplit[0]), new Text(examSplit[2] + "\t1"));
}
}
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
public static class Reduce1 extends Reducer<Text, Text, Text, Text> {
private int N;
@Override
public void setup(Context context) {
N = context.getConfiguration().getInt("N", MIN_INVALID_GRADE);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) {
try {
int studentCount = 0;
int grade = MIN_INVALID_GRADE;
for (Text value : values) {
String[] valueSplit = value.toString().split("\t");
grade = Integer.parseInt(valueSplit[0]);
if (grade == N) {
return;
}
studentCount += Integer.parseInt(valueSplit[1]);
}
context.write(key, new Text(grade + "\t" + studentCount));
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
public static void job1(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("R", args[2]);
conf.setInt("N", Integer.parseInt(args[3]));
Job job = Job.getInstance(conf, "ocene2-1");
job.setJarByClass(Ocene2.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map1.class);
job.setReducerClass(Reduce1.class);
job.setCombinerClass(Reduce1.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path("ocene2-temp"));
job.waitForCompletion(true);
}
public static class Map2 extends Mapper<LongWritable, Text, Text, Text> {
private static final Text text = new Text("ocene2-text");
@Override
public void map(LongWritable key, Text value, Context context) {
try {
context.write(text, value);
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
public static class Reduce2 extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) {
try {
int maxStudentCount = 0;
String subject = null;
for (Text value : values) {
String[] valueSplit = value.toString().split("\t");
int studentCount = Integer.parseInt(valueSplit[2]);
if (studentCount > maxStudentCount) {
maxStudentCount = studentCount;
subject = valueSplit[0];
}
}
context.write(new Text(subject), new Text(String.valueOf(maxStudentCount)));
} catch (Exception e) {
// Хватају се све грешке јер Hadoop понекад може да их не испише.
e.printStackTrace();
}
}
}
public static void job2(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJobName("ocene2-2");
job.setJarByClass(Ocene2.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map2.class);
job.setCombinerClass(Reduce2.class);
FileInputFormat.setInputPaths(job, new Path("ocene2-temp"));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
FileUtils.deleteDirectory(new File(args[1]));
FileUtils.deleteDirectory(new File("ocene2-temp"));
job1(args);
job2(args);
}
}
Provera
Sledeći sadržaj datoteke koja se prosleđuje kao prvi argument oba programa može se koristiti za testiranje:
pera predmet1,jun2020,9;predmet2,jun2020,10;predmet3,jun2020,9;predmet1,jul2020,10;predmet3,jul2020,10 mika predmet1,jun2020,6;predmet2,jun2020,6;predmet3,jun2020,7;predmet1,jul2020,6 zika predmet1,jun2020,8 jovan
(dodati tabulator na kraj poslednjeg reda ručno ukoliko se ne iskopira). Primer sa kolokvijuma je posedovao redove bez ijednog ispita.
Napomene
- ↑ Ukoliko ih ima više, vratiti bilo koji. Nije garantovano da ovaj predmet postoji.