void launchTask(ExecutorDriver* driver, const TaskInfo& task)
{
if (run.isSome()) {
TaskStatus status;
status.mutable_task_id()->CopyFrom(task.task_id());
status.set_state(TASK_FAILED);
status.set_message(
"Attempted to run multiple tasks using a \"docker\" executor");
-
driver->sendStatusUpdate(status);
return;
}
-
// Capture the TaskID.
taskId = task.task_id();
-
cout << "Starting task " << taskId.get() << endl;
-
CHECK(task.has_container());
CHECK(task.has_command());
-
CHECK(task.container().type() == ContainerInfo::DOCKER);
-
// We're adding task and executor resources to launch docker since
// the DockerContainerizer updates the container cgroup limits
// directly and it expects it to be the sum of both task and
// executor resources. This does leave to a bit of unaccounted
// resources for running this executor, but we are assuming
// this is just a very small amount of overcommit.
run = docker->run(
task.container(),
task.command(),
containerName,
sandboxDirectory,
mappedDirectory,
task.resources() + task.executor().resources(),
None(),
Subprocess::FD(STDOUT_FILENO),
Subprocess::FD(STDERR_FILENO));
-
run->onAny(defer(self(), &Self::reaped, driver, lambda::_1));
-
// Delay sending TASK_RUNNING status update until we receive
// inspect output.
inspect = docker->inspect(containerName, DOCKER_INSPECT_DELAY)
.then(defer(self(), [=](const Docker::Container& container) {
if (!killed) {
TaskStatus status;
status.mutable_task_id()->CopyFrom(taskId.get());
status.set_state(TASK_RUNNING);
status.set_data(container.output);
if (container.ipAddress.isSome()) {
// TODO(karya): Deprecated -- Remove after 0.25.0 has shipped.
Label* label = status.mutable_labels()->add_labels();
label->set_key("Docker.NetworkSettings.IPAddress");
label->set_value(container.ipAddress.get());
-
NetworkInfo* networkInfo =
status.mutable_container_status()->add_network_infos();
-
// TODO(CD): Deprecated -- Remove after 0.27.0.
networkInfo->set_ip_address(container.ipAddress.get());
-
NetworkInfo::IPAddress* ipAddress =
networkInfo->add_ip_addresses();
ipAddress->set_ip_address(container.ipAddress.get());
}
driver->sendStatusUpdate(status);
}
-
return Nothing();
}));
-
inspect.onReady(
defer(self(), &Self::launchHealthCheck, containerName, task));
}