spark代码示例---explode()炸裂函数使用

发布时间 2023-11-02 12:58:35作者: Yr-Zhang

数据结构,及bean的结构

root
 |-- eventName: string (nullable = true)
 |-- itmeList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- did: string (nullable = true)
 |    |    |-- dno: long (nullable = true)
 |    |    |-- dtm: long (nullable = true)
 |    |    |-- eventName: string (nullable = true)
 |    |    |-- kind: integer (nullable = true)
 |    |    |-- logdate: string (nullable = true)
 |    |    |-- tid: string (nullable = true)
 |    |    |-- typ: long (nullable = true)
 |    |    |-- val: string (nullable = true)
 |-- logdate: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- tid: string (nullable = true)

对上面数据结构的说明:第三层bean结构

root层的数据结构:重要把数据集放在一起,并且给每个itemlist声明一个tag
public class HiveEtlItmeTableSchemaBean extends BasicBean implements Serializable {
    /**
     * tag
     */
    private String tag ;
    /**
     * 数据集
     */
    private List<HiveEtlTableSchemaBean> itmeList ;
}

对上面数据结构的说明:第二层bean结构

itmeList是一个list集合,要是炸裂函数获取每个元素,元素的数据结构如下
public class HiveEtlTableSchemaBean extends BasicBean implements Serializable {
    /**
     * 编号
     */
    private String did ;
    /**
     * 数据号
     */
    private long dno ;
    /**
     * 数据时间
     */
    private long dtm ;
    /**
     * 大类
     */
    private int kind ;
    /**
     * 小类
     */
    private long typ ;
    /**
     * 值
     */
    private String val ;
}

代码编写查询数据的方式

//遍历每个车的所有数据项,并返回输出的数据项和其对应的标识
Dataset<Row> etlDataSet = resSet.flatMap(new FlatMapFunction<OrgBean, HiveEtlItmeTableSchemaBean>() {
    @Override
    public Iterator<HiveEtlItmeTableSchemaBean> call(OrgBean orgBean) throws Exception {
        List<HiveEtlItmeTableSchemaBean> catItme = new ArrayList<>();
        ...
        return catItme.iterator();
    }
}, Encoders.bean(HiveEtlItmeTableSchemaBean.class)).toDF();
//============================================
//查询数据
Dataset<Row> rowDataset = etlDataSet.where("tag='canlist'")
.select(explode(new Column("itmeList")))
//.where("col.val!=null and col.did!=null and col.dno!=null and col.dtm !=null and col.kind !=null and col.typ!=null")
.select("col.did", "col.dno", "col.dtm", "col.kind", "col.typ", "col.val")
.toDF("did", "dno", "dtm", "kind", "typ", "val");
// .write().option("compression", CompressionKind.ZSTD.name()).mode(SaveMode.Overwrite).orc("/data/tmp");
rowDataset.printSchema();
rowDataset.show(false);