1 在线学习
模型随着接收的新消息,不断更新自己;而不是像离线训练一次次重新训练。
2 Spark Streaming
- 离散化流(DStream)
-
输入源:Akka actors、消息队列、Flume、Kafka、……
http://spark.apache.org/docs/latest/streaming-programming-guide.html
类群(lineage):应用到RDD上的转换算子和执行算子的集合
3 MLib+Streaming应用
3.0 build.sbt
依赖Spark MLlib和Spark Streaming
name := "scala-spark-streaming-app"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.1"
使用国内镜像仓库
~/.sbt/repositories
[repositories]
local
osc: http://maven.oschina.net/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots
3.1 生产消息
object StreamingProducer {
def main(args: Array[String]) {
val random = new Random()
// Maximum number of events per second
val MaxEvents = 6
// Read the list of possible names
val namesResource = this.getClass.getResourceAsStream("/names.csv")
val names = scala.io.Source.fromInputStream(namesResource)
.getLines()
.toList
.head
.split(",")
.toSeq
// Generate a sequence of possible products
val products = Seq(
"iPhone Cover" -> 9.99,
"Headphones" -> 5.49,
"Samsung Galaxy Cover" -> 8.95,
"iPad Cover" -> 7.49
)
/** Generate a number of random product events */
def generateProductEvents(n: Int) = {
(1 to n).map { i =>
val (product, price) = products(random.nextInt(products.size))
val user = random.shuffle(names).head
(user, product, price)
}
}
// create a network producer
val listener = new ServerSocket(9999)
println("Listening on port: 9999")
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(1000)
val num = random.nextInt(MaxEvents)
val productEvents = generateProductEvents(num)
productEvents.foreach{ event =>
out.write(event.productIterator.mkString(","))
out.write("\n")
}
out.flush()
println(s"Created $num events...")
}
socket.close()
}
}.start()
}
}
}
sbt run
Multiple main classes detected, select one to run:
[1] MonitoringStreamingModel
[2] SimpleStreamingApp
[3] SimpleStreamingModel
[4] StreamingAnalyticsApp
[5] StreamingModelProducer
[6] StreamingProducer
[7] StreamingStateApp
Enter number: 6
3.2 打印消息
阅读全文请点击:http://click.aliyun.com/m/8713/
Spark机器学习· 实时机器学习的更多相关文章
-
Spark机器学习 Day1 机器学习概述
Spark机器学习 Day1 机器学习概述 今天主要讨论个问题:Spark机器学习的本质是什么,其内部构成到底是什么. 简单来说,机器学习是数据+算法. 数据 在Spark中做机器学习,肯定有数据来源 ...
-
Spark MLBase分布式机器学习系统入门:以MLlib实现Kmeans聚类算法
1.什么是MLBaseMLBase是Spark生态圈的一部分,专注于机器学习,包含三个组件:MLlib.MLI.ML Optimizer. ML Optimizer: This layer aims ...
-
Spark 中的机器学习库及示例
MLlib 是 Spark 的机器学习库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模.MLlib 由一些通用的学习算法和工具组成,包括分类.回归.聚类.协同过滤.降维等,同时还包括底层的优化 ...
-
【Streaming】30分钟概览Spark Streaming 实时计算
本文主要介绍四个问题: 什么是Spark Streaming实时计算? Spark实时计算原理流程是什么? Spark 2.X下一代实时计算框架Structured Streaming Spark S ...
-
机器学习五 -- 机器学习的“Hello World”,感知机
机器学习五 -- 机器学习的“Hello World”,感知机 感知机是二类分类的线性分类模型,是神经网络和支持向量机的基础.其输入为实例的特征向量,输出为实例的类别,取+1和-1二值之一,即二类分类 ...
-
Spark Streaming实时计算框架介绍
随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐.用户行为分析等. Spark Streaming是建立在 ...
-
【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化
系统架构介绍 整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streami ...
-
Spark练习之通过Spark Streaming实时计算wordcount程序
Spark练习之通过Spark Streaming实时计算wordcount程序 Java版本 Scala版本 pom.xml Java版本 import org.apache.spark.Spark ...
-
Spark机器学习9· 实时机器学习(scala with sbt)
1 在线学习 模型随着接收的新消息,不断更新自己:而不是像离线训练一次次重新训练. 2 Spark Streaming 离散化流(DStream) 输入源:Akka actors.消息队列.Flume ...
随机推荐
-
elk平台搭建
很多时候我们需要对日志做一个集中式的处理,但是通常情况下这些日志都分布到n台机器上面,导致一个结果就是效率比较低,而ELK平台可以帮助我们解决这么一件事情: ELK下载:https://www.ela ...
-
HTML与HTML5笔记
doctype作用 1.告诉浏览器使用什么样的html或者xhtml规范来解析html文档 2.影响浏览器的渲染模式. 浏览器解析css的两种渲染模式:标准模式strict mode和怪异模式quir ...
-
JAVA-面向对象-特性
1.封装 1.定义方式 1修饰符class类名 2类名首字母大写 2.类的成员 1属性 成员变量 可以设置默认值 第一个单词首字母小写,后面首字母大写 一般把属性设置成private 提供属性对应的g ...
-
javascript笔记——js的阻塞特性[转载]
JS具有阻塞特性,当浏览器在执行js代码时,不能同时做其它事情,即<script>每次出现都会让页面等待脚本的解析和执行(不论JS是内嵌的还是外链的),JS代码执行完成后,才继续渲染页面. ...
-
Ubuntu 16安装GPU版本tensorflow
pre { direction: ltr; color: rgb(0, 0, 0) } pre.western { font-family: "Liberation Mono", ...
-
火狐兼容window.event.returnValue=false;
火狐中window.event是未定义的,可用e.preventDefault();替代window.event.returnValue=false; 直接上图
-
Python3实战系列之九(获取印度售后数据项目)
项目现状:已经部署在服务器上并正常运行了. 1.服务器上的部署 2.下载到服务器的文件列表 3.转存在到数据库SQL Server中的数据 项目总结:这次项目采用python来实现,刚开始还是有点担忧 ...
-
BZOJ1821 [JSOI2010]Group 部落划分 Group Kruskal
欢迎访问~原文出处——博客园-zhouzhendong 去博客园看该题解 题目传送门 - BZOJ1821 题意概括 平面上有n个点,现在把他们划分成k个部分,求不同部分之间最近距离的最大值. 两个部 ...
-
kettle的输入输出组件和脚本组件
一. 输入组件 1.1表输入 从指定的数据库中,通过sql语句来查询数据加载到内存. 允许简易转换:勾选后可以避免不必要的字段的数据类型转换,从而提高性能. 替换sql语句里的变量:勾选后可以通过${ ...
-
C#: 获取当前路径不要用Environment.CurrentDirectory
网上大把文章写到C#获取当前路径的方法如下: // 获取程序的基目录. System.AppDomain.CurrentDomain.BaseDirectory // 获取模块的完整路径. Syste ...