一、介绍
除了主输入PCollection之外,还可以以旁路输入(side inputs)的形式向ParDo转换提供额外的输入。旁路输入是一个额外的输入,DoFn每次处理input PCollection中的一个元素时都可以访问它。当您指定一个side inputs时,您将创建一些其他数据的视图(PCollectionView),这些数据可以在处理每个元素时从ParDo转换的DoFn中读取。
二、背景
Beam中如何需要多多个数据源进行算子处理,就必须用到旁路输入,但是官方文档给的示例代码如下。官方这个代码有个问题 new Max.MaxInFn() 的方法已经被设为私有,查看源码后可知改为 new Max.ofIntegers 即可。但官方文档主要给的是以.asSingletonView() 的方法将Combine处理后的PCollection转为PCollectionView,不适合于普通PCollection转换。
// Pass side inputs to your ParDo transform by invoking .withSideInputs. // Inside your DoFn, access the side input by using the method DoFn.ProcessContext.sideInput. // The input PCollection to ParDo. PCollection<String> words = ...; // A PCollection of word lengths that we'll combine into a single value.
// 这里是主输入
PCollection<Integer> wordLengths = ...; // Singleton PCollection
// Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
// 旁路输入必须以PCollectionView的形式输入
final PCollectionView<Integer> maxWordLengthCutOffView =
wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());
// 官方文档这有个错误需要改
// final PCollectionView<Integer> maxWordLengthCutOffView =
// wordLengths.apply(Combine.globally(new Max.ofIntegers()).asSingletonView());
// Apply a ParDo that takes maxWordLengthCutOffView as a side input.
PCollection<String> wordsBelowCutOff =
words.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> out, ProcessContext c) {
// In our DoFn, access the side input.
int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
if (word.length() <= lengthCutOff) {
out.output(word);
}
}
}).withSideInputs(maxWordLengthCutOffView)
);
3、分析
旁路输入的逻辑是,通过 “.withSideInputs”方法将一个PCollectionView传入到一个Transform中,这里我们可以goto
到withSideInputs方法中容易发现只能传入两种数据类型如下图。
即我们需要将旁路输入的PCollection转为PCollectionView
我在SDK中找到了一个源码可以方便对普通PCollection转为PCollectionView,如下
final PCollectionView<Geometry> test01=test02.apply(View.asSingleton());
这样就能方便的使用旁路输入了。代码示例如下
Sideinputs.WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(Sideinputs.WordCountOptions.class); Pipeline p = Pipeline.create(options); String a01="LINESTRING(0 0, 2 0, 5 0)"; String a02="LINESTRING(0 0, 0 2)"; PCollection<Geometry> Sline01 = p.apply("Source01", Create.of(a01)) .apply(new GeoLine()); PCollection<Geometry> SMline02 = p.apply("Source02", Create.of(a02)) .apply(new GeoMLine()); wordLengths.apply(Combine.globally(Max.ofIntegers()).asSingletonView()); final PCollectionView<Geometry> GeoLine01=SMline02.apply(View.asSingleton()); PCollection<String> res01= Sline01.apply(ParDo .of(new DoFn<Geometry, String>() { @ProcessElement public void processElement(@Element Geometry line, OutputReceiver<String> out, ProcessContext c) { Geometry geometry1=c.sideInput(GeoLine01); String interPoint = geometry1.intersection(line).toString();//相交点 out.output(interPoint); } }).withSideInputs(GeoLine01) ); res01.apply(TextIO.write().to(options.getOutput())); p.run().waitUntilFinish();
原文:
https://blog.csdn.net/leehom__/article/details/96576668
https://max.book118.com/html/2019/0526/7053034046002030.shtm