目錄

  • 一、環境準備
  • (一)新建maven項目
  • (二)添加框架支持
  • (三)修改maven倉庫地址
  • (四)pom文件
  • (五)新建scala目錄
  • 二、編寫具體代碼
  • (一)全量抽取
  • (二)增量抽取
  • 題目一:兩個時間數據類型增量抽取字段
  • 題目2 :一個時間數據類型增量抽取字段
  • 題目3:一個數值型id作為增量抽取字段
  • 三、將應用程序打包後提交到yarn平台運行
  • (一)maven項目打jar包
  • (二)平台環境的啓動
  • (三)執行命令

一、環境準備

(一)新建maven項目

文件—》新建—》項目,選maven

(二)添加框架支持

選中項目,單機鼠標右鍵,選“添加框架支持”,將scala前的複選框勾上

2011-2022年高職大數據競賽-賽題任務剖析_字段

(三)修改maven倉庫地址

文件——設置——搜索maven,按照題目要求修改本地倉庫

2011-2022年高職大數據競賽-賽題任務剖析_字段_02

(四)pom文件

複製pom.xml的內容,點擊maven菜單中的刷新按鈕,重新加載項目

2011-2022年高職大數據競賽-賽題任務剖析_大數據_03

(五)新建scala目錄

在src下新建scala的目錄,並將scala目錄標記為源根。標記成功後文件夾為藍色。

2011-2022年高職大數據競賽-賽題任務剖析_大數據_04

二、編寫具體代碼

(一)全量抽取

全量抽取比較簡單,只需要讀取mysql數據庫中的數據,然後不經過任何篩選操作直接將數據全部寫入hive數據庫中。

(二)增量抽取

題目一:兩個時間數據類型增量抽取字段

抽取shtd_store庫中user_info的增量數據進入Hive的ods庫中表user_info。根據ods.user_info表中operate_time或create_time作為增量字段(即MySQL中每條數據取這兩個時間中較大的那個時間作為增量字段去和ods裏的這兩個字段中較大的時間進行比較),只將新增的數據抽入,字段名稱、類型不變,同時添加靜態分區,分區字段為etl_date,類型為String,且值為當前比賽日的前一天日期(分區字段格式為yyyyMMdd)。使用hive cli執行show partitions ods.user_info命令,將結果截圖粘貼至客户端桌面【Release\任務B提交結果.docx】中對應的任務序號下

```scala
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.Properties

object Chouqu {
  def main(args: Array[String]): Unit = {
  	//創建SparkSession對象
	val spark = SparkSession
      .builder()
      .appName("chouqu3")
      //設置hive元服務的地址
      .config("hive.metastore.uris", "thrift://master:9083")
      //設置hive數據存儲目錄
      .config("hive.metastore.warehouse.dir", "hdfs://master:8020/hive/warehouse")
      //設置hive分區模式
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .enableHiveSupport()
      .getOrCreate()
     
	//導入隱式包
	import spark.implicits._

	//設置mysql數據庫相關連接屬性
	val url = "jdbc:mysql://master:3306/shtd_store?useSSL=false"
    val prop = new Properties()
    prop.setProperty("user", "root")
    prop.setProperty("password", "1234")
	
	//讀取mysql中的數據
	val data = spark.read.jdbc(url, "user_info", prop)
	
	//讀取hive中的數據
	val hivedata = spark.read.table("ods.user_info")
	
	//找到抽取數據的增量值,即hive中create_time,operate_time最大值
	val maxtime =
	//找到
      spark.sql("select max(greatest(create_time,operate_time)) as maxtime from ods.user_info")
        .withColumn("maxtime",$"maxtime".cast(StringType))
        .head()
        .getAs[String]("maxtime")
     //獲取比賽前一天的時間,並格式化為yyyyMMdd格式
     val curDate = LocalDateTime.now()
     val preDate = curDate.minusDays(2).format(DateTimeFormatter.ofPattern("yyyyMMdd"))
     data
     /*
		藉助greatest函數找到"operate_time","create_time"的較大值
		藉助gt函數完成比較:gr——大於;lt——小於
		藉助where函數完成數據篩選
	*/
      .where(greatest("operate_time","create_time").gt(maxtime))
      //增加分區字段
      .withColumn("etl_date",lit(predate))
      .write
      //添加分區
      .partitionBy("etl_date")
      //增量抽取,需保留原有數據,因此用Append模式
      .mode(SaveMode.Append)
      .format("hive")
      //寫入hive中
      .saveAsTable("ods.user_info")

    spark.stop()
  }
}

題目2 :一個時間數據類型增量抽取字段

2、抽取shtd_store庫中sku_info的增量數據進入Hive的ods庫中表sku_info。根據ods.sku_info表中create_time作為增量字段,只將新增的數據抽入,字段名稱、類型不變,同時添加靜態分區,分區字段為etl_date,類型為String,且值為當前比賽日的前一天日期(分區字段格式為yyyyMMdd)。使用hive cli執行show partitions ods.sku_info命令,將結果截圖粘貼至客户端桌面【Release\任務B提交結果.docx】中對應的任務序號下;

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}

import java.util.Properties
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType

import java.time.LocalDate
import java.time.format.DateTimeFormatter

object Chouqu2 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)
    val spark = SparkSession
      .builder()
      .config("hive.metastore.uris","thrift://master:9083")
      .config("hive.metastore.warehouse.dir","hdfs://master:8020/user/hive/warehouse")
      .config("hive.exec.dynamic.partition.mode","nonstrict")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    val url = "jdbc:mysql://master:3306/shtd_store?useSSL=false"
    val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","1234")

    val mdf = spark.read.jdbc(url,"sku_info",prop)
    val hdf = spark.read.table("ods.sku_info")

    val maxtime = hdf
      .agg(max("create_time") as "maxtime")
      .withColumn("maxtime", $"maxtime".cast(StringType))
      .collect()(0)
      .getAs[String]("maxtime")

    val curdate = LocalDate.now()
    val predate = curdate.minusDays(1).format(DateTimeFormatter.ofPattern("yyyyMMdd"))
    mdf
      .where($"create_time".gt(maxtime))
      .withColumn("etl_date",lit(predate))
      .write
      .partitionBy("etl_date")
      .mode(SaveMode.Append)
      .format("hive")
      .saveAsTable("ods.sku_info")

    spark.stop()
  }
}

題目3:一個數值型id作為增量抽取字段

3、抽取shtd_store庫中base_province的增量數據進入Hive的ods庫中表base_province。根據ods.base_province表中id作為增量字段,只將新增的數據抽入,字段名稱、類型不變並添加字段create_time取當前時間,同時添加靜態分區,分區字段為etl_date,類型為String,且值為當前比賽日的前一天日期(分區字段格式為yyyyMMdd)。使用hive cli執行show partitions ods.base_province命令,將結果截圖粘貼至客户端桌面【Release\任務B提交結果.docx】中對應的任務序號下;

package com.fuxi

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.Properties
object Chouqu3 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      //.master("local")
     // .config("fs.defaultFS", "hdfs://master:8020")
      //jar運行去掉改行
      //.config("spark.sql.warehouse.dir", "hdfs://master:8020/hive/warehouse")
      .config("hive.metastore.uris","thrift://master:9083")
      .config("hive.metastore.warehouse.dir","hdfs://master:8020/hive/warehouse")
      .config("hive.exec.dynamic.partition.mode","nonstrict")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    val url = "jdbc:mysql://master:3306/shtd_store?useSSL=false"
    val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","1234")

    val mdf = spark.read.jdbc(url,"base_province",prop)
    val hdf = spark.read.table("ods.base_province")

    val maxId = hdf
      .agg(max("id") as "maxId")
      .withColumn("maxId",$"maxId".cast(LongType) )
      .collect()(0)
      .getAs[Long]("maxId")

    val predate = LocalDate.now()
      .minusDays(1)
      .format(DateTimeFormatter.ofPattern("yyyyMMdd"))

    mdf
      .withColumn("etl_date",lit(predate))
      .where($"id".gt(maxId))
      .write
      .partitionBy("etl_date")
      .mode(SaveMode.Append)
      .format("hive")
      .saveAsTable("ods.base_province")

    spark.stop()
  }
}

三、將應用程序打包後提交到yarn平台運行

(一)maven項目打jar包

先點擊maven菜單項中clean,結束後再點擊package

2011-2022年高職大數據競賽-賽題任務剖析_字段_05

(二)平台環境的啓動

啓動hadoop集羣:start-all.sh
啓動hive元服務:hive --service metastore&

2011-2022年高職大數據競賽-賽題任務剖析_spark_06

(三)執行命令

spark-submit --master yarn --deploy-mode client --class com.yunxing.Chouqu /opt/jars/test.jar