Spark学习笔记-Streaming集成Flume

时间:2022-10-10 20:47:57

Spark Streaming 与 Flume集成有两种模式:


1、基于推模式


Flume 配置文件如下:

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = centos.host1
a1.sources.r1.port = 22222
a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 11111

当然也可以配置其他source,比如exec、thrift、avro等

a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 22222
a1.sources.r1.channels = c1

Spark 代码如下:

import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel

object SparkStreamingFlume1 {

def main(args: Array[String]) {

if (args.length < 2) {
print("please enter host and port")
System.exit(1)
}

val sc = new SparkContext("spark://centos.host1:7077", "Spark Streaming Flume Integration")

//创建StreamingContext,20秒一个批次
val ssc = new StreamingContext(sc, Seconds(20))

val hostname = args(0)
val port = args(1).toInt
val storageLevel = StorageLevel.MEMORY_ONLY
val flumeStream = FlumeUtils.createStream(ssc, hostname, port, storageLevel)

flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()

//开始运行
ssc.start()
//计算完毕退出
ssc.awaitTermination()

sc.stop()
}

}

提交Spark,这里需要注意的添加必要的jar包,可以在提交的时候加上--jars来指定相关的jar包,也可以在sc中调用addJar()添加

[hadoop@centos spark-1.1.0-bin-hadoop2.4]$bin/spark-submit --class org.project.modules.streaming.SparkStreamingFlume1 --jars lib/spark-streaming-flume-sink_2.10-1.1.0.jar,lib/spark-examples-1.1.0-hadoop2.4.0.jar --master spark://centos.host1:7077 /home/hadoop/temp/flume.jar centos.host1 11111


启动flume

[hadoop@centos flume-1.5.0.1]$bin/flume-ng agent --conf conf --conf-file conf/example5.properties --name a1 -Dflume.root.logger=INFO,console


source类型是netcat的话,可以通过telnet centos.host1 22222测试,如果是avro类型的话,可以通过flume内部提供了一个avro client测试,方式如下

[hadoop@centos flume-1.5.0.1]$bin/flume-ng avro-client --conf conf -H localhost -p 44444 -F a.xml -Dflume.root.logger=DEBUG,console 


2、基于拉模式


Flume 配置文件如下:

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = centos.host1
a1.sources.r1.port = 22222
a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = centos.host1
a1.sinks.k1.port = 11111
a1.sinks.k1.channel = c1

Spark代码如下:

import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel

object SparkStreamingFlume2 {

def main(args: Array[String]) {

if (args.length < 2) {
print("please enter host and port")
System.exit(1)
}

val sc = new SparkContext("spark://centos.host1:7077", "Spark Streaming Flume Integration")

//创建StreamingContext,20秒一个批次
val ssc = new StreamingContext(sc, Seconds(20))

val hostname = args(0)
val port = args(1).toInt
val storageLevel = StorageLevel.MEMORY_ONLY
val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port, storageLevel)

flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()

//开始运行
ssc.start()
//计算完毕退出
ssc.awaitTermination()

sc.stop()
}

}


启动Flume,这里主要需要添加 scala-library.jar 、spark-streaming-flume-sink_2.10-1.1.0.jar 到$FLUME_HOME/lib目录下

[hadoop@centos flume-1.5.0.1]$bin/flume-ng agent --conf conf --conf-file conf/example6.properties --name a1 -Dflume.root.logger=INFO,console


可以通过telnet centos.host1 22222测试


提交Spark,这里需要注意的和上面一样添加必要的jar包

[hadoop@centos spark-1.1.0-bin-hadoop2.4]$bin/spark-submit --class org.project.modules.streaming.SparkStreamingFlume2 --jars lib/spark-streaming-flume-sink_2.10-1.1.0.jar,lib/spark-examples-1.1.0-hadoop2.4.0.jar --master spark://centos.host1:7077 /home/hadoop/temp/flume.jar centos.host1 11111