【大数据】RDD

发布时间 2023-10-20 16:28:59作者: PythonNew_Mr.Wang

RDD介绍


Resilient: RDD中的数据可以存储在内存中或者磁盘中。

Dataset:一个数据集合,用于存放数据的。 

Distributed: RDD中的数据是分布式存储的,可用于分布式计算

RDD五大特性

# coding:utf8
from pyspark import SparkConf, SparkContext


if __name__ == '__main__':
    conf = SparkConf().setAppName("RDD")
    # 通过SparkConf对象构建SparkContext对象
    sc = SparkContext(conf=conf)

    # 特性1:分区
    rdd1 = sc.parallelize([1,2,3,4,5,6],3)
    print(rdd1.glom().collect())
    """  [[1, 2], [3, 4], [5, 6]]    """


    # 特性2:RDD的方法会作用在其所有的分区上
    rdd2 = sc.parallelize([1,2,3,4,5,6],3).map(lambda x:x * 10)
    print(rdd2.glom().collect())
    """  [[10, 20], [30, 40], [50, 60]]    """


    # 特性3:RDD之间是有依赖关系(RDD有血缘关系)
    rdd1 = sc.textFile("./data/words.txt")
    rdd2 = rdd1.flatMap(lambda x:x.split(' '))
    rdd3 = rdd2.map(lambda x: (x, 1))
    print(rdd3.collect())
    """[('make', 1), ('make', 1), ('make', 1), ('make', 1), ('love', 1), ('love', 1), ('love', 1), ('love', 1)]"""


    # 特性4: Key-Value型的RDD可以有分区器
    """ ("a",1) ("a",2) ("b",3) ...  根据K的不同进行分区  """

    
    # 特性5: RDD的分区规划,会尽量靠近数据所在的服务器

RDD 创建方式

# coding:utf8
from pyspark import SparkConf, SparkContext


if __name__ == '__main__':

    conf = SparkConf().setAppName("RDD").setMaster("local[*]")
    # 通过SparkConf对象构建SparkContext对象
    sc = SparkContext(conf=conf)

    List = [1,2,3,4,5,6,7,8,9]

    """  创建RDD对象:   parallelize """
    # sc.parallelize(可迭代对象,分区数量)
    api1 = sc.parallelize(List,numSlices=3)
    print("集合分布式RDD得出的结果:",api1.collect())


    """  读取本地/HDFS文件数据:   textFile"""
    # sc.textFile(路径,最小分区,编码)
    # sc.textFile("hdfs://192.168.88.201:8020/input/words.txt")
    api2 = sc.textFile('./data/words.txt',)
    print("结果:", api2.collect())


    """  读取本地/HDFS一堆文件:   wholeTextFiles"""
    # sc.textFile(路径,最小分区,编码)
    api3 = sc.wholeTextFiles('./data/tiny_files',)
    print("结果:",api3.collect())

Transformation算子

"""  
     map:       遍历每行数据进行操作 
     res:       [2,3,4,5,6,7,8,9,10]
"""
	def check(x):
    	return x + 1
	res = sc.parallelize([1,2,3,4,5,6,7,8,9], 3).map(check).collect()


    

"""  
    flatMap:   对rdd执行map操作,然后解除嵌套
    res:
          map : ["D,W,Q","M,A,D"] -> [['D', 'W', 'Q'], ['M', 'A', 'D']]
      flatMap : ["D,W,Q","M,A,D"] -> ['D', 'W', 'Q', 'M', 'A', 'D']
"""
	def check(data):
    	return str(data).split(",")
	res = sc.parallelize(["D,W,Q","M,A,D"]).flatMap(check).collect()




"""
    reduceByKey:  自动按照key分组,对v进行你想要的逻辑方式处理.
    理解:         check函数传入的x1与x2,实际是相同的K的数量 按照你需要的方式进行处理

    res :         
                x1 * x2 :    [('a',1),('a',5),('a',2)] ->  [('a', 10)]
                结果叠加:第一次的结果 + 第二次 R1 + 公式  ->  (1):1 * 5 = 5(R1) (2):5(R1) * 2 = 10

                 x1 * x2 + 5:  [('a',1),('a',5),('a',2)] -> [('a', 25)]
                结果叠加:第一次的结果 + 第二次 R1 + 公式     ->  (1):1 * 5 + 5 = 10(R1) (2):10(R1) * 2 + 5 = 25
"""
	def check(x1,x2):
		 return x1 * x2 + 5
	res = sc.parallelize([('a',1),('a',5),('a',2)]).reduceByKey(check).collect()



"""
    mapValues : 针对二元元组RDD ,对其内部的二元元组的 Value执行map操作
    res :       [('a',1),('a',5),('a',2)] ->  [('a', 2), ('a', 6), ('a', 3)]
"""
	def check(x):
   		return x + 1
	res = sc.parallelize([('a',1),('a',5),('a',2)]).mapValues(check).collect()


"""
   groupBy : 对数据进行分组
   res :     [('Alice', [('Alice', 25), ('Alice', 35)]), ('Bob', [('Bob', 30), ('Bob', 40)]), ('Chris', [('Chris', 20)])]
"""
	def check(x):
    	return x[0],list(x[1])
	data = [("Alice", 25), ("Bob", 30), ("Alice", 35), ("Bob", 40), ("Chris", 20)]
	rdd = sc.parallelize(data)
	grouped_rdd = rdd.groupBy(lambda x: x[0])  # 使用groupBy()方法按 x[0] 姓名进行分组
	res = grouped_rdd.map(check).collect()     # 格式化输出 : x[0],list(x[1])


"""
    filter: 筛选
    res   :   [1, 3, 5, 7, 9]
"""
	def check(x):
		if x % 2 == 0:
      		pass
 		else:
       		return x
	data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
	rdd = sc.parallelize(data)
	filtered_rdd = rdd.filter(check).collect()
	print(filtered_rdd)


"""
   distinct : 去重
   res      : [1, 2, 3, 4, 5]
"""
	data = [1, 2, 3, 1, 2, 3, 4, 5]
	rdd = sc.parallelize(data)
	distinct_rdd = rdd.distinct().collect()


    
"""
    union : 合并
      res : [1, 1, 3, 3, "a", "b", "c"]
"""
	rdd1 = sc.parallelize([1, 1, 3,3])
	rdd2 = sc.parallelize(["a", "b", "c"])
	union_rdd = rdd1.union(rdd2).collect()



"""
    按照K来关联:
    join(内连接):    rdd1与rdd2都有的才做计算
                      [(2, ('Banana', 'Yellow')), (1, ('Apple', 'Red')), (3, ('Orange', 'Orange'))]

    leftOuterJoin(左外): rdd1与rdd2  以左边的为准计算,没有的为None(4在rdd2里面没有,所有为None)
                         [(2, ('Banana', 'Yellow')), (4, ('Grapes', None)), (1, ('Apple', 'Red')), (3, ('Orange', 'Orange'))]

    rightOuterJoin(右外):  rdd1与rdd2  以右边边的为准计算,没有的为None(5在rdd1里面没有,所有为None)
                            [(2, ('Banana', 'Yellow')), (1, ('Apple', 'Red')), (3, ('Orange', 'Orange')), (5, (None, 'Green'))]
"""
	rdd1 = sc.parallelize([(1, "Apple"), (2, "Banana"), (3, "Orange"), (4, "Grapes")])
	rdd2 = sc.parallelize([(1, "Red"), (2, "Yellow"), (3, "Orange"), (5, "Green")])
	joined_rdd = rdd1.join(rdd2).collect()



"""
    intersection :  俩交集
    res          :  [4, 5]
"""
	rdd1 = sc.parallelize([1, 2, 3, 4, 5])
	rdd2 = sc.parallelize([4, 5, 6, 7, 8])
	intersect_rdd = rdd1.intersection(rdd2).collect()


    
    
"""
    glom :  返回分区列表
    res  :  [[1, 2], [3, 4, 5]]
"""
	rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
	glom_rdd = rdd.glom().collect()         # 使用glom()方法将每个分区的元素转换为列表


    
"""
   groupByKey: 根据键对RDD中的KEY进行分组
   res: 
       Key: 1, Values: ['apple', 'orange']
       Key: 2, Values: ['banana', 'grape']
       Key: 3, Values: ['kiwi']
"""
	rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "orange"), (2, "grape"), (3, "kiwi")])
	# 使用groupByKey()方法根据键对元素进行分组
	grouped_rdd = rdd.groupByKey()
	# 打印每个键对应的值列表
	for key, values in grouped_rdd.collect():
    	print(f"Key: {key}, Values: {list(values)}")


"""
   sortBy: 排序
   res:  
       Ture: [1, 2, 3, 5, 8]
       False:[8, 5, 3, 2, 1]
"""
	rdd = sc.parallelize([5, 2, 8, 1, 3])
	sorted_rdd = rdd.sortBy(lambda x: x,True).collect()


    
"""
   sortByKey: 排序
   res:  
        Ture: [(1, 'Banana'), (2, 'Orange'), (3, 'Apple')]
        False:~
"""
	rdd = sc.parallelize([(3, "Apple"), (1, "Banana"), (2, "Orange")])
	sorted_rdd = rdd.sortByKey(ascending=True).collect()

Action算子

# countByKey: 用于 对键值 对RDD中的 键 进行计数
data = [("apple", 1), ("banana", 2), ("orange", 3), ("apple", 4), ("kiwi", 5), ("banana", 6)]
rdd = sc.parallelize(data)
print(rdd.countByKey())
# defaultdict(<class 'int'>, {'apple': 2, 'banana': 2, 'orange': 1, 'kiwi': 1})



# collect: 将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
# 数据量不能太大,考虑到内存问题
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())
# [1, 2, 3, 4, 5]




# reduce:  数据集 按照你传入的逻辑进行 聚合
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda x, y: x + y))
# 15



# fold: 与reduce不同的是,fold操作还可以指定一个初始值,用于处理空RDD的情况
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.fold(5, lambda x, y: x + y)) # 设置初始值
# 25



# first: 第一个元素
rdd = sc.parallelize([1, 2, 3, 4, 5]).first
# 1



# takeSample : 随机抽样
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
result = rdd.takeSample(False, 3) # False:不取相同数据,Ture:可以取相同数据
# [3, 8, 5]




# takeOrdered : 排序取前N个
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
result = rdd.takeOrdered(3)   # rdd.takeOrdered(3, lambda x:-x) 降序排序
# [1, 2, 3]



# foreach: 跟map一样,但是没有返回值(由分区Executor直接执行!)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(lambda x:x + 1)
# None


# saveAsTextFile: 将RDD保存为文本文件(由分区Executor直接执行!)
rdd = sc.parallelize(["Hello", "World", "PySpark", "Example"])
rdd.saveAsTextFile("output_directory")

分区算子

# mapPartitions: 对每个分区进行操作,提升效率
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
def check(iterator):
    List = []
    for x in iterator: # 将每个分区里面的元素都放到空列表
        List.append(x + 1)
        return List
    result = rdd.mapPartitions(check).collect()
# [2, 3, 4, 5, 6]
    


# foreachPartition: 对每个分区进行操作,提升效率
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
def check(iterator):
    List = []
    for x in iterator: # 将每个分区里面的元素都放到空列表
        List.append(x + 1)
        print(List)
result = rdd.foreachPartition(check)
print(result)
# [2, 3, 4, 5, 6]
# None
 
    
# partitionBy : 自定义分区(分成几个列表)
data = [("A", 1), ("B", 2), ("C", 3), ("D", 4), ("E", 5)]
rdd = sc.parallelize(data)
result = rdd.partitionBy(3).glom().collect()
print(result)
# [[('B', 2), ('C', 3), ('D', 4)], [('A', 1)], [('E', 5)]]




# repartition:  决定新的分区数
rdd = sc.parallelize(range(100))
print("初始分区数:", rdd.getNumPartitions())
# 对 RDD 进行重分区,将分区数设置为 4
repartitioned_rdd = rdd.repartition(4)
print("重分区后的分区数:", repartitioned_rdd.getNumPartitions())
# 初始分区数: 1
# 重分区后的分区数: 4