pyspark使用

发布时间 2023-08-12 15:26:47作者: steve.z

#
#   py_pyspark.py
#   py_learn
#
#   Created by Z. Steve on 2023/8/10 17:51.
#


# pyspark 编程主要分三步:1. 数据输入。2. 数据处理。3. 数据输出。
# RDD:Resilient Distributed Datasets 弹性分布式数据集

# 1. 安装 pyspark 库
# pip3 install pyspark


# 2. 导入 pyspark
from pyspark import SparkConf, SparkContext

# 3. 创建 SparkConf 对象
conf = SparkConf().setMaster("local[*]").setAppName("testPySpark")

# 4. 基于 SparkConf 对象创建 SparkContext 对象
sc = SparkContext(conf=conf)

# 通过 SparkContext 对象查看 pyspark 版本
print(sc.version)

# 5. 读取数据, 并将数据转化为 rdd 对象。
# rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# rdd2 = sc.parallelize(('a', 'b', 'c', 'd', 'e'))
# rdd3 = sc.parallelize({'name': 'steve', 'age': 18})
# rdd4 = sc.parallelize('hello welcome to China.')
# rdd5 = sc.parallelize({'x', 'y', 1, 100, 90})
#
# # 查看 rdd 对象中的数据
# print(rdd1.collect())
# print(rdd2.collect())
# print(rdd3.collect())
# print(rdd4.collect())
# print(rdd5.collect())
#
# # 通过 rdd 对象读取文件
# rdd6 = sc.textFile("/Users/stevexhz/PycharmProjects/py_learn/content.txt")
# print(rdd6.collect())

# 6. 数据计算
# 6.1 map() 方法. 将 rdd 中的数据逐条处理, 然后返回新的 rdd 对象
# rdd7 = sc.parallelize([1, 2, 3, 4, 5])
# rddResult = rdd7.map(lambda x: x * 10).map(lambda n: n + 5)
# print(rddResult.collect())

# 6.2 flatMap() 方法. 该方法可以解除嵌套
# rdd8 = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
# # flatMap() 对操作完结果还是嵌套情况的进行解嵌套
# result = rdd8.flatMap(lambda x: x)
# print(result.collect())


# 6.3 reduceByKey()
# 针对二元元组,例如:(('a', 1), ('b', 2), ('c', 3), ('a', 10), ('b', 20), ('c', 30))
# 然后按照 key 分组, 根据用户提供的聚合逻辑, 将 value 值进行聚合
rdd9 = sc.parallelize((('a', 1), ('b', 2), ('c', 3), ('a', 10), ('b', 20), ('c', 30)))
rddResult = rdd9.reduceByKey(lambda m, n: m + n)
print(rddResult.collect())

# 停止 SparkContext 停止 pyspark 程序
sc.stop()