flume配置文件 flume_to_kafka.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.channels = c1 a1.sources.r1.spoolDir = /home/hadoop/logs/ a1.sources.r1.fileHeader = true a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = spark a1.sinks.k1.brokerList = m1:9092,m2:9092,m3:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
kafka
1、启动kafka
./bin/kafka-server-start.sh ./config/server.properties
2、创建spark topic
bin/kafka-topics.sh --create --zookeeper m1:2181 --replication-factor 2 --partitions 2 --topic spark
启动flume
flume-ng agent -c conf/ -f conf/flume_to_kafka.conf -n a1
测试是否可以正常消费到数据
bin/kafka-console-consumer.sh --bootstrap-server m1:9092,m2:9092,m3:9092 --from-beginning --topic spark
代码实现
object SparkStreamDemo { def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("spark_streaming") conf.setMaster("local[*]") val sc = new SparkContext(conf) sc.setCheckpointDir("D:/checkpoints") sc.setLogLevel("ERROR") val ssc = new StreamingContext(sc, Seconds(5)) val topics = Map("spark" -> 2) val lines = KafkaUtils.createStream(ssc, "m1:2181,m2:2181,m3:2181", "spark", topics).map(_._2) val ds1 = lines.flatMap(_.split(" ")).map((_, 1)) val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => { Some(x.sum + y.getOrElse(0)) }) ds2.print() ssc.start() ssc.awaitTermination() } }