SparkSQL入门

发布时间 2023-06-02 08:32:54作者: strongmore

Spark SQL

Spark SQL和我们之前讲Hive的时候说的hive on spark是不一样的。
hive on spark是表示把底层的mapreduce引擎替换为spark引擎。
而Spark SQL是Spark自己实现的一套SQL处理引擎。

Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。

DataFrame=RDD+Schema 。

它其实和关系型数据库中的表非常类似,RDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD

Spark1.3出现的 DataFrame ,Spark1.6出现了 DataSet ,在Spark2.0中两者统一,DataFrame等于DataSet[Row]

SparkSession

要使用Spark SQL,首先需要创建一个SpakSession对象

SparkSession中包含了SparkContext和SqlContext

所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContext

这个SqlContext是使用sparkSQL操作hive的时候会用到的。

创建DataFrame

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql_2.11</artifactId>
   <version>2.4.3</version>
</dependency>

student.json内容

{"name":"jack","age":19,"sex":"male"}
{"name":"tom","age":18,"sex":"male"}
{"name":"jessic","age":27,"sex":"female"}
{"name":"hehe","age":18,"sex":"female"}
{"name":"haha","age":15,"sex":"male"}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * 使用json文件创建DataFrame
  */
object SqlDemoScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("SqlDemoScala")
      .config(conf)
      .getOrCreate()
    //读取json文件,获取DataFrame
    val stuDf = sparkSession.read.json("C:\\D-myfiles\\testjar\\spark\\student.json")
    //查看DataFrame中的数据
    stuDf.show()
    sparkSession.stop()
  }

}

输出为

+---+------+------+
|age|  name|   sex|
+---+------+------+
| 19|  jack|  male|
| 18|   tom|  male|
| 27|jessic|female|
| 18|  hehe|female|
| 15|  haha|  male|
+---+------+------+

由于DataFrame等于DataSet[Row],它们两个可以互相转换,所以创建哪个都是一样的
咱们前面的scala代码默认创建的是DataFrame,java代码默认创建的是DataSet
尝试对他们进行转换
在Scala代码中将DataFrame转换为DataSet[Row],对后面的操作没有影响

//将DataFrame转换为DataSet[Row]
val stuDf = sparkSession.read.json("D:\\student.json").as("stu")

DataFrame常见算子操作

  • printSchema()
  • show()
  • select()
  • filter()、where()
  • groupBy()
  • count()
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * DataFrame常见操作
  */
object DataFrameOpScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("DataFrameOpScala")
      .config(conf)
      .getOrCreate()
    val stuDf = sparkSession.read.json("C:\\D-myfiles\\testjar\\spark\\student.json")
    //打印schema信息
    stuDf.printSchema()
    //默认显示所有数据,可以通过参数控制显示多少条
    stuDf.show(2)
    //查询数据中的指定字段信息
    stuDf.select("name", "age").show()
    //在使用select的时候可以对数据做一些操作,需要添加隐式转换函数,否则语法报错
    import sparkSession.implicits._
    stuDf.select($"name", $"age" + 1).show()
    //对数据进行过滤,需要添加隐式转换函数,否则语法报错
    stuDf.filter($"age" > 18).show()
    //where底层调用的就是filter
    stuDf.where($"age" > 18).show()
    //对数据进行分组求和
    stuDf.groupBy("age").count().show()
    sparkSession.stop()
  }

}

这些就是针对DataFrame的一些常见的操作。
但是现在这种方式其实用起来还是不方便,只是提供了一些类似于可以操作表的算子,很对一些简单的查询还是可以的,但是针对一些复杂的操作,使用算子写起来就很麻烦了,所以我们希望能够直接支持用sql的方式执行,Spark SQL也是支持的

DataFrame的sql操作

想要实现直接支持sql语句查询DataFrame中的数据
需要两步操作

  1. 先将DataFrame注册为一个临时表
  2. 使用sparkSession中的sql函数执行sql语句
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * 使用sql操作DataFrame
  */
object DataFrameSqlScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("DataFrameSqlScala")
      .config(conf)
      .getOrCreate()
    val stuDf = sparkSession.read.json("C:\\D-myfiles\\testjar\\spark\\student.json")
    //将DataFrame注册为一个临时表
    stuDf.createOrReplaceTempView("student")
    //使用sql查询临时表中的数据
    sparkSession.sql("select age,count(*) as num from student group by age")
      .show()
    sparkSession.stop()
  }

}

RDD转换为DataFrame

为什么要将RDD转换为DataFrame?

在实际工作中我们可能会先把hdfs上的一些日志数据加载进来,然后进行一些处理,最终变成结构化的数据,希望对这些数据做一些统计分析,当然了我们可以使用spark中提供的transformation算子来实现,只不过会有一些麻烦,毕竟是需要写代码的,如果能够使用sql实现,其实是更加方便的。

所以可以针对我们前面创建的RDD,将它转换为DataFrame,这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。

Spark SQL支持这两种方式将RDD转换为DataFrame

  1. 反射方式
  2. 编程方式

反射方式

这种方式是使用反射来推断RDD中的元数据。
基于反射的方式,代码比较简洁,也就是说当你在写代码的时候,已经知道了RDD中的元数据,这样的
话使用反射这种方式是一种非常不错的选择。
Scala具有隐式转换的特性,所以spark sql的scala接口是支持自动将包含了case class的RDD转换为DataFrame的

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * 使用反射方式实现RDD转换为DataFrame
  */
object RddToDataFrameByReflectScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("RddToDataFrameByReflectScala")
      .config(conf)
      .getOrCreate()
    //获取SparkContext
    val sc = sparkSession.sparkContext
    val dataRDD = sc.parallelize(Array(("jack", 18), ("tom", 20), ("jessic", 30)))
    //基于反射直接将包含Student对象的dataRDD转换为DataFrame
    //需要导入隐式转换 使用了org.apache.spark.sql.SQLImplicits.rddToDatasetHolder函数
    import sparkSession.implicits._
    val stuDf = dataRDD.map(tup => Student(tup._1, tup._2)).toDF()
    //下面就可以通过DataFrame的方式操作dataRDD中的数据了
    stuDf.createOrReplaceTempView("student")
    //执行sql查询
    val resDf = sparkSession.sql("select name,age from student where age > 18")
    //将DataFrame转化为RDD
    val resRDD = resDf.rdd
    //从row中取数据,封装成student,打印到控制台
    resRDD.map(row => Student(row(0).toString, row(1).toString.toInt))
      .collect()
      .foreach(println(_))
    //使用row的getAs()方法,获取指定列名的值
    resRDD.map(row => Student(row.getAs[String]("name"), row.getAs[Int]("age")))
      .collect()
      .foreach(println(_))
    sparkSession.stop()
  }
}

//定义一个Student
case class Student(name: String, age: Int)

编程方式

这种方式是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,就是Schema,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。

也就是说当case calss中的字段无法预先定义的时候,就只能用编程方式动态指定元数据了

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * 使用编程方式实现RDD转换为DataFrame
  */
object RddToDataFrameByProgramScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("RddToDataFrameByProgramScala")
      .config(conf)
      .getOrCreate()
    //获取SparkContext
    val sc = sparkSession.sparkContext
    val dataRDD = sc.parallelize(Array(("jack", 18), ("tom", 20), ("jessic", 30)))
    //组装rowRDD
    val rowRDD = dataRDD.map(tup => Row(tup._1, tup._2))
    //指定元数据信息【这个元数据信息就可以动态从外部获取了,比较灵活】
    val schema = StructType(Array(
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    ))
    //组装DataFrame
    val stuDf = sparkSession.createDataFrame(rowRDD, schema)
    //下面就可以通过DataFrame的方式操作dataRDD中的数据了
    stuDf.createOrReplaceTempView("student")
    //执行sql查询
    val resDf = sparkSession.sql("select name,age from student where age > 18")
    //将DataFrame转化为RDD
    val resRDD = resDf.rdd
    resRDD.map(row => (row(0).toString, row(1).toString.toInt))
      .collect()
      .foreach(println(_))
    sparkSession.stop()
  }
}

load和save操作

对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。

  • load操作主要用于加载数据,创建出DataFrame;
  • save操作,主要用于将DataFrame中的数据保存到文件中。

前面操作json格式的数据的时候好像没有使用load方法,而是直接使用的json方法,这是什么特殊用
法吗?
查看json方法的源码会发现,它底层调用的是format和load方法

def json(paths: String*): DataFrame = format("json").load(paths : _*)

我们如果使用原始的format和load方法加载数据,此时如果不指定format,则默认读取的数据源格式是parquet,也可以手动指定数据源格式。Spark SQL内置了一些常见的数据源类型,比如json, parquet, jdbc, orc, csv, text

通过这个功能,就可以在不同类型的数据源之间进行转换了。

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * load和save的使用
  */
object LoadAndSaveOpScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("LoadAndSaveOpScala")
      .config(conf)
      .getOrCreate()

    sparkSession.sparkContext.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
    //读取数据
    val stuDf = sparkSession.read
      .format("json")
      .load("C:\\D-myfiles\\testjar\\spark\\student.json")
    //保存数据
    stuDf.select("name", "age")
      .write
      .format("csv")
      .save("hdfs://bigdata01:9000/sparksql-out-save001")
    sparkSession.stop()
  }
}

image

SaveMode

Spark SQL对于save操作,提供了不同的save mode。
主要用来处理,当目标位置已经有数据时应该如何处理。save操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。

SaveMode.ErrorIfExists      (默认) 如果目标位置已经存在数据,那么抛出一个异常
SaveMode.Append             如果目标位置已经存在数据,那么将数据追加进去
SaveMode.Overwrite          如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据覆盖
SaveMode.Ignore             如果目标位置已经存在数据,那么就忽略,不做任何操作

在LoadAndSaveOpScala中增加SaveMode的设置,重新执行,验证结果

将SaveMode设置为Append,如果目标已存在,则追加

stuDf.select("name", "age")
      .write
      .format("csv")
      .mode(SaveMode.Append)
      .save("hdfs://bigdata01:9000/sparksql-out-save001")

执行之后的结果确实是追加到之前的结果目录中了

image

内置函数

Spark中提供了很多内置的函数,

聚合函数           avg, count, countDistinct, first, last, max, mean, min, sum,
集合函数           array_contains, explode, size
日期/时间函数      datediff, date_add, date_sub, add_months, last_day, next_day,
数学函数           abs, ceil, floor, round
混合函数           if, isnull, md5, not, rand, when
字符串函数         concat, get_json_object, length, reverse, split, upper
窗口函数          denseRank, rank, rowNumber

其实这里面的函数和hive中的函数是类似的

注意:SparkSQL中的SQL函数文档不全,其实在使用这些函数的时候,大家完全可以去查看hive中sql的文档,使用的时候都是一样的。

实战:TopN主播统计

需求分析

在前面讲Spark core的时候我们讲过一个案例,TopN主播统计,计算每个大区当天金币收入TopN的主播,之前我们使用spark中的transformation算子去计算,实现起来还是比较麻烦的,代码量相对来说比较多,下面我们就使用咱们刚学习的Spark sql去实现一下,你会发现,使用sql之后确实简单多了。

回顾以下我们的两份原始数据,数据都是json格式的
video_info.log 主播的开播记录,其中包含主播的id:uid、直播间id:vid 、大区:area、视频开播时长:length、增加粉丝数量:follow等信息
gift_record.log 用户送礼记录,其中包含送礼人id:uid,直播间id:vid,礼物id:good_id,金币数量:gold 等信息

最终需要的结果是这样的

US 8407173251015:180,8407173251012:70,8407173251001:60

分析一下具体步骤

  1. 直接使用SparkSession中的load方式加载json的数据
  2. 对这两份数据注册临时表
  3. 执行sql计算TopN主播
  4. 使用foreach将结果打印到控制台

原始数据

主播开播记录数据如下:video_info.log

{"uid":"8407173251001","vid":"14943445328940001","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":101,"share_num":"21","type":"video_info"}
{"uid":"8407173251002","vid":"14943445328940002","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":201,"share_num":"331","type":"video_info"}
{"uid":"8407173251003","vid":"14943445328940003","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":221,"share_num":"321","type":"video_info"}
{"uid":"8407173251004","vid":"14943445328940004","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":401,"share_num":"311","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940005","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":31,"share_num":"131","type":"video_info"}
{"uid":"8407173251006","vid":"14943445328940006","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":22,"share_num":"3431","type":"video_info"}
{"uid":"8407173251007","vid":"14943445328940007","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":44,"share_num":"131","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940008","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":66,"share_num":"131","type":"video_info"}
{"uid":"8407173251009","vid":"14943445328940009","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":32,"share_num":"231","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940010","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":342,"share_num":"431","type":"video_info"}
{"uid":"8407173251011","vid":"14943445328940011","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":223,"share_num":"331","type":"video_info"}
{"uid":"8407173251012","vid":"14943445328940012","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":554,"share_num":"312","type":"video_info"}
{"uid":"8407173251013","vid":"14943445328940013","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":334,"share_num":"321","type":"video_info"}
{"uid":"8407173251014","vid":"14943445328940014","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":653,"share_num":"311","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940015","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251001","vid":"14943445328940016","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":432,"share_num":"531","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940017","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940018","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":564,"share_num":"131","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940019","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":324,"share_num":"231","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940020","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":532,"share_num":"331","type":"video_info"}

用户送礼记录数据如下:gift_record.log

{"uid":"7201232141001","vid":"14943445328940001","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141002","vid":"14943445328940001","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141003","vid":"14943445328940002","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141004","vid":"14943445328940002","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141005","vid":"14943445328940003","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141006","vid":"14943445328940003","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141007","vid":"14943445328940004","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141008","vid":"14943445328940004","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141009","vid":"14943445328940005","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141010","vid":"14943445328940005","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141011","vid":"14943445328940006","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141012","vid":"14943445328940006","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141013","vid":"14943445328940007","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141014","vid":"14943445328940007","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141015","vid":"14943445328940008","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141016","vid":"14943445328940008","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141017","vid":"14943445328940009","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141018","vid":"14943445328940009","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141019","vid":"14943445328940010","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141020","vid":"14943445328940010","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141021","vid":"14943445328940011","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141022","vid":"14943445328940011","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141023","vid":"14943445328940012","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141024","vid":"14943445328940012","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141025","vid":"14943445328940013","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141026","vid":"14943445328940013","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141027","vid":"14943445328940014","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141028","vid":"14943445328940014","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141029","vid":"14943445328940015","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141030","vid":"14943445328940015","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141031","vid":"14943445328940016","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141032","vid":"14943445328940016","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141033","vid":"14943445328940017","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141034","vid":"14943445328940017","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141035","vid":"14943445328940018","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141036","vid":"14943445328940018","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141037","vid":"14943445328940019","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141038","vid":"14943445328940019","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141039","vid":"14943445328940020","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141040","vid":"14943445328940020","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}

代码实现

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import scala.io.Source

/**
  * 计算TopN主播
  */
object TopNAnchorScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("TopNAnchorScala")
      .config(conf)
      .getOrCreate()
    //1:直接使用sparkSession中的load方式加载json数据
    val videoInfoDf = sparkSession.read.json("C:\\D-myfiles\\testjar\\spark\\video_info.log")
    val giftRecordDf = sparkSession.read.json("C:\\D-myfiles\\testjar\\spark\\gift_record.log")
    //2:对这两份数据注册临时表
    videoInfoDf.createOrReplaceTempView("video_info")
    giftRecordDf.createOrReplaceTempView("gift_record")
    //3:执行sql计算TopN主播
    val sql = Source.fromInputStream(getClass.getResourceAsStream("/sql/topn_anchor.sql")).mkString
    val resDf = sparkSession.sql(sql)
    //4:使用foreach将结果打印到控制台
    resDf.rdd.foreach(row => println(row.getAs[String]("area") + "\t" + row.getAs[String]("topn_list")))
    sparkSession.stop()
  }

}

具体的sql文件放在classpath下,sql逻辑为

  1. 对用户送礼记录(gift_record)进行聚合,对相同vid的数据求和,因为用户可能在一次直播中给主播送多次礼物
  2. 将聚合数据和主播开播记录(video_info)join到一块,vid作为join的key
  3. 基于uid再做一次聚合,对相同uid的礼物求和
  4. 基于area分组,并对组内数据(根据礼物总数)进行排序,
  5. 行转列,将同一个area的数据拼接成一列
SELECT t4.area,concat_ws(',', collect_list(t4.topn)) AS topn_list
FROM(
SELECT t3.area,concat(t3.uid, ':', cast(t3.gold_sum_all AS int)) AS topn
FROM
  (SELECT t2.uid,t2.area,t2.gold_sum_all,
          row_number() OVER (PARTITION BY area order by gold_sum_all desc) as num
      FROM
      (SELECT t1.uid,max(t1.area) AS area,sum(t1.gold_sum) AS gold_sum_all
       FROM
         (SELECT vi.uid,vi.vid,vi.area,gr.gold_sum
          FROM video_info AS vi
          JOIN
            (SELECT vid,sum(gold) AS gold_sum
             FROM gift_record
             GROUP BY vid)AS gr ON vi.vid = gr.vid) AS t1
       GROUP BY t1.uid) AS t2)AS t3
   WHERE t3.num <=3 ) AS t4
GROUP BY t4.area

输出结果为

CN	8407173251008:120,8407173251003:60,8407173251014:50
ID	8407173251005:160,8407173251010:140,8407173251002:70
US	8407173251015:180,8407173251012:70,8407173251001:60