Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task

时间:2021-10-04 21:42:00

 

MesosSchedulerDriver的代码在src/sched/sched.cpp里面实现。

 

Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task

 

Driver->run()调用start()

 

Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task

 

首先检测Mesos-Master的leader

 

Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task

 

Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task

 

创建一个线程。

 

SchedulerProcess的initialize()函数

 

里面主要注册消息处理函数。

 

  1. virtual
    void initialize()
  2.  {
  3.    install<Event>(&SchedulerProcess::receive);
  4.  
  5.    // TODO(benh): Get access to flags so that we can decide whether
  6.    // or not to make ZooKeeper verbose.
  7.    install<FrameworkRegisteredMessage>(
  8.        &SchedulerProcess::registered,
  9.        &FrameworkRegisteredMessage::framework_id,
  10.        &FrameworkRegisteredMessage::master_info);
  11.  
  12.    install<FrameworkReregisteredMessage>(
  13.        &SchedulerProcess::reregistered,
  14.        &FrameworkReregisteredMessage::framework_id,
  15.        &FrameworkReregisteredMessage::master_info);
  16.  
  17.    install<ResourceOffersMessage>(
  18.        &SchedulerProcess::resourceOffers,
  19.        &ResourceOffersMessage::offers,
  20.        &ResourceOffersMessage::pids);
  21.  
  22.    install<RescindResourceOfferMessage>(
  23.        &SchedulerProcess::rescindOffer,
  24.        &RescindResourceOfferMessage::offer_id);
  25.  
  26.    install<StatusUpdateMessage>(
  27.        &SchedulerProcess::statusUpdate,
  28.        &StatusUpdateMessage::update,
  29.        &StatusUpdateMessage::pid);
  30.  
  31.    install<LostSlaveMessage>(
  32.        &SchedulerProcess::lostSlave,
  33.        &LostSlaveMessage::slave_id);
  34.  
  35.    install<ExitedExecutorMessage>(
  36.        &SchedulerProcess::lostExecutor,
  37.        &ExitedExecutorMessage::executor_id,
  38.        &ExitedExecutorMessage::slave_id,
  39.        &ExitedExecutorMessage::status);
  40.  
  41.    install<ExecutorToFrameworkMessage>(
  42.        &SchedulerProcess::frameworkMessage,
  43.        &ExecutorToFrameworkMessage::slave_id,
  44.        &ExecutorToFrameworkMessage::executor_id,
  45.        &ExecutorToFrameworkMessage::data);
  46.  
  47.    install<FrameworkErrorMessage>(
  48.        &SchedulerProcess::error,
  49.        &FrameworkErrorMessage::message);
  50.  
  51.    // Start detecting masters.
  52.    detector->detect()
  53.      .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
  54.  }

 

在前面的文章中,Mesos源码分析(6): Mesos Master的初始化中,

Allocator的initialize函数中,传入的OfferCallback是Master::offer。 

每过allocation_interval,Allocator都会计算每个framework的offer,然后依次调用Master::offer,将资源offer给相应的framework

在Master::offer函数中,生成如下的ResourceOffersMessage,并且发送给Framework。

 

对应到这里当Driver收到ResourceOffersMessage的消息的时候,会调用SchedulerProcess::resourceOffers

 

  1.   void resourceOffers(
  2.       const UPID& from,
  3.       const vector<Offer>& offers,
  4.       const vector<string>& pids)
  5.   {
  6. ……
  7.     VLOG(2) << "Received " << offers.size() << " offers";
  8. ……
  9.     scheduler->resourceOffers(driver, offers);
  10.  
  11.     VLOG(1) << "Scheduler::resourceOffers took " << stopwatch.elapsed();
  12.   }

 

最终调用了Framework的resourceOffers。

 

Test Framework的resourceOffers函数,根据得到的offers,创建一系列tasks,然后调用driver的launchTasks函数

  1. virtual
    void resourceOffers(SchedulerDriver* driver,
  2.                             const vector<Offer>& offers)
  3. {
  4.   foreach (const Offer& offer, offers) {
  5.     cout << "Received offer " << offer.id() << " with " << offer.resources()
  6.          << endl;
  7.  
  8.     static
    const Resources TASK_RESOURCES = Resources::parse(
  9.         "cpus:" + stringify(CPUS_PER_TASK) +
  10.         ";mem:" + stringify(MEM_PER_TASK)).get();
  11.  
  12.     Resources remaining = offer.resources();
  13.  
  14.     // Launch tasks.
  15.     vector<TaskInfo> tasks;
  16.     while (tasksLaunched < totalTasks &&
  17.            remaining.flatten().contains(TASK_RESOURCES)) {
  18.       int taskId = tasksLaunched++;
  19.  
  20.       cout << "Launching task " << taskId << " using offer "
  21.            << offer.id() << endl;
  22.  
  23.       TaskInfo task;
  24.       task.set_name("Task " + lexical_cast<string>(taskId));
  25.       task.mutable_task_id()->set_value(lexical_cast<string>(taskId));
  26.       task.mutable_slave_id()->MergeFrom(offer.slave_id());
  27.       task.mutable_executor()->MergeFrom(executor);
  28.  
  29.       Option<Resources> resources =
  30.         remaining.find(TASK_RESOURCES.flatten(role));
  31.  
  32.       CHECK_SOME(resources);
  33.       task.mutable_resources()->MergeFrom(resources.get());
  34.       remaining -= resources.get();
  35.  
  36.       tasks.push_back(task);
  37.     }
  38.  
  39.     driver->launchTasks(offer.id(), tasks);
  40.   }
  41. }

 

SchedulerProcess的launchTasks函数实现如下:

  1. void launchTasks(const vector<OfferID>& offerIds,
  2.                  const vector<TaskInfo>& tasks,
  3.                  const Filters& filters)
  4. {
  5.   Offer::Operation operation;
  6.   operation.set_type(Offer::Operation::LAUNCH);
  7.  
  8.   Offer::Operation::Launch* launch = operation.mutable_launch();
  9.   foreach (const TaskInfo& task, tasks) {
  10.     launch->add_task_infos()->CopyFrom(task);
  11.   }
  12.  
  13.   acceptOffers(offerIds, {operation}, filters);
  14. }
  15.  
  16. void acceptOffers(
  17.     const vector<OfferID>& offerIds,
  18.     const vector<Offer::Operation>& operations,
  19.     const Filters& filters)
  20. {
  21.   // TODO(jieyu): Move all driver side verification to master since
  22.   // we are moving towards supporting pure launguage scheduler.
  23.  
  24.   if (!connected) {
  25.     VLOG(1) << "Ignoring accept offers message as master is disconnected";
  26.  
  27.     // NOTE: Reply to the framework with TASK_LOST messages for each
  28.     // task launch. See details from notes in launchTasks.
  29.     foreach (const Offer::Operation& operation, operations) {
  30.       if (operation.type() != Offer::Operation::LAUNCH) {
  31.         continue;
  32.       }
  33.  
  34.       foreach (const TaskInfo& task, operation.launch().task_infos()) {
  35.         StatusUpdate update = protobuf::createStatusUpdate(
  36.             framework.id(),
  37.             None(),
  38.             task.task_id(),
  39.             TASK_LOST,
  40.             TaskStatus::SOURCE_MASTER,
  41.             None(),
  42.             "Master disconnected",
  43.             TaskStatus::REASON_MASTER_DISCONNECTED);
  44.  
  45.         statusUpdate(UPID(), update, UPID());
  46.       }
  47.     }
  48.     return;
  49.   }
  50.  
  51.   Call call;
  52.   CHECK(framework.has_id());
  53.   call.mutable_framework_id()->CopyFrom(framework.id());
  54.   call.set_type(Call::ACCEPT);
  55.  
  56.   Call::Accept* accept = call.mutable_accept();
  57.  
  58.   // Setting accept.operations.
  59.   foreach (const Offer::Operation& _operation, operations) {
  60.     Offer::Operation* operation = accept->add_operations();
  61.     operation->CopyFrom(_operation);
  62.   }
  63.  
  64.   // Setting accept.offer_ids.
  65.   foreach (const OfferID& offerId, offerIds) {
  66.     accept->add_offer_ids()->CopyFrom(offerId);
  67.  
  68.     if (!savedOffers.contains(offerId)) {
  69.       // TODO(jieyu): A duplicated offer ID could also cause this
  70.       // warning being printed. Consider refine this message here
  71.       // and in launchTasks as well.
  72.       LOG(WARNING) << "Attempting to accept an unknown offer " << offerId;
  73.     } else {
  74.       // Keep only the slave PIDs where we run tasks so we can send
  75.       // framework messages directly.
  76.       foreach (const Offer::Operation& operation, operations) {
  77.         if (operation.type() != Offer::Operation::LAUNCH) {
  78.           continue;
  79.         }
  80.  
  81.         foreach (const TaskInfo& task, operation.launch().task_infos()) {
  82.           const SlaveID& slaveId = task.slave_id();
  83.  
  84.           if (savedOffers[offerId].contains(slaveId)) {
  85.             savedSlavePids[slaveId] = savedOffers[offerId][slaveId];
  86.           } else {
  87.             LOG(WARNING) << "Attempting to launch task " << task.task_id()
  88.                          << " with the wrong slave id " << slaveId;
  89.           }
  90.         }
  91.       }
  92.     }
  93.  
  94.     // Remove the offer since we saved all the PIDs we might use.
  95.     savedOffers.erase(offerId);
  96.   }
  97.  
  98.   // Setting accept.filters.
  99.   accept->mutable_filters()->CopyFrom(filters);
  100.  
  101.   CHECK_SOME(master);
  102.   send(master.get().pid(), call);
  103. }

 

最终向Mesos-Master的leader发送launchTasks的消息。