Spark之RDD相关

发布时间 2023-06-02 07:27:16作者: strongmore

创建RDD

RDD是Spark编程的核心,在进行Spark编程时,首要任务是创建一个初始的RDD,这样就相当于设置了Spark应用程序的输入源数据

然后在创建了初始的RDD之后,才可以通过Spark 提供的一些高阶函数,对这个RDD进行操作,来获取其它的RDD

Spark提供三种创建RDD方式:集合、本地文件、HDFS文件

  • 使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。
  • 使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件
  • 使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进行离线批处理操作。

使用集合创建RDD

调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了

调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。

Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)

import org.apache.spark.{SparkConf, SparkContext}

object CreateRddByArrayScala {
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByArrayScala ") //设置任务名称
      .setMaster("local") //local表示在本地执行
    val sc = new SparkContext(conf)
    //创建集合
    val arr = Array(1, 2, 3, 4, 5)
    //基于集合创建RDD
    val rdd = sc.parallelize(arr)
    val sum = rdd.reduce((a, b) => a + b)
    println(sum)
    //停止SparkContext
    sc.stop()
  }

}

注意:
val arr = Array(1,2,3,4,5)还有println(sum)代码是在driver进程中执行的,这些代码不会并行执行,parallelize还有reduce之类的操作是在worker节点中执行的

使用本地文件和HDFS文件创建RDD

通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD,RDD中的每个元素就是文件中的一行文本内容,
textFile()方法支持针对目录、压缩文件以及通配符创建RDD

Spark默认会为HDFS文件的每一个Block创建一个partition,也可以通过textFile()的第二个参数手动设置分区数量,只能比Block数量多,不能比Block数量少,比Block数量少的话你的设置是不生效的

import org.apache.spark.{SparkConf, SparkContext}

/**
  * 使用本地文件和HDFS文件创建RDD
  */
object CreateRddByFileScala {
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByFileScala") //设置任务名称
      .setMaster("local") //local表示在本地执行
    val sc = new SparkContext(conf)
    var path = "C:\\D-myfiles\\testjar\\spark\\hello.txt"
    path = "hdfs://bigdata01:9000/hello.txt"
    //读取文件数据,可以在textFile中指定生成的RDD的分区数量
    val rdd = sc.textFile(path)
    //获取每一行数据的长度,计算文件内数据的总长度
    val length = rdd.map(x => x.length).reduce((a, b) => a + b)
    println(length)
    sc.stop()
  }
}

使用hdfs文件创建时,出现了以下问题

23/04/29 19:35:24 WARN DFSClient: Failed to connect to /192.168.80.2:9866 for block, add to deadNodes and continue. java.net.ConnectException: Connection timed out: no further information
java.net.ConnectException: Connection timed out: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
	at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3090)
	at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:778)
	at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:693)
	at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:354)
	at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:617)
	at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:841)
	at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapred.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:208)
	at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)
	at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:293)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:224)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1019)
	at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:2157)
	at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:2157)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
23/04/29 19:35:24 INFO DFSClient: Could not obtain BP-1163939135-192.168.80.2-1681619890557:blk_1073741828_1004 from any node: java.io.IOException: No live nodes contain block BP-1163939135-192.168.80.2-1681619890557:blk_1073741828_1004 after checking nodes = [192.168.80.2:9866], ignoredNodes = null No live nodes contain current block Block locations: 192.168.80.2:9866 Dead nodes:  192.168.80.2:9866. Will get new block locations from namenode and retry...
23/04/29 19:35:24 WARN DFSClient: DFS chooseDataNode: got # 1 IOException, will wait for 2844.3655872008176 msec.

解决方法

val sc = new SparkContext(conf)
sc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")

通过主机名访问hadoop,修改hadoop配置

Transformation和Action

Spark对RDD的操作可以整体分为两类:
Transformation和Action

这里的Transformation可以翻译为转换,表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等

Action可以翻译为执行,表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序

不管是Transformation里面的操作还是Action里面的操作,我们一般会把它们称之为算子,例如:map算子、reduce算子

其中Transformation算子有一个特性:lazy

lazy特性在这里指的是,如果一个spark任务中只定义了transformation算子,那么即使你执行这个任务,任务中的算子也不会执行。也就是说,transformation是不会触发spark任务的执行,它们只是记录了对RDD所做的操作,不会执行。

只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。
Action的特性:执行Action操作才会触发一个Spark 任务的运行,从而触发这个Action之前所有的Transformation的执行。

使用起来类似于Java中StreamAPI,原理也类似。

常用Transformation介绍

  • map 将RDD中的每个元素进行处理,一进一出
  • filter 对RDD中每个元素进行判断,返回true则保留
  • flatMap 与map类似,但是每个元素都可以返回一个或多个新元素
  • groupByKey 根据key进行分组,每个key对应一个Iterable
  • reduceByKey 对每个相同key对应的value进行reduce操作
  • sortByKey 对每个相同key对应的value进行排序操作(全局排序)
  • join 对两个包含<key,value>对的RDD进行join操作
  • distinct 对RDD中的元素进行全局去重
import org.apache.spark.{SparkConf, SparkContext}

object TransformationOpScala {

  def main(args: Array[String]): Unit = {
    val sc = getSparkContext()
    joinOp(sc)
    sc.stop()
  }
  def joinOp(sc: SparkContext): Unit = {
    val dataRDD1 = sc.parallelize(Array((150001, "US"), (150002, "CN"), (150003, "IN"), (150003, "CN")))
    val dataRDD2 = sc.parallelize(Array((150001, 400), (150002, 200), (150003, 300), (150003, 400)))
    val joinRDD = dataRDD1.join(dataRDD2)
    //joinRDD.foreach(println(_))
    joinRDD.foreach(tup => {
      //用户id
      val uid = tup._1
      val area_gold = tup._2
      //大区
      val area = area_gold._1
      //音浪收入
      val gold = area_gold._2
      println(uid + "\t" + area + "\t" + gold)
    })
  }
  private def getSparkContext(): SparkContext = {
    val conf = new SparkConf()
    conf.setAppName("TransformationOpScala")
      .setMaster("local")
    new SparkContext(conf)
  }
}

join结果为

150001	US	400
150002	CN	200
150003	IN	300
150003	IN	400
150003	CN	300
150003	CN	400

取笛卡尔积,类似mysql的join结果。

常见的Action算子

  • reduce 将RDD中的所有元素进行聚合操作
  • collect 将RDD中所有元素获取到本地客户端(Driver)
  • count 获取RDD中元素总数
  • take(n) 获取RDD中前n个元素
  • saveAsTextFile 将RDD中元素保存到文件中,对每个元素调用toString
  • countByKey 对每个key对应的值进行count计数
  • foreach 遍历RDD中的每个元素
import org.apache.spark.{SparkConf, SparkContext}

object ActionOpScala {

  def main(args: Array[String]): Unit = {
    val sc = getSparkContext()
    saveAsTextFileOp(sc)
    sc.stop()
  }

  def saveAsTextFileOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
    //指定HDFS的路径信息即可,需要指定一个不存在的目录
    dataRDD.saveAsTextFile("hdfs://bigdata01:9000/spark-rdd-out")
  }

  private def getSparkContext(): SparkContext = {
    val conf = new SparkConf()
    conf.setAppName("TransformationOpScala")
      .setMaster("local")
    val sc = new SparkContext(conf)
    sc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
    sc
  }
}

将结果写入到hdfs文件中

RDD持久化原理

Spark中有一个非常重要的功能就是可以对RDD进行持久化。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition数据持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存中缓存的partition数据。

这样的话,针对一个RDD反复执行多个操作的场景,就只需要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。因为正常情况下这个RDD的数据使用过后内存中是不会一直保存的。

val dataRDD = sc.parallelize(Array(1,2,3,4,5))
val mapRDD = dataRDD.map(...)
mapRDD.foreach(...)
mapRDD.saveAsTextFile(...)
mapRDD.collect()

巧妙使用RDD持久化,在某些场景下,对spark应用程序的性能有很大提升。特别是对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。要持久化一个RDD,只需要调用它的cache()或者persist()方法就可以了。

在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition数据丢失了,那么Spark会自动通过其源RDD,使用transformation算子重新计算该partition的数据。

cache()和persist()的区别在于:

cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,也就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中清除缓存,那么可以使用unpersist()方法。

RDD持久化策略

下面看一下目前Spark支持的一些持久化策略

  • MEMORY_ONLY 以非序列化的方式持久化在JVM内存中,如果内存不够存放所有的数据,则数据可能就不会进行持久化。这是默认的持久化策略。
  • MEMORY_AND_DISK 同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中
  • MEMORY_ONLY_SER 同MEMORY_ONLY,但是会序列化
  • MEMORY_AND_DISK_SER 同MEMORY_AND_DSK,但是会序列化
  • DISK_ONLY 以非序列化的方式完全存储到磁盘上
  • MEMORY_ONLY_2、MEMORY_AND_DISK_2等 尾部加了2的持久化级别,表示会将持久化数据复制一份,保存到其它节点,从而在数据丢失时,不需要重新计算,只需要使用备份数据即可。

Spark提供了多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议:

  1. 优先使用MEMORY_ONLY,纯内存速度最快,而且没有序列化不需要消耗CPU进行反序列化操作,缺点就是比较耗内存
  2. MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快,只是在使用的时候需要消耗CPU进行反序列化

注意:
如果需要进行数据的快速失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。
能不使用DISK相关的策略,就不要使用,因为有的时候,从磁盘读取数据,还不如重新计算一次。

import org.apache.spark.{SparkConf, SparkContext}

/**
  * 测试RDD持久化
  */
object PersistRddScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("PersistRddScala")
      .setMaster("local")
    val sc = new SparkContext(conf)
    //文件一百万行
    val dataRDD = sc.textFile("C:\\D-myfiles\\testjar\\spark\\hello_1000000.txt").cache()
    var start_time = System.currentTimeMillis()
    var count = dataRDD.count()
    println(count)
    var end_time = System.currentTimeMillis()
    println("第一次耗时:" + (end_time - start_time)) //7993ms
    start_time = System.currentTimeMillis()
    count = dataRDD.count()
    println(count)
    end_time = System.currentTimeMillis()
    println("第二次耗时:" + (end_time - start_time))//170ms
    sc.stop()

  }
}

看网上说,必须在transformation或者textfile等创建一个rdd之后,直接连续调用cache()或者persist()才可以,而不能另起一行执行cache()或者persist(),不然不生效,但经过测试,发现另起一行也是可以的。

共享变量

默认情况下,如果在一个算子函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量数据。如果多个task想要共享某个变量,那么这种方式是做不到的。

Spark为此提供了两种共享变量

  • 一种是Broadcast Variable(广播变量)
  • 另一种是Accumulator(累加变量)

Broadcast Variable

将使用到的变量,仅仅为每个节点拷贝一份,而不会为每个task都拷贝一份副本,因此其最大的作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗

通过调用SparkContext的broadcast()方法,针对某个变量创建广播变量

注意:广播变量,是只读的,然后在算子函数内,使用到广播变量时,每个节点只会拷贝一份副本。可以使用广播变量的value()方法获取值。

import org.apache.spark.{SparkConf, SparkContext}

object BroadcastOpScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("BroadcastOpScala")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
    val varable = 2
    //dataRDD.map(_ * varable)
    //1:定义广播变量
    val varableBroadcast = sc.broadcast(varable)
    //2:使用广播变量,调用其value方法
    dataRDD.map(num => num * varableBroadcast.value).foreach(num => println(num))
    sc.stop()
  }
}

Accumulator

正常情况下在Spark的任务中,由于一个算子可能会产生多个task并行执行,所以在这个算子内部执行的聚合计算都是局部的,想要实现多个task进行全局聚合计算,此时需要使用到Accumulator这个共享的累加变量。

注意:Accumulator只提供了累加的功能。在task只能对Accumulator进行累加操作,不能读取它的值。只有在Driver进程中才可以读取Accumulator的值。

import org.apache.spark.{SparkConf, SparkContext}

object AccumulatorOpScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("AccumulatorOpScala")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
    //这种写法是错误的,因为foreach代码是在worker节点上执行的
    // var total = 0和println("total:"+total)是在Driver进程中执行的
    //所以无法实现累加操作
    //并且foreach算子可能会在多个task中执行,这样foreach内部实现的累加也不是最终全局
    /*var total = 0
    dataRDD.foreach(num=>total += num)
    println("total:"+total)*/
    //所以此时想要实现累加操作就需要使用累加变量了
    //1:定义累加变量
    val sumAccumulator = sc.longAccumulator
    //2:使用累加变量
    dataRDD.foreach(num => sumAccumulator.add(num))
    //注意:只能在Driver进程中获取累加变量的结果
    println(sumAccumulator.value)
  }
}

实战:TopN主播统计

需求

需求:计算每个大区当天金币收入TopN的主播

背景是这样的,我们有一款直播APP,已经在很多国家上线并运营了一段时间,产品经理希望开发一个功能,topN主播排行榜,按天更新排名信息,统计的维度有多种,其中有一个维度是针对主播当天直播的金币收入进行排名。

在我们的直播平台中有大区这个概念,一个大区下面包含多个国家,不同大区的运营策略是不一样的,所以就把不同国家划分到不同大区里面,方便运营。那这个TopN主播排行榜在统计的时候就需要分大区统计了。针对主播每天的开播数据我们已经有了,以及直播间内用户的送礼记录也都是有的。那这样其实就可以统计主播当天的金币收入了

主播一天可能会开播多次,所以后期在统计主播当天收入的时候是需要把他当天所有直播中的金币收入都计算在内的。

分析

分析:我们有两份数据,数据都是json格式的

  • video_info.log 主播的开播记录,其中包含主播的id:uid、直播间id:vid 、大区:area、视频开播时长:length、增加粉丝数量:follow等信息
  • gift_record.log 用户送礼记录,其中包含送礼人id:uid,直播间id:vid,礼物id:good_id,金币数量:gold 等信息

基于以上两份数据,计算每个大区当天金币收入TopN的主播,其实就是按照当天主播所有开播的直播间内的收入汇总,按大区分组,统计每个大区内收入TopN的主播

  1. 首先获取两份数据中的核心字段,使用fastjson包解析数据
    主播开播记录:主播ID:uid,直播间ID:vid,大区:area(vid,(uid,area))
    用户送礼记录:直播间ID:vid,金币数量:gold(vid,gold)
    这样的可以把这两份数据关联到一块就能获取到大区、主播、金币这些信息了,使用直播间vid进行关联
  2. 对用户送礼记录数据进行聚合,对相同vid的数据求和,因为用户可能在一次直播中给主播送多次礼物
    (vid,gold_sum)
  3. 把这两份数据join到一块,vid作为join的key
    (vid,((uid,area),gold_sum))
  4. 使用map迭代join之后的数据,最后获取到uid,area,gold_sum字段,由于一个主播一天可能会开播多次,后面需要基于uid和area再做一次聚合,所以把数据转换成这种格式,uid和area是一一对应的,一个人只能属于一个大区
    ((uid,area),gold_sum)
  5. 使用reduceByKey算子对数据进行聚合
    ((uid,area),gold_sum_all)
  6. 接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
    map:(area,(uid,gold_sum_all))
    groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>
  7. 使用map迭代每个分组内的数据,按金币数量倒序排序,取前N个,最终输出area、topN,这个TopN其实就是把前几名主播的id还有金币数量拼接成一个字符串
    (area,topN)
  8. 使用foreach将结果打印到控制台,多个字段使用制表符分割
    area topN

原始数据

主播开播记录数据如下:video_info.log

{"uid":"8407173251001","vid":"14943445328940001","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":101,"share_num":"21","type":"video_info"}
{"uid":"8407173251002","vid":"14943445328940002","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":201,"share_num":"331","type":"video_info"}
{"uid":"8407173251003","vid":"14943445328940003","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":221,"share_num":"321","type":"video_info"}
{"uid":"8407173251004","vid":"14943445328940004","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":401,"share_num":"311","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940005","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":31,"share_num":"131","type":"video_info"}
{"uid":"8407173251006","vid":"14943445328940006","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":22,"share_num":"3431","type":"video_info"}
{"uid":"8407173251007","vid":"14943445328940007","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":44,"share_num":"131","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940008","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":66,"share_num":"131","type":"video_info"}
{"uid":"8407173251009","vid":"14943445328940009","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":32,"share_num":"231","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940010","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":342,"share_num":"431","type":"video_info"}
{"uid":"8407173251011","vid":"14943445328940011","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":223,"share_num":"331","type":"video_info"}
{"uid":"8407173251012","vid":"14943445328940012","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":554,"share_num":"312","type":"video_info"}
{"uid":"8407173251013","vid":"14943445328940013","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":334,"share_num":"321","type":"video_info"}
{"uid":"8407173251014","vid":"14943445328940014","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":653,"share_num":"311","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940015","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251001","vid":"14943445328940016","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":432,"share_num":"531","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940017","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940018","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":564,"share_num":"131","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940019","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":324,"share_num":"231","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940020","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":532,"share_num":"331","type":"video_info"}

用户送礼记录数据如下:gift_record.log

{"uid":"7201232141001","vid":"14943445328940001","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141002","vid":"14943445328940001","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141003","vid":"14943445328940002","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141004","vid":"14943445328940002","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141005","vid":"14943445328940003","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141006","vid":"14943445328940003","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141007","vid":"14943445328940004","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141008","vid":"14943445328940004","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141009","vid":"14943445328940005","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141010","vid":"14943445328940005","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141011","vid":"14943445328940006","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141012","vid":"14943445328940006","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141013","vid":"14943445328940007","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141014","vid":"14943445328940007","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141015","vid":"14943445328940008","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141016","vid":"14943445328940008","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141017","vid":"14943445328940009","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141018","vid":"14943445328940009","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141019","vid":"14943445328940010","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141020","vid":"14943445328940010","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141021","vid":"14943445328940011","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141022","vid":"14943445328940011","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141023","vid":"14943445328940012","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141024","vid":"14943445328940012","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141025","vid":"14943445328940013","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141026","vid":"14943445328940013","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141027","vid":"14943445328940014","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141028","vid":"14943445328940014","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141029","vid":"14943445328940015","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141030","vid":"14943445328940015","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141031","vid":"14943445328940016","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141032","vid":"14943445328940016","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141033","vid":"14943445328940017","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141034","vid":"14943445328940017","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141035","vid":"14943445328940018","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141036","vid":"14943445328940018","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141037","vid":"14943445328940019","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141038","vid":"14943445328940019","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141039","vid":"14943445328940020","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141040","vid":"14943445328940020","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}

实现

添加maven依赖

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.68</version>
</dependency>
import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}

object TopNScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("TopNScala")
      .setMaster("local")
    val sc = new SparkContext(conf)
    //1:首先获取两份数据中的核心字段,使用fastjson包解析数据
    val videoInfoRDD = sc.textFile("C:\\D-myfiles\\testjar\\spark\\video_info.log")
    val giftRecordRDD = sc.textFile("C:\\D-myfiles\\testjar\\spark\\gift_record.log")
    //(vid,(uid,area))
    val videoInfoFieldRDD = videoInfoRDD.map(line => {
      val jsonObj = JSON.parseObject(line)
      val vid = jsonObj.getString("vid")
      val uid = jsonObj.getString("uid")
      val area = jsonObj.getString("area")
      (vid, (uid, area))
    })
    //(vid,gold)
    val giftRecordFieldRDD = giftRecordRDD.map(line => {
      val jsonObj = JSON.parseObject(line)
      val vid = jsonObj.getString("vid")
      val gold = Integer.parseInt(jsonObj.getString("gold"))
      (vid, gold)
    })
    //2:对用户送礼记录数据进行聚合,对相同vid的数据求和
    //(vid,gold_sum)
    val giftRecordFieldAggRDD = giftRecordFieldRDD.reduceByKey((a, b) => a + b)
    //3:把这两份数据join到一块,vid作为join的key
    //(vid,((uid,area),gold_sum))
    val joinRDD = videoInfoFieldRDD.join(giftRecordFieldAggRDD)
    //4:使用map迭代join之后的数据,最后获取到uid、area、gold_sum字段
    val joinMapRDD = joinRDD.map(tup => {
      //joinRDD: (vid,((uid,area),gold_sum))
      //获取uid
      val uid = tup._2._1._1
      //获取area
      val area = tup._2._1._2
      //获取gold_sum
      val gold_sum = tup._2._2
      ((uid, area), gold_sum)
    })
    //5:使用reduceByKey算子对数据进行聚合 key为(uid, area)
    //((uid,area),gold_sum_all)
    val reduceRDD = joinMapRDD.reduceByKey((a, b) => a + b)
    //6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
    //map:(area,(uid,gold_sum_all))
    //groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>
    val groupRDD = reduceRDD.map(tup => (tup._1._2, (tup._1._1, tup._2))).groupByKey()
    //7:使用map迭代每个分组内的数据,按照金币数量倒序排序,取前N个,最终输出area,topN
    //(area,topN)
    val top3RDD = groupRDD.map(tup => {
      val area = tup._1
      //toList:把iterable转成list
      //sortBy:排序,默认是正序
      //reverse:反转,实现倒序效果
      //take(3):取前3个元素
      //mkString:使用指定字符把集合转成字符串
      //uid:gold_sum_all,uid:gold_sum_all,uid:gold_sum_all
      val top3 = tup._2.toList.sortBy(_._2).reverse.take(3).map(t => t._1 + ":" + t._2).mkString(",")
      (area, top3)
    })
    //8:使用foreach将结果打印到控制台,多个字段使用制表符分割
    top3RDD.foreach(tup => println(tup._1 + "\t" + tup._2))
    sc.stop()
  }
}

结果为

CN	8407173251008:120,8407173251003:60,8407173251014:50
ID	8407173251005:160,8407173251010:140,8407173251002:70
US	8407173251015:180,8407173251012:70,8407173251001:60

注意:针对fastjson这个依赖,有两种选择

  1. 把这个依赖一起打进jar包里面
  2. 在提交任务的时候动态指定这个依赖jar包

参考

Spark 设置 Hadoop 的配置文件