博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【MapReduce】基础案例 ---- Map Join 实现数据合并(缓存表)
阅读量:317 次
发布时间:2019-03-04

本文共 4287 字,大约阅读时间需要 14 分钟。


文章目录


Map Join

① Map Join工作原理

  • 1.使用场景

    Map Join适用于一张表十分小、一张表很大的场景

  • 2.优点

    • 思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
      • 在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜
  • 3.具体办法:采用DistributedCache

    (1)在Mapper的setup阶段,将文件读取到缓存集合中。

    (2)在驱动函数中加载缓存。

// 缓存普通文件到Task运行节点。job.addCacheFile(new URI("file://e:/cache/pd.txt"));


② Map Join 案例

☠ 需求分析

  • Map Join适用于关联表中有小表的情形
    在这里插入图片描述


☠ 代码实现

Mapper阶段

package 第三章_MR框架原理.多种join应用;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;public class DistributedCacheMapper extends Mapper
{
Text k = new Text(); // 创建集合存储拆分的数据信息 HashMap pdMap = new HashMap
(); /** * 重写setUp方法 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException {
// 1.缓存小表 // 1.1获取要缓存的文件路径 // 1.1.1 首先获取缓存文件 URI[] cacheFiles = context.getCacheFiles(); // 1.1.2 通过缓存文件获取其路径 String path = cacheFiles[0].getPath().toString(); // 1.2 读取文件信息 BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path),"UTF-8")); String line; while (StringUtils.isNotEmpty(line = reader.readLine())){
//pid pname //01 小米 // 1.3 拆分 String[] fields = line.split("\t"); // 1.4 添加到集合中,将pid作为key,通过pid获取pname,后续写到order表中 pdMap.put(fields[0],fields[1]); } // 1.5 关闭资源 IOUtils.closeStream(reader); } /** * 在map阶段进行join操作 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// id pid amount // 1001 01 1 // pid pname // 01 小米 // 1.获取一行数据 String line = value.toString(); // 2.拆分 String[] fields = line.split("\t"); // 3.获取pid,到对应的pdMap中找出pname写入 String pid = fields[1]; String pname = (String) pdMap.get(pid); // 4.拼接 line = line + "\t" + pname; k.set(line); // 5.写出 context.write(k,NullWritable.get()); }}


Driver阶段

package 第三章_MR框架原理.多种join应用;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class DistributedCacheDriver {
public static void main(String[] args) {
Configuration configuration = new Configuration(); Job job = null; try {
// 1 获取job信息 job = Job.getInstance(configuration); // 2 设置加载jar包路径 job.setJarByClass(DistributedCacheDriver.class); // 3 关联map job.setMapperClass(DistributedCacheMapper.class); // 4 设置最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\多种join应用\\data\\order.txt")); FileOutputFormat.setOutputPath(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\多种join应用\\cacheoutput")); // 6 加载缓存数据表 job.addCacheFile(new URI("file:///G:/Projects/IdeaProject-C/MapReduce/src/main/java/第三章_MR框架原理/多种join应用/data/pd.txt")); // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0 job.setNumReduceTasks(0); // 8 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } catch (Exception e) {
e.printStackTrace(); } }}

在这里插入图片描述


☠ 总结

  • 可以明显看出,在Map阶段进行join操作,不需要使用Reduce,合并的操作完全在Mapper阶段完成,充分利用了map阶段的资源,并且有效避免了Reduce阶段的数据倾斜情况。


转载地址:http://hueq.baihongyu.com/

你可能感兴趣的文章