目前有二个集合R与S
R的数据格式是:id,name,age,weight
S的数据格式是:id,gender,height
首先将这种字符串封装为具体的类PO
package RelationAlgebra;
import java.io.DataInput;
/**
* 表示一个关系的属性构成
*/
public class RelationA implements WritableComparable<RelationA> {
private int id;
private String name;
private int age;
private double weight;
public RelationA() {
}
public RelationA(int id, String name, int age, double weight) {
this.setId(id);
this.setName(name);
this.setAge(age);
this.setWeight(weight);
}
public RelationA(String line) {
String[] value = line.split(",");
System.out.println(line + " " + value[0]);
this.setId(Integer.parseInt(value[0]));
this.setName(value[1]);
this.setAge(Integer.parseInt(value[2]));
this.setWeight(Double.parseDouble(value[3]));
}
public boolean isCondition(int col, String value) {
if (col == 0 && Integer.parseInt(value) == this.id)
return true;
else if (col == 1 && name.equals(value))
return true;
else if (col == 2 && Integer.parseInt(value) == this.age)
return true;
else if (col == 3 && Double.parseDouble(value) == this.weight)
return true;
else
return false;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public double getWeight() {
return weight;
}
public void setWeight(double weight) {
this.weight = weight;
}
public String getCol(int col) {
switch (col) {
case 0:
return String.valueOf(id);
case 1:
return name;
case 2:
return String.valueOf(age);
case 3:
return String.valueOf(weight);
default:
return null;
}
}
public String getValueExcept(int col) {
switch (col) {
case 0:
return name + "," + String.valueOf(age) + ","
+ String.valueOf(weight);
case 1:
return String.valueOf(id) + "," + String.valueOf(age) + ","
+ String.valueOf(weight);
case 2:
return String.valueOf(id) + "," + name + ","
+ String.valueOf(weight);
case 3:
return String.valueOf(id) + "," + name + "," + String.valueOf(age);
default:
return null;
}
}
@Override
public String toString() {
return id + "," + name + "," + age + "," + weight;
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(id);
out.writeUTF(name);
out.writeInt(age);
out.writeDouble(weight);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
id = in.readInt();
name = in.readUTF();
age = in.readInt();
weight = in.readDouble();
}
@Override
public int compareTo(RelationA o) {
if (id == o.getId() && name.equals(o.getName()) && age == o.getAge()
&& weight == o.getWeight())
return 0;
else if (id < o.getId())
return -1;
else
return 1;
}
}
package RelationAlgebra;
import java.io.DataInput;
public class RelationB implements WritableComparable<RelationB> {
private int id;
private boolean gender;
private double height;
public RelationB() {
}
public RelationB(int id, boolean gender, double height) {
this.setId(id);
this.setGender(gender);
this.setHeight(height);
}
public RelationB(String line) {
String[] value = line.split(",");
this.setId(Integer.parseInt(value[0]));
this.setGender(Boolean.parseBoolean(value[1]));
this.setHeight(Double.parseDouble(value[2]));
}
public String getValueExcept(int col) {
switch (col) {
case 0:
return String.valueOf(gender) + "," + String.valueOf(height);
case 1:
return String.valueOf(id) + "," + String.valueOf(height);
case 2:
return String.valueOf(id) + "," + String.valueOf(gender);
default:
return null;
}
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public boolean isGender() {
return gender;
}
public void setGender(boolean gender) {
this.gender = gender;
}
public double getHeight() {
return height;
}
public void setHeight(double height) {
this.height = height;
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(id);
out.writeBoolean(gender);
out.writeDouble(height);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
id = in.readInt();
gender = in.readBoolean();
height = in.readDouble();
}
@Override
public int compareTo(RelationB o) {
if (id == o.getId() && gender == o.isGender()
&& height == o.getHeight())
return 0;
else if (id < o.getId())
return -1;
else
return 1;
}
}
1、选择找出R指定id和值得元祖
package RelationAlgebra;
import java.io.IOException;
/**
* 获得列号为id的列上所有值为value的元组
*
*/
public class Selection {
public static class SelectionMap extends
Mapper<LongWritable, Text, RelationA, NullWritable> {
private int id;
private String value;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
id = context.getConfiguration().getInt("col", 0);
value = context.getConfiguration().get("value");
}
@Override
public void map(LongWritable offSet, Text line, Context context)
throws IOException, InterruptedException {
RelationA record = new RelationA(line.toString());
if (record.isCondition(id, value))
context.write(record, NullWritable.get());
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Job selectionJob = new Job();
selectionJob.setJobName("selectionJob");
selectionJob.setJarByClass(Selection.class);
selectionJob.getConfiguration()
.setInt("col", Integer.parseInt(args[2]));
selectionJob.getConfiguration().set("value", args[3]);
selectionJob.setMapperClass(SelectionMap.class);
selectionJob.setMapOutputKeyClass(RelationA.class);
selectionJob.setMapOutputValueClass(NullWritable.class);
selectionJob.setNumReduceTasks(0);
selectionJob.setInputFormatClass(TextInputFormat.class);
selectionJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(selectionJob, new Path(args[0]));
FileOutputFormat.setOutputPath(selectionJob, new Path(args[1]));
selectionJob.waitForCompletion(true);
System.out.println("finished");
}
}
2、投影
投影运算,选择列col的值输出,这里输出的值进行了剔重
package RelationAlgebra;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 投影运算,选择列col的值输出,这里输出的值进行了剔重
*/
public class Projection {
public static class ProjectionMap extends
Mapper<LongWritable, Text, Text, NullWritable> {
private int col;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
col = context.getConfiguration().getInt("col", 0);
}
@Override
public void map(LongWritable offSet, Text line, Context context)
throws IOException, InterruptedException {
RelationA record = new RelationA(line.toString());
context.write(new Text(record.getCol(col)), NullWritable.get());
}
}
public static class ProjectionReduce extends
Reducer<Text, NullWritable, Text, NullWritable> {
@Override
public void reduce(Text key, Iterable<NullWritable> value,
Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Job projectionJob = new Job();
projectionJob.setJobName("projectionJob");
projectionJob.setJarByClass(Projection.class);
projectionJob.getConfiguration().setInt("col",
Integer.parseInt(args[2]));
projectionJob.setMapperClass(ProjectionMap.class);
projectionJob.setMapOutputKeyClass(Text.class);
projectionJob.setMapOutputValueClass(NullWritable.class);
projectionJob.setReducerClass(ProjectionReduce.class);
projectionJob.setOutputKeyClass(Text.class);
projectionJob.setOutputValueClass(NullWritable.class);
projectionJob.setInputFormatClass(TextInputFormat.class);
projectionJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(projectionJob, new Path(args[0]));
FileOutputFormat.setOutputPath(projectionJob, new Path(args[1]));
projectionJob.waitForCompletion(true);
System.out.println("finished!");
}
}
/*
交集主要就是看记录相同的数大于2
*/
package RelationAlgebra;
import java.io.IOException;
/**
* 求交集,对于每个record发送(record,1),reduce 时值为2才发射此record
*/
public class Intersection {
public static class IntersectionMap extends
Mapper<LongWritable, Text, RelationA, IntWritable> {
private IntWritable one = new IntWritable(1);
@Override
public void map(LongWritable offSet, Text line, Context context)
throws IOException, InterruptedException {
RelationA record = new RelationA(line.toString());
context.write(record, one);
}
}
public static class IntersectionReduce extends
Reducer<RelationA, IntWritable, RelationA, NullWritable> {
@Override
public void reduce(RelationA key, Iterable<IntWritable> value,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : value) {
sum += val.get();
}
if (sum >= 2)
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Job intersectionJob = new Job();
intersectionJob.setJobName("intersectionJob");
intersectionJob.setJarByClass(Intersection.class);
intersectionJob.setMapperClass(IntersectionMap.class);
intersectionJob.setMapOutputKeyClass(RelationA.class);
intersectionJob.setMapOutputValueClass(IntWritable.class);
intersectionJob.setReducerClass(IntersectionReduce.class);
intersectionJob.setOutputKeyClass(RelationA.class);
intersectionJob.setOutputValueClass(NullWritable.class);
intersectionJob.setInputFormatClass(TextInputFormat.class);
intersectionJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(intersectionJob, new Path(args[0]));
FileOutputFormat.setOutputPath(intersectionJob, new Path(args[1]));
intersectionJob.waitForCompletion(true);
System.out.println("finished!");
}
}
/*
差R-S 属于R但是不属于S
map
(record,R)
(record,S)
reduce
(record,R,R,S) 不是所以只需要判断是否只有R,而没有S
*/
package RelationAlgebra;
import java.io.IOException;
public class Difference {
public static class DifferenceMap extends
Mapper<Object, Text, RelationA, Text> {
@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
FileSplit split=(FileSplit)context.getInputSplit();
String filename=split.getPath().getName();
RelationA relation=new RelationA(value.toString());
context.write(relation, new Text(filename));
}
}
public static class DifferenceReduce extends
Reducer<RelationA, Text, RelationA, NullWritable> {
String setR;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
setR=context.getConfiguration().get("setR");
}
@Override
public void reduce(RelationA key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
StringBuffer sb=new StringBuffer();
for(Text val:value){
sb.append(val.toString()).append(",");
}
if(!sb.toString().contains(setR)){
context.write(key, NullWritable.get());
}
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Job differenceJob = new Job();
differenceJob.setJobName("differenceJob");
differenceJob.setJarByClass(Difference.class);
differenceJob.getConfiguration().set("setR", args[2]);
differenceJob.setMapperClass(DifferenceMap.class);
differenceJob.setMapOutputKeyClass(RelationA.class);
differenceJob.setMapOutputValueClass(Text.class);
differenceJob.setReducerClass(DifferenceReduce.class);
differenceJob.setOutputKeyClass(RelationA.class);
differenceJob.setOutputValueClass(NullWritable.class);
differenceJob.setInputFormatClass(WholeFileInputFormat.class);
differenceJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(differenceJob, new Path(args[0]));
FileOutputFormat.setOutputPath(differenceJob, new Path(args[1]));
differenceJob.waitForCompletion(true);
System.out.println("finished!");
}
}
连接:
根据ID做自然连接 做笛卡尔积连接
package RelationAlgebra;
import java.io.IOException;
/**
* 自然连接操作,在属性col上进行连接
*
*/
public class NaturalJoin {
public static class NaturalJoinMap extends
Mapper<Text, BytesWritable, Text, Text> {
private int col;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
col = context.getConfiguration().getInt("col", 0);
}
@Override
public void map(Text relationName, BytesWritable content,
Context context) throws IOException, InterruptedException {
String[] records = new String(content.getBytes(), "UTF-8")
.split("\\n");
for (int i = 0; i < records.length - 1; i++) {
RelationA record = new RelationA(records[i]);
context.write(new Text(record.getCol(col)), new Text(
relationName.toString() + " "
+ record.getValueExcept(col)));
}
}
}
public static class NaturalJoinReduce extends
Reducer<Text, Text, Text, NullWritable> {
private String relationNameA;
protected void setup(Context context) throws IOException,
InterruptedException {
relationNameA = context.getConfiguration().get("relationNameA");
}
public void reduce(Text key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
ArrayList<Text> setR = new ArrayList<Text>();
ArrayList<Text> setS = new ArrayList<Text>();
// 按照来源分为两组然后做笛卡尔乘积
for (Text val : value) {
String[] recordInfo = val.toString().split(" ");
if (recordInfo[0].equalsIgnoreCase(relationNameA))
setR.add(new Text(recordInfo[1]));
else
setS.add(new Text(recordInfo[1]));
}
for (int i = 0; i < setR.size(); i++) {
for (int j = 0; j < setS.size(); j++) {
Text t = new Text(setR.get(i).toString() + ","
+ key.toString() + "," + setS.get(j).toString());
context.write(t, NullWritable.get());
}
}
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Job naturalJoinJob = new Job();
naturalJoinJob.setJobName("naturalJoinJob");
naturalJoinJob.setJarByClass(NaturalJoin.class);
naturalJoinJob.getConfiguration().setInt("col",
Integer.parseInt(args[2]));
naturalJoinJob.getConfiguration().set("relationNameA", args[3]);
naturalJoinJob.setMapperClass(NaturalJoinMap.class);
naturalJoinJob.setMapOutputKeyClass(Text.class);
naturalJoinJob.setMapOutputValueClass(Text.class);
naturalJoinJob.setReducerClass(NaturalJoinReduce.class);
naturalJoinJob.setOutputKeyClass(Text.class);
naturalJoinJob.setOutputValueClass(NullWritable.class);
naturalJoinJob.setInputFormatClass(WholeFileInputFormat.class);
naturalJoinJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(naturalJoinJob, new Path(args[0]));
FileOutputFormat.setOutputPath(naturalJoinJob, new Path(args[1]));
naturalJoinJob.waitForCompletion(true);
System.out.println("finished!");
}
}