Apache Beam 旁路输入(Side inputs)

发布时间 2024-01-08 15:16:29作者: 粒子先生

一、介绍
        除了主输入PCollection之外,还可以以旁路输入(side inputs)的形式向ParDo转换提供额外的输入。旁路输入是一个额外的输入,DoFn每次处理input PCollection中的一个元素时都可以访问它。当您指定一个side inputs时,您将创建一些其他数据的视图(PCollectionView),这些数据可以在处理每个元素时从ParDo转换的DoFn中读取。

二、背景       
        Beam中如何需要多多个数据源进行算子处理,就必须用到旁路输入,但是官方文档给的示例代码如下。官方这个代码有个问题 new Max.MaxInFn() 的方法已经被设为私有,查看源码后可知改为 new Max.ofIntegers 即可。但官方文档主要给的是以.asSingletonView() 的方法将Combine处理后的PCollection转为PCollectionView,不适合于普通PCollection转换。

// Pass side inputs to your ParDo transform by invoking .withSideInputs.
// Inside your DoFn, access the side input by using the method DoFn.ProcessContext.sideInput.

// The input PCollection to ParDo.
PCollection<String> words = ...;

// A PCollection of word lengths that we'll combine into a single value.
// 这里是主输入
PCollection<Integer> wordLengths = ...; // Singleton PCollection

// Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
// 旁路输入必须以PCollectionView的形式输入
final PCollectionView<Integer> maxWordLengthCutOffView =
wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());
// 官方文档这有个错误需要改
// final PCollectionView<Integer> maxWordLengthCutOffView =
// wordLengths.apply(Combine.globally(new Max.ofIntegers()).asSingletonView());


// Apply a ParDo that takes maxWordLengthCutOffView as a side input.
PCollection<String> wordsBelowCutOff =
words.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> out, ProcessContext c) {
// In our DoFn, access the side input.
int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
if (word.length() <= lengthCutOff) {
out.output(word);
}
}
}).withSideInputs(maxWordLengthCutOffView)
);

3、分析
旁路输入的逻辑是,通过 “.withSideInputs”方法将一个PCollectionView传入到一个Transform中,这里我们可以goto

到withSideInputs方法中容易发现只能传入两种数据类型如下图。

 

即我们需要将旁路输入的PCollection转为PCollectionView

我在SDK中找到了一个源码可以方便对普通PCollection转为PCollectionView,如下

final PCollectionView<Geometry> test01=test02.apply(View.asSingleton());
这样就能方便的使用旁路输入了。代码示例如下

Sideinputs.WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(Sideinputs.WordCountOptions.class);
Pipeline p = Pipeline.create(options);
String a01="LINESTRING(0 0, 2 0, 5 0)";
String a02="LINESTRING(0 0, 0 2)";
PCollection<Geometry> Sline01 = p.apply("Source01", Create.of(a01))
.apply(new GeoLine());
PCollection<Geometry> SMline02 = p.apply("Source02", Create.of(a02))
.apply(new GeoMLine());
wordLengths.apply(Combine.globally(Max.ofIntegers()).asSingletonView());

final PCollectionView<Geometry> GeoLine01=SMline02.apply(View.asSingleton());

PCollection<String> res01=
Sline01.apply(ParDo
.of(new DoFn<Geometry, String>() {
@ProcessElement
public void processElement(@Element Geometry line, OutputReceiver<String> out, ProcessContext c) {
Geometry geometry1=c.sideInput(GeoLine01);
String interPoint = geometry1.intersection(line).toString();//相交点
out.output(interPoint);
}
}).withSideInputs(GeoLine01)
);
res01.apply(TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();

 原文:
https://blog.csdn.net/leehom__/article/details/96576668

https://max.book118.com/html/2019/0526/7053034046002030.shtm

https://blog.csdn.net/ffjl1985/article/details/78055152