通过来自mongodb的ids聚合现有数据

时间:2021-02-14 23:13:22

I'm wondering if Mongo-spark is able to handle a scenario where Im going to be importing json data from a stream, but for each file I want to first if there is a corresponding entity already within Mongo and if there is I'd like to manually merge the 2 documents.

我想知道Mongo-spark是否能够处理我将从流中导入json数据的情况,但是对于每个文件,如果Mongo中已经存在相应的实体并且是否存在我想要的话喜欢手动合并2个文件。

The way that the imported data looks like is as follows

导入数据的样式如下所示

{orderId: 1290edoiadq, from: <Some_address_string>, to: <Some_address_string>, status: "Shipped"}. 

The MongoDB that I have has the same data but the _id field contains the orderId. What I'm looking for is to get all the orders, and then check if they need to updated or inserted in.

我拥有的MongoDB具有相同的数据,但_id字段包含orderId。我正在寻找的是获取所有订单,然后检查是否需要更新或插入。

EDIT Let me clarify what merging means. If I have an order with the same id, but their status is different then I'd like to update the status of the existing order in the db to be what is in the JSON data.

编辑让我澄清合并意味着什么。如果我的订单具有相同的ID,但它们的状态不同,那么我想将数据库中现有订单的状态更新为JSON数据中的状态。

2 个解决方案

#1


1  

I'd like to manually merge the 2 documents.

我想手动合并2个文件。

Depends on your definition of merge.

取决于您对合并的定义。

If it's a one way direction, from the incoming json data stream to update documents stored in MongoDB, you could utilise upsert.

如果它是单向方向,从传入的json数据流更新存储在MongoDB中的文档,则可以使用upsert。

Since MongoDB Connector for Spark version 1.1.0, if a dataframe contains _id matching the data in MongoDB, save() will use upsert. Which will update if matching _id exist, otherwise insert.

从MongoDB Connector for Spark版本1.1.0开始,如果数据帧包含与MongoDB中的数据匹配的_id,则save()将使用upsert。如果匹配_id存在,将更新哪个,否则插入。

For example, to change to status=delivered:

例如,要更改为status =已发送:

> df.schema
  org.apache.spark.sql.types.StructType = StructType(StructField(_id,StringType,true), StructField(from,StringType,true), StructField(status,StringType,true), StructField(to,StringType,true))

> df.first()
  org.apache.spark.sql.Row = [1290edoiadq,sender,delivered,receiver]

> MongoSpark.save(df.write.option("collection", "order").mode("append"))

You just have to rename the orderId field name to _id before calling save().

您只需在调用save()之前将orderId字段名重命名为_id。

See SPARK-66 and MongoSpark: save() for more information.

有关更多信息,请参阅SPARK-66和MongoSpark:save()。

However if by merge you mean a two direction update (incoming stream and MongoDB), then you would have to consolidate the changes first in Spark. Resolving any conflict as you deemed appropriate in the code.

但是,如果通过合并意味着双向更新(传入流和MongoDB),那么您必须首先在Spark中合并更改。解决代码中认为合适的任何冲突。

#2


-1  

I have not used MongoDB but had a similar use case. My scenario: There is an incoming stream of events read from a kafka topic. These events need to be mapped and grouped by a key, for each key there might be a corresponding entry in a data store(HBase in my case, MongoDB in yours). If there is an entry, merge the keyed events into the existing entity if not create a new entity and save it to HBase. There are other complications in my case such as looking up multiple tables etc. but the gist of the problem appears to be similar to yours.

我没有使用MongoDB,但有一个类似的用例。我的场景:有一个从kafka主题读取的传入事件流。这些事件需要通过键映射和分组,对于每个键,数据存储中可能存在相应的条目(在我的情况下为HBase,在您的情况下为MongoDB)。如果有条目,则将键控事件合并到现有实体中(如果不创建新实体并将其保存到HBase)。在我的案例中还有其他复杂情况,例如查找多个表等,但问题的要点似乎与您的相似。

My approach and the challenges I faced: I used the Kafka Direct Streaming approach, this gives me a batch(for this discussion is interchangeable with RDD) of data for the configured duration of time. The Direct Streaming approach would read from all kafka partitions but you have to set a manual checkpoint in the streaming context to make your program recoverable.

我的方法和我面临的挑战:我使用了Kafka Direct Streaming方法,这给了我一个批处理(对于这个讨论可与RDD互换)的数据,用于配置的持续时间。 Direct Streaming方法将从所有kafka分区读取,但您必须在流上下文中设置手动检查点以使您的程序可恢复。

Now this RDD represents all the messages read within a configured duration.You can additionally configure the maximum size of this batch. When you do any processing on this RDD, the RDD is partitioned into chunks and each chunk is processed by an executor. Spark typically spawns one executor per core per machine in a cluster. I would advise you to configure the maximum number for your spark job. You can amortize the access to your data source (HBase in my case) on a per partition basis. So if your have 10 executors running parallel, be mindful that you can open 10 I/O connections in parallel to your data. Since your read should reflect the latest write on the entity, this is probably the most crucial aspect of your design.Can your data source guarantee consistency?

现在,此RDD表示在配置的持续时间内读取的所有消息。您还可以配置此批次的最大大小。当您对此RDD执行任何处理时,RDD将分区为块,每个块由执行程序处理。 Spark通常在群集中的每台机器上为每个核心生成一个执行程序。我建议你配置你的火花工作的最大数量。您可以基于每个分区来分摊对数据源(在我的情况下为HBase)的访问。因此,如果您有10个执行程序并行运行,请注意您可以与数据并行打开10个I / O连接。由于您的阅读应反映实体的最新写入,这可能是您设计中最关键的方面。您的数据源可以保证一致性吗?

In terms of code your program would look something like this

在代码方面,您的程序看起来像这样

dataStream.foreachRDD(rdd -> {
 // for each incoming batch, do any shuffle operations like groupByKey   first
// This is because during shuffle data is exchanged between partitions
 rdd.groupByKey().mapPartitions(eventsInPartition -> {
  // this part of the code executes at each partition.
  Connection connection = createConnectionToDataSource()

  eventsInPartition.forEachRemaining(eventPair -> {

  Entity entity = connection.getStuffFromDB(eventPair._1)
  entity.addNewEvents(eventPair._2) // your merge step
  connection.writeStuffToDB(eventPair._1, entity)

   })
 })
})

You start with foreachRDD to act on each incoming batch of data. Do any maps or transformation that can apply on each individual events in parallel first. groupByKey will shuffle the data across partition and you will have all events with the same key in the same partition. mapPartitions accepts a function that executes in a single partition. Here you can form connections to your DB. In this example since we do group by key, you have a pairRDD which be an RDD of a Tuple of event key + Iterable sequence of events. You can use the connection to lookup, merge, do other magic , write out the entity to the DB. Try different configs for batch duration, max cores, maxRatePerPartitions to control the flow of data based on how your DB and your cluster is handling the loads.

您从foreachRDD开始,对每批传入的数据进行操作。首先可以并行地对每个单独的事件应用任何地图或转换。 groupByKey将跨分区对数据进行洗牌,您将在同一分区中拥有相同键的所有事件。 mapPartitions接受在单个分区中执行的函数。在这里,您可以形成与数据库的连接。在这个例子中,因为我们按键进行分组,所以你有一个pairRDD,它是事件键元组+可迭代事件序列的RDD。您可以使用连接来查找,合并,做其他魔术,将实体写出到DB。为批处理持续时间,max cores,maxRatePerPartitions尝试不同的配置,以根据数据库和集群处理负载的方式来控制数据流。

#1


1  

I'd like to manually merge the 2 documents.

我想手动合并2个文件。

Depends on your definition of merge.

取决于您对合并的定义。

If it's a one way direction, from the incoming json data stream to update documents stored in MongoDB, you could utilise upsert.

如果它是单向方向,从传入的json数据流更新存储在MongoDB中的文档,则可以使用upsert。

Since MongoDB Connector for Spark version 1.1.0, if a dataframe contains _id matching the data in MongoDB, save() will use upsert. Which will update if matching _id exist, otherwise insert.

从MongoDB Connector for Spark版本1.1.0开始,如果数据帧包含与MongoDB中的数据匹配的_id,则save()将使用upsert。如果匹配_id存在,将更新哪个,否则插入。

For example, to change to status=delivered:

例如,要更改为status =已发送:

> df.schema
  org.apache.spark.sql.types.StructType = StructType(StructField(_id,StringType,true), StructField(from,StringType,true), StructField(status,StringType,true), StructField(to,StringType,true))

> df.first()
  org.apache.spark.sql.Row = [1290edoiadq,sender,delivered,receiver]

> MongoSpark.save(df.write.option("collection", "order").mode("append"))

You just have to rename the orderId field name to _id before calling save().

您只需在调用save()之前将orderId字段名重命名为_id。

See SPARK-66 and MongoSpark: save() for more information.

有关更多信息,请参阅SPARK-66和MongoSpark:save()。

However if by merge you mean a two direction update (incoming stream and MongoDB), then you would have to consolidate the changes first in Spark. Resolving any conflict as you deemed appropriate in the code.

但是,如果通过合并意味着双向更新(传入流和MongoDB),那么您必须首先在Spark中合并更改。解决代码中认为合适的任何冲突。

#2


-1  

I have not used MongoDB but had a similar use case. My scenario: There is an incoming stream of events read from a kafka topic. These events need to be mapped and grouped by a key, for each key there might be a corresponding entry in a data store(HBase in my case, MongoDB in yours). If there is an entry, merge the keyed events into the existing entity if not create a new entity and save it to HBase. There are other complications in my case such as looking up multiple tables etc. but the gist of the problem appears to be similar to yours.

我没有使用MongoDB,但有一个类似的用例。我的场景:有一个从kafka主题读取的传入事件流。这些事件需要通过键映射和分组,对于每个键,数据存储中可能存在相应的条目(在我的情况下为HBase,在您的情况下为MongoDB)。如果有条目,则将键控事件合并到现有实体中(如果不创建新实体并将其保存到HBase)。在我的案例中还有其他复杂情况,例如查找多个表等,但问题的要点似乎与您的相似。

My approach and the challenges I faced: I used the Kafka Direct Streaming approach, this gives me a batch(for this discussion is interchangeable with RDD) of data for the configured duration of time. The Direct Streaming approach would read from all kafka partitions but you have to set a manual checkpoint in the streaming context to make your program recoverable.

我的方法和我面临的挑战:我使用了Kafka Direct Streaming方法,这给了我一个批处理(对于这个讨论可与RDD互换)的数据,用于配置的持续时间。 Direct Streaming方法将从所有kafka分区读取,但您必须在流上下文中设置手动检查点以使您的程序可恢复。

Now this RDD represents all the messages read within a configured duration.You can additionally configure the maximum size of this batch. When you do any processing on this RDD, the RDD is partitioned into chunks and each chunk is processed by an executor. Spark typically spawns one executor per core per machine in a cluster. I would advise you to configure the maximum number for your spark job. You can amortize the access to your data source (HBase in my case) on a per partition basis. So if your have 10 executors running parallel, be mindful that you can open 10 I/O connections in parallel to your data. Since your read should reflect the latest write on the entity, this is probably the most crucial aspect of your design.Can your data source guarantee consistency?

现在,此RDD表示在配置的持续时间内读取的所有消息。您还可以配置此批次的最大大小。当您对此RDD执行任何处理时,RDD将分区为块,每个块由执行程序处理。 Spark通常在群集中的每台机器上为每个核心生成一个执行程序。我建议你配置你的火花工作的最大数量。您可以基于每个分区来分摊对数据源(在我的情况下为HBase)的访问。因此,如果您有10个执行程序并行运行,请注意您可以与数据并行打开10个I / O连接。由于您的阅读应反映实体的最新写入,这可能是您设计中最关键的方面。您的数据源可以保证一致性吗?

In terms of code your program would look something like this

在代码方面,您的程序看起来像这样

dataStream.foreachRDD(rdd -> {
 // for each incoming batch, do any shuffle operations like groupByKey   first
// This is because during shuffle data is exchanged between partitions
 rdd.groupByKey().mapPartitions(eventsInPartition -> {
  // this part of the code executes at each partition.
  Connection connection = createConnectionToDataSource()

  eventsInPartition.forEachRemaining(eventPair -> {

  Entity entity = connection.getStuffFromDB(eventPair._1)
  entity.addNewEvents(eventPair._2) // your merge step
  connection.writeStuffToDB(eventPair._1, entity)

   })
 })
})

You start with foreachRDD to act on each incoming batch of data. Do any maps or transformation that can apply on each individual events in parallel first. groupByKey will shuffle the data across partition and you will have all events with the same key in the same partition. mapPartitions accepts a function that executes in a single partition. Here you can form connections to your DB. In this example since we do group by key, you have a pairRDD which be an RDD of a Tuple of event key + Iterable sequence of events. You can use the connection to lookup, merge, do other magic , write out the entity to the DB. Try different configs for batch duration, max cores, maxRatePerPartitions to control the flow of data based on how your DB and your cluster is handling the loads.

您从foreachRDD开始,对每批传入的数据进行操作。首先可以并行地对每个单独的事件应用任何地图或转换。 groupByKey将跨分区对数据进行洗牌,您将在同一分区中拥有相同键的所有事件。 mapPartitions接受在单个分区中执行的函数。在这里,您可以形成与数据库的连接。在这个例子中,因为我们按键进行分组,所以你有一个pairRDD,它是事件键元组+可迭代事件序列的RDD。您可以使用连接来查找,合并,做其他魔术,将实体写出到DB。为批处理持续时间,max cores,maxRatePerPartitions尝试不同的配置,以根据数据库和集群处理负载的方式来控制数据流。