// Create an offer for each slave and add it to the message.
ResourceOffersMessage message;
Framework* framework = CHECK_NOTNULL(frameworks.registered[frameworkId]);
foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
……
Slave* slave = slaves.registered.get(slaveId);
CHECK_NOTNULL(slave);
……
// TODO(bmahler): Set "https" if only "https" is supported.
mesos::URL url;
url.set_scheme("http");
url.mutable_address()->set_hostname(slave->info.hostname());
url.mutable_address()->set_ip(stringify(slave->pid.address.ip));
url.mutable_address()->set_port(slave->pid.address.port);
url.set_path("/" + slave->pid.id);
Offer* offer = new Offer();
offer->mutable_id()->MergeFrom(newOfferId());
offer->mutable_framework_id()->MergeFrom(framework->id());
offer->mutable_slave_id()->MergeFrom(slave->id);
offer->set_hostname(slave->info.hostname());
offer->mutable_url()->MergeFrom(url);
offer->mutable_resources()->MergeFrom(offered);
offer->mutable_attributes()->MergeFrom(slave->info.attributes());
// Add all framework's executors running on this slave.
if (slave->executors.contains(framework->id())) {
const hashmap<ExecutorID, ExecutorInfo>& executors =
slave->executors[framework->id()];
foreachkey (const ExecutorID& executorId, executors) {
offer->add_executor_ids()->MergeFrom(executorId);
}
}
……
offers[offer->id()] = offer;
framework->addOffer(offer);
slave->addOffer(offer);
if (flags.offer_timeout.isSome()) {
// Rescind the offer after the timeout elapses.
offerTimers[offer->id()] =
delay(flags.offer_timeout.get(),
self(),
&Self::offerTimeout,
offer->id());
}
// TODO(jieyu): For now, we strip 'ephemeral_ports' resource from
// offers so that frameworks do not see this resource. This is a
// short term workaround. Revisit this once we resolve MESOS-1654.
Offer offer_ = *offer;
offer_.clear_resources();
foreach (const Resource& resource, offered) {
if (resource.name() != "ephemeral_ports") {
offer_.add_resources()->CopyFrom(resource);
}
}
// Add the offer *AND* the corresponding slave's PID.
message.add_offers()->MergeFrom(offer_);
message.add_pids(slave->pid);
}
if (message.offers().size() == 0) {
return;
}
LOG(INFO) << "Sending " << message.offers().size()
<< " offers to framework " << *framework;
framework->send(message);
}