flume+kafka+spark streaming整合

时间:2022-11-01 20:46:25

1.安装好flume
2.安装好kafka
3.安装好spark
4.流程说明:
  日志文件->flume->kafka->spark streaming
  flume输入:文件
  flume输出:kafka的输入
  kafka输出:spark 输入
5.整合步骤:
  (1).将插件jar拷贝到flume的lib目录下
    a. flumeng-kafka-plugin.jar
    b. metrics-annotation-2.2.0.jar

  (2).将配置文件producer.properties拷贝到flume的conf目录下
    配置文件内容如下:
      #agentsection
      producer.sources=s
      producer.channels=c
      producer.sinks=r

      #sourcesection
      producer.sources.s.type=exec
      producer.sources.s.command=tail -f -n+1 /opt/apache-flume-1.6.0/data/testFlumeKafka.txt
      producer.sources.s.channels=c

      # Eachsink's type must be defined
      producer.sinks.r.type=org.apache.flume.plugins.KafkaSink
      producer.sinks.r.metadata.broker.list=namenode:19092,datanode1:19092,datanode2:19092
      producer.sinks.r.partition.key=0
      producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
      producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
      producer.sinks.r.request.required.acks=0
      producer.sinks.r.max.message.size=1000000
      producer.sinks.r.producer.type=sync
      producer.sinks.r.custom.encoding=UTF-8
      producer.sinks.r.custom.topic.name=test //需建好对应topic

      #Specifythe channel the sink should use
      producer.sinks.r.channel=c

      # Eachchannel's type is defined.
      producer.channels.c.type=memory
      producer.channels.c.capacity=1000
      producer.channels.c.transactionCapacity=100

    (3).启动flume-ng
      命令如下:flume-ng agent -c . -f /opt/apache-flume-1.6.0/conf/producer.conf -n producer

    (4).启动kafka-server
      命令如下:bin/kafka-server-start.sh config/server.properties

    (5).启动kafka-consumer(默认已经创建了test topic)
      命令如下:bin/kafka-console-consumer.sh --zookeeper namenode:12181,datanode1:12181,datanode2:12181 --topic test --from-beginning

    (6).启动spark
      命令如下:sbin/start-all.sh

    (7).运行spark streaming Demo
      命令如下:run-example org.apache.spark.examples.streaming.JavaKafkaWordCount namenode:12181 test-consumer-group test 3 >> test.log

    (8).在对应的日志文件中输入内容,则可以在test.log文件看到单词计数的结果