object FlumeEventCount {
def main(args: Array[String]) {
StreamingExamples.setStreamingLogLevels()
//val Array(host, IntParam(port)) = args
val host = "localhost"
val port = 19999
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("FlumeEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)
// Create a flume stream
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
ssc.awaitTermination()
}
}
flume中配置文件,spark_avro.conf:
a1.channels = c1
a1.sinks = k1
a1.sources = r1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 19999
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
2,运行时报错的话,需要引入如下jar包:
在flume的lib目录下查找.
![Spark-streaming 连接flume Spark-streaming 连接flume](https://image.shishitao.com:8440/aHR0cDovL2Jic21heC5pa2FmYW4uY29tL3N0YXRpYy9MM0J5YjNoNUwyaDBkSEJ6TDJGd2NDNTVhVzU0YVdGdVp5NWpiMjB2YzJoaGNtUXZjekl5TDNKbGN5OHhNbUZsTXpFNVpTMWhOVFk0TFRSaVpUZ3RPV0kyTmkxalpqWTFaamczTmpobE5UVXZVVkVsUlRVbE9VSWxRa1VsUlRjbE9Ea2xPRGN5TURFMU1EZ3lOekUwTlRJek9DNXFjR2MvYzJWaGNtTm9QV1pzZFcxbC5qcGc%3D.jpg?w=700&webp=1)
agent --conf conf --conf-file ./spark_avro.conf --name a1 -Dflume.root.logger=INFO,console
avro-client --conf ../conf/ -Hlocalhost -p 44444 -F /usr/local/spark-1.4.0/conf/spark-env.sh.template -Dflume.root.logger=DEBUG,console
![Spark-streaming 连接flume Spark-streaming 连接flume](https://image.shishitao.com:8440/aHR0cDovL2Jic21heC5pa2FmYW4uY29tL3N0YXRpYy9MM0J5YjNoNUwyaDBkSEJ6TDJGd2NDNTVhVzU0YVdGdVp5NWpiMjB2YzJoaGNtUXZjekl5TDNKbGN5OWhNV1kzTlRZMk15MDVNREF3TFRSbFpHTXRZVFl6T1MwMVpqQmlZbU0yWTJWaU1UVXZNUzVxY0djL2MyVmhjbU5vUFdac2RXMWwuanBn.jpg?w=700&webp=1)
![Spark-streaming 连接flume Spark-streaming 连接flume](https://image.shishitao.com:8440/aHR0cDovL2Jic21heC5pa2FmYW4uY29tL3N0YXRpYy9MM0J5YjNoNUwyaDBkSEJ6TDJGd2NDNTVhVzU0YVdGdVp5NWpiMjB2YzJoaGNtUXZjekl5TDNKbGN5ODNNelUyTUdSa1pDMDBNVE15TFRReFpEUXRPVGN6WlMweFltRXlaVFV4WkRGa016a3ZNaTVxY0djL2MyVmhjbU5vUFdac2RXMWwuanBn.jpg?w=700&webp=1)
Spark-streaming 连接flume的更多相关文章
-
Spark Streaming连接TCP Socket
1.Spark Streaming是什么 Spark Streaming是在Spark上建立的可扩展的高吞吐量实时处理流数据的框架,数据可以是来自多种不同的源,例如kafka,Flume,Twitte ...
-
Spark Streaming从Flume Poll数据案例实战和内幕源码解密
本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分 ...
-
Spark学习之路(十五)—— Spark Streaming 整合 Flume
一.简介 Apache Flume是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中.Spark Straming提供了以下两种方式用于Flu ...
-
Spark 系列(十五)—— Spark Streaming 整合 Flume
一.简介 Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中.Spark Straming 提供了以下两种方式用于 ...
-
Spark Streaming 整合 Flume
Spark Streaming 整合 Flume 一.简介二.推送式方法 2.1 配置日志收集Flume 2.2 项目依赖 2.3 Spark Strea ...
-
spark streaming集成flume
1. 安装flume flume安装,解压后修改flume_env.sh配置文件,指定java_home即可. cp hdfs jar包到flume lib目录下(否则无法抽取数据到hdfs上): $ ...
-
cdh环境下,spark streaming与flume的集成问题总结
文章发自:http://www.cnblogs.com/hark0623/p/4170156.html 转发请注明 如何做集成,其实特别简单,网上其实就是教程. http://blog.csdn.n ...
-
Spark Streaming处理Flume数据练习
把Flume Source(netcat类型),从终端上不断给Flume Source发送消息,Flume把消息汇集到Sink(avro类型),由Sink把消息推送给Spark Streaming并处 ...
-
Spark Streaming连接Kafka的两种方式 direct 跟receiver 方式接收数据的区别
Receiver是使用Kafka的高层次Consumer API来实现的. Receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming ...
-
Spark Streaming整合Flume + Kafka wordCount
flume配置文件 flume_to_kafka.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = sp ...
随机推荐
-
linux命令:exec
1.命令介绍: exec用来配合find命令找到的文件后接着执行相应的命令 2.命令格式: find . -type f exec ls -l {} \;
-
034. asp.netWeb用户控件之三通过用户控件实现用户注册和登录
用户控件login.ascx代码: <%@ Control Language="C#" AutoEventWireup="true" CodeFile=& ...
-
HDU 4941 Magical Forest --STL Map应用
题意: 有n*m个格子(n,m <= 2*10^9),有k(k<=10^5)个格子中有值,现在有三种操作,第一种为交换两行,第二种为交换两列,交换时只有两行或两列都有格子有值或都没有格子有 ...
-
LoadCursor 函数
从可执行文件中载入指定的光标资源,加载到指定的应用实例中 ? 1 2 3 4 5 HCURSOR WINAPI LoadCursor( _In_opt_ HINSTANCE hInstance, ...
-
XNA Game Studio 4.0 Programming 随便读,随便记 &ldquo;Game Class&rdquo;
XNA 中的 Game 类,是所有神奇事情发生的地方.几乎游戏中所有的事情都由它来操办. 它是项目中的王者,让我们深入窥探一番: 虚方法 Game 本身从众多其它地方继续了许多能力才能完成游戏中的事情 ...
-
04747_Java语言程序设计(一)_第5章_图形界面设计(一)
例5.1一个用JFrame类创建窗口的Java应用程序. import javax.swing.*; public class Example5_1 { public static void main ...
-
js特殊字符转义
点的转义:. ==> \\u002E 美元符号的转义:$ ==> \\u0024 乘方符号的转义:^ ==> \\u005E 左大括号的转义:{ ==> \\u007B 左方括 ...
-
awk的用法(转)
awk 用法:awk ' pattern {action} ' 变量名 含义 ARGC 命令行变元个数 ARGV 命令行变元数组 FILENAME 当前输入文件名 FNR 当前文件中的记录号 FS 输 ...
-
Java中的双重检查锁(double checked locking)
最初的代码 在最近的项目中,写出了这样的一段代码 private static SomeClass instance; public SomeClass getInstance() { if (nul ...
-
Centos7 安装mysql-8.0.13(rpm)
yum or rpm? yum安装方式很方便,但是下载mysql的时候从官网下载,速度较慢. rpm安装方式可以从国内镜像下载mysql的rpm包,比较快.rpm也适合离线安装. 环境说明 操作系统: ...