Spark Streaming源码解读之Executor容错安全性

时间:2021-10-04 20:49:05

本节主要考虑:Executor的安全性

主要是数据的安全容错,计算是借助Spark Core的计算容错,本次暂不考虑。

数据容错天然方式就是数据副本,当前数据有问题就读取另外一份;十秒数据出问题,再次读取,支持数据重放。

天然借助BlockManager做数据备份,参照Spark Core,有不同的StoreageLevel备份策略:

lass StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {

receiver收到数据,存储是menemery_ser_2,指定二分副本:都放在内存,放不下就放到磁盘,例如:二分副本一份放在executor a中,另一份放在executor b中。

ReceiverSupervisorImpl:
private val host = SparkEnv.get.blockManager.blockManagerId.host
private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
private val receivedBlockHandler: ReceivedBlockHandler = {
  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    if (checkpointDirOption.isEmpty) {
      throw new SparkException(
        "Cannot enable receiver write-ahead log without checkpoint directory set. " +
          "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
          "See documentation for more details.")
    }
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
  } else {
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  }
}
/** A helper class with utility functions related to the WriteAheadLog interface */
private[streaming] object WriteAheadLogUtils extends Logging {
  val RECEIVER_WAL_ENABLE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.enable"
  val RECEIVER_WAL_CLASS_CONF_KEY = "spark.streaming.receiver.writeAheadLog.class"
  val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
    "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs"
  val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures"
  val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
    "spark.streaming.receiver.writeAheadLog.closeFileAfterWrite"

  val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class"
  val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
    "spark.streaming.driver.writeAheadLog.rollingIntervalSecs"
  val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures"
  val DRIVER_WAL_BATCHING_CONF_KEY = "spark.streaming.driver.writeAheadLog.allowBatching"
  val DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY = "spark.streaming.driver.writeAheadLog.batchingTimeout"
  val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
    "spark.streaming.driver.writeAheadLog.closeFileAfterWrite"

  val DEFAULT_ROLLING_INTERVAL_SECS = 60
  val DEFAULT_MAX_FAILURES = 3
 必须有ck目录,构建receiver的时候传进来的storagelevel是?
从业务代码的socketTextStream入手,找到storgaeleve=memory_and_disk_ser_2
/**
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes it interepreted as object using the given
 * converter.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param converter     Function to convert the byte stream to objects
 * @param storageLevel  Storage level to use for storing the received objects
 * @tparam T            Type of the objects received (after converting bytes to objects)
 */
def socketStream[T: ClassTag](
    hostname: String,
    port: Int,
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
/** Trait that represents a class that handles the storage of blocks received by receiver */
private[streaming] trait ReceivedBlockHandler {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

  var numRecords = None: Option[Long]

  val putResult: Seq[(BlockId, BlockStatus)] = block match {
    case ArrayBufferBlock(arrayBuffer) =>
      numRecords = Some(arrayBuffer.size.toLong)
      blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
        tellMaster = true)
    case IteratorBlock(iterator) =>
      val countIterator = new CountingIterator(iterator)
      val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
        tellMaster = true)
      numRecords = countIterator.count
      putResult
    case ByteBufferBlock(byteBuffer) =>
      blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
    case o =>
      throw new SparkException(
        s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
  }
  if (!putResult.map { _._1 }.contains(blockId)) {
    throw new SparkException(
      s"Could not store $blockId to block manager with storage level $storageLevel")
  }
  BlockManagerBasedStoreResult(blockId, numRecords)
}
 
/**
 * Put the given block according to the given level in one of the block stores, replicating
 * the values if necessary.
 *
 * The effective storage level refers to the level according to which the block will actually be
 * handled. This allows the caller to specify an alternate behavior of doPut while preserving
 * the original level specified by the user.
 */
private def doPut(
    blockId: BlockId,
    data: BlockValues,
    level: StorageLevel,
    tellMaster: Boolean = true,
    effectiveStorageLevel: Option[StorageLevel] = None)
  : Seq[(BlockId, BlockStatus)] = {

  require(blockId != null, "BlockId is null")
  require(level != null && level.isValid, "StorageLevel is null or invalid")
  effectiveStorageLevel.foreach { level =>
    require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
  }

  // Return value
  val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

  /* Remember the block's storage level so that we can correctly drop it to disk if it needs
   * to be dropped right after it got put into memory. Note, however, that other threads will
   * not be able to get() this block until we call markReady on its BlockInfo. */
  val putBlockInfo = {
    val tinfo = new BlockInfo(level, tellMaster)
    // Do atomically !
    val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
    if (oldBlockOpt.isDefined) {
      if (oldBlockOpt.get.waitForReady()) {
        logWarning(s"Block $blockId already exists on this machine; not re-adding it")
        return updatedBlocks
      }
      // TODO: So the block info exists - but previous attempt to load it (?) failed.
     
// What do we do now ? Retry on it ?
      oldBlockOpt.get
    } else {
      tinfo
    }
  }

  val startTimeMs = System.currentTimeMillis

 
/* If we're storing values and we need to replicate the data, we'll want access to the values,
   * but because our put will read the whole iterator, there will be no values left. For the
   * case where the put serializes data, we'll remember the bytes, above; but for the case where
   * it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */
  var valuesAfterPut: Iterator[Any] = null

 
// Ditto for the bytes after the put
  var bytesAfterPut: ByteBuffer = null

 
// Size of the block in bytes
  var size = 0L

  // The level we actually use to put the block
  val putLevel = effectiveStorageLevel.getOrElse(level)

  // If we're storing bytes, then initiate the replication before storing them locally.
  // This is faster as data is already serialized and ready to send.
  val replicationFuture = data match {
    case b: ByteBufferValues if putLevel.replication > 1 =>
      // Duplicate doesn't copy the bytes, but just creates a wrapper
      val bufferView = b.buffer.duplicate()
      Future {
        // This is a blocking action and should run in futureExecutionContext which is a cached
        // thread pool
        replicate(blockId, bufferView, putLevel)
      }(futureExecutionContext)
    case _ => null
 
A机器接收数据、存储,同时通过C机器做备份,一旦A挂了会瞬间切换到C机器。
HBase也会做wal,默认做一份日志,后面出问题的话,做日志恢复,默认需要写CheckPoint中,看ReceiverSupervisorImpl的53-65行。
默认放在hdfs上日志CheckPoint默认有三分副本,浪费磁盘,但安全。
写log 的方式日志会有很多需要streamid,默认情况不需要此id。在看receiverblockhandler的121行,先写日志在放blockmanager;继续观察134行 effectiveStorgaelevel,假如做了wal,没有必要把sotagelevel的replication变成2份?没有必要,浪费磁盘,CheckPoint默认存在hdfs下,有3份副本。继续153行,封装线程池想并发存储数据,并发进行,然后就是166行的storeblock,186行,196行 write写到block中系列化,202行放到线程做,217写入目录、receiverddata,看40行Write是个接口方法 看下writeAheadLog类描述,read、write、clean、readall,wal写数据顺序的写,不能修改数据,所以
读的时候按照游标或指针读取record,读取数据在哪里,效率很高,顺序的写、随机的读,没有追加、修改、删除等操作;
Batch的时候会构建一个文件,writer之后会返回一个句柄,读数据的时候就根据这个句柄,看writeAcheadLog的 public
Writeaheadlogrecordhandle,其子类实现是一个case class,filebase。。。,路径、索引、长度读取wal中的数据。
Readall读取全部内容,clean根据时间清理数据,看下writeaheadlog的子类:FileBasedwriteaheadlog的注释,管理wal文件,周期性写文件、出错的时候读数据,写时用writer、读时用reader,默认不是hadoop读写方式注意;
在看def write描述:不严谨的hdfs,fileSegment 就是刚才说的path、offset、length,getLogWriter然后就writer,
看下getlogwriter会产生很多小文件纯粹是java对文件操作。
在看read部分:hasnext方法,继续看FileBasedWraterahedlograndomreader 的read方法随机读,在看filebasewriteaheadlogread的hasnext方法,没有操作句柄就只有迭代来读取数据。思想理解,然后就是具体代码。
二中方式:
1、blockmanger
2、wal的方式,考性能,如果容纳1分钟延迟可以考虑,但有可能会有数据丢失
数据重放:
考虑到kafka,receiver方式使用zk来管理元数据,数据重复消费问题,数据消费后没有来得及告诉zk,生产环境下
直接derect,这种方式能确保有且只有一次读取数据,directkafkainputdstream类,本身会负责offset的,管理元数据信息
,每次batch生成会调用leatsetleadersoffsets,找到offset的范围,就可以确定rdd的数据源,每个batch执行,看最新的
Offset-之前的offset,找到其范围,读取根据范围获取;在看kafkaRDD,其核心根据offset读取数据,基于读取的数据进行
计算,getpartitions根据offsetrange读取数据,实际上真正读数据 需要读kafka从集群读取,实际上是一个simpleConsumeer
消费者,再看simpleConsumer,它是kafka下的包,读取kafka中的信息。这里直接读取kafak的信息,确定of的范围,分配batch的数据,操作后会ck数据,把kafak当做底层文件系统。代码只有几百行非常简单。弊端是耗时,但不是所有情况下
都不可以丢数据的,一般5%的数据丢失范围。
假如从作业调度容错层面:
1000个block丢失一个block也算丢失,丢失也算拉起数据失败,通过wal来恢复,其他的999个也需要重新恢复,恢复力度太粗,可以修改其源码来控制只需要找回一个即可。
副本二中方式:在内存中二分数据;wal;数据重放;
明天需要讲解driver端的安全,也是编程的关键;

Spark发行版笔记12

 

新浪微博:http://weibo.com/ilovepains

 

微信公众号:DT_Spark

 

博客:http://blog.sina.com.cn/ilovepains

 

手机:18610086859

 

QQ:1740415547

 

邮箱:18610086859@vip.126.com