pyspark学习

发布时间 2023-09-03 20:51:43作者: cojames
from pyspark import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import json
import os

from pyspark.sql.types import StructType, IntegerType, StringType


# os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop-3.3.0/etc/hadoop'

if __name__ == '__main__':
    #pyspark编程入口,创建buIlder,创建SparkSession实例对象
    spark = SparkSession.builder. \
        appName("test"). \
        master("local[*]"). \
        getOrCreate()
    #可以使用rdd转化为dataframe格式(表格式)
    sc = spark.sparkContext

    #schema为表的格式,即表的属性设置,第一个参数为属性名称,第二个参数为属性的类型,第三个参数表示是否为空,TRUE默认可以为为空,使用add()函数添加属性
    schema = StructType().add("user_id", StringType(), nullable=True). \
        add("movie_id", IntegerType(), nullable=True). \
        add("rank", IntegerType(), nullable=True). \
        add("ts", StringType(), nullable=True)

    #也可以使用read的api创建dataframe,其中option为参数设置(按照什么分割,是否有标题,编码等等),load为路径(可以是本地的路径,也可以是hdfs)
    df = spark.read.format("csv"). \
        option("sep", "\t"). \
        option("header", False). \
        option("encoding", "utf-8"). \
        schema(schema=schema). \
        load("../data/input/sql/u.data")




    #TODO 求每个人的平均分降序平排列
    #api风格
    #withColumnRenamed对一列改名
    #withColumn对一列进行操作,例如设置精度等等
    df.groupBy("user_id"). \
        avg("rank"). \
        withColumnRenamed("avg(rank)", "avg_rank"). \
        withColumn("avg_rank", f.round("avg_rank", 2)). \
        orderBy("avg_rank", ascending=False). \
        show()

    # TODO 求每个人的平均分,最高分,最低分
    #当有多个聚合函数的时候使用agg()里面,可以使用function的api来对dataframe进行操作
    df.groupBy("user_id"). \
        agg(
        f.round(f.avg("rank"), 2).alias("avg_rank")
    ).show()

    user_id = df.where("rank>3"). \
        groupBy("user_id"). \
        count(). \
        withColumnRenamed("count", "cnt"). \
        orderBy("cnt", asending=False). \
        limit(1).first()["user_id"]
    print(user_id)