I could not figure out the right way to add a side input using the ParDo function using apache_beam[gcp] version of 2.4.0.
我无法找到使用apache_beam [gcp]版本2.4.0使用ParDo功能添加侧输入的正确方法。
My pipeline is
我的管道是
pipeline
| "Load" >> ReadFromText("query.txt")
| "Count Words" >> CountWordsTransform()
class CountWordsTransform(beam.PTransform):
def expand(self, p_collection):
anotherPipleline = beam.Pipeline(runner="DataflowRunner", argv=[
"--staging_location", ("%s/staging" % gcs_path),
"--temp_location", ("%s/temp" % gcs_path),
"--output", ("%s/output" % gcs_path),
"--setup_file", "./setup.py"
])
value2 = anotherPipleline | 'create2' >> Create([("a", 1), ("b", 2), ("c", 3)])
return (p_collection
| "Split" >> (beam.ParDo(FindWords(), beam.pvalue.AsDict(value2))))
The class FindWords() is defined as:
FindWords()类定义为:
class FindWords(beam.DoFn):
def process(self, element, values):
import re as regex
return regex.findall(r"[A-Za-z\']+", element)
I receive the following error:
我收到以下错误:
'NoneType' object has no attribute 'parts'
1 个解决方案
#1
1
You are creating a separate pipeline inside your composite transform to create your side input - this will cause issues as collections should not be shared across different pipelines.
您正在复合转换中创建一个单独的管道来创建您的侧输入 - 这将导致问题,因为不应在不同的管道之间共享集合。
Instead you could try creating your side input in the same pipeline and passing that as a parameter to your transform.
相反,您可以尝试在同一管道中创建侧输入,并将其作为参数传递给转换。
Eg.
例如。
values = pipeline | "Get pcol for side input" >> beam.Create([("a", 1), ("b", 2), ("c", 3)])
pipeline
| "Load" >> beam.io.ReadFromText('gs://bucket/words.txt')
| "Count Words" >> CountWordsTransform(values)
class CountWordsTransform(beam.PTransform):
def __init__(self, values):
self.values = values
def expand(self, p_collection):
return p_collection | "Split" >> (beam.ParDo(FindWords(), beam.pvalue.AsDict(self.values)))
Tested above with 2.4.0
上面用2.4.0测试过
#1
1
You are creating a separate pipeline inside your composite transform to create your side input - this will cause issues as collections should not be shared across different pipelines.
您正在复合转换中创建一个单独的管道来创建您的侧输入 - 这将导致问题,因为不应在不同的管道之间共享集合。
Instead you could try creating your side input in the same pipeline and passing that as a parameter to your transform.
相反,您可以尝试在同一管道中创建侧输入,并将其作为参数传递给转换。
Eg.
例如。
values = pipeline | "Get pcol for side input" >> beam.Create([("a", 1), ("b", 2), ("c", 3)])
pipeline
| "Load" >> beam.io.ReadFromText('gs://bucket/words.txt')
| "Count Words" >> CountWordsTransform(values)
class CountWordsTransform(beam.PTransform):
def __init__(self, values):
self.values = values
def expand(self, p_collection):
return p_collection | "Split" >> (beam.ParDo(FindWords(), beam.pvalue.AsDict(self.values)))
Tested above with 2.4.0
上面用2.4.0测试过