概述
適用於關聯表中有小表的情形.
使用分佈式緩存,可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表數據進行join並輸出最終結果,可以大大提高join操作的併發度,加快處理速度
#### 實現步驟
先在mapper類中預先定義好小表,進行join
引入實際場景中的解決方案:一次加載數據庫或者用
#####Step 1:定義Mapper
~~~java
public class MapJoinMapper extends Mapper<LongWritable,Text,Text,Text>{
private HashMap<String, String> map = new HashMap<>();//第一件事情:將分佈式緩存的小表數據讀取到本地Map集合(只需要做一次)
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//1:獲取分佈式緩存文件列表
URI[] cacheFiles = context.getCacheFiles();//2:獲取指定的分佈式緩存文件的文件系統(FileSystem)
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());//3:獲取文件的輸入流
FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));//4:讀取文件內容, 並將數據存入Map集合
//4.1 將字節輸入流轉為字符緩衝流FSDataInputStream --->BufferedReader
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
//4.2 讀取小表文件內容,以行位單位,並將讀取的數據存入map集合String line = null;
while((line = bufferedReader.readLine()) != null){
String[] split = line.split(",");map.put(split[0], line);
}
//5:關閉流
bufferedReader.close();
fileSystem.close();}
//第二件事情:對大表的處理業務邏輯,而且要實現大表和小表的join操作
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1:從行文本數據中獲取商品的id: p0001 , p0002 得到了K2
String[] split = value.toString().split(",");
String productId = split[2]; //K2//2:在Map集合中,將商品的id作為鍵,獲取值(商品的行文本數據) ,將value和值拼接,得到V2
String productLine = map.get(productId);
String valueLine = productLine+"\t"+value.toString(); //V2
//3:將K2和V2寫入上下文中
context.write(new Text(productId), new Text(valueLine));
}
}~~~
#####Step 2:定義主類
~~~java
public class JobMain extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
//1:獲取job對象
Job job = Job.getInstance(super.getConf(), "map_join_job");//2:設置job對象(將小表放在分佈式緩存中)
//將小表放在分佈式緩存中
// DistributedCache.addCacheFile(new URI("hdfs://node01:8020/cache_file/product.txt"), super.getConf());
job.addCacheFile(new URI("hdfs://node01:8020/cache_file/product.txt"));//第一步:設置輸入類和輸入的路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\map_join_input"));
//第二步:設置Mapper類和數據類型
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);//第八步:設置輸出類和輸出路徑
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\map_join_out"));//3:等待任務結束
boolean bl = job.waitForCompletion(true);
return bl ? 0 :1;
}public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//啓動job任務
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。