mapreducer編程模型是一種八股文的代碼邏輯,就以用户行為分析求流存率的作為例子
1.map端來説:必須繼承hadoop規定好的mapper類:在讀取hbase數據時,已經有現成的接口
TableMapper,只需要規定輸出的key和value的類型
public class LoseUserMapper extends TableMapper<KeyStatsDimension, Text> {
//////////省去代碼
在執行map方法前會執行setup方法,在流失率的時候 比如説求的是七天的流失率:
1。先將七天前的那天的組合key+uuid存入一個map集合,這過程在setup方法中進行
2.再將今天的數據根據key+uuid2組成字符串
3。if(map.get(key+uuid2)!=null)
4.context.write(userDimensionKey, uuidText);發給reducer
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
long endDate = TimeUtil.parseString2Long(conf.get(GlobalConstants.END_DATE), TimeUtil.DATE_FORMAT);
long beginDate = TimeUtil.parseString2Long(conf.get(GlobalConstants.BEGIN_DATE), TimeUtil.DATE_FORMAT);
dateKey = DateDimensionKey.buildDate(endDate, DateEnum.DAY);
if ((endDate - beginDate) == 7 * GlobalConstants.DAY_OF_MILLISECONDS) {
channelKpiKey = new KpiDimensionKey(KpiEnum.CHANNEL_SEVEN_DAY_LOSE.name);
versionKpiKey = new KpiDimensionKey(KpiEnum.VERSION_SEVEN_DAY_LOSE.name);
areaKpiKey = new KpiDimensionKey(KpiEnum.AREA_SEVEN_DAY_LOSE.name);
}
if ((endDate - beginDate) == 14 * GlobalConstants.DAY_OF_MILLISECONDS) {
channelKpiKey = new KpiDimensionKey(KpiEnum.CHANNEL_FOURTEEN_DAY_LOSE.name);
versionKpiKey = new KpiDimensionKey(KpiEnum.VERSION_FOURTEEN_DAY_LOSE.name);
areaKpiKey = new KpiDimensionKey(KpiEnum.AREA_FOURTEEN_DAY_LOSE.name);
}
if ((endDate - beginDate) /30 == GlobalConstants.DAY_OF_MILLISECONDS) {
channelKpiKey = new KpiDimensionKey(KpiEnum.CHANNEL_THIRTY_DAY_LOSE.name);
versionKpiKey = new KpiDimensionKey(KpiEnum.VERSION_THIRTY_DAY_LOSE.name);
areaKpiKey = new KpiDimensionKey(KpiEnum.AREA_THIRTY_DAY_LOSE.name);
}
setActiveUserCache(conf);
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
String uuid = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID)));
String appId = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_APP)));
String platformId = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)));
String channel = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_CHANNEL)));
String version = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION)));
String country = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_COUNTRY)));
String province = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PROVINCE)));
String city = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_CITY)));
String isp = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_ISP)));
List<StatsUserDimensionKey> userDimensionKeys = getDimensionKeys(appId, platformId, channel, version, country, province, city, isp);
for (StatsUserDimensionKey userDimensionKey : userDimensionKeys) {
if (activeUserCache.get(userDimensionKey.toString() + GlobalConstants.KEY_SEPARATOR + uuid) != null) {
this.uuidText.set(uuid);
context.write(userDimensionKey, uuidText);
}
}
}
public void setActiveUserCache(Configuration conf){
String date = conf.get(GlobalConstants.BEGIN_DATE).replaceAll("-", "");
FilterList filterList = new FilterList();
filterList.addFilter(
new SingleColumnValueFilter(EventLogConstants.EVENT_LOGS_FAMILY_NAME_BYTES,
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME),
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(EventLogConstants.EventEnum.START.alias)));
String[] columns = new String[] {
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME,
EventLogConstants.LOG_COLUMN_NAME_APP,
EventLogConstants.LOG_COLUMN_NAME_PLATFORM,
EventLogConstants.LOG_COLUMN_NAME_CHANNEL,
EventLogConstants.LOG_COLUMN_NAME_VERSION,
EventLogConstants.LOG_COLUMN_NAME_PROVINCE,
EventLogConstants.LOG_COLUMN_NAME_ISP,
EventLogConstants.LOG_COLUMN_NAME_UUID,
};
filterList.addFilter(this.getColumnFilter(columns));
Connection conn = null;
Admin admin = null;
Scan scan = new Scan();
Table table = null;
try {
conn = ConnectionFactory.createConnection(conf);
admin = conn.getAdmin();
String tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS +"_"+ date;
table = conn.getTable(TableName.valueOf(tableName));
scan.setFilter(filterList);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
String appId = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_APP)));
String platformId = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)));
String channel = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_CHANNEL)));
String version = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION)));
String country = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_COUNTRY)));
String province = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PROVINCE)));
String city = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_CITY)));
String isp = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_ISP)));
String uuid = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID)));
List<StatsUserDimensionKey> userDimensionKeys = getDimensionKeys(appId, platformId, channel, version, country, province, city, isp);
for (StatsUserDimensionKey userDimensionKey : userDimensionKeys) {
activeUserCache.put(userDimensionKey.toString() + GlobalConstants.KEY_SEPARATOR + uuid, 1);
}
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("創建HBaseAdmin發生異常", e);
} finally {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private Filter getColumnFilter(String[] columns) {
int length = columns.length;
byte[][] filter = new byte[length][];
for (int i = 0; i < length; i++) {
filter[i] = Bytes.toBytes(columns[i]);
}
return new MultipleColumnPrefixFilter(filter);
}
private List<StatsUserDimensionKey> getDimensionKeys(String appid, String platformId, String channel, String version, String country, String province, String city, String isp) {
List<StatsUserDimensionKey> keys = new ArrayList<>();
AppDimensionKey appKey = new AppDimensionKey(appid, AppEnum.valueOfAlias(appid).name);
List<PlatformDimensionKey> platformKeys = PlatformDimensionKey.buildList(platformId, PlatformEnum.valueOfAlias(platformId).name);
List<ChannelDimensionKey> channelKeys = ChannelDimensionKey.buildList(channel);
List<VersionDimensionKey> versionKeys = VersionDimensionKey.buildList(version);
List<AreaDimensionKey> areaKeys = AreaDimensionKey.buildList(country, province, city);
List<IspDimensionKey> ispKeys = IspDimensionKey.buildList(isp);
for (PlatformDimensionKey platformKey : platformKeys) {
//應用+終端+渠道 維度
for (ChannelDimensionKey channelKey : channelKeys) {
StatsUserDimensionKey userDimensionKey = new StatsUserDimensionKey();
StatsCommonDimensionKey commonKey = userDimensionKey.getCommonDimensionKey();
commonKey.setDateDimensionKey(dateKey);
commonKey.setAppDimensionKey(appKey);
commonKey.setPlatformDimensionKey(platformKey);
commonKey.setKpiDimensionKey(channelKpiKey);
userDimensionKey.setCommonDimensionKey(commonKey);
userDimensionKey.setChannelDimensionKey(channelKey);
keys.add(userDimensionKey);
}
//應用+終端+版本 維度
for (VersionDimensionKey versionKey : versionKeys) {
StatsUserDimensionKey userDimensionKey = new StatsUserDimensionKey();
StatsCommonDimensionKey commonKey = userDimensionKey.getCommonDimensionKey();
commonKey.setDateDimensionKey(dateKey);
commonKey.setAppDimensionKey(appKey);
commonKey.setPlatformDimensionKey(platformKey);
commonKey.setKpiDimensionKey(versionKpiKey);
userDimensionKey.setVersionDimensionKey(versionKey);
keys.add(userDimensionKey);
}
//應用+終端+地域+運營商 維度
for (AreaDimensionKey areaKey : areaKeys) {
for (IspDimensionKey ispKey : ispKeys) {
StatsUserDimensionKey userDimensionKey = new StatsUserDimensionKey();
StatsCommonDimensionKey commonKey = userDimensionKey.getCommonDimensionKey();
commonKey.setDateDimensionKey(dateKey);
commonKey.setAppDimensionKey(appKey);
commonKey.setPlatformDimensionKey(platformKey);
commonKey.setKpiDimensionKey(areaKpiKey);
userDimensionKey.setAreaDimensionKey(areaKey);
userDimensionKey.setIspDimensionKey(ispKey);
keys.add(userDimensionKey);
}
}
}
return keys;
}
}
reducer:對map端的key進行拉取,相同的key存入一個集合中 ,不同的組合key可能有相同的uuid,遍歷value將uuid存入set集合求取他的長度就是
今天在七天前的留存人數
public class LoseUserReducer extends Reducer<StatsUserDimensionKey, Text, StatsUserDimensionKey, MapWritableValue> {
private MapWritableValue outputValue = new MapWritableValue();
private Set<String> unique = new HashSet<String>();
@Override
protected void reduce(StatsUserDimensionKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
this.unique.clear();
for (Text value : values) {
this.unique.add(value.toString());
}
// 設置值
MapWritable map = new MapWritable();
map.put(new IntWritable(-1), new IntWritable(this.unique.size()));
this.outputValue.setValue(map);
// 設置kpi
this.outputValue.setKpi(KpiEnum.valueOfName(key.getCommonDimensionKey().getKpiDimensionKey().getKpiName()));
// 數據輸出
context.write(key, outputValue);
}
}
輸出到mysql:
/**
* 輸出數據, 當在reduce中調用context.write方法的時候,底層調用的是該方法
* 將Reduce輸出的Key/Value寫成特定的格式
* 自定義輸出到MySQL的outputformat類
*/
public class TransformerOutputFormat extends OutputFormat<KeyBaseDimension, BaseStatsValueWritable> {
/**
* 返回一個具體定義如何輸出數據的對象, recordwriter被稱為數據的輸出器
* getRecordWriter用於返回一個RecordWriter的實例,Reduce任務在執行的時候就是利用這個實例來輸出Key/Value的。
* (如果Job不需要Reduce,那麼Map任務會直接使用這個實例來進行輸出。)
*/
@Override
public RecordWriter<KeyBaseDimension, BaseStatsValueWritable> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
/**
* 使用RPC方式創建converter,很重要,通過配置獲取維度id
*/
IDimensionConverter converter = DimensionConverterClient.createDimensionConverter(conf);
Connection conn = null;
try {
conn = JdbcManager.getConnection(conf, GlobalConstants.WAREHOUSE_OF_REPORT);
// 關閉自動提交機制
conn.setAutoCommit(false);
} catch (Exception e) {
throw new RuntimeException("獲取數據庫連接失敗", e);
}
return new TransformerRecordWriter(conn, conf, converter);
}
/**
* 執行reduce時,會驗證輸出目錄是否存在,
* checkOutputSpecs是 在JobClient提交Job之前被調用的(在使用InputFomat進行輸入數據劃分之前),用於檢測Job的輸出路徑。
* 比如,FileOutputFormat通過這個方法來確認在Job開始之前,Job的Output路徑並不存在,然後該方法又會重新創建這個Output 路徑。
* 這樣一來,就能確保Job結束後,Output路徑下的東西就是且僅是該Job輸出的。
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
// 這個方法在自己實現的時候不需要關注,如果你非要關注,最多檢查一下表數據存在
}
/**
* getOutputCommitter則 用於返回一個OutputCommitter的實例
* @param context
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
}
/**
* 自定義的數據輸出器
*/
public class TransformerRecordWriter extends RecordWriter<KeyBaseDimension, BaseStatsValueWritable> {
private Connection conn = null;
private Configuration conf = null;
private IDimensionConverter converter = null;
private Map<KpiEnum, PreparedStatement> kpiTypeSQLMap = new HashMap<>();
private Map<KpiEnum, Integer> batchMap = new HashMap<>();
public TransformerRecordWriter(Connection conn, Configuration conf, IDimensionConverter converter) {
super();
this.conn = conn;
this.conf = conf;
this.converter = converter;
}
/**
* 輸出數據, 當在reduce中調用context.write方法的時候,底層調用的是該方法
* 將Reduce輸出的Key/Value寫成特定的格式
* @param key
* @param value
* @throws IOException
* @throws InterruptedException
*/
@Override
public void write(KeyBaseDimension key, BaseStatsValueWritable value)
throws IOException, InterruptedException {
KpiEnum kpiEnum = value.getKpi();
String sql = this.conf.get(kpiEnum.name);
PreparedStatement pstmt;
int count = 1;
try {
if (kpiTypeSQLMap.get(kpiEnum) == null) {
// 第一次創建
pstmt = this.conn.prepareStatement(sql);
kpiTypeSQLMap.put(kpiEnum, pstmt);
} else {
// 標示已經存在
pstmt = kpiTypeSQLMap.get(kpiEnum);
if (!batchMap.containsKey(kpiEnum)) {
batchMap.put(kpiEnum, count);
}
count = batchMap.get(kpiEnum);
count++;
}
batchMap.put(kpiEnum, count);
// 針對特定的MR任務有特定的輸出器:IOutputCollector
String collectorClassName = conf.get(GlobalConstants.OUTPUT_COLLECTOR_KEY_PREFIX + kpiEnum.name);
Class<?> clazz = Class.forName(collectorClassName);
// 創建對象, 要求實現子類一定要有一個無參數的構造方法
IOutputCollector collector = (IOutputCollector) clazz.newInstance();
collector.collect(conf, key, value, pstmt, converter);
// 批量提交
if (count % conf.getInt(GlobalConstants.JDBC_BATCH_NUMBER, GlobalConstants.DEFAULT_JDBC_BATCH_NUMBER) == 0) {
pstmt.executeBatch(); // 批量提交
conn.commit();
batchMap.remove(kpiEnum); // 移除已經存在的輸出數據
}
} catch (Exception e) {
throw new IOException("數據輸出產生異常", e);
}
}
/**
* 關閉資源使用,最終一定會調用
* 負責對輸出做最後的確認並關閉輸出
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
try {
try {
for (Map.Entry<KpiEnum, PreparedStatement> entry : this.kpiTypeSQLMap.entrySet()) {
entry.getValue().executeBatch();
}
} catch (Exception e) {
throw new IOException("輸出數據出現異常", e);
} finally {
try {
if (conn != null) {
conn.commit();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (conn != null) {
for (Map.Entry<KpiEnum, PreparedStatement> entry : this.kpiTypeSQLMap.entrySet()) {
try {
entry.getValue().close();
} catch (SQLException e) {
e.printStackTrace();
}
}
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
} finally {
// 關閉遠程連接
DimensionConverterClient.stopDimensionConverterProxy(converter);
}
}
}
}
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。