pyspark数据写入文件及数据库hive

发布时间 2023-10-31 16:27:04作者: whiteY

原始数据如下

POD9_6ec8794bd3297048d6ef7b6dff7b8be1|#$2023-10-24|#$0833|#$#|#$#|#$99999999999|#$#|#$12345678912
POD9_352858578708f144bb166a77bad743f4|#$2023-10-24|#$0391|#$#|#$#|#$99999999999|#$#|#$12345678912
POD9_c8c996cff241529b2427a29e9d6c68d7|#$2023-10-24|#$0834|#$#|#$#|#$99999999999|#$P00000016970|#$12345678912

数据分区查看设置

print(df.rdd.getNumPartitions())
print(df.rdd.glom().collect())

结果为2个分区,未指定的情况下,默认使用了本地机器的CPU核数。

为了让后续数据文件集中,方便查看,我们将数据进行重分区,分区数设定为1个,如下所示:

df = df.coalesce(1)

写入文件

1.写入csv文件

df.write.csv("data_csv")
# 或者
df.write.format("csv").save("data_csv")

生成的结果如下,一个csv文件,以及标志成功的文件和crc校验文件。

从结果可以看出,数据是没有表头的。可以通过指定option来指定表头:

df.write.format("csv").option("header", True).mode("overwrite").save("data_csv")

2.写入txt文件

需要注意官网有这么一句话:The DataFrame must have only one column that is of string type. Each row becomes a new line in the output file. 意思是写txt文件时dataframe只能有一列,而且必须是string类型。


value = [("alice",), ("bob",)]
df = spark.createDataFrame(value, schema="name: string")
df.show()
df = df.coalesce(1)
df.write.text("data_txt")

3.写入json文件

df.write.json("data_json")
#  或者
df.write.format("json").mode("overwrite").save("data_json")

结果如下:

4.写入parquet文件(二进制)

df.write.parquet("data_parquet")
#  或者
df.write.format("parquet").mode("overwrite").save("data_parquet")```

结果如下:

5.写入orc文件(二进制)

df.write.orc("data_orc")
#  或者
df.write.format("orc").mode("overwrite").save("data_orc")

结果如下:

orc文件中内容如下,与parquet的内容类似,也是采用二进制编码存储的。相同内容的数据,用orc文件明显比parquet文件占用的大小更小。在实际工作中,我们一般选用orc格式保存数据。

写入数据表

写入hive

写入hive需要在spark环境放hive和hdfs对应的配置文件

详情见 https://www.cnblogs.com/whiteY/p/17774218.html

提前创建hive分区表:

CREATE EXTERNAL TABLE `bus_recommend_staff`(       
  `serialnum_bg` string,                           
  `bus_date` string,                               
  `city_code` string,                              
  `staff_num` string,                              
  `staff_name` string,                             
  `serial_number` string,                          
  `channel_code` string,                           
  `bind_user_num` string)                         
PARTITIONED BY (                                   
  `dt` string)                                     
ROW FORMAT SERDE                                   
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'      
STORED AS INPUTFORMAT                              
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT                                       
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION                                           
  'hdfs://NNHA/cmbh_log/bus_recommend_staff'

pyspark 程序将日志数据写入hive分区表

from pyspark.sql import SparkSession,HiveContext

spark = SparkSession.builder.enableHiveSupport().master("local[*]")\
    .appName("log ETL")\
    .config("hive.metastore.uris", "thrift://hadoopm111:9083")\
    .getOrCreate()
sc = spark.sparkContext
hc = HiveContext(sc)



columns = ["SERIALNUM_BG", "BUS_DATE", "CITY_CODE", "STAFF_NUM",
           "STAFF_NAME", "SERIAL_NUMBER", "CHANNEL_CODE", "BIND_USER_NUM"]

rdd = sc.textFile("BUS_RECOMMEND_STAFF.log")  # 读取文件
# print(rdd.collect())
# ['POD9_6ec8794bd3297048d6ef7b6dff7b8be1|#$2023-10-24|#$0833|#$#|#$#|#$99999999999|#$#|#$18881353974', 'POD9_352858578708f144bb166a77bad743f4|#$2023-10-24|#$0391|#$#|#$#|#$99999999999|#$#|#$15978720838', 'POD9_c8c996cff241529b2427a29e9d6c68d7|#$2023-10-24|#$0834|#$#|#$#|#$99999999999|#$P00000016970|#$18280654614']
rdd1 = rdd.map(lambda x: x.split("|#$"))  # 按指定分隔符进行分割
# print(rdd1.collect())
# [['POD9_6ec8794bd3297048d6ef7b6dff7b8be1', '2023-10-24', '0833', '#', '#', '99999999999', '#', '18881353974'], ['POD9_352858578708f144bb166a77bad743f4', '2023-10-24', '0391', '#', '#', '99999999999', '#', '15978720838'], ['POD9_c8c996cff241529b2427a29e9d6c68d7', '2023-10-24', '0834', '#', '#', '99999999999', 'P00000016970', '18280654614']]
df = rdd1.toDF(columns)  # rdd转DF指定schema

df = df.coalesce(1)

# df.show()

# 创建临时视图
df.createTempView('STAFF')

# hc.sql("select * from STAFF").show()

# 写入hive分区表
hc.sql("insert into cmbh_log.bus_recommend_staff partition(dt='20231031') select * from STAFF ")

查看hive分区表数据

select * from cmbh_log.bus_recommend_staff where dt='20231031';

参考文档

https://blog.csdn.net/liuyingying0418/article/details/124346855