导入WriteToDatastore时出错(Apache Beam / Google DataFlow)

时间:2022-01-05 15:34:51

I'm trying to use an Apache Beam pipeline to write entities to the Google Cloud Datastore. For testing, I'm doing this in a local Python 2.7 virtual environment that is setup using the Apache Beam instructions. Coding is done in a Jupyter notebook locally. Here's the pseudo code I'm trying:

我正在尝试使用Apache Beam管道将实体写入Google Cloud Datastore。为了测试,我在使用Apache Beam指令设置的本地Python 2.7虚拟环境中执行此操作。编码在本地的Jupyter笔记本中完成。这是我正在尝试的伪代码:

import apache_beam as beam
#from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud import datastore
from google.cloud.datastore.entity import Entity

# projectId will be taken from the environment
storage = datastore.Client()

# The kind for the new entity
gds_entity_kind = 'test_entity'


class PrintFn(beam.DoFn):
    def process(self, element):
        print (element)
        return None


def create_entity(entity_id, name):
    key = storage.key(gds_entity_kind, int(entity_id))

    entity = Entity(key=key)
    entity.update({
        'name': name
    })

    return entity


lines = [
    "'0815';'entity A'",
    "'4711';'entity B'"
]

with beam.Pipeline() as p:
    (p
     | 'read lines' >> beam.Create(lines)
     | 'rows to columns' >> beam.Map(lambda v: v.split(';'))
     | 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
     | 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
#     | 'write to datastore' >> WriteToDatastore()
     | 'debug print' >> beam.ParDo(PrintFn())
    )

I found this posting with the similar problem, but somewhat the answer seems unrelated to my situation?! I figure out that the problem seems to be related to the import statement

我发现这个帖子有类似的问题,但有些答案似乎与我的情况无关?!我发现问题似乎与import语句有关

from google.cloud import datastore

来自google.cloud导入数据存储区

in relation to

和---关联

from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore

来自apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore

If I restart the kernel and import the WriteToDataStore only, then I don't get any error. If I try to import both, I get this error. Any help is appreciated!

如果我重新启动内核并仅导入WriteToDataStore,那么我不会收到任何错误。如果我尝试导入两者,我会收到此错误。任何帮助表示赞赏!

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-10-45d84b2c60ba> in <module>()
      1 import apache_beam as beam
----> 2 from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
      3 from google.cloud import datastore
      4 from google.cloud.datastore.entity import Entity
      5 

/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/datastoreio.py in <module>()
     21 import time
     22 
---> 23 from apache_beam.io.gcp.datastore.v1 import helper
     24 from apache_beam.io.gcp.datastore.v1 import query_splitter
     25 from apache_beam.io.gcp.datastore.v1 import util

/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/helper.py in <module>()
     34 # pylint: disable=wrong-import-order, wrong-import-position
     35 try:
---> 36   from google.cloud.proto.datastore.v1 import datastore_pb2
     37   from google.cloud.proto.datastore.v1 import entity_pb2
     38   from google.cloud.proto.datastore.v1 import query_pb2

/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/datastore_pb2.py in <module>()
     15 
     16 from google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2
---> 17 from google.cloud.proto.datastore.v1 import entity_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_entity__pb2
     18 from google.cloud.proto.datastore.v1 import query_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_query__pb2
     19 

/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/entity_pb2.py in <module>()
     26   serialized_pb=_b('\n,google/cloud/proto/datastore/v1/entity.proto\x12\x13google.datastore.v1\x1a\x1cgoogle/api/annotations.proto\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x18google/type/latlng.proto\"7\n\x0bPartitionId\x12\x12\n\nproject_id\x18\x02 \x01(\t\x12\x14\n\x0cnamespace_id\x18\x04 \x01(\t\"\xb7\x01\n\x03Key\x12\x36\n\x0cpartition_id\x18\x01 \x01(\x0b\x32 .google.datastore.v1.PartitionId\x12\x32\n\x04path\x18\x02 \x03(\x0b\x32$.google.datastore.v1.Key.PathElement\x1a\x44\n\x0bPathElement\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12\x0c\n\x02id\x18\x02 \x01(\x03H\x00\x12\x0e\n\x04name\x18\x03 \x01(\tH\x00\x42\t\n\x07id_type\"8\n\nArrayValue\x12*\n\x06values\x18\x01 \x03(\x0b\x32\x1a.google.datastore.v1.Value\"\xf1\x03\n\x05Value\x12\x30\n\nnull_value\x18\x0b \x01(\x0e\x32\x1a.google.protobuf.NullValueH\x00\x12\x17\n\rboolean_value\x18\x01 \x01(\x08H\x00\x12\x17\n\rinteger_value\x18\x02 \x01(\x03H\x00\x12\x16\n\x0c\x64ouble_value\x18\x03 \x01(\x01H\x00\x12\x35\n\x0ftimestamp_value\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x12-\n\tkey_value\x18\x05 \x01(\x0b\x32\x18.google.datastore.v1.KeyH\x00\x12\x16\n\x0cstring_value\x18\x11 \x01(\tH\x00\x12\x14\n\nblob_value\x18\x12 \x01(\x0cH\x00\x12.\n\x0fgeo_point_value\x18\x08 \x01(\x0b\x32\x13.google.type.LatLngH\x00\x12\x33\n\x0c\x65ntity_value\x18\x06 \x01(\x0b\x32\x1b.google.datastore.v1.EntityH\x00\x12\x36\n\x0b\x61rray_value\x18\t \x01(\x0b\x32\x1f.google.datastore.v1.ArrayValueH\x00\x12\x0f\n\x07meaning\x18\x0e \x01(\x05\x12\x1c\n\x14\x65xclude_from_indexes\x18\x13 \x01(\x08\x42\x0c\n\nvalue_type\"\xbf\x01\n\x06\x45ntity\x12%\n\x03key\x18\x01 \x01(\x0b\x32\x18.google.datastore.v1.Key\x12?\n\nproperties\x18\x03 \x03(\x0b\x32+.google.datastore.v1.Entity.PropertiesEntry\x1aM\n\x0fPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.google.datastore.v1.Value:\x02\x38\x01\x42\x82\x01\n\x17\x63om.google.datastore.v1B\x0b\x45ntityProtoP\x01Z<google.golang.org/genproto/googleapis/datastore/v1;datastore\xaa\x02\x19Google.Cloud.Datastore.V1b\x06proto3')
     27   ,
---> 28   dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
     29 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
     30 

/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/protobuf/descriptor.pyc in __new__(cls, name, package, options, serialized_pb, dependencies, public_dependencies, syntax, pool)
    827         # TODO(amauryfa): use the pool passed as argument. This will work only
    828         # for C++-implemented DescriptorPools.
--> 829         return _message.default_pool.AddSerializedFile(serialized_pb)
    830       else:
    831         return super(FileDescriptor, cls).__new__(cls)

TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/proto/datastore/v1/entity.proto":
  google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.

1 个解决方案

#1


1  

I figured out that the programming concept for using Google Cloud Datastore with Apache Beam (Google Cloud Dataflow) differs from the default Datastore API.

我发现使用带有Apache Beam(Google Cloud Dataflow)的Google Cloud Datastore的编程概念与默认的Datastore API不同。

You need to use the Datastore helper as given in this example. With this I was able to changed my code that now successfully runs. Notice the different import for the entity and the different entity creation process.

您需要使用此示例中给出的数据存储区帮助程序。有了这个,我就能够改变现在成功运行的代码。请注意实体的不同导入和不同的实体创建过程。

In summary it creates the entity in the full qualified JSON notation that you will also see when exploring the Datastore in the Google Cloud Console. My original code created the entity in a much more simpler JSON that is usually also understood when writing to the Datastore. Overall I avoided the different dependencies of my original two different imports that caused the error.

总之,它以完全合格的JSON表示法创建实体,您还将在Google Cloud Console中浏览数据存储区时看到该表示法。我的原始代码用更简单的JSON创建了实体,通常在写入数据存储区时也能理解。总的来说,我避免了导致错误的原始两个不同导入的不同依赖关系。

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper, PropertyFilter

#from google.cloud import datastore
#from google.cloud.datastore.entity import Entity


def create_entity(_id, name):
    entity = entity_pb2.Entity()
    kind = "test_entity"

    datastore_helper.add_key_path(entity.key, kind, _id)

    datastore_helper.add_properties(entity, {
            "name": unicode(name)
        })

    return entity

class PrintFn(beam.DoFn):
    def process(self, element):
        print (element)
        return None

project_id = 'your-gcp-project-id'

lines = [
    "'0815';'entity A'",
    "'4711';'entity B'"
]

with beam.Pipeline() as p:
    (p
     | 'read lines' >> beam.Create(lines)
     | 'rows to columns' >> beam.Map(lambda v: v.split(';'))
     | 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
     | 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
     | 'write to datastore' >> WriteToDatastore(project_id)
#     | 'debug print' >> beam.ParDo(PrintFn())
    )

#1


1  

I figured out that the programming concept for using Google Cloud Datastore with Apache Beam (Google Cloud Dataflow) differs from the default Datastore API.

我发现使用带有Apache Beam(Google Cloud Dataflow)的Google Cloud Datastore的编程概念与默认的Datastore API不同。

You need to use the Datastore helper as given in this example. With this I was able to changed my code that now successfully runs. Notice the different import for the entity and the different entity creation process.

您需要使用此示例中给出的数据存储区帮助程序。有了这个,我就能够改变现在成功运行的代码。请注意实体的不同导入和不同的实体创建过程。

In summary it creates the entity in the full qualified JSON notation that you will also see when exploring the Datastore in the Google Cloud Console. My original code created the entity in a much more simpler JSON that is usually also understood when writing to the Datastore. Overall I avoided the different dependencies of my original two different imports that caused the error.

总之,它以完全合格的JSON表示法创建实体,您还将在Google Cloud Console中浏览数据存储区时看到该表示法。我的原始代码用更简单的JSON创建了实体,通常在写入数据存储区时也能理解。总的来说,我避免了导致错误的原始两个不同导入的不同依赖关系。

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper, PropertyFilter

#from google.cloud import datastore
#from google.cloud.datastore.entity import Entity


def create_entity(_id, name):
    entity = entity_pb2.Entity()
    kind = "test_entity"

    datastore_helper.add_key_path(entity.key, kind, _id)

    datastore_helper.add_properties(entity, {
            "name": unicode(name)
        })

    return entity

class PrintFn(beam.DoFn):
    def process(self, element):
        print (element)
        return None

project_id = 'your-gcp-project-id'

lines = [
    "'0815';'entity A'",
    "'4711';'entity B'"
]

with beam.Pipeline() as p:
    (p
     | 'read lines' >> beam.Create(lines)
     | 'rows to columns' >> beam.Map(lambda v: v.split(';'))
     | 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
     | 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
     | 'write to datastore' >> WriteToDatastore(project_id)
#     | 'debug print' >> beam.ParDo(PrintFn())
    )