I am using spark 1.5.0 and java 7.
我使用的是spark 1.5.0和java 7。
Input is from kafka in form of different json objects with a type
field. Eg:
输入来自kafka,其形式为不同的json对象和类型字段。例如:
{'type': 'alpha', ...}
{'type': 'beta', ...}
...
I am creating a JavaPairDStream<String, Integer>
from this input data corresponding to counts of each event type.
我正在创建一个JavaPairDStream
I want to store this data to redis. How can I go about doing this?
我想把这些数据存储到redis。我该怎么做呢?
2 个解决方案
#1
2
Used the foreachRDD
and forEach
functions to achieve this as follows:
使用foreachRDD和forEach函数实现如下:
wordCounts.foreachRDD(
new Function<JavaPairRDD<String, Integer>, Void>() {
public Void call(JavaPairRDD<String, Integer> rdd) {
rdd.foreach(
new VoidFunction<Tuple2<String,Integer>>() {
public void call(Tuple2<String,Integer> wordCount) {
System.out.println(wordCount._1() + ":" + wordCount._2());
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");
Jedis jedis = pool.getResource();
jedis.select(0);
jedis.set(wordCount._1(), wordCount._2().toString());
}
}
);
return null;
}
}
);
#2
0
Creating a new connection pool for every single RDD is very inefficient. I suggest creating one connection for each partition:
为每一个RDD创建一个新的连接池是非常低效的。我建议为每个分区创建一个连接:
wordCount.mapPartitions(p->{
Jedis jd = new Jedis(getJedisConfig());
while (p->hasNext()) {
Tuple2<String,Integer> data = p.next();
String word = data._1();
Integer cnt = data._2();
jd.set(word,count); // or any other format of save to Redis
}
}
)
#1
2
Used the foreachRDD
and forEach
functions to achieve this as follows:
使用foreachRDD和forEach函数实现如下:
wordCounts.foreachRDD(
new Function<JavaPairRDD<String, Integer>, Void>() {
public Void call(JavaPairRDD<String, Integer> rdd) {
rdd.foreach(
new VoidFunction<Tuple2<String,Integer>>() {
public void call(Tuple2<String,Integer> wordCount) {
System.out.println(wordCount._1() + ":" + wordCount._2());
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");
Jedis jedis = pool.getResource();
jedis.select(0);
jedis.set(wordCount._1(), wordCount._2().toString());
}
}
);
return null;
}
}
);
#2
0
Creating a new connection pool for every single RDD is very inefficient. I suggest creating one connection for each partition:
为每一个RDD创建一个新的连接池是非常低效的。我建议为每个分区创建一个连接:
wordCount.mapPartitions(p->{
Jedis jd = new Jedis(getJedisConfig());
while (p->hasNext()) {
Tuple2<String,Integer> data = p.next();
String word = data._1();
Integer cnt = data._2();
jd.set(word,count); // or any other format of save to Redis
}
}
)