一.环境搭建
1. 创建虚拟环境, 指定python包
2. 切换到虚拟环境,安装你所需要的python相关模块包
3. 把整个虚拟环境打成.zip
4. 将 zip上传的hadfs
5. spark-submit 指定python包的路径
可以参考 https://dandelioncloud.cn/article/details/1589470996832964609
二. pyspark数据redis
1. 先要在之前的虚拟环境中安装,redis的python相关包
''' pip3 install pyspark pip3 install redis-py-cluster==2.1.3 -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com ''' from pyspark.sql import SparkSession from rediscluster import RedisCluster redis_hosts = [ {"host": "192.168.2.150", "port": 6379}, {"host": "192.168.2.150", "port": 6380}, {"host": "192.168.2.150", "port": 6381}, {"host": "192.168.2.150", "port": 6382} ] def write_hive2redis(df): json_rdd = df.rdd.map(lambda row:row.asDict()) def write2redis(partition_data_list): redis_conn = RedisCluster(startup_nodes=redis_hosts, password="", decode_responses=True) for dic in partition_data_list: id = dic.get("id") name = dic.get("name") # 往redis中写数据 redis_conn.set(id,name) json_rdd.foreachPartition(write2redis) if __name__ == '__main__': # 创建SparkSession对象 spark = SparkSession.builder.appName("HiveExample").enableHiveSupport().getOrCreate() sql="select * from table1" df = spark.sql(sql) write_hive2redis(df) spark.stop()
三. pyspark将数据写入es
先下载jar包: elasticsearch-spark-20_2.11-7.6.2.jar
在提交spark-submit时,--jars elasticsearch-spark-20_2.11-7.6.2.jar
或 如果有网络的情况下:可以用参数 --packages=org.elasticsearch:elasticsearch-spark-20_2.11-7.6.2.jar