在做這個Join查詢的時候,必然涉及數據,我這裏設計了2張表,分別較data.txt和info.txt,字段之間以/t劃分。

data.txt內容如下:

201001    1003    abc 
201002    1005    def 
201003    1006    ghi 
201004    1003    jkl 
201005    1004    mno 
201006    1005    pqr

 

info.txt內容如下:

 

1003    kaka

1004    da

1005    jue

1006    zhao

 

期望輸出結果:

1003    201001    abc    kaka

1003    201004    jkl    kaka

1004    201005    mno    da

1005    201002    def    jue

1005    201006    pqr    jue

1006    201003    ghi    zhao

 

四、Map代碼

首先是map的代碼,我貼上,然後簡要説説

 

public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> { 
        @Override 
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
 
            // 獲取輸入文件的全路徑和名稱 
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); 
 
            if (pathName.contains("data.txt")) { 
                String values[] = value.toString().split("/t"); 
                if (values.length < 3) { 
                    // data數據格式不規範,字段小於3,拋棄數據 
                    return; 
                } else { 
                    // 數據格式規範,區分標識為1 
                    TextPair tp = new TextPair(new Text(values[1]), new Text("1")); 
                    context.write(tp, new Text(values[0] + "/t" + values[2])); 
                } 
            } 
            if (pathName.contains("info.txt")) { 
                String values[] = value.toString().split("/t"); 
                if (values.length < 2) { 
                    // data數據格式不規範,字段小於2,拋棄數據 
                    return; 
                } else { 
                    // 數據格式規範,區分標識為0 
                    TextPair tp = new TextPair(new Text(values[0]), new Text("0")); 
                    context.write(tp, new Text(values[1])); 
                } 
            } 
        } 
    }

 

這裏需要注意以下部分:

A、pathName是文件在HDFS中的全路徑(例如:hdfs://M1:9000/MengYan/join/data/info.txt),可以以endsWith()的方法來判斷。

B、資料表,也就是這裏的info.txt需要放在前面,也就是標識號是0.否則無法輸出理想結果。

C、Map執行完成之後,輸出的中間結果如下:

1003,0    kaka 
1004,0    da 
1005,0    jue 
1006,0    zhao 
1003,1    201001    abc 
1003,1    201004    jkl 
1004,1    201005    mon 
1005,1    201002    def 
1005,1    201006    pqr 
1006,1    201003    ghi

 

五、分區和分組

1、map之後的輸出會進行一些分區的操作,代碼貼出來:

public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> { 
        @Override 
        public int getPartition(TextPair key, Text value, int numParititon) { 
            return Math.abs(key.getFirst().hashCode() * 127) % numParititon; 
        } 
    }

分區我在以前的文檔中寫過,這裏不做描述了,就説是按照map輸出的符合key的第一個字段做分區關鍵字。分區之後,相同key會劃分到一個reduce中去處理(如果reduce設置是1,那麼就是分區有多個,但是還是在一個reduce中處理。但是結果會按照分區的原則排序)。分區後結果大致如下:

 

同一區:

1003,0    kaka

1003,1    201001    abc

1003,1    201004    jkl

 

 

同一區:

1004,0    da

1004,1    201005    mon

 

 

同一區:

1005,0    jue

1005,1    201002    def

1005,1    201006    pqr

 

 

同一區:

1006,0    zhao

1006,1    201003    ghi

 

2、分組操作,代碼如下

 

public static class Example_Join_01_Comparator extends WritableComparator { 
 
        public Example_Join_01_Comparator() { 
            super(TextPair.class, true); 
        } 
 
        @SuppressWarnings("unchecked") 
        public int compare(WritableComparable a, WritableComparable b) { 
            TextPair t1 = (TextPair) a; 
            TextPair t2 = (TextPair) b; 
            return t1.getFirst().compareTo(t2.getFirst()); 
        } 
    }

分組操作就是把在相同分區的數據按照指定的規則進行分組的操作,就以上來看,是按照複合key的第一個字段做分組原則,達到忽略複合key的第二個字段值的目的,從而讓數據能夠迭代在一個reduce中。輸出後結果如下:

 

同一組:

1003,0    kaka

1003,0    201001    abc

1003,0    201004    jkl

 

同一組:

1004,0    da

1004,0    201005    mon

 

同一組:

1005,0    jue

1005,0    201002    def

1005,0    201006    pqr

 

同一組:

1006,0    zhao

1006,0    201003    ghi

 

六、reduce操作

貼上代碼如下:

public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> { 
        protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, 
                InterruptedException { 
            Text pid = key.getFirst(); 
            String desc = values.iterator().next().toString(); 
            while (values.iterator().hasNext()) { 
                context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc)); 
            } 
        } 
    }

1、代碼比較簡單,首先獲取關鍵的ID值,就是key的第一個字段。

2、獲取公用的字段,通過排組織後可以看到,一些共有字段是在第一位,取出來即可。

3、遍歷餘下的結果,輸出。

七、其他的支撐代碼

1、首先是TextPair代碼,沒有什麼可以細説的,貼出來:

public class TextPair implements WritableComparable<TextPair> { 
    private Text first; 
    private Text second; 
 
    public TextPair() { 
        set(new Text(), new Text()); 
    } 
 
    public TextPair(String first, String second) { 
        set(new Text(first), new Text(second)); 
    } 
 
    public TextPair(Text first, Text second) { 
        set(first, second); 
    } 
 
    public void set(Text first, Text second) { 
        this.first = first; 
        this.second = second; 
    } 
 
    public Text getFirst() { 
        return first; 
    } 
 
    public Text getSecond() { 
        return second; 
    } 
 
    public void write(DataOutput out) throws IOException { 
        first.write(out); 
        second.write(out); 
    } 
 
    public void readFields(DataInput in) throws IOException { 
        first.readFields(in); 
        second.readFields(in); 
    } 
 
    public int compareTo(TextPair tp) { 
        int cmp = first.compareTo(tp.first); 
        if (cmp != 0) { 
            return cmp; 
        } 
        return second.compareTo(tp.second); 
    } 
}

2、Job的入口函數

public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException { 
        Configuration conf = new Configuration(); 
        GenericOptionsParser parser = new GenericOptionsParser(conf, agrs); 
        String[] otherArgs = parser.getRemainingArgs(); 
        if (agrs.length < 3) { 
            System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>"); 
            System.exit(2); 
        } 
 
        //conf.set("hadoop.job.ugi", "root,hadoop"); 
 
        Job job = new Job(conf, "Example_Join_01"); 
        // 設置運行的job 
        job.setJarByClass(Example_Join_01.class); 
        // 設置Map相關內容 
        job.setMapperClass(Example_Join_01_Mapper.class); 
        // 設置Map的輸出 
        job.setMapOutputKeyClass(TextPair.class); 
        job.setMapOutputValueClass(Text.class); 
        // 設置partition 
        job.setPartitionerClass(Example_Join_01_Partitioner.class); 
        // 在分區之後按照指定的條件分組 
        job.setGroupingComparatorClass(Example_Join_01_Comparator.class); 
        // 設置reduce 
        job.setReducerClass(Example_Join_01_Reduce.class); 
        // 設置reduce的輸出 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(Text.class); 
        // 設置輸入和輸出的目錄 
        FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
        FileInputFormat.addInputPath(job, new Path(otherArgs[1])); 
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 
        // 執行,直到結束就退出 
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
 
    }

 

八、總結

1、這是個簡單的join查詢,可以看到,我在處理輸入源的時候是在map端做來源判斷。其實在0.19可以用MultipleInputs.addInputPath()的方法,但是它用了JobConf做參數。這個方法原理是多個數據源就採用多個map來處理。方法各有優劣。

2、對於資源表,如果我們採用0和1這樣的模式來區分,資源表是需要放在前的。例如本例中info.txt就是資源表,所以標識位就是0.如果寫為1的話,可以試下,在分組之後,資源表對應的值放在了迭代器最後一位,無法追加在最後所有的結果集合中。

3、關於分區,並不是所有的map都結束才開始的,一部分數據完成就會開始執行。同樣,分組操作在一個分區內執行,如果分區完成,分組將會開始執行,也不是等所有分區完成才開始做分組的操作。