pyspark 广播变量和累加器

发布时间 2023-10-24 15:45:05作者: whiteY

广播变量broadcast

广播变量允许程序缓存一个只读变量在集群的每台机器上,而不是每个任务保存一个拷贝。借助广播变量,可以用一种更高效的方法来共享一些数据,比如一个全局配置文件。

from pyspark import SparkConf,SparkContext

conf = SparkConf().setAppName("broadcast").setMaster("local[4]")

sc = SparkContext(conf=conf)

conf = {"ip": "192.168.10.108", "key": "spark"}
# 广播变量
broadVar = sc.broadcast(conf)

print(broadVar.value)

print(broadVar.value["key"])
# 更新广播变量
broadVar.unpersist()
conf["key"] = "pyspark"
#  再次广播变量
broadVar = sc.broadcast(conf)

print(broadVar.value)
print(broadVar.value["key"])
# destroy()可将广播变量的数据和元数据一同销毁,销毁后不能使用
broadVar.destroy()
print(broadVar.value)

累加器accumulator

实现在Driver端和Executor端共享变量 写的功能,Driver端定义的变量,在Executor端的每个Task都会得到这个变量的副本;

在每个Task对自己内部的变量副本值更新完成后,传回给Driver端,然后将每个变量副本的值进行累计操作;

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[3]").setAppName("Test")

sc = SparkContext(conf=conf)

rdd = sc.range(1, 101)
# 创建累加器,初始值0
acc = sc.accumulator(0)


def fcounter(x):
    global acc
    if x % 2 == 0:
        acc += 1
# unsupported operand type(s) for -=
# acc -= 1


rdd_counter = rdd.map(fcounter)

print(acc.value)  # 0 fcounter函数的逻辑还未执行
# 保证多次正确获取累加器值
rdd_counter.persist()
print(rdd_counter.count())  # 100
print(acc.value)  # 50