pyspark 环境搭建和相关操作redis ,es

发布时间 2023-08-03 23:39:15作者: 侠客云

一.环境搭建

1. 创建虚拟环境, 指定python包

2. 切换到虚拟环境,安装你所需要的python相关模块包

3. 把整个虚拟环境打成.zip

4. 将 zip上传的hadfs

5. spark-submit 指定python包的路径

可以参考 https://dandelioncloud.cn/article/details/1589470996832964609

二. pyspark数据redis

1. 先要在之前的虚拟环境中安装,redis的python相关包

'''
pip3 install pyspark
pip3 install redis-py-cluster==2.1.3 -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com
'''
from pyspark.sql import SparkSession
from rediscluster import RedisCluster

redis_hosts = [
    {"host": "192.168.2.150", "port": 6379},
    {"host": "192.168.2.150", "port": 6380},
    {"host": "192.168.2.150", "port": 6381},
    {"host": "192.168.2.150", "port": 6382}
]
def write_hive2redis(df):
    json_rdd = df.rdd.map(lambda row:row.asDict())
    def write2redis(partition_data_list):
        redis_conn = RedisCluster(startup_nodes=redis_hosts, password="", decode_responses=True)
        for dic in partition_data_list:
            id = dic.get("id")
            name = dic.get("name")
            # 往redis中写数据
            redis_conn.set(id,name)
    json_rdd.foreachPartition(write2redis)

            
        
if __name__ == '__main__':
    # 创建SparkSession对象
    spark = SparkSession.builder.appName("HiveExample").enableHiveSupport().getOrCreate()
    sql="select * from table1"
    df = spark.sql(sql)
    write_hive2redis(df)
    spark.stop()

 

 

三.  pyspark将数据写入es

先下载jar包: elasticsearch-spark-20_2.11-7.6.2.jar

 

在提交spark-submit时,--jars   elasticsearch-spark-20_2.11-7.6.2.jar 

或 如果有网络的情况下:可以用参数 --packages=org.elasticsearch:elasticsearch-spark-20_2.11-7.6.2.jar