SparkSql 写 Es

发布时间 2023-09-22 17:30:03作者: Eiffelzero

官方文档

key 备注
es.write.operation index(默认)添加新数据,旧数据被替换,重新索引;create添加新数据,数据存在抛出异常;update 更新现有数据,不存在抛出异常,upsert 插入及更新
es.mapping.id _id es的doc_id

出仓脚本exg:

CREATE TEMPORARY VIEW table_name 
(
    `id` STRING COMMENT 'id',
    `id2` BIGINT COMMENT 'id2'
) USING org.elasticsearch.spark.sql OPTIONS (
    resource 'index_version/_doc',
    nodes 'xxx'
     	,port '9200'
        ,scroll_size '50'
        ,nodes_client_only 'true'
        ,nodes_data_only 'false'
		,es.mapping.id 'id'
        ,es.write.operation 'upsert'
        ,es.batch.write.retry.count '6'
        ,es.batch.write.retry.wait '20s'
);

-- 只出仓维度
insert into table table_name
(select id,
        id2
 from table_name_source
 where xxx
)