大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn

时间:2022-09-20 20:02:04

1 redis的事务(pipeline)测试

  Redis本身对数据进行操作,单条命令是原子性的,但事务不保证原子性,且没有回滚。事务中任何命令执行失败,其余的命令仍会被执行,将Redis的多个操作放到一起执行,要成功多成功,如果失败了,可以把整个操作放弃,可以实现类似事物的功能。redis事务包含三个阶段:开始事务,命令入队,执行事务。redis的分片副本集集群不支持pipeline,redis只支持单机版的事务(pipeline),Redis的主从复制也支持pipeline(目前一些公司就是这样干的)。若是想用集群,可以使用MongoDB,MongoDB集群支持事物,是一个NoSQL文档数据库,支持存储海量数据、安全、可扩容。

RedisPipelineTest

大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn
package com._51doit.spark14

import com._51doit.utils.JedisConnectionPool
import redis.clients.jedis.{Jedis, Pipeline} object RedisPipeLineTest {
def main(args: Array[String]): Unit = {
val jedis: Jedis = JedisConnectionPool.getConnection
jedis.select(1)
// 获取jedis的pipeline
val pipeline: Pipeline = jedis.pipelined()
// 开启多个操作在一个批次执行
pipeline.multi() try {
pipeline.hincrBy("AAA", "a", 200) var i = 1 / 0 pipeline.hincrBy("BBB", "b", 20) //提交事物
pipeline.exec()
pipeline.sync()
} catch {
case e: Exception => {
//将脏数据废弃
pipeline.discard()
e.printStackTrace()
}
} finally {
pipeline.close()
jedis.close()
} }
}

2. 利用redis的pipeline实现数据统计的exactlyonce  

ExactlyOnceWordCountOffsetStoreInRedis 

大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn
package cn._51doit.spark.day14

import cn._51doit.spark.utils.{JedisConnectionPool, OffsetUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import redis.clients.jedis.{Jedis, Pipeline} /**
* 从Kafka读取数据,实现ExactlyOnce,偏移量保存到Redis中
* 1.将聚合好的数据,收集到Driver端,
* 2.然后将计算好的数据和偏移量在一个pipeline中同时保存到Redis中
* 3.成功了提交事物
* 4.失败了废弃原来的数据并让这个任务重启
*/
object ExactlyOnceWordCountOffsetStoreInRedis { def main(args: Array[String]): Unit = { //true a1 g1 ta,tb
val Array(isLocal, appName, groupId, allTopics) = args val conf = new SparkConf()
.setAppName(appName) if (isLocal.toBoolean) {
conf.setMaster("local[*]")
} //创建StreamingContext,并指定批次生成的时间
val ssc = new StreamingContext(conf, Milliseconds(5000))
//设置日志级别
ssc.sparkContext.setLogLevel("WARN") //SparkStreaming 跟kafka进行整合
//1.导入跟Kafka整合的依赖
//2.跟kafka整合,创建直连的DStream【使用底层的消费API,效率更高】 val topics = allTopics.split(",") //SparkSteaming跟kafka整合的参数
//kafka的消费者默认的参数就是每5秒钟自动提交偏移量到Kafka特殊的topic中: __consumer_offsets
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> groupId,
"auto.offset.reset" -> "earliest" //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读
, "enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量
) //在创建KafkaDStream之前要先读取Redis数据库,查询历史偏移量,没有就从头读,有就接着读
//offsets: collection.Map[TopicPartition, Long]
val offsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromRedis(appName, groupId) //跟Kafka进行整合,需要引入跟Kafka整合的依赖
//createDirectStream更加高效,使用的是Kafka底层的消费API,消费者直接连接到Kafka的Leader分区进行消费
//直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, //调度task到Kafka所在的节点
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets) //指定订阅Topic的规则
) kafkaDStream.foreachRDD(rdd => { //判断当前批次的RDD是否有数据
if (!rdd.isEmpty()) { //获取RDD所有分区的偏移量
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //实现WordCount业务逻辑
val words: RDD[String] = rdd.flatMap(_.value().split(" "))
val wordsAndOne: RDD[(String, Int)] = words.map((_, 1))
val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey(_ + _)
//将计算好的结果收集到Driver端再写入到Redis中【保证数据和偏移量写入在一个事物中】
//触发Action,将数据收集到Driver段
val res: Array[(String, Int)] = reduced.collect() var jedis: Jedis = null
var pipeline: Pipeline = null
//创建一个Redis的连接【在Driver端创建】
try {
jedis = JedisConnectionPool.getConnection()
//使用pipeline
pipeline = jedis.pipelined()
pipeline.select(1)
//开启多个操作在一起执行
pipeline.multi() //写入计算好的结果
for (tp <- res) {
pipeline.hincrBy("WORD_COUNT", tp._1, tp._2)
} //写入偏移量
for (offsetRange <- offsetRanges) {
val topic = offsetRange.topic
val partition = offsetRange.partition
val untilOffset = offsetRange.untilOffset
//将原来的偏移量覆盖
pipeline.hset(appName +"_" + groupId, topic + "_" + partition, untilOffset.toString)
}
//类似提交事物
pipeline.exec()
pipeline.sync()
} catch {
case e: Exception => {
pipeline.discard()
e.printStackTrace()
ssc.stop()
} } finally {
pipeline.close()
jedis.close()
}
}
}) ssc.start() ssc.awaitTermination() }
}

查询redis的历史偏移量:OffsetUtils(queryHistoryOffsetFromRedis)

大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn
package cn._51doit.spark.utils

import java.sql.{Connection, DriverManager, ResultSet}
import java.util import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange import scala.collection.mutable object OffsetUtils { def queryHistoryOffsetFromMySQL(appName: String, groupId: String): Map[TopicPartition, Long] = { val offsets = new mutable.HashMap[TopicPartition, Long]() val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456") val ps = connection.prepareStatement("SELECT topic_partition, offset FROM t_kafka_offset WHERE" +
" app_gid = ?") ps.setString(1, appName + "_" +groupId) val rs = ps.executeQuery() while (rs.next()) {
val topicAndPartition = rs.getString(1)
val offset = rs.getLong(2)
val fields = topicAndPartition.split("_")
val topic = fields(0)
val partition = fields(1).toInt
val topicPartition = new TopicPartition(topic, partition)
//将构建好的TopicPartition放入map中
offsets(topicPartition) = offset
}
offsets.toMap
} /**
* 将偏移量更新到MySQL中
* @param offsetRanges
* @param connection
*/
def updateOffsetToMySQL(appNameAndGroupId: String, offsetRanges: Array[OffsetRange], connection: Connection) = { val ps = connection.prepareStatement("INSERT INTO t_kafka_offset (app_gid, topic_partition, offset) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset = ?") for (offsetRange <- offsetRanges) {
//topic名称
val topic = offsetRange.topic
//topic分区编号
val partition = offsetRange.partition
//获取结束偏移量
val untilOffset = offsetRange.untilOffset
//将结果写入MySQL
ps.setString(1, appNameAndGroupId)
ps.setString(2, topic + "_" + partition)
ps.setLong(3, untilOffset)
ps.setLong(4, untilOffset)
ps.executeUpdate()
}
ps.close()
} /**
* 从Redis中查询历史偏移量
* @param appName
* @param groupId
* @return
*/
def queryHistoryOffsetFromRedis(appName: String, groupId: String): Map[TopicPartition, Long] = { val offsets = new mutable.HashMap[TopicPartition, Long]() val jedis = JedisConnectionPool.getConnection() jedis.select(1) val topicPartitionAndOffsets: util.Map[String, String] = jedis.hgetAll(appName + "_" + groupId) //导入隐式转换
import scala.collection.JavaConversions._ for((topicAndPartition, offset) <- topicPartitionAndOffsets) {
val fields = topicAndPartition.split("_")
val topic = fields(0)
val partition = fields(1).toInt
val topicPartition = new TopicPartition(topic, partition)
offsets(topicPartition) = offset.toLong
}
offsets.toMap
} //每一次启动该程序,都要从Hbase查询历史偏移量
def queryHistoryOffsetFromHbase(view: String, groupid: String): Map[TopicPartition, Long] = { val offsets = new mutable.HashMap[TopicPartition, Long]() val connection = DriverManager.getConnection("jdbc:phoenix:node-1.51doit.cn,node-2.51doit.cn,node-3.51doit.cn:2181") val ps = connection.prepareStatement("select \"topic_partition\", max(\"offset\") from \"myorder\" where \"groupid\" = ? group by \"topic_partition\"") ps.setString(1, groupid) //查询返回结果
val rs: ResultSet = ps.executeQuery() while(rs.next()) { val topicAndPartition = rs.getString(1) val fields = topicAndPartition.split("_")
val topic = fields(0)
val partition = fields(1).toInt val offset = rs.getLong(2) offsets.put(new TopicPartition(topic, partition), offset) } offsets.toMap
} }

以上的查询偏移量,以及将偏移量都可以写到一个工具类中,封装成方法,上诉OffsetUtils中对将偏移量存mysql这样走了

注意:以上的统计结果都能收集到driver端的原因是数据统计是聚合类的操作(数据量必定小),若不是聚合类的操作,则不能收集到driver端,进而达不到将数据和偏移量同时写入数据库的需求,解决办法如3

3 SparkStreaming中数据写入Hbase实现ExactlyOnce

  hbase不支持事务(无法保证多条数据同时写入成功),但其支持行级事务(即每行的每个列族的值要么成功写入hbase,要么失败),其能保证统计的数据和偏移量同时写入成功

  数据是在executor端写入的,但偏移量是在driver端获取到的。为了保证数据和偏移量同时写入,偏移量也要在executor端写入

(1)思路:

  利用hbase支持行级事务的特点,将偏移量随着task发送到executor中,每个task都会有与自己对应的ID(这个id与kafka中的leader分区一一对应),每个task获取自己的偏移量只需要利用自身的id作为角标从offsetrange数组中获取。

思路图:

大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn

难点1解决:使用闭包的形式将偏移量和task一起发送到Executor端

难点2解决:使用协处理器,phionex

注意:数据在写入kafka前必须要有一个唯一的标识(即rowkey),若没有的话,可以在写入kafka前,让数据生成自己的rowkey

(2)hbase创建表以及用phionex做视图映射

  • 创建表:
create 'myorder','data','offset'

结果:

大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn

  • 用phionex做视图映射(对myorder表)
create view "myorder" (pk VARCHAR PRIMARY KEY, "offset"."groupid" VARCHAR, "offset"."topic_partition" VARCHAR, "offset"."offset" UNSIGNED_LONG);

大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn

(3)业务代码(KafkaToHbase)

KafkaToHbase

大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn
package cn._51doit.spark.day14

import java.util

import cn._51doit.spark.utils.OffsetUtils
import com.alibaba.fastjson.{JSON, JSONException}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Connection, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext} /**
* https://www.jianshu.com/p/f1340eaa3e06
*
* spark.task.maxFailures
* yarn.resourcemanager.am.max-attempts
* spark.speculation
*
* create view "orders" (pk VARCHAR PRIMARY KEY, "offsets"."groupid" VARCHAR, "offsets"."topic_partition" VARCHAR, "offsets"."offset" UNSIGNED_LONG);
* select max("offset") from "orders" where "groupid" = 'g104' group by "topic_partition";
*
*/
object KafkaToHbase { def main(args: Array[String]): Unit = { //true a1 g1 ta,tb
val Array(isLocal, appName, groupId, allTopics) = args val conf = new SparkConf()
.setAppName(appName) if (isLocal.toBoolean) {
conf.setMaster("local[*]")
} val sc = new SparkContext(conf)
sc.setLogLevel("WARN") val ssc: StreamingContext = new StreamingContext(sc, Milliseconds(5000)) val topics = allTopics.split(",") //SparkSteaming跟kafka整合的参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092",
"key.deserializer" -> classOf[StringDeserializer].getName,
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> groupId,
"auto.offset.reset" -> "earliest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读
"enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量
) //查询历史偏移量【上一次成功写入到数据库的偏移量】
val historyOffsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromHbase("myorder", groupId) //跟Kafka进行整合,需要引入跟Kafka整合的依赖
//createDirectStream更加高效,使用的是Kafka底层的消费API,消费者直接连接到Kafka的Leader分区进行消费
//直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, //调度task到Kafka所在的节点
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, historyOffsets) //指定订阅Topic的规则, 从历史偏移量接着读取数据
) kafkaDStream.foreachRDD(rdd => { if (!rdd.isEmpty()) { //获取KakfaRDD的偏移量
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //获取KafkaRDD中的数据
val lines: RDD[String] = rdd.map(_.value()) val orderRDD: RDD[Order] = lines.map(line => {
var order: Order = null
try {
order = JSON.parseObject(line, classOf[Order])
} catch {
case e: JSONException => {
//TODO
}
}
order
})
//过滤问题数据
val filtered: RDD[Order] = orderRDD.filter(_ != null) filtered.foreachPartition(iter => {
if (iter.nonEmpty) {
//先获取当前Task的分区编号,然后根据Task分区编号再获取当前分区的偏移量
val offsetRange = offsetRanges(TaskContext.get.partitionId)
//获取一个Hbase的Connection【在Executor端获取的】
val connection: Connection = HBaseUtil.getConnection("node-1.51doit.cn,node-2.51doit.cn,node-3.51doit.cn", 2181)
val t_orders: Table = connection.getTable(TableName.valueOf("myorder")) //定义一个集合,将数据先缓存到集合中
val puts = new util.ArrayList[Put]()
//迭代分区中的每一条数据
iter.foreach(o => {
// new 了一个put,就是hbase一行数据
val put = new Put(Bytes.toBytes(o.oid)) //put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("order_id"), Bytes.toBytes(o.oid))
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("total_money"), Bytes.toBytes(o.totalMoney)) //如果是一个批次中的最后一条数据,将偏移量和数据同时写入Hbase的同一行中
if (!iter.hasNext) {
val topic = offsetRange.topic
val partition = offsetRange.partition
val untilOffset = offsetRange.untilOffset
put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("groupid"), Bytes.toBytes(groupId))
put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("topic_partition"), Bytes.toBytes(topic + "_" + partition))
put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("offset"), Bytes.toBytes(untilOffset))
} puts.add(put)
// if (puts.size() % 5 == 0) {
// t_orders.put(puts)
// puts.clear()
// } })
//批量写入
t_orders.put(puts)
//关闭Hbase的table
t_orders.close()
//关闭Hbase连接
connection.close() }
}) } }) ssc.start() ssc.awaitTermination() }
}

 HBaseUtil:建立连接Hbase连接

大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn
package com._51doit.utils

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory} /**
* Hbase的工具类,用来创建Hbase的Connection
*/
object HBaseUtil extends Serializable {
/**
* @param zkQuorum zookeeper地址,多个要用逗号分隔
* @param port zookeeper端口号
* @return
*/
def getConnection(zkQuorum: String, port: Int): Connection = synchronized {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", zkQuorum)
conf.set("hbase.zookeeper.property.clientPort", port.toString)
ConnectionFactory.createConnection(conf)
}
}

OffsetUtils(查询hbase偏移量)

大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn
  //每一次启动该程序,都要从Hbase查询历史偏移量
def queryHistoryOffsetFromHbase(view: String, groupid: String): Map[TopicPartition, Long] = { val offsets = new mutable.HashMap[TopicPartition, Long]() val connection = DriverManager.getConnection("jdbc:phoenix:node-1.51doit.cn,node-2.51doit.cn,node-3.51doit.cn:2181") val ps = connection.prepareStatement("select \"topic_partition\", max(\"offset\") from \"myorder\" where \"groupid\" = ? group by \"topic_partition\"") ps.setString(1, groupid) //查询返回结果
val rs: ResultSet = ps.executeQuery() while(rs.next()) { val topicAndPartition = rs.getString(1) val fields = topicAndPartition.split("_")
val topic = fields(0)
val partition = fields(1).toInt val offset = rs.getLong(2) offsets.put(new TopicPartition(topic, partition), offset) } offsets.toMap
}

注意:写入hbase与前面写入mysql,redis不同的是:此处是在executor进行写数据和偏移量(数据费聚合类,不能收集到driver端),所以在计算逻辑中需要根据任务id去获取指定的分区

//先获取当前Task的分区编号,然后根据Task分区编号再获取当前分区的偏移量
val offsetRange = offsetRanges(TaskContext.get.partitionId())

4.Spark StandAlone的执行模式

  具体见文档

5 spark on yarn

  具体见文档

大数据学习day34---spark14------1 redis的事务(pipeline)测试 ,2. 利用redis的pipeline实现数据统计的exactlyonce ,3 SparkStreaming中数据写入Hbase实现ExactlyOnce, 4.Spark StandAlone的执行模式,5 spark on yarn的更多相关文章

  1. 大数据学习day33----spark13-----1&period;两种方式管理偏移量并将偏移量写入redis 2&period; MySQL事务的测试 3&period;利用MySQL事务实现数据统计的ExactlyOnce(sql语句中出现相同key时如何进行累加(此处时出现相同的单词))4 将数据写入kafka

    1.两种方式管理偏移量并将偏移量写入redis (1)第一种:rdd的形式 一般是使用这种直连的方式,但其缺点是没法调用一些更加高级的api,如窗口操作.如果想更加精确的控制偏移量,就使用这种方式 代 ...

  2. 大数据学习——java操作hdfs环境搭建以及环境测试

    1 新建一个maven项目 打印根目录下的文件的名字 添加pom依赖 pom.xml <?xml version="1.0" encoding="UTF-8&quo ...

  3. redis未授权弱口令检测脚本(redis未授权访问漏洞,利用redis写webshell)

    以下如有雷同,不胜荣幸 * --- 示例代码!!!!!----*/ #! /usr/bin/env python # _*_  coding:utf-8 _*_ import socket impor ...

  4. Apache Spark源码走读之8 -- Spark on Yarn

    欢迎转载,转载请注明出处,徽沪一郎. 概要 Hadoop2中的Yarn是一个分布式计算资源的管理平台,由于其有极好的模型抽象,非常有可能成为分布式计算资源管理的事实标准.其主要职责将是分布式计算集群的 ...

  5. spark第四篇:Running Spark on YARN

    确保HADOOP_CONF_DIR或者YARN_CONF_DIR指向hadoop集群配置文件目录.这些配置用来写数据到hdfs以及连接yarn ResourceManager.(在$SPARK_HOM ...

  6. 04、Spark Standalone集群搭建

    04.Spark Standalone集群搭建 4.1 集群概述 独立模式是Spark集群模式之一,需要在多台节点上安装spark软件包,并分别启动master节点和worker节点.master节点 ...

  7. Spark记录(二):Spark程序的生命周期

    本文以Spark执行模式中最常见的集群模式为例,详细的描述一下Spark程序的生命周期(YARN作为集群管理器). 1.集群节点初始化 集群刚初始化的时候,或者之前的Spark任务完成之后,此时集群中 ...

  8. 论SparkStreaming的数据可靠性和一致性

    转自: http://www.csdn.net/article/2015-06-21/2825011 摘要:眼下大数据领域最热门的词汇之一便是流计算了,而其中最耀眼的无疑是来自Spark社区的Spar ...

  9. MapReduce和Spark写入Hbase多表总结

    作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处 大家都知道用mapreduce或者spark写入已知的hbase中的表时,直接在mapreduc ...

随机推荐

  1. 中国175个 AAAAA级风景区,去过20个 以上,你就是旅游达人

    省份 数量 景区名称 我 北京 7 故宫博物院 1 天坛公园 颐和园 1 八达岭-慕田峪长城旅游区 1 明十三陵景区(神路-定陵-长陵-昭陵) 恭王府景区 北京奥林匹克公园(鸟巢-水立方-中国科技馆- ...

  2. 转 A Week with Mozilla&&num;39&semi;s Rust

    转自http://relistan.com/a-week-with-mozilla-rust/ A Week with Mozilla's Rust I want another systems la ...

  3. makefile 分析 -- 内置变量及自动变量

    makefile 分析1  -p 选项,可以打印出make过程中的数据库, 下面研究一下内置的变量和规则. -n 选项, 只运行,不执行, -d 选项,相当于--debug=a,  b(basic), ...

  4. Java基础知识强化之网络编程笔记09:TCP之客户端键盘录入服务器写到文本文件中

    1. TCP之客户端键盘录入服务器写到文本文件中 (1)客户端: package cn.itcast_09; import java.io.BufferedReader; import java.io ...

  5. 类和对象:拾遗 - 零基础入门学习Python039

    类和对象:拾遗 让编程改变世界 Change the world by program 这节课谈的内容主要有: 组合 ...... 此处省略N多内容,具体请看视频讲解 ...... 类.类对象和实例对 ...

  6. win7系统无线 VirtualBox rehat 32位linux 下 host-only模式相互通信及上网 配置

    虚拟机环境:virtualBox虚拟环境 redhat 32位 主机环境 : win7 无线路由 模式: host-only win7下nat模式原先的设置基于 有网线连接的情形下,在使用了无线路由之 ...

  7. webviewactivity

    WebView注意点,注释里有说明 package com.example.suneyaenews; import com.example.http.HttpThread; import androi ...

  8. mysql数据库--explain(查询表是否走索引)各个字段含义

    1.1 id列 数字越大越先执行,如果说数字一样大,那么就从上往下依次执行,id列为null的就表是这是一个结果集,不需要使用它来进行查询. 1.2 select_type列 常见的有: ◆ simp ...

  9. 高通平台手机开发之LCD

    4.1. LCD 参考文档: 1) 80-NA157-174_E_DSI_Programing_Guide_B-Family_Android_Devices.pdf 2) 80-NN766-1_A_L ...

  10. 使用fiddler修改支付金额,支付必测

    使用fiddler4先把网页拦截,修改服务器返回的参数,并把修改后的数据包发送给服务器,若该页面价格修改成功则是一个重大bug. 以下是操作流程: 一.拦截订单请求 方法一: 1.在下方命令行输入命令 ...