离线数据处理1

发布时间 2023-12-21 11:06:23作者: Reira

离线数据处理-数据抽取&数据清洗&指标计算.1 2023/12/20学习笔记

1.基础SQL-1

1.1 基础命令

1.1.1 SQL基本操作-数据库数据表基本操作

#创建数据库
create database DatebaseName;
#查询所有的数据库
show database DatabaseName;
#删除数据库
drop database DatabaseName;
#切换数据库
use databaseName;
#查看当前使用的数据库
select database();
#创建数据表
create table tablename(字段名 value,字段名 value......);
#查看数据表
show tables;
#查看表结构
desc tablename;
#删除数据表
drop table tablename;

1.1.2 SQL基本操作-修改数据表

#修改表名称
alter table tablename rename to tablename1;
#修改字段名
alter table tablename change name sname 字段名;
#修改字段数据类型
alter table tablename modify sname 字段名;
#增加字段
alter table tablename add 字段名 字段类型;
#删除字段
drop table tablename drop 字段名;

1.1.3 SQL基本操作-主键约束

主键约束即primary key用于唯一的标识表中的每一行。被标识为主键的数据在表中是唯一的且其值不能为空。这点类似于我们每个人都有一个身份证号,并且这个身份证号是唯一的。
基本语法:
字段名 数据类型 primary key;

#Part.1
create table tablename(字段名 数据类型 primary key...)
#Part.2
create table tablename(字段名 数据类型,字段名 数据类型...primary key)

1.1.4 SQL基本操作-非空约束

非空约束即 NOT NULL指的是字段的值不能为空
基本语法:
字段名 数据类型 NOT NULL;

create table tablename(字段名 数据类型 NOT NULL)

1.1.5 SQL基本操作-默认值约束

默认值约束即DEFAULT用于给数据表中的字段指定默认值,即当在表中插入一条新记录时若未给该字段赋值,那么,数据库系统会自动为这个字段插入默认值
基本语法:
字段名 数据类型 DEFAULT 默认值;

create table tablename(字段名 数据类型,字段名 数据类型 default '...')

1.1.6 SQL基本操作-唯一性约束

唯一性约束即UNIQUE用于保证数据表中字段的唯一性,即表中字段的值不能重复出现
基本语法:
字段名 数据类型 UNIQUE;

create table tablename(字段名 数据类型,字段名 数据类型 unique)

1.1.7 SQL基本操作-外键约束

基本语法:
CONSTRAINT 外键名 FOREIGN KEY (从表外键字段) REFERENCES 主表 (主键字段)
ALTER TABLE 从表名 ADD CONSTRAINT 外键名 FOREIGN KEY (从表外键字段) REFERENCES 主表 (主键字段);

create table tablename1(字段名1 数据类型,字段名2 数据类型)
create table tablename2(字段名3 数据类型,字段名4 数据类型)
alter table tablename2 add constraint fk_字段名3_字段名4 foreign key(字段名3) references tablename3(字段名1);

1.1.8 SQL基础操作-删除外键

alter table 从表名 drop foreign key 外键名

1.1.9 SQL基础操作-插入数据

#为表中字段插入数据
insert into tablename (字段1,字段2...) values (值1,值2...);
#同时插入多条记录仪
insert into tablename (字段1,字段2...) values (值1,值2...),(值1,值2...),...;

2.离线数据处理-数据抽取

2.1 数据抽取-GZ033

2.1.1 数据抽取-题目

编写Scala代码,使用Spark将MySQL的shtd_store库中表user_info、sku_info、base_province、base_region、order_info、order_detail的数据增量抽取到Hive的ods库中对应表user_info、sku_info、base_province、base_region、order_info、order_detail中

1.抽取shtd_store库中user_info的增量数据进入Hive的ods库中表user_info。根据ods.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.user_info命令

2.1.2 数据抽取-代码

package ods
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import java.util.Properties

object task6 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .appName("hive example")
      .master("local")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("hive.metastore.uris", "thrift://address1:9083")
      .config("spark.sql.warehouse.dir", "hdfs://address1:8020/user/hive/warehouse")
      .config("dfs.client.use.datanode.hostname", "true")
      .enableHiveSupport()
      .getOrCreate()

    val URL = "jdbc:mysql://address2:3306/ds_pub?useSSL=false"
    val properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "yourpassword")
    properties.setProperty("driver", "com.mysql.jdbc.Driver")

        spark.read.jdbc(url,"user_info",properties).createOrReplaceTempView("user_info")

    val max = spark.sql(
      """
        |select greatest(create_time,operate_time) as max_time
        |from user_info
        |""".stripMargin).collect()(0)(0).toString

    println(max)

    val df1 = spark.sql(
      """
        |select * , '20231220' as etl_date
        |from user_info
        |where greatest(create_time,operate_time) > "2020-04-26 18:57:55"
        |""".stripMargin)

    println("df1--------------"+df1.count())

    df1.show()

    spark.stop()
  }
}

3.离线数据处理-数据清洗

3.1 数据清洗-GZ033

3.1.1 数据清洗-题目

编写Scala代码,使用Spark将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。(若dwd库中部分表没有数据,正常抽取即可)

1.抽取ods库中user_info表中昨天的分区(子任务一生成的分区)数据,并结合dim_user_info最新分区现有的数据,根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_user_info命令

3.1.2 数据清洗-代码

package DataCleaning

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.lit
import java.text.SimpleDateFormat
import java.util.{Date, Properties}

object task1_1 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_HOME","root")
      Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .master("local")
      .config("hive.metastore.uris", "thrift://ADDRESS1:9083")
      .config("spark.sql.warehouse.dir", "hdfs://ADDRESS1:8020/user/hive/warehouse")
      .config("dfs.client.use.datanode.hostname", "true")
      .config("hive.metastore.uris", "thrift://ADDRESS1:9083")
      .config("spark.sql.warehouse.dir", "hdfs://ADDRESS1:8020/user/hive/warehouse")
      .config("dfs.client.use.datanode.hostname", "true")
      .enableHiveSupport()
      .getOrCreate()

//    val URL = "jdbc:mysql://ADDRESS2:3306/ds_pub?useSSL=false"
    val URL = "jdbc:mysql://localhost/dwd?useSSL=false"
    val properties = new Properties()
      properties.setProperty("user","root")
      properties.setProperty("password","yourpassword")
      properties.setProperty("driver","com.mysql.jdbc.Driver")

    spark.read.jdbc(URL,"user_info",properties).createOrReplaceTempView("user_info")
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val df1 = spark.sql(
      """
        |select *,
        |   case
        |   When operate_time is null then
        |       create_time
        |   else
        |       operate_time
        |   end as op
        |from user_info
        |""".stripMargin
    )
      .drop("operate_time")
      .withColumnRenamed("op","operate_time")
      .withColumn("etl_date",lit("20201111"))
      .withColumn("insert_user",lit("user1"))
      .withColumn("insert_time",lit(sdf.format(new Date(new Date().getTime-60*1000))))
      .withColumn("modify_user",lit("user1"))
      .withColumn("modify_time",lit(sdf.format(new Date(new Date().getTime-60*1000))))
    println("df1---------dim-customer_inf最新分区现有的数据的数量"+df1.count())
    df1.show()

    spark.read.jdbc(URL,"user_info_new",properties).createOrReplaceTempView("user_info_new")
    val df2 = spark.sql(
      """
        |select *,
        |   case
        |   when operate_time is null
        |       then
        |       create_time
        |   else
        |     operate_time
        |   end as op
        |from user_info_new
        |""".stripMargin)
//      .show()
      .drop("operate_time")
      .withColumnRenamed("op","operate_time")
      .withColumn("etl_date",lit("20201111"))
      .withColumn("insert_user",lit("user1"))
      .withColumn("insert_time",lit(sdf.format(new Date(new Date().getTime-60*1000))))
      .withColumn("modify_user",lit("user1"))
      .withColumn("modify_time",lit(sdf.format(new Date(new Date().getTime-60*1000))))
    println("df2---------dim-customer-inf最新分区现有的数据的数量"+df2.count())
    df2.show()

    df1.createOrReplaceTempView("dim_customer_info")

    df2.createOrReplaceTempView("customer_info")

    //合并df1和df2
    val df3 = spark.sql(
      """
        |select * from customer_info
        |union all
        |select * from dim_customer_info
        |""".stripMargin)
    println("df3--------union_user_inf0合并后的分区现有的数据的数量"+df3.count())
    df3.show()

    df3.createOrReplaceTempView("union_user_info")

    //从union的视图中以id来查询operate_time的最大值
    val df4 = spark.sql(
      """
        |select id,
        |   Max(operate_time) operate_time
        |from
        |   union_user_info
        |group by id
        |""".stripMargin)
//      .show()
    println("df4---------dict_user_info根据df3分组id得到的操作时间表"+df4.count())
    df4.show()

    df4.createOrReplaceTempView("dict_user_info")

    //根据operate_time取最新的一条
    //从union_user_info和dict_user_info中进行筛选
    //and union_user_info.operate_time=dict_user_info.operate_time指的是operate_time也必须匹配
    //df5将包含union_user_info中那些在dict_user_info中也匹配id和operate_time的记录的所有类
    val df5 = spark.sql(
      """
        |select union_user_info.*
        |from union_user_info,dict_user_info
        |where union_user_info.id==dict_user_info.id
        |and union_user_info.operate_time==dict_user_info.operate_time
        |""".stripMargin)
//      .show()
    println("df5----------"+df5.count())
    df5.show()

    df5.createOrReplaceTempView("distinct_user_info")

//    val df6 = spark.sql(
//      """
//        |select id,
//        | min(insert_time) dit
//        |from
//        | union_user_info
//        |group by id
//        |""".stripMargin)
////      .show()
//    println("df6---------"+df6.count())
//    df6.show()
//
//    df6.createOrReplaceTempView("dict2_user_info")
  }

}

3.1.3 数据抽取-个人总结

在做数据清洗的时候根据题意得知题意需要我们做的是抽取(子任务1)中做出的user_info表与我们新抽取出的表根据id字段去合并数据到一张新的表(再根据operate_time排序取出最新的那一条),题目中给出条件当operate_time为null值就用create_time填充,同时添加4个新的字段...

4.离线数据抽取-指标计算

4.1 指标计算-GZ033

4.1.1 指标计算-题目

编写Scala代码,使用Spark计算相关指标。
注:在指标计算中,不考虑订单信息表中order_status字段的值,将所有订单视为有效订单。计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表取最新的分区。

1.根据dwd层表统计每个省份、每个地区、每个月下单的数量和下单的总金额,存入MySQL数据库shtd_result的provinceeverymonth表中(表结构如下),然后在Linux的MySQL命令行中根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条,

字段 类型 中文含义
provinceid int 省份表主键
provincename text 省份名称
regionid int 地区表主键
regionname text 地区名称
totalconsumption double 订单总金额
totalorder int 订单总数
year int
month int

4.1.2 指标计算-代码

package DataCleaning
import org.apache.hadoop.hdfs.server.namenode.SafeMode
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.util.Properties

object task {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","root")
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession.builder()
      .master("local")
      .config("hive.metastore.uris", "thrift://youraddress:9083")
      .config("spark.sql.warehouse.dir", "hdfs://192.168.45.13:8020/user/hive/warehouse")
      .config("dfs.client.use.datanode.hostname", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .enableHiveSupport()
      .getOrCreate()

    val url = "jdbc:mysql://mysqladeddress:3306/ds_pub?useSSL=false"
    val url_save = "jdbc:mysql://youraddress:3306/shtd_result?useSSL=false"
    val url1_save = "jdbc:mysql://localhost:3306/dwd?useSSL=false"

    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","yourpassword")
    properties.setProperty("driver","com.mysql.jdbc.Driver")

    val properties1 = new Properties()
    properties1.setProperty("user", "root")
    properties1.setProperty("password", "123456")
    properties1.setProperty("driver", "com.mysql.jdbc.Driver")

    val properties2 = new Properties()
    properties2.setProperty("user", "root")
    properties2.setProperty("password", "yourpassword")
    properties2.setProperty("driver", "com.mysql.jdbc.Driver")

    spark.read.jdbc(url,"user_info",properties).createOrReplaceTempView("user_info")
    spark.read.jdbc(url,"base_province",properties).createOrReplaceTempView("base_province")
    spark.read.jdbc(url,"base_region",properties).createOrReplaceTempView("base_region")
    spark.read.jdbc(url,"order_info",properties).createOrReplaceTempView("order_info")
    spark.read.jdbc(url,"order_detail",properties).createOrReplaceTempView("order_detail")
    spark.read.jdbc(url,"sku_info",properties).createOrReplaceTempView("sku_info")

    val df1 = spark.sql(
      """
        |SELECT
        |	base_province.id AS provinceid,
        |	base_province.name AS provincename,
        |	base_region.id AS regionid,
        |	base_region.region_name AS regionname,
        |	sum( final_total_amount ) AS totalconsumption,
        |	count(*) AS totalorder,
        |	YEAR ( create_time ) year,
        |	MONTH ( create_time ) month
        |FROM
        |	order_info,base_province,base_region
        |where base_province.id = order_info.province_id and base_region.id = base_province.region_id
        |GROUP BY
        |	provinceid,
        |	provincename,
        |	regionid,
        |	regionname,
        |	YEAR,
        |	MONTH
        |""".stripMargin)

    df1.show()
    df1.write.jdbc(url_save,"provinceeverymonth",properties1)
    spark.stop()
  }
}

4.1.3 指标计算做题思路-个人总结

根据题意,题目要求统计每个省份每个地区以及每个月下单的数量以及下单的总金额,我们可以根据它题目中给出的字段名去每个表中查找中间哪个是互相匹配的,由此做出连接,base_province中的id字段与order_info里的id字段匹配的,base_region中的id与base_province中的region_id字段匹配的,然后再根据题目需求,group by每个省份,每个地区,每个月以及下单的总金额
在做sql语句的时候一定要经常调试!