本篇博文的目标是
1. Executor的WAL机制详解
2. 消息重放Kafka
数据安全性的考虑:
- Spark Streaming不断的接收数据,并且不断的产生Job,不断的提交Job给集群运行。所以这就涉及到一个非常重要的问题数据安全性。
- Spark Streaming是基于Spark Core之上的,如果能够确保数据安全可好的话,在Spark Streaming生成Job的时候里面是基于RDD,即使运行的时候出现问题,那么Spark Streaming也可以借助Spark Core的容错机制自动容错。
- 对Executor容错主要是对数据的安全容错
- 为啥这里不考虑对数据计算的容错:计算的时候Spark Streaming是借助于Spark Core之上的容错的,所以天然就是安全可靠的。
Executor容错方式:
1. 最简单的容错是副本方式,基于底层BlockManager副本容错,也是默认的容错方式。
2. 接收到数据之后不做副本,支持数据重放,所谓重放就是支持反复读取数据。
BlockManager备份:
- 默认在内存中两份副本,也就是Spark Streaming的Receiver接收到数据之后存储的时候指定StorageLevel为MEMORY_AND_DISK_SER_2,底层存储是交给BlockManager,BlockManager的语义确保了如果指定了两份副本,一般都在内存中。所以至少两个Executor中都会有数据。
/**
* :: DeveloperApi ::
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
* ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
* to replicate the RDD partitions on multiple nodes.
*
* The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
* factory method of the singleton object (`StorageLevel(...)`).
*/
@DeveloperApi
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
2. ReceiverBlockHandler源码如下:
private val receivedBlockHandler: ReceivedBlockHandler = {
//如果要开启WAL必须要有checkpoint目录。
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager,
//此时的storageLevel是构建Receiver的时候传入进来的
receiver.storageLevel)
}
}
3. 默认没有开启WAL机制。
/** A helper class with utility functions related to the WriteAheadLog interface */
private[streaming] object WriteAheadLogUtils extends Logging {
val RECEIVER_WAL_ENABLE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.enable"
val RECEIVER_WAL_CLASS_CONF_KEY = "spark.streaming.receiver.writeAheadLog.class"
val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
"spark.streaming.receiver.writeAheadLog.rollingIntervalSecs"
val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures"
val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
"spark.streaming.receiver.writeAheadLog.closeFileAfterWrite"
val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class"
val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
"spark.streaming.driver.writeAheadLog.rollingIntervalSecs"
val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures"
val DRIVER_WAL_BATCHING_CONF_KEY = "spark.streaming.driver.writeAheadLog.allowBatching"
val DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY = "spark.streaming.driver.writeAheadLog.batchingTimeout"
val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
"spark.streaming.driver.writeAheadLog.closeFileAfterWrite"
val DEFAULT_ROLLING_INTERVAL_SECS = 60
val DEFAULT_MAX_FAILURES = 3
def enableReceiverLog(conf: SparkConf): Boolean = {
conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
}
4. 例如socketTextStream源码如下:
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(
hostname: String,
port: Int,
//初始化了storageLevel
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
5. BlockManagerBasedBlockHandler源码如下:
/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks into a block manager with the specified storage level.
*/
private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
extends ReceivedBlockHandler with Logging {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
var numRecords = None: Option[Long]
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
numRecords = Some(arrayBuffer.size.toLong)
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
tellMaster = true)
case IteratorBlock(iterator) =>
val countIterator = new CountingIterator(iterator)
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
tellMaster = true)
numRecords = countIterator.count
putResult
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case o =>
throw new SparkException(
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
}
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
BlockManagerBasedStoreResult(blockId, numRecords)
}
def cleanupOldBlocks(threshTime: Long) {
// this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
// of BlockRDDs.
}
}
6. 具体实现是通过putIterator。
def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
}
7. doPut源码如下:
// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool}
//通过replicate将数据备份到其他节点上。
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null
}
8. replicate源码如下:把数据备份到另一个节点。
/**
* Replicate block to another node. Not that this is a blocking call that returns after
* the block has been replicated.
*/
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)
WAL方式
1. 干其他事情之前写入log日志中。将此日志写入目录下,也就是checkpoint目录下。如果作业失败的话,可以基于此日志进行恢复。
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
//因为可能有好几个receiver,所以这里需要streamId.
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
//而BlockManager是基于RDD容错的,所以就不需要了。
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
2. ReceivedBlockHandler源码如下:实现了ReceiverBlockHandler
/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks in both, a write ahead log and a block manager.
*/
private[streaming] class WriteAheadLogBasedBlockHandler(
blockManager: BlockManager,
streamId: Int,
storageLevel: StorageLevel,
conf: SparkConf,
hadoopConf: Configuration,
checkpointDir: String,
clock: Clock = new SystemClock
) extends ReceivedBlockHandler with Logging {
3. 使用WAL,就没必要将replication变成2份。WAL是写到checkpoint目录中,而checkpoint是保持在HDFS中,HDFS默认是3份副本。
private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
s" write ahead log is enabled, change to serialization false")
}
if (storageLevel.replication > 1) {
logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
s"write ahead log is enabled, change to replication 1")
}
4. 存储数据的时候是同时往WAL和BlockManager中放数据。
/**
* This implementation stores the block into the block manager as well as a write ahead log.
* It does this in parallel, using Scala Futures, and returns only after the block has
* been stored in both places.
*/
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
var numRecords = None: Option[Long]
// Serialize the block so that it can be inserted into both
val serializedBlock = block match {
case ArrayBufferBlock(arrayBuffer) =>
numRecords = Some(arrayBuffer.size.toLong)
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
case IteratorBlock(iterator) =>
val countIterator = new CountingIterator(iterator)
val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
numRecords = countIterator.count
serializedBlock
case ByteBufferBlock(byteBuffer) =>
byteBuffer
case _ =>
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
}
5. 然后将数据存储到BlockManager中。
// Store the block in block manager
val storeInBlockManagerFuture = Future {
val putResult =
blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
}
6. 使用write方法写入到log中
// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
//block本身要可序列化。
writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}
7. WAL写数据的时候是顺序写,数据不可修改,所以读的时候只需要按照指针(也就是要读的record在那,长度是多少)读即可。所以WAL的速度非常快。
/**
* :: DeveloperApi ::
*
* This abstract class represents a write ahead log (aka journal) that is used by Spark Streaming
* to save the received data (by receivers) and associated metadata to a reliable storage, so that
* they can be recovered after driver failures. See the Spark documentation for more information
* on how to plug in your own custom implementation of a write ahead log.
*/
@org.apache.spark.annotation.DeveloperApi
public abstract class WriteAheadLog {
Record handle包含了所有的读和写所必要信息,时间作为索引
/**
* Write the record to the log and return a record handle, which contains all the information
* necessary to read back the written record. The time is used to the index the record,
* such that it can be cleaned later. Note that implementations of this abstract class must
* ensure that the written data is durable and readable (using the record handle) by the
* time this function returns.
*/
// WriteAheadLogRecordHandle使用该句柄读取数据
abstract public WriteAheadLogRecordHandle write(ByteBuffer record, long time);
/**
* Read a written record based on the given record handle.
*/
abstract public ByteBuffer read(WriteAheadLogRecordHandle handle);
/**
* Read and return an iterator of all the records that have been written but not yet cleaned up.
*/
abstract public Iterator<ByteBuffer> readAll();
/**
* Clean all the records that are older than the threshold time. It can wait for
* the completion of the deletion.
*/
//清除过时的目录
abstract public void clean(long threshTime, boolean waitForCompletion);
/**
* Close this log and release any resources.
*/
abstract public void close();
}
8. WriteAheadLogRecordHandle的实现是FileBasedWriteAheadLogSegment.
9. Path: 在哪个目录下,offset:索引,length:长度,基于此就可以索引到数据的位置。
/** Class for representing a segment of data in a write ahead log file */
private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int)
extends WriteAheadLogRecordHandle
10. WriteAheadLog的实现如下:
11. FileBasedWriteAheadLog管理WAL文件。
/**
* This class manages write ahead log files.
*
* - Writes records (bytebuffers) to periodically rotating log files.
* - Recovers the log files and the reads the recovered records upon failures.
* - Cleans up old log files.
*
* Uses [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]] to write
* and [[org.apache.spark.streaming.util.FileBasedWriteAheadLogReader]] to read.
*
* @param logDirectory Directory when rotating log files will be created.
* @param hadoopConf Hadoop configuration for reading/writing log files.
*/
private[streaming] class FileBasedWriteAheadLog(
12. 直接将数据写入到HDFS的checkpoint
/**
* Write a byte buffer to the log file. This method synchronously writes the data in the
* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
* to HDFS, and will be available for readers to read.
*/
def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
var fileSegment: FileBasedWriteAheadLogSegment = null
var failures = 0
var lastException: Exception = null
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
// getLogWriter获得Writer
fileSegment = getLogWriter(time).write(byteBuffer)
if (closeFileAfterWrite) {
resetWriter()
}
succeeded = true
} catch {
case ex: Exception =>
lastException = ex
logWarning("Failed to write to write ahead log")
resetWriter()
failures += 1
}
}
if (fileSegment == null) {
logError(s"Failed to write to write ahead log after $failures failures")
throw lastException
}
fileSegment
}
13. 不同时间不同条件下,会写入到不同的文件中,会有很多小文件。
/** Get the current log writer while taking care of rotation */
private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized {
if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
resetWriter()
currentLogPath.foreach {
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
}
currentLogWriterStartTime = currentTime
currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
val newLogPath = new Path(logDirectory,
timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
currentLogPath = Some(newLogPath.toString)
currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf)
}
currentLogWriter
}
14. Read部分
/**
* A random access reader for reading write ahead log files written using
* [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. Given the file segment info,
* this reads the record (ByteBuffer) from the log file.
*/
private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf: Configuration)
extends Closeable {
private val instream = HdfsUtils.getInputStream(path, conf)
private var closed = (instream == null) // the file may be deleted as we're opening the stream
def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
//先找到指针索引
assertOpen()
instream.seek(segment.offset)
val nextLength = instream.readInt()
HdfsUtils.checkState(nextLength == segment.length,
s"Expected message length to be ${segment.length}, but was $nextLength")
val buffer = new Array[Byte](nextLength)
instream.readFully(buffer)
ByteBuffer.wrap(buffer)
}
支持数据存放。在实际的开发中直接使用Kafka,因为不需要容错,也不需要副本。
Kafka有Receiver方式和Direct方式
Receiver方式:是交给Zookeeper去管理数据的,也就是偏移量offSet.如果失效后,Kafka会基于offSet重新读取,因为处理数据的时候中途崩溃,不会给Zookeeper发送ACK,此时Zookeeper认为你并没有消息这个数据。但是在实际中越来用的越多的是Direct的方式直接操作offSet.而且还是自己管理offSet.
- DirectKafkaInputDStream会去查看最新的offSet,并且把offSet放到Batch中。
- 在Batch每次生成的时候都会调用latestLeaderOffsets查看最近的offSet,此时的offSet就会与上一个offSet相减获得这个Batch的范围。这样就可以知道读那些数据。
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
// Either.fold would confuse @tailrec, do it manually
if (o.isLeft) {
val err = o.left.get.toString
if (retries <= 0) {
throw new SparkException(err)
} else {
log.error(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
} else {
o.right.get
}
}