原始数据如下
POD9_6ec8794bd3297048d6ef7b6dff7b8be1|#$2023-10-24|#$0833|#$#|#$#|#$99999999999|#$#|#$12345678912
POD9_352858578708f144bb166a77bad743f4|#$2023-10-24|#$0391|#$#|#$#|#$99999999999|#$#|#$12345678912
POD9_c8c996cff241529b2427a29e9d6c68d7|#$2023-10-24|#$0834|#$#|#$#|#$99999999999|#$P00000016970|#$12345678912
数据分区查看设置
print(df.rdd.getNumPartitions())
print(df.rdd.glom().collect())
结果为2个分区,未指定的情况下,默认使用了本地机器的CPU核数。
为了让后续数据文件集中,方便查看,我们将数据进行重分区,分区数设定为1个,如下所示:
df = df.coalesce(1)
写入文件
1.写入csv文件
df.write.csv("data_csv")
# 或者
df.write.format("csv").save("data_csv")
生成的结果如下,一个csv文件,以及标志成功的文件和crc校验文件。
从结果可以看出,数据是没有表头的。可以通过指定option来指定表头:
df.write.format("csv").option("header", True).mode("overwrite").save("data_csv")
2.写入txt文件
需要注意官网有这么一句话:The DataFrame must have only one column that is of string type. Each row becomes a new line in the output file. 意思是写txt文件时dataframe只能有一列,而且必须是string类型。
value = [("alice",), ("bob",)]
df = spark.createDataFrame(value, schema="name: string")
df.show()
df = df.coalesce(1)
df.write.text("data_txt")
3.写入json文件
df.write.json("data_json")
# 或者
df.write.format("json").mode("overwrite").save("data_json")
结果如下:
4.写入parquet文件(二进制)
df.write.parquet("data_parquet")
# 或者
df.write.format("parquet").mode("overwrite").save("data_parquet")```
结果如下:
5.写入orc文件(二进制)
df.write.orc("data_orc")
# 或者
df.write.format("orc").mode("overwrite").save("data_orc")
结果如下:
orc文件中内容如下,与parquet的内容类似,也是采用二进制编码存储的。相同内容的数据,用orc文件明显比parquet文件占用的大小更小。在实际工作中,我们一般选用orc格式保存数据。
写入数据表
写入hive
写入hive需要在spark环境放hive和hdfs对应的配置文件
详情见 https://www.cnblogs.com/whiteY/p/17774218.html
提前创建hive分区表:
CREATE EXTERNAL TABLE `bus_recommend_staff`(
`serialnum_bg` string,
`bus_date` string,
`city_code` string,
`staff_num` string,
`staff_name` string,
`serial_number` string,
`channel_code` string,
`bind_user_num` string)
PARTITIONED BY (
`dt` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://NNHA/cmbh_log/bus_recommend_staff'
pyspark 程序将日志数据写入hive分区表
from pyspark.sql import SparkSession,HiveContext
spark = SparkSession.builder.enableHiveSupport().master("local[*]")\
.appName("log ETL")\
.config("hive.metastore.uris", "thrift://hadoopm111:9083")\
.getOrCreate()
sc = spark.sparkContext
hc = HiveContext(sc)
columns = ["SERIALNUM_BG", "BUS_DATE", "CITY_CODE", "STAFF_NUM",
"STAFF_NAME", "SERIAL_NUMBER", "CHANNEL_CODE", "BIND_USER_NUM"]
rdd = sc.textFile("BUS_RECOMMEND_STAFF.log") # 读取文件
# print(rdd.collect())
# ['POD9_6ec8794bd3297048d6ef7b6dff7b8be1|#$2023-10-24|#$0833|#$#|#$#|#$99999999999|#$#|#$18881353974', 'POD9_352858578708f144bb166a77bad743f4|#$2023-10-24|#$0391|#$#|#$#|#$99999999999|#$#|#$15978720838', 'POD9_c8c996cff241529b2427a29e9d6c68d7|#$2023-10-24|#$0834|#$#|#$#|#$99999999999|#$P00000016970|#$18280654614']
rdd1 = rdd.map(lambda x: x.split("|#$")) # 按指定分隔符进行分割
# print(rdd1.collect())
# [['POD9_6ec8794bd3297048d6ef7b6dff7b8be1', '2023-10-24', '0833', '#', '#', '99999999999', '#', '18881353974'], ['POD9_352858578708f144bb166a77bad743f4', '2023-10-24', '0391', '#', '#', '99999999999', '#', '15978720838'], ['POD9_c8c996cff241529b2427a29e9d6c68d7', '2023-10-24', '0834', '#', '#', '99999999999', 'P00000016970', '18280654614']]
df = rdd1.toDF(columns) # rdd转DF指定schema
df = df.coalesce(1)
# df.show()
# 创建临时视图
df.createTempView('STAFF')
# hc.sql("select * from STAFF").show()
# 写入hive分区表
hc.sql("insert into cmbh_log.bus_recommend_staff partition(dt='20231031') select * from STAFF ")
查看hive分区表数据
select * from cmbh_log.bus_recommend_staff where dt='20231031';
参考文档
https://blog.csdn.net/liuyingying0418/article/details/124346855