GCP数据流:ImportError:没有名为IPython.core的模块

时间:2021-08-04 15:34:19

I'm just getting started with Beam/Dataflow. I read the docs, ran the examples, and now I'm making small edits to the example scripts to see if I can apply what I read. I made a minor edit to the 'minimal word count' script - see below. (I removed my GCP info.)

我刚刚开始使用Beam / Dataflow。我阅读了文档,运行了示例,现在我对示例脚本进行了少量编辑,看看我是否可以应用我读过的内容。我对“最小字数”脚本做了一个小修改 - 见下文。 (我删除了我的GCP信息。)

When I run this with DirectRunner, it runs fine and uploads the results to my GCP cloud storage bucket. However, when I switch to the DataflowRunner, I get the following error: ImportError: No module named IPython.core and the job fails. (Full error msg pasted below code.)

当我使用DirectRunner运行它时,它运行正常并将结果上传到我的GCP云存储桶。但是,当我切换到DataflowRunner时,我收到以下错误:ImportError:没有名为IPython.core的模块,并且作业失败。 (代码下面粘贴了完整错误消息。)

I understand that this means that there's a module missing, but I don't know how/where to import that module. Or, maybe I'm totally misunderstanding PTransforms. Any guidance is much appreciated. FYI, I'm using Python 2.7 (Anaconda).

我知道这意味着缺少一个模块,但我不知道如何/在哪里导入该模块。或者,也许我完全误解了PTransforms。任何指导都非常感谢。仅供参考,我使用的是Python 2.7(Anaconda)。

Input file: https://github.com/ageron/handson-ml/blob/master/datasets/housing/housing.csv

输入文件:https://github.com/ageron/handson-ml/blob/master/datasets/housing/housing.csv

Code

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def run(argv=None):
    """Main entry point; defines and runs the housing pipeline."""

    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                      dest='input',
                      default='gs://<bucket>/housing.csv',
                      help='Input file to process.')
    parser.add_argument('--output',
                      dest='output',
                      default='gs://<bucket>/housing_file',
                      help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
#            '--runner=DirectRunner',         # local
            '--runner=DataflowRunner',        # GCP
            '--project=<project>',
            '--staging_location=gs://<bucket>/staging',
            '--temp_location=gs://<bucket>/temp',
            '--job_name=housing-data',
            ])

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=pipeline_options) as p:

        # Read the text file[pattern] into a PCollection.
        lines = p | ReadFromText(known_args.input)

        # Simple transform: gets the X values from the .csv file
        class get_X_values(beam.DoFn):
            def process(self, element):
                value_list = element.split(',')[0:8]
                value_list.append(element.split(',')[9])
                return [",".join(value_list)]            
        X_values = lines | beam.ParDo(get_X_values())


        # Write the output using a "Write" transform that has side effects.
        # pylint: disable=expression-not-assigned
        X_values | WriteToText(known_args.output, file_name_suffix='.txt')


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Error Message

错误信息

JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 733, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 472, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 247, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 363, in load_session
    module = unpickler.load()
  File "/usr/lib/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1139, in load_reduce
    value = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 767, in _import_module
    return getattr(__import__(module, None, None, [obj]), obj)
ImportError: No module named IPython.core

1 个解决方案

#1


0  

I got my code to work by running the file outside my IDE (spyder), in a command window. I guess this counts as a solution, though I still don't know why there was an error in the first place.

我通过在命令窗口中运行IDE(spyder)外部的文件来获取代码。我想这算作一个解决方案,但我仍然不知道为什么首先出现错误。

FWIW, I still got the same ImportError even after adding dependencies to the workers, when I ran the code in my IDE. I'm still interested if anyone knows why this is an issue or what I can do to fix it ...

FWIW,当我在IDE中运行代码时,即使在向工作程序添加依赖项之后,我仍然得到相同的ImportError。我仍然感兴趣,如果有人知道为什么这是一个问题或我可以做些什么来解决它...

#1


0  

I got my code to work by running the file outside my IDE (spyder), in a command window. I guess this counts as a solution, though I still don't know why there was an error in the first place.

我通过在命令窗口中运行IDE(spyder)外部的文件来获取代码。我想这算作一个解决方案,但我仍然不知道为什么首先出现错误。

FWIW, I still got the same ImportError even after adding dependencies to the workers, when I ran the code in my IDE. I'm still interested if anyone knows why this is an issue or what I can do to fix it ...

FWIW,当我在IDE中运行代码时,即使在向工作程序添加依赖项之后,我仍然得到相同的ImportError。我仍然感兴趣,如果有人知道为什么这是一个问题或我可以做些什么来解决它...