/**
* Created by root on 9/8/15.
*/
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.sql.SQLContext object SparkStreamingTest {
def main(args: Array[String]) {
//create a local StreamingContext with two working thread and batch interval of 1 second.
val conf = new SparkConf().setAppName("Spark streaming test").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
//val sc = ssc.sparkContext
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")) //word count
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print() //convert spark streaming to sparksql
words.foreachRDD((rdd: RDD[String], time: Time) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
wordsDataFrame.registerTempTable("words")
val wordsCountDataFrame = sqlContext.sql("select word, count(*) as total from words group by word")
println(s"======================= $time =======================")
wordsCountDataFrame.show()
}) ssc.start()
ssc.awaitTermination()
}
} case class Record(word: String) object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
SparkStreamingTest.scala的更多相关文章
-
jdb调试scala代码的简单介绍
在linux调试C/C++的代码需要通过gdb,调试java代码呢?那就需要用到jdb工具了.关于jdb的用法在网上大家都可以找到相应的文章,但是对scala进行调试的就比较少了.其实调试的大致流程都 ...
-
scala练习题1 基础知识
1, 在scala REPL中输入3. 然后按下tab键,有哪些方法可以被调用? 24个方法可以被调用, 8个基本类型: 基本的操作符, 等: 2,在scala REPL中,计算3的平方根,然 ...
-
牛顿法求平方根 scala
你任说1个整数x,我任猜它的平方根为y,如果不对或精度不够准确,那我令y = (y+x/y)/2.如此循环反复下去,y就会无限逼近x的平方根.scala代码牛顿智商太高了println( sqr(10 ...
-
Scala集合和Java集合对应转换关系
作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处 用Scala编码的时候,经常会遇到scala集合和Java集合互相转换的case,特意mark一 ...
-
Scala化规则引擎
1. 引言 什么是规则引擎 一个业务规则包含一组条件和在此条件下执行的操作,它们表示业务规则应用程序的一段业务逻辑.业务规则通常应该由业务分析人员和策略管理者开发和修改,但有些复杂的业务规则也可以由技 ...
-
Scala快速概览
IDEA工具安装及scala基本操作 目录 一. 1. 2. 3. 4. 二. 1. 2. 3. 三. 1. 2. 3. 4. 5. 6. 7. 四. 1. (1) (2) (3) (4) (5) ( ...
-
Scala Macros - scalamela 1.x,inline-meta annotations
在上期讨论中我们介绍了Scala Macros,它可以说是工具库编程人员不可或缺的编程手段,可以实现编译器在编译源代码时对源代码进行的修改.扩展和替换,如此可以对用户屏蔽工具库复杂的内部细节,使他们可 ...
-
Scala Macros - 元编程 Metaprogramming with Def Macros
Scala Macros对scala函数库编程人员来说是一项不可或缺的编程工具,可以通过它来解决一些用普通编程或者类层次编程(type level programming)都无法解决的问题,这是因为S ...
-
Scala Reflection - Mirrors,ClassTag,TypeTag and WeakTypeTag
反射reflection是程序对自身的检查.验证甚至代码修改功能.反射可以通过它的Reify功能来实时自动构建生成静态的Scala实例如:类(class).方法(method).表达式(express ...
随机推荐
-
eclipse新建maven项目(2)
本篇博文是继续之前的博文eclipse新建maven项目(1),那篇博文不在随笔在文章中.首先按照之前那篇博文进行创建maven项目操作,一系列操作下来之后发现刷新项目后会报错: 别急哈,可以解决. ...
-
springmvc(3)--数据类型转换
springmvc 配置 中conversionService可以配置类型转换,springmvc 参数绑定 中各种绑定方式和注解就是使用的这些转换器 一.先看下spring提供的内建类型转换器 第一 ...
-
base64转码
Base64是一种编码方法,可以将任意字符转成可打印字符.使用这种编码方法,主要不是为了加密,而是为了不出现特殊字符,简化程序的处理. JavaScript原生提供两个Base64相关方法. btoa ...
-
SqlServer 2015修改表时出现“save changes is not permitted…”的解决方法
使用SqlServer 2015的过程中,会出现如下情况: 在修改完表字段名或是类型后点击保存时会弹出一个对话框,且无法保存已做的修改.对话框内容大致如下: Saving changes is not ...
-
Makfile文件编写
一.make是什么 GNU make是一个工程管理器,专门负责管理.维护较多文件的处理,实现自动化编译.如果一个工程项目中,有成百上千个代码源文件,若其中一个或多个文件进过修改,make就需要能够自动 ...
-
Team Foundation Server 基本功能
Team Foundation Server(以下简称TFS)作为Microsoft发布的一个主要用于团队源代码管理工具,以敏捷开发作为其最大的特点而占领部分市场.该文主要介绍 TFS 在 Visua ...
-
ASP.NET没有魔法——ASP.NET 身份验证与Identity
前面的文章中为My Blog加入了文章的管理功能(ASP.NET没有魔法——ASP.NET MVC使用Area开发一个管理模块),但是管理功能应该只能由“作者”来访问,那么要如何控制用户的访问权限?也 ...
-
JavaScript笔记1———js的数据类型
JS的数据类型有: 1.数值类型(Number):js中所有数字均用浮点数字表示. 可以表示32位(即4字节)的整数,也可以表示64位(即8字节)的浮点数(小数). 也可以用二进制.八进制.十进制.十 ...
-
CentOS 7 yum install cobbler2.8.3
安装前注意事项: 1.cobbler主机要为静态ip,否则和dhcpd服务冲突. 2.如果用虚拟机安装,client的内存请设置为2g以上,否则会报错. 3.kickstart文件中不要出现中文,大坑 ...
- 比马卡龙好看N倍的中式甜点