当前位置: 首页 > >

第七天 hadoop shuffle过程

发布时间:



shuffle
一、Shuffle(重要,MR的核心)1. 概念 (分组、排序、合并)2. 阶段:map阶段的Shuffle:reduce 阶段的shuffle


二、 reduce阶段的多表合并三、map阶段的多表合并四、数据的清洗五、案例基类map端合并reduce端合并client 端





一、Shuffle(重要,MR的核心)


1. 概念 (分组、排序、合并)

是从map结束到reduce开始之间的过程
包括: partitions 、copy 、 sort 阶段 和一些可选操作(合并/压缩等操作)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9rJIOG1h-1599902562842)(2.png)]


2. 阶段:
map阶段的Shuffle:



Map:
spill溢写
缓冲区---------------------------------------磁盘disk
Partition/Sort/Combiner

reduce 阶段的shuffle


reduce:
copy /sort(merge) /reduce

二、 reduce阶段的多表合并

表操作:
两个文件:t_order 和t_product
select o.id ... .... from t_order o join t_product p and o.pid=p.id

需求:两张表的合并和汇总
订单表:
订单编号 日期 商品编号 数量
1001,20150710,P0001,2
1002,20150710,P0001,2
1002,20150710,P0002,3

商品表:
商品编号 名称 种类 价格
P0001,小米5,1000,1999
P0002,锤子T1,1000,2000

思路:map + reduce
k v p0001 info order
pid InfoBean p0002 info order
p0002 info order
p0001 info product
p0002 info product
实现:
1. 编写bean对象
序列化和反序列化
添加一个额外的字段,唯一表示表的类型

2. 编写mapper和reducer和client


三、map阶段的多表合并

问题:在reduce端实现多表合并的所有操作,容易造成reduce端压力过大,数据倾斜等问题

解决:在map端进行表的合并,分布式缓存技术

思路:同时读取两张表
读取一张表,缓存一张表(DistributedCache)

好处:减轻了reduce端的压力
减少了map的个数

实现:
1. 将一张表的数据缓存起来
job.addCacheFile(new URI("file:///e:/orders2/t_order.txt"));

job.setNumReduceTasks(0),取消reduce

2. 读取缓存中数据
在mapper类中的setup方法中操作缓存数据
将缓存文件中的数据封装到一个集合中


3. 在mapper类中的map方法中实现数据的具体业务操作
k ----- t_product.txt
集合-----缓存订单表的所有数据


四、数据的清洗

目的:Flume采集到的原始数据通常都不规范,格式不符合要求


五、案例

1. 在reduce阶段实现订单表和商品表中数据的合并展示
2. 在map阶段实现订单表和商品表中数据的合并展示

基类

/**
* 封装order和product表中数据的
*
* 将bean对象的字段,设置成最终结果需要显示的字段(将两张表的字段进行融合)
*
*/
public class InfoBean implements Writable {
private int order_id;
private int p_id;
private String name;
private float price;
private int category_id;
private String dateTime;
private int num;
// 表示表的类型:true:封装了产品信息; fasle:封装了订单信息
private boolean flag;

public void setAllFiled(int order_id, int p_id, String name, float price, int category_id, String dateTime, int num,
boolean flag) {
this.order_id = order_id;
this.p_id = p_id;
this.name = name;
this.price = price;
this.category_id = category_id;
this.dateTime = dateTime;
this.num = num;
this.flag = flag;
}
//序列化与反序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeUTF(dateTime);
out.writeUTF(name);
out.writeInt(p_id);
out.writeFloat(price);
out.writeInt(num);
out.writeInt(category_id);
out.writeBoolean(flag);
}

@Override
public void readFields(DataInput in) throws IOException {
this.order_id = in.readInt();
this.dateTime = in.readUTF();
this.name = in.readUTF();
this.p_id = in.readInt();
this.price = in.readFloat();
this.num = in.readInt();
this.category_id = in.readInt();
this.flag = in.readBoolean();
}

+set+get方法+toString方法
}


map端合并

public class MergeMapper extends Mapper {

InfoBean info = new InfoBean();
Text keyOut = new Text();

@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
// 1. 读一行数据
String line = value.toString();
System.out.println("line:"+line);
// 2. 获取操作的文件(表)的类型
FileSplit split = (FileSplit) context.getInputSplit();
String path = split.getPath().getName();
System.out.println(path);

String keyPID = "";
// 3. 拆分和对象的封装
String[] fields = line.split(",");
if (path.startsWith("t_order")) {
// 读取到的数据是订单表中的数据
info.setAllFiled(
Integer.parseInt(fields[0]),
Integer.parseInt(fields[2]),
"", 0, 0,
fields[1],
Integer.parseInt(fields[3]), false);
keyPID = fields[2];
} else {
// 读取到的数据是商品表中的数据
info.setAllFiled(0, Integer.parseInt(fields[0]), fields[1], Float.parseFloat(fields[3]),
Integer.parseInt(fields[2]), "", 0, true);
keyPID = fields[0];
}

keyOut.set(keyPID);
// 4. 写出数据
System.out.println(keyOut.toString()+"-->info:"+info);
context.write(keyOut, info);
}

}

reduce端合并



public class MergeReduce extends Reducer {

@Override
protected void reduce(Text key, Iterable values,
Reducer.Context context) throws IOException, InterruptedException {
System.out.println("reduce........start");

InfoBean cacheInfo = new InfoBean();//商品信息,临时存储到此缓存中
ArrayList orderBeans = new ArrayList<>();//订单信息,临时存储到此缓存中

//1. 分类存储结果数据
for (InfoBean infoBean : values) {
System.out.println("opeartion info:"+infoBean);
if(infoBean.isFlag()){
//true,商品信息
try {
BeanUtils.copyProperties(cacheInfo, infoBean);
} catch (Exception e) {
e.printStackTrace();
}
}else{
InfoBean infoOrder = new InfoBean();
try {
BeanUtils.copyProperties(infoOrder, infoBean);
//true,订单信息
orderBeans.add(infoOrder );
} catch (Exception e) {
e.printStackTrace();
}
}

}

//2. 补全商品信息
for (InfoBean infoBean : orderBeans) {
infoBean.setName(cacheInfo.getName());
infoBean.setPrice(cacheInfo.getPrice());
infoBean.setCategory_id(cacheInfo.getCategory_id());
context.write(infoBean, NullWritable.get());
}
System.out.println("reduce........stop");
}

}


client 端

public class MergeClient {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 1. 获取配置信息,获取job对象
Job job = Job.getInstance(conf);

// 2. 设置作业的属性

// 指定当前程序jar包的位置
job.setJarByClass(MergeClient.class);

File f = new File("E:\mergeresults");
if (f.exists())
FileSystem.get(conf).delete(new Path("E:\mergeresults"));

// 3.设置源数据和目标路径
FileInputFormat.setInputPaths(job, "E:\orders");
FileOutputFormat.setOutputPath(job, new Path("E:\mergeresults"));

// 4. 设置当前job使用的任务
job.setMapperClass(MergeMapper.class);
job.setReducerClass(MergeReduce.class);

// 5. 设置数据输入输出的类型
// 设置Map输出的数据类型
job.setMapOutputKeyClass(Text.class);// pid商品id
job.setMapOutputValueClass(InfoBean.class);
// job.setNumReduceTasks(5);
// 最终输出到持久化层的数据类型
job.setOutputKeyClass(InfoBean.class);
job.setOutputValueClass(NullWritable.class);

// 6. 提交作业
job.waitForCompletion(true);
}
}



友情链接: 传奇百科网 招聘百科网 非凡百科网 游艇百科网 口红百科网 创业百科网 软木百科网