I am new to Spark. I am writing the following script that receives a stream from Kafka, which is then transformed to an RDD.
我是Spark的新手。我正在编写以下脚本来接收来自Kafka的流,然后将其转换为RDD。
My goal is to store in-memory the data from each stream iteration to one RDD. Like adding an element to a list in each loop.
我的目标是将内存中每个流迭代的数据存储到一个RDD。就像在每个循环中向列表中添加元素一样。
conf = SparkConf().setAppName("Application")
sc = SparkContext(conf=conf)
def joinRDDs(rdd):
elements = rdd.collect()
rdds = sc.parallelize(elements)
transformed = rdds.map(lambda x: ('key', {u'name': x[1]}))
if __name__ == '__main__':
ssc = StreamingContext(sc, 2)
stream = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": host})
stream.foreachRDD(joinRDDs)
How can I accomplish this?
我怎么能做到这一点?
Thank you for your attention
感谢您的关注
1 个解决方案
#1
0
Use updateStatebyKey() and pass in function as needed.Function takes two arguements new data thats come in every batch and also historic data which you are holding in memory.
使用updateStatebyKey()并根据需要传入函数。函数需要两个参数,每个批次中都包含新数据,以及您在内存中保存的历史数据。
def countPurchasers(newValues,lastSum): if lastSum is None: lastSum=0 return sum(newValues,lastSum)
def countPurchasers(newValues,lastSum):如果lastSum为None:lastSum = 0返回sum(newValues,lastSum)
updateStatebBykey(countPurchasers)
#1
0
Use updateStatebyKey() and pass in function as needed.Function takes two arguements new data thats come in every batch and also historic data which you are holding in memory.
使用updateStatebyKey()并根据需要传入函数。函数需要两个参数,每个批次中都包含新数据,以及您在内存中保存的历史数据。
def countPurchasers(newValues,lastSum): if lastSum is None: lastSum=0 return sum(newValues,lastSum)
def countPurchasers(newValues,lastSum):如果lastSum为None:lastSum = 0返回sum(newValues,lastSum)
updateStatebBykey(countPurchasers)