I'm trying to deal with some code that runs differently on Spark stand-alone mode and Spark running on a cluster. Basically, for each item in an RDD, I'm trying to add it to a list, and once this is done, I want to send this list to Solr.
我正在尝试处理一些代码,这些代码在Spark独立模式下运行不同,而Spark在集群上运行。基本上,对于RDD中的每个项目,我正在尝试将其添加到列表中,一旦完成,我想将此列表发送给Solr。
This works perfectly fine when I run the following code in stand-alone mode of Spark, but does not work when the same code is run on a cluster. When I run the same code on a cluster, it is like "send to Solr" part of the code is executed before the list to be sent to Solr is filled with items. I try to force the execution by solrInputDocumentJavaRDD.collect();
after foreach
, but it seems like it does not have any effect.
当我在Spark的独立模式下运行以下代码时,这完全正常,但在群集上运行相同的代码时不起作用。当我在集群上运行相同的代码时,它就像是“发送到Solr”部分代码执行之前要发送到Solr的列表中的项目。我尝试通过solrInputDocumentJavaRDD.collect()强制执行;在foreach之后,但它似乎没有任何影响。
// For each RDD
solrInputDocumentJavaDStream.foreachRDD(
new Function<JavaRDD<SolrInputDocument>, Void>() {
@Override
public Void call(JavaRDD<SolrInputDocument> solrInputDocumentJavaRDD) throws Exception {
// For each item in a single RDD
solrInputDocumentJavaRDD.foreach(
new VoidFunction<SolrInputDocument>() {
@Override
public void call(SolrInputDocument solrInputDocument) {
// Add the solrInputDocument to the list of SolrInputDocuments
SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
}
});
// Try to force execution
solrInputDocumentJavaRDD.collect();
// After having finished adding every SolrInputDocument to the list
// add it to the solrServer, and commit, waiting for the commit to be flushed
try {
if (SolrIndexerDriver.solrInputDocumentList != null
&& SolrIndexerDriver.solrInputDocumentList.size() > 0) {
SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
SolrIndexerDriver.solrServer.commit(true, true);
SolrIndexerDriver.solrInputDocumentList.clear();
}
} catch (SolrServerException | IOException e) {
e.printStackTrace();
}
return null;
}
}
);
What should I do, so that sending-to-Solr part executes after the list of SolrDocuments are added to solrInputDocumentList
(and works also in cluster mode)?
我应该怎么做,以便在SolrDocuments列表添加到solrInputDocumentList之后执行发送到Solr部分(并且也在集群模式下工作)?
2 个解决方案
#1
As I mentioned on the Spark Mailing list: I'm not familiar with the Solr API but provided that 'SolrIndexerDriver' is a singleton, I guess that what's going on when running on a cluster is that the call to:
正如我在Spark Mailing列表中提到的:我不熟悉Solr API,但是假设'SolrIndexerDriver'是一个单例,我想在集群上运行时发生的是调用:
SolrIndexerDriver.solrInputDocumentList.add(elem)
is happening on different singleton instances of the SolrIndexerDriver on different JVMs while
发生在不同JVM上的SolrIndexerDriver的不同单例实例上
SolrIndexerDriver.solrServer.commit
is happening on the driver.
发生在司机身上。
In practical terms, the lists on the executors are being filled-in but they are never committed and on the driver the opposite is happening.
实际上,执行人员的名单正在填写,但他们永远不会被承诺,而对于司机则相反的情况正在发生。
The recommended way to handle this is to use foreachPartition
like this:
处理此问题的推荐方法是使用foreachPartition,如下所示:
rdd.foreachPartition{iter =>
// prepare connection
Stuff.connect(...)
// add elements
iter.foreach(elem => Stuff.add(elem))
// submit
Stuff.commit()
}
This way you can add the data of each partition and commit the results in the local context of each executor. Be aware that this add/commit must be thread safe in order to avoid data loss or corruption.
这样,您可以添加每个分区的数据,并将结果提交到每个执行程序的本地上下文中。请注意,此添加/提交必须是线程安全的,以避免数据丢失或损坏。
#2
have you checked under the spark UI to see the execution plan of this job. Check how it is getting split into stages and their dependencies. That should give you an idea hopefully.
你有没有在spark UI下检查过,看看这份工作的执行计划。检查它是如何分成阶段及其依赖关系的。这应该有希望给你一个想法。
#1
As I mentioned on the Spark Mailing list: I'm not familiar with the Solr API but provided that 'SolrIndexerDriver' is a singleton, I guess that what's going on when running on a cluster is that the call to:
正如我在Spark Mailing列表中提到的:我不熟悉Solr API,但是假设'SolrIndexerDriver'是一个单例,我想在集群上运行时发生的是调用:
SolrIndexerDriver.solrInputDocumentList.add(elem)
is happening on different singleton instances of the SolrIndexerDriver on different JVMs while
发生在不同JVM上的SolrIndexerDriver的不同单例实例上
SolrIndexerDriver.solrServer.commit
is happening on the driver.
发生在司机身上。
In practical terms, the lists on the executors are being filled-in but they are never committed and on the driver the opposite is happening.
实际上,执行人员的名单正在填写,但他们永远不会被承诺,而对于司机则相反的情况正在发生。
The recommended way to handle this is to use foreachPartition
like this:
处理此问题的推荐方法是使用foreachPartition,如下所示:
rdd.foreachPartition{iter =>
// prepare connection
Stuff.connect(...)
// add elements
iter.foreach(elem => Stuff.add(elem))
// submit
Stuff.commit()
}
This way you can add the data of each partition and commit the results in the local context of each executor. Be aware that this add/commit must be thread safe in order to avoid data loss or corruption.
这样,您可以添加每个分区的数据,并将结果提交到每个执行程序的本地上下文中。请注意,此添加/提交必须是线程安全的,以避免数据丢失或损坏。
#2
have you checked under the spark UI to see the execution plan of this job. Check how it is getting split into stages and their dependencies. That should give you an idea hopefully.
你有没有在spark UI下检查过,看看这份工作的执行计划。检查它是如何分成阶段及其依赖关系的。这应该有希望给你一个想法。