I have a DStream[String, Int
] with pairs of word counts, e.g. ("hello" -> 10)
. I want to write these counts to cassandra with a step index. The index is initialized as var step = 1
and is incremented with each microbatch processed.
我有一个DStream[String, Int],有两个单词计数,例如("hello" -> 10)。我想用一个步骤索引把这些数写在卡桑德拉身上。将索引初始化为var step = 1,并对每一个处理过的微批进行递增。
The cassandra table created as:
cassandra表创建为:
CREATE TABLE wordcounts (
step int,
word text,
count int,
primary key (step, word)
);
When trying to write the stream to the table...
当试图将流写到表中时…
stream.saveToCassandra("keyspace", "wordcounts", SomeColumns("word", "count"))
... I get java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step
.
…. lang。IllegalArgumentException: RDD中缺少一些主键列,或者没有选择:step。
How can I prepend the step
index to the stream in order to write the three columns together?
我如何将这个步骤索引预加到流中,以便将三列写在一起?
I'm using spark 2.0.0, scala 2.11.8, cassandra 3.4.0 and spark-cassandra-connector 2.0.0-M3.
我使用的是spark 2.0.0、scala 2.11.8、cassandra 3.4.0和spark-cassandra-connector 2.0.0- m3。
3 个解决方案
#1
1
As noted, while the Cassandra table expects something of the form (Int, String, Int)
, the wordCount DStream is of type DStream[(String, Int)]
, so for the call to saveToCassandra(...)
to work, we need a DStream
of type DStream[(Int, String, Int)]
.
正如所指出的,虽然Cassandra表期望表单(Int、String、Int), wordCount DStream是类型DStream[(String, Int)],因此对于调用saveToCassandra(…)来工作,我们需要DStream类型的DStream[(Int, String, Int)]。
The tricky part in this question is how to bring a local counter, that is by definition only known in the driver, up to the level of the DStream.
在这个问题中,棘手的部分是如何引入本地计数器,这是由驱动程序定义的,直到DStream的级别。
To do that, we need to do two things: "lift" the counter to a distributed level (in Spark, we mean "RDD" or "DataFrame") and join that value with the existing DStream
data.
要做到这一点,我们需要做两件事:将计数器“提升”到一个分布式的级别(在Spark中,我们指的是“RDD”或“DataFrame”),并将该值与现有的DStream数据连接起来。
Departing from the classic Streaming word count example:
离开经典的流字计数示例:
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
We add a local var to hold the count of the microbatches:
我们添加一个局部var来保存微批次的计数:
@transient var batchCount = 0
It's declared transient, so that Spark doesn't try to close over its value when we declare transformations that use it.
它被声明为瞬态,因此当我们声明使用它的转换时,Spark不会试图关闭它的值。
Now the tricky bit: Within the context of a DStream transform
ation, we make an RDD out of that single var
iable and join it with underlying RDD of the DStream using cartesian product:
现在,棘手的一点是:在DStream转换的上下文中,我们将一个RDD从该单一变量中提取出来,并使用cartesian产品将它与DStream的底层RDD连接起来:
val batchWordCounts = wordCounts.transform{ rdd =>
batchCount = batchCount + 1
val localCount = sparkContext.parallelize(Seq(batchCount))
rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
}
(Note that a simple map
function would not work, as only the initial value of the var
iable would be captured and serialized. Therefore, it would look like the counter never increased when looking at the DStream data.
(注意,简单的map函数不会起作用,因为只有变量的初始值才能被捕获和序列化。因此,当查看DStream数据时,计数器不会增加。
Finally, now that the data is in the right shape, save it to Cassandra:
最后,现在数据的形状是正确的,保存到Cassandra:
batchWordCounts.saveToCassandra("keyspace", "wordcounts")
#2
0
updateStateByKey
function is provided by spark for global state handling. For this case it could look something like following
updateStateByKey函数由spark提供,用于全局状态处理。对于这种情况,它可以是如下所示。
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount: Int = runningCount.getOrElse(0) + 1
Some(newCount)
}
val step = stream.updateStateByKey(updateFunction _)
stream.join(step).map{case (key,(count, step)) => (step,key,count)})
.saveToCassandra("keyspace", "wordcounts")
#3
-1
Since you are trying to save the RDD to existing Cassandra table, you need to include all the primary key column values in the RDD.
既然您正在尝试将RDD保存到现有的Cassandra表,那么您需要在RDD中包含所有主键列值。
What you can do is, you can use the below methods to save the RDD to new table.
您可以使用以下方法将RDD保存到新表中。
saveAsCassandraTable or saveAsCassandraTableEx
For more info look into this.
要了解更多信息,请查看。
#1
1
As noted, while the Cassandra table expects something of the form (Int, String, Int)
, the wordCount DStream is of type DStream[(String, Int)]
, so for the call to saveToCassandra(...)
to work, we need a DStream
of type DStream[(Int, String, Int)]
.
正如所指出的,虽然Cassandra表期望表单(Int、String、Int), wordCount DStream是类型DStream[(String, Int)],因此对于调用saveToCassandra(…)来工作,我们需要DStream类型的DStream[(Int, String, Int)]。
The tricky part in this question is how to bring a local counter, that is by definition only known in the driver, up to the level of the DStream.
在这个问题中,棘手的部分是如何引入本地计数器,这是由驱动程序定义的,直到DStream的级别。
To do that, we need to do two things: "lift" the counter to a distributed level (in Spark, we mean "RDD" or "DataFrame") and join that value with the existing DStream
data.
要做到这一点,我们需要做两件事:将计数器“提升”到一个分布式的级别(在Spark中,我们指的是“RDD”或“DataFrame”),并将该值与现有的DStream数据连接起来。
Departing from the classic Streaming word count example:
离开经典的流字计数示例:
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
We add a local var to hold the count of the microbatches:
我们添加一个局部var来保存微批次的计数:
@transient var batchCount = 0
It's declared transient, so that Spark doesn't try to close over its value when we declare transformations that use it.
它被声明为瞬态,因此当我们声明使用它的转换时,Spark不会试图关闭它的值。
Now the tricky bit: Within the context of a DStream transform
ation, we make an RDD out of that single var
iable and join it with underlying RDD of the DStream using cartesian product:
现在,棘手的一点是:在DStream转换的上下文中,我们将一个RDD从该单一变量中提取出来,并使用cartesian产品将它与DStream的底层RDD连接起来:
val batchWordCounts = wordCounts.transform{ rdd =>
batchCount = batchCount + 1
val localCount = sparkContext.parallelize(Seq(batchCount))
rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
}
(Note that a simple map
function would not work, as only the initial value of the var
iable would be captured and serialized. Therefore, it would look like the counter never increased when looking at the DStream data.
(注意,简单的map函数不会起作用,因为只有变量的初始值才能被捕获和序列化。因此,当查看DStream数据时,计数器不会增加。
Finally, now that the data is in the right shape, save it to Cassandra:
最后,现在数据的形状是正确的,保存到Cassandra:
batchWordCounts.saveToCassandra("keyspace", "wordcounts")
#2
0
updateStateByKey
function is provided by spark for global state handling. For this case it could look something like following
updateStateByKey函数由spark提供,用于全局状态处理。对于这种情况,它可以是如下所示。
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount: Int = runningCount.getOrElse(0) + 1
Some(newCount)
}
val step = stream.updateStateByKey(updateFunction _)
stream.join(step).map{case (key,(count, step)) => (step,key,count)})
.saveToCassandra("keyspace", "wordcounts")
#3
-1
Since you are trying to save the RDD to existing Cassandra table, you need to include all the primary key column values in the RDD.
既然您正在尝试将RDD保存到现有的Cassandra表,那么您需要在RDD中包含所有主键列值。
What you can do is, you can use the below methods to save the RDD to new table.
您可以使用以下方法将RDD保存到新表中。
saveAsCassandraTable or saveAsCassandraTableEx
For more info look into this.
要了解更多信息,请查看。