Mesos源码分析(16): mesos-docker-executor的运行

时间:2023-03-09 14:25:28
Mesos源码分析(16): mesos-docker-executor的运行

mesos-docker-executor的运行代码在src/docker/executor.cpp中

 

  1. int main(int argc, char** argv)
  2. {
  3.   GOOGLE_PROTOBUF_VERIFY_VERSION;
  4.   mesos::internal::docker::Flags flags;
  5.   // Load flags from environment and command line.
  6.   Try<Nothing> load = flags.load(None(), &argc, &argv);
  7.   if (load.isError()) {
  8.     cerr << flags.usage(load.error()) << endl;
  9.     return EXIT_FAILURE;
  10.   }
  11.   std::cout << stringify(flags) << std::endl;
  12.   mesos::internal::logging::initialize(argv[0], flags, true); // Catch signals.
  13.   if (flags.help) {
  14.     cout << flags.usage() << endl;
  15.     return EXIT_SUCCESS;
  16.   }
  17.   std::cout << stringify(flags) << std::endl;
  18.   if (flags.docker.isNone()) {
  19.     cerr << flags.usage("Missing required option --docker") << endl;
  20.     return EXIT_FAILURE;
  21.   }
  22.   if (flags.container.isNone()) {
  23.     cerr << flags.usage("Missing required option --container") << endl;
  24.     return EXIT_FAILURE;
  25.   }
  26.   if (flags.sandbox_directory.isNone()) {
  27.     cerr << flags.usage("Missing required option --sandbox_directory") << endl;
  28.     return EXIT_FAILURE;
  29.   }
  30.   if (flags.mapped_directory.isNone()) {
  31.     cerr << flags.usage("Missing required option --mapped_directory") << endl;
  32.     return EXIT_FAILURE;
  33.   }
  34.   if (flags.stop_timeout.isNone()) {
  35.     cerr << flags.usage("Missing required option --stop_timeout") << endl;
  36.     return EXIT_FAILURE;
  37.   }
  38.   if (flags.launcher_dir.isNone()) {
  39.     cerr << flags.usage("Missing required option --launcher_dir") << endl;
  40.     return EXIT_FAILURE;
  41.   }
  42.   // The 2nd argument for docker create is set to false so we skip
  43.   // validation when creating a docker abstraction, as the slave
  44.   // should have already validated docker.
  45.   Try<Owned<Docker>> docker = Docker::create(
  46.       flags.docker.get(),
  47.       flags.docker_socket.get(),
  48.       false);
  49.   if (docker.isError()) {
  50.     cerr << "Unable to create docker abstraction: " << docker.error() << endl;
  51.     return EXIT_FAILURE;
  52.   }
  53.   mesos::internal::docker::DockerExecutor executor(
  54.       docker.get(),
  55.       flags.container.get(),
  56.       flags.sandbox_directory.get(),
  57.       flags.mapped_directory.get(),
  58.       flags.stop_timeout.get(),
  59.       flags.launcher_dir.get());
  60.   mesos::MesosExecutorDriver driver(&executor);
  61.   return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
  62. }

 

如上一篇文章对MesosExecutorDriver的分析,Mesos-slave给Executor发送message运行Task,会调用DockerExecutor的launchTask函数。

  1. virtual
    void launchTask(ExecutorDriver* driver, const TaskInfo& task)
  2. {
  3.   dispatch(process.get(), &DockerExecutorProcess::launchTask, driver, task);
  4. }

 

最后调用DockerExecutorProcess的launchTask函数。

  1. void launchTask(ExecutorDriver* driver, const TaskInfo& task)
  2. {
  3.   if (run.isSome()) {
  4.     TaskStatus status;
  5.     status.mutable_task_id()->CopyFrom(task.task_id());
  6.     status.set_state(TASK_FAILED);
  7.     status.set_message(
  8.         "Attempted to run multiple tasks using a \"docker\" executor");
  9.     driver->sendStatusUpdate(status);
  10.     return;
  11.   }
  12.   // Capture the TaskID.
  13.   taskId = task.task_id();
  14.   cout << "Starting task " << taskId.get() << endl;
  15.   CHECK(task.has_container());
  16.   CHECK(task.has_command());
  17.   CHECK(task.container().type() == ContainerInfo::DOCKER);
  18.   // We're adding task and executor resources to launch docker since
  19.   // the DockerContainerizer updates the container cgroup limits
  20.   // directly and it expects it to be the sum of both task and
  21.   // executor resources. This does leave to a bit of unaccounted
  22.   // resources for running this executor, but we are assuming
  23.   // this is just a very small amount of overcommit.
  24.   run = docker->run(
  25.       task.container(),
  26.       task.command(),
  27.       containerName,
  28.       sandboxDirectory,
  29.       mappedDirectory,
  30.       task.resources() + task.executor().resources(),
  31.       None(),
  32.       Subprocess::FD(STDOUT_FILENO),
  33.       Subprocess::FD(STDERR_FILENO));
  34.   run->onAny(defer(self(), &Self::reaped, driver, lambda::_1));
  35.   // Delay sending TASK_RUNNING status update until we receive
  36.   // inspect output.
  37.   inspect = docker->inspect(containerName, DOCKER_INSPECT_DELAY)
  38.     .then(defer(self(), [=](const Docker::Container& container) {
  39.       if (!killed) {
  40.         TaskStatus status;
  41.         status.mutable_task_id()->CopyFrom(taskId.get());
  42.         status.set_state(TASK_RUNNING);
  43.         status.set_data(container.output);
  44.         if (container.ipAddress.isSome()) {
  45.           // TODO(karya): Deprecated -- Remove after 0.25.0 has shipped.
  46.           Label* label = status.mutable_labels()->add_labels();
  47.           label->set_key("Docker.NetworkSettings.IPAddress");
  48.           label->set_value(container.ipAddress.get());
  49.           NetworkInfo* networkInfo =
  50.             status.mutable_container_status()->add_network_infos();
  51.           // TODO(CD): Deprecated -- Remove after 0.27.0.
  52.           networkInfo->set_ip_address(container.ipAddress.get());
  53.           NetworkInfo::IPAddress* ipAddress =
  54.             networkInfo->add_ip_addresses();
  55.           ipAddress->set_ip_address(container.ipAddress.get());
  56.         }
  57.         driver->sendStatusUpdate(status);
  58.       }
  59.       return Nothing();
  60.     }));
  61.   inspect.onReady(
  62.       defer(self(), &Self::launchHealthCheck, containerName, task));
  63. }

 

调用Docker函数启容器。