I'm just getting started with Beam/Dataflow. I read the docs, ran the examples, and now I'm making small edits to the example scripts to see if I can apply what I read. I made a minor edit to the 'minimal word count' script - see below. (I removed my GCP info.)
我刚刚开始使用Beam / Dataflow。我阅读了文档,运行了示例,现在我对示例脚本进行了少量编辑,看看我是否可以应用我读过的内容。我对“最小字数”脚本做了一个小修改 - 见下文。 (我删除了我的GCP信息。)
When I run this with DirectRunner
, it runs fine and uploads the results to my GCP cloud storage bucket. However, when I switch to the DataflowRunner
, I get the following error: ImportError: No module named IPython.core
and the job fails. (Full error msg pasted below code.)
当我使用DirectRunner运行它时,它运行正常并将结果上传到我的GCP云存储桶。但是,当我切换到DataflowRunner时,我收到以下错误:ImportError:没有名为IPython.core的模块,并且作业失败。 (代码下面粘贴了完整错误消息。)
I understand that this means that there's a module missing, but I don't know how/where to import that module. Or, maybe I'm totally misunderstanding PTransform
s. Any guidance is much appreciated. FYI, I'm using Python 2.7 (Anaconda).
我知道这意味着缺少一个模块,但我不知道如何/在哪里导入该模块。或者,也许我完全误解了PTransforms。任何指导都非常感谢。仅供参考,我使用的是Python 2.7(Anaconda)。
Input file: https://github.com/ageron/handson-ml/blob/master/datasets/housing/housing.csv
输入文件:https://github.com/ageron/handson-ml/blob/master/datasets/housing/housing.csv
Code
码
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None):
"""Main entry point; defines and runs the housing pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://<bucket>/housing.csv',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
default='gs://<bucket>/housing_file',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
# '--runner=DirectRunner', # local
'--runner=DataflowRunner', # GCP
'--project=<project>',
'--staging_location=gs://<bucket>/staging',
'--temp_location=gs://<bucket>/temp',
'--job_name=housing-data',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)
# Simple transform: gets the X values from the .csv file
class get_X_values(beam.DoFn):
def process(self, element):
value_list = element.split(',')[0:8]
value_list.append(element.split(',')[9])
return [",".join(value_list)]
X_values = lines | beam.ParDo(get_X_values())
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
X_values | WriteToText(known_args.output, file_name_suffix='.txt')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Error Message
错误信息
JOB_MESSAGE_ERROR: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 733, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 472, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 247, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 363, in load_session
module = unpickler.load()
File "/usr/lib/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1139, in load_reduce
value = func(*args)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 767, in _import_module
return getattr(__import__(module, None, None, [obj]), obj)
ImportError: No module named IPython.core
1 个解决方案
#1
0
I got my code to work by running the file outside my IDE (spyder), in a command window. I guess this counts as a solution, though I still don't know why there was an error in the first place.
我通过在命令窗口中运行IDE(spyder)外部的文件来获取代码。我想这算作一个解决方案,但我仍然不知道为什么首先出现错误。
FWIW, I still got the same ImportError even after adding dependencies to the workers, when I ran the code in my IDE. I'm still interested if anyone knows why this is an issue or what I can do to fix it ...
FWIW,当我在IDE中运行代码时,即使在向工作程序添加依赖项之后,我仍然得到相同的ImportError。我仍然感兴趣,如果有人知道为什么这是一个问题或我可以做些什么来解决它...
#1
0
I got my code to work by running the file outside my IDE (spyder), in a command window. I guess this counts as a solution, though I still don't know why there was an error in the first place.
我通过在命令窗口中运行IDE(spyder)外部的文件来获取代码。我想这算作一个解决方案,但我仍然不知道为什么首先出现错误。
FWIW, I still got the same ImportError even after adding dependencies to the workers, when I ran the code in my IDE. I'm still interested if anyone knows why this is an issue or what I can do to fix it ...
FWIW,当我在IDE中运行代码时,即使在向工作程序添加依赖项之后,我仍然得到相同的ImportError。我仍然感兴趣,如果有人知道为什么这是一个问题或我可以做些什么来解决它...