kafka内存溢出分析:OutOfMemoryError
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* /licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.wenjie.demo.kafkaconnector.component;
import org.apache.kafka.clients.*;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.*;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.metrics.*;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
/**
* The default implementation of {@link AdminClient}. An instance of this class is created by invoking one of the
* {@code create()} methods in {@code AdminClient}. Users should not refer to this class directly.
* <p>
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class KafkaAdminClient extends AdminClient {
/**
* The next integer to use to name a KafkaAdminClient which the user hasn't specified an explicit name for.
*/
private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
/**
* The prefix to use for the JMX metrics for this class
*/
private static final String JMX_PREFIX = "";
/**
* An invalid shutdown time which indicates that a shutdown has not yet been performed.
*/
private static final long INVALID_SHUTDOWN_TIME = -1;
/**
* Thread name prefix for admin client network thread
*/
static final String NETWORK_THREAD_PREFIX = "kafka-admin-client-thread";
private final Logger log;
/**
* The default timeout to use for an operation.
*/
private final int defaultTimeoutMs;
/**
* The name of this AdminClient instance.
*/
private final String clientId;
/**
* Provides the time.
*/
private final Time time;
/**
* The cluster metadata manager used by the KafkaClient.
*/
private final AdminMetadataManager metadataManager;
/**
* The metrics for this KafkaAdminClient.
*/
private final Metrics metrics;
/**
* The network client to use.
*/
private final KafkaClient client;
/**
* The runnable used in the service thread for this admin client.
*/
private final AdminClientRunnable runnable;
/**
* The network service thread for this admin client.
*/
private final Thread thread;
/**
* During a close operation, this is the time at which we will time out all pending operations
* and force the RPC thread to exit. If the admin client is not closing, this will be 0.
*/
private final AtomicLong hardShutdownTimeMs = new AtomicLong(INVALID_SHUTDOWN_TIME);
/**
* A factory which creates TimeoutProcessors for the RPC thread.
*/
private final TimeoutProcessorFactory timeoutProcessorFactory;
private final int maxRetries;
private final long retryBackoffMs;
/**
* Get or create a list value from a map.
*
* @param map The map to get or create the element from.
* @param key The key.
* @param <K> The key type.
* @param <V> The value type.
* @return The list value.
*/
static <K, V> List<V> getOrCreateListValue(Map<K, List<V>> map, K key) {
List<V> list = map.get(key);
if (list != null)
return list;
list = new LinkedList<>();
map.put(key, list);
return list;
}
/**
* Get the current time remaining before a deadline as an integer.
*
* @param now The current time in milliseconds.
* @param deadlineMs The deadline time in milliseconds.
* @return The time delta in milliseconds.
*/
static int calcTimeoutMsRemainingAsInt(long now, long deadlineMs) {
long deltaMs = deadlineMs - now;
if (deltaMs > Integer.MAX_VALUE)
deltaMs = Integer.MAX_VALUE;
else if (deltaMs < Integer.MIN_VALUE)
deltaMs = Integer.MIN_VALUE;
return (int) deltaMs;
}
/**
* Generate the client id based on the configuration.
*
* @param config The configuration
* @return The client id
*/
static String generateClientId(AdminClientConfig config) {
String clientId = config.getString(AdminClientConfig.CLIENT_ID_CONFIG);
if (!clientId.isEmpty())
return clientId;
return "adminclient-" + ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
}
/**
* Get the deadline for a particular call.
*
* @param now The current time in milliseconds.
* @param optionTimeoutMs The timeout option given by the user.
* @return The deadline in milliseconds.
*/
private long calcDeadlineMs(long now, Integer optionTimeoutMs) {
if (optionTimeoutMs != null)
return now + Math.max(0, optionTimeoutMs);
return now + defaultTimeoutMs;
}
/**
* Pretty-print an exception.
*
* @param throwable The exception.
* @return A compact human-readable string.
*/
static String prettyPrintException(Throwable throwable) {
if (throwable == null)
return "Null exception.";
if (throwable.getMessage() != null) {
return throwable.getClass().getSimpleName() + ": " + throwable.getMessage();
}
return throwable.getClass().getSimpleName();
}
static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) {
Metrics metrics = null;
NetworkClient networkClient = null;
Time time = Time.SYSTEM;
String clientId = generateClientId(config);
ChannelBuilder channelBuilder = null;
Selector selector = null;
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = createLogContext(clientId);
try {
// Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, time);
String metricGrpPrefix = "admin-client";
channelBuilder = ClientUtils.createChannelBuilder(config, time);
//注意这里!!!!!!!!!!!
selector = new Selector(config.getInt(AdminClientConfig.MAX_BUFFER_BYTES), config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext);
networkClient = new NetworkClient(
selector,
metadataManager.updater(),
clientId,
1,
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
(int) TimeUnit.HOURS.toMillis(1),
ClientDnsLookup.forConfig(config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
logContext);
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
timeoutProcessorFactory, logContext);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
closeQuietly(networkClient, "NetworkClient");
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
throw new KafkaException("Failed to create new KafkaAdminClient", exc);
}
}
static LogContext createLogContext(String clientId) {
return new LogContext("[AdminClient clientId=" + clientId + "] ");
}
private KafkaAdminClient(AdminClientConfig config,
String clientId,
Time time,
AdminMetadataManager metadataManager,
Metrics metrics,
KafkaClient client,
TimeoutProcessorFactory timeoutProcessorFactory,
LogContext logContext) {
this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.clientId = clientId;
this.log = logContext.logger(KafkaAdminClient.class);
this.time = time;
this.metadataManager = metadataManager;
this.metrics = metrics;
this.client = client;
this.runnable = new AdminClientRunnable();
String threadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.thread = new KafkaThread(threadName, runnable, true);
this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
new TimeoutProcessorFactory() : timeoutProcessorFactory;
this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka admin client initialized");
thread.start();
}
@Override
public void close(Duration timeout) {
long waitTimeMs = timeout.toMillis();
if (waitTimeMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
waitTimeMs = Math.min(TimeUnit.DAYS.toMillis(365), waitTimeMs); // Limit the timeout to a year.
long now = time.milliseconds();
long newHardShutdownTimeMs = now + waitTimeMs;
long prev = INVALID_SHUTDOWN_TIME;
while (true) {
if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) {
if (prev == INVALID_SHUTDOWN_TIME) {
log.debug("Initiating close operation.");
} else {
log.debug("Moving hard shutdown time forward.");
}
client.wakeup(); // Wake the thread, if it is blocked inside poll().
break;
}
prev = hardShutdownTimeMs.get();
if (prev < newHardShutdownTimeMs) {
log.debug("Hard shutdown time is already earlier than requested.");
newHardShutdownTimeMs = prev;
break;
}
}
if (log.isDebugEnabled()) {
long deltaMs = Math.max(0, newHardShutdownTimeMs - time.milliseconds());
log.debug("Waiting for the I/O thread to exit. Hard shutdown in {} ms.", deltaMs);
}
try {
// Wait for the thread to be joined.
thread.join();
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka admin client closed.");
} catch (InterruptedException e) {
log.debug("Interrupted while joining I/O thread", e);
Thread.currentThread().interrupt();
}
}
/**
* An interface for providing a node for a call.
*/
private interface NodeProvider {
Node provide();
}
private class MetadataUpdateNodeIdProvider implements NodeProvider {
@Override
public Node provide() {
return client.leastLoadedNode(time.milliseconds());
}
}
/**
* Provides the least loaded node.
*/
private class LeastLoadedNodeProvider implements NodeProvider {
@Override
public Node provide() {
if (metadataManager.isReady()) {
// This may return null if all nodes are busy.
// In that case, we will postpone node assignment.
return client.leastLoadedNode(time.milliseconds());
}
metadataManager.requestUpdate();
return null;
}
}
abstract class Call {
private final boolean internal;
private final String callName;
private final long deadlineMs;
private final NodeProvider nodeProvider;
private int tries = 0;
private boolean aborted = false;
private Node curNode = null;
private long nextAllowedTryMs = 0;
Call(boolean internal, String callName, long deadlineMs, NodeProvider nodeProvider) {
this.internal = internal;
this.callName = callName;
this.deadlineMs = deadlineMs;
this.nodeProvider = nodeProvider;
}
Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
this(false, callName, deadlineMs, nodeProvider);
}
protected Node curNode() {
return curNode;
}
/**
* Handle a failure.
* <p>
* Depending on what the exception is and how many times we have already tried, we may choose to
* fail the Call, or retry it. It is important to print the stack traces here in some cases,
* since they are not necessarily preserved in ApiVersionException objects.
*
* @param now The current time in milliseconds.
* @param throwable The failure exception.
*/
final void fail(long now, Throwable throwable) {
if (aborted) {
// If the call was aborted while in flight due to a timeout, deliver a
// TimeoutException. In this case, we do not get any more retries - the call has
// failed. We increment tries anyway in order to display an accurate log message.
tries++;
if (log.isDebugEnabled()) {
log.debug("{} aborted at {} after {} attempt(s)", this, now, tries,
new Exception(prettyPrintException(throwable)));
}
handleFailure(new TimeoutException("Aborted due to timeout."));
return;
}
// If this is an UnsupportedVersionException that we can retry, do so. Note that a
// protocol downgrade will not count against the total number of retries we get for
// this RPC. That is why 'tries' is not incremented.
if ((throwable instanceof UnsupportedVersionException) &&
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
log.debug("{} attempting protocol downgrade and then retry.", this);
runnable.enqueue(this, now);
return;
}
tries++;
nextAllowedTryMs = now + retryBackoffMs;
// If the call has timed out, fail.
if (calcTimeoutMsRemainingAsInt(now, deadlineMs) < 0) {
if (log.isDebugEnabled()) {
log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
new Exception(prettyPrintException(throwable)));
}
handleFailure(throwable);
return;
}
// If the exception is not retryable, fail.
if (!(throwable instanceof RetriableException)) {
if (log.isDebugEnabled()) {
log.debug("{} failed with non-retriable exception after {} attempt(s)", this, tries,
new Exception(prettyPrintException(throwable)));
}
handleFailure(throwable);
return;
}
// If we are out of retries, fail.
if (tries > maxRetries) {
if (log.isDebugEnabled()) {
log.debug("{} failed after {} attempt(s)", this, tries,
new Exception(prettyPrintException(throwable)));
}
handleFailure(throwable);
return;
}
if (log.isDebugEnabled()) {
log.debug("{} failed: {}. Beginning retry #{}",
this, prettyPrintException(throwable), tries);
}
runnable.enqueue(this, now);
}
abstract AbstractRequest.Builder createRequest(int timeoutMs);
abstract void handleResponse(AbstractResponse abstractResponse);
abstract void handleFailure(Throwable throwable);
boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
return false;
}
@Override
public String toString() {
return "Call(callName=" + callName + ", deadlineMs=" + deadlineMs + ")";
}
public boolean isInternal() {
return internal;
}
}
static class TimeoutProcessorFactory {
TimeoutProcessor create(long now) {
return new TimeoutProcessor(now);
}
}
static class TimeoutProcessor {
private final long now;
private int nextTimeoutMs;
TimeoutProcessor(long now) {
this.now = now;
this.nextTimeoutMs = Integer.MAX_VALUE;
}
int handleTimeouts(Collection<Call> calls, String msg) {
int numTimedOut = 0;
for (Iterator<Call> iter = calls.iterator(); iter.hasNext(); ) {
Call call = iter.next();
int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
if (remainingMs < 0) {
call.fail(now, new TimeoutException(msg));
iter.remove();
numTimedOut++;
} else {
nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs);
}
}
return numTimedOut;
}
boolean callHasExpired(Call call) {
int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
if (remainingMs < 0)
return true;
nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs);
return false;
}
int nextTimeoutMs() {
return nextTimeoutMs;
}
}
private final class AdminClientRunnable implements Runnable {
private final ArrayList<Call> pendingCalls = new ArrayList<>();
private final Map<Node, List<Call>> callsToSend = new HashMap<>();
private final Map<String, List<Call>> callsInFlight = new HashMap<>();
private final Map<Integer, Call> correlationIdToCalls = new HashMap<>();
private List<Call> newCalls = new LinkedList<>();
private void timeoutPendingCalls(TimeoutProcessor processor) {
int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed out waiting for a node assignment.");
if (numTimedOut > 0)
log.debug("Timed out {} pending calls.", numTimedOut);
}
private int timeoutCallsToSend(TimeoutProcessor processor) {
int numTimedOut = 0;
for (List<Call> callList : callsToSend.values()) {
numTimedOut += processor.handleTimeouts(callList,
"Timed out waiting to send the call.");
}
if (numTimedOut > 0)
log.debug("Timed out {} call(s) with assigned nodes.", numTimedOut);
return numTimedOut;
}
private synchronized void drainNewCalls() {
if (!newCalls.isEmpty()) {
pendingCalls.addAll(newCalls);
newCalls.clear();
}
}
private long maybeDrainPendingCalls(long now) {
long pollTimeout = Long.MAX_VALUE;
log.trace("Trying to choose nodes for {} at {}", pendingCalls, now);
Iterator<Call> pendingIter = pendingCalls.iterator();
while (pendingIter.hasNext()) {
Call call = pendingIter.next();
// If the call is being retried, await the proper backoff before finding the node
if (now < call.nextAllowedTryMs) {
pollTimeout = Math.min(pollTimeout, call.nextAllowedTryMs - now);
} else if (maybeDrainPendingCall(call, now)) {
pendingIter.remove();
}
}
return pollTimeout;
}
private boolean maybeDrainPendingCall(Call call, long now) {
try {
Node node = call.nodeProvider.provide();
if (node != null) {
log.trace("Assigned {} to node {}", call, node);
call.curNode = node;
getOrCreateListValue(callsToSend, node).add(call);
return true;
} else {
log.trace("Unable to assign {} to a node.", call);
return false;
}
} catch (Throwable t) {
// Handle authentication errors while choosing nodes.
log.debug("Unable to choose node for {}", call, t);
call.fail(now, t);
return true;
}
}
private long sendEligibleCalls(long now) {
long pollTimeout = Long.MAX_VALUE;
for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Node, List<Call>> entry = iter.next();
List<Call> calls = entry.getValue();
if (calls.isEmpty()) {
iter.remove();
continue;
}
Node node = entry.getKey();
if (!client.ready(node, now)) {
long nodeTimeout = client.pollDelayMs(node, now);
pollTimeout = Math.min(pollTimeout, nodeTimeout);
log.trace("Client is not ready to send to {}. Must delay {} ms", node, nodeTimeout);
continue;
}
Call call = calls.remove(0);
int timeoutMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
AbstractRequest.Builder<?> requestBuilder;
try {
requestBuilder = call.createRequest(timeoutMs);
} catch (Throwable throwable) {
call.fail(now, new KafkaException(String.format(
"Internal error sending %s to %s.", call.callName, node)));
continue;
}
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true);
log.trace("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId());
client.send(clientRequest, now);
getOrCreateListValue(callsInFlight, node.idString()).add(call);
correlationIdToCalls.put(clientRequest.correlationId(), call);
}
return pollTimeout;
}
private void timeoutCallsInFlight(TimeoutProcessor processor) {
int numTimedOut = 0;
for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) {
List<Call> contexts = entry.getValue();
if (contexts.isEmpty())
continue;
String nodeId = entry.getKey();
// We assume that the first element in the list is the earliest. So it should be the
// only one we need to check the timeout for.
Call call = contexts.get(0);
if (processor.callHasExpired(call)) {
if (call.aborted) {
log.warn("Aborted call {} is still in callsInFlight.", call);
} else {
log.debug("Closing connection to {} to time out {}", nodeId, call);
call.aborted = true;
client.disconnect(nodeId);
numTimedOut++;
// We don't remove anything from the callsInFlight data structure. Because the connection
// has been closed, the calls should be returned by the next client#poll(),
// and handled at that point.
}
}
}
if (numTimedOut > 0)
log.debug("Timed out {} call(s) in flight.", numTimedOut);
}
/**
* Handle responses from the server.
*
* @param now The current time in milliseconds.
* @param responses The latest responses from KafkaClient.
**/
private void handleResponses(long now, List<ClientResponse> responses) {
for (ClientResponse response : responses) {
int correlationId = response.requestHeader().correlationId();
Call call = correlationIdToCalls.get(correlationId);
if (call == null) {
// If the server returns information about a correlation ID we didn't use yet,
// an internal server error has occurred. Close the connection and log an error message.
log.error("Internal server error on {}: server returned information about unknown " +
"correlation ID {}, requestHeader = {}", response.destination(), correlationId,
response.requestHeader());
client.disconnect(response.destination());
continue;
}
// Stop tracking this call.
correlationIdToCalls.remove(correlationId);
List<Call> calls = callsInFlight.get(response.destination());
if ((calls == null) || (!calls.remove(call))) {
log.error("Internal server error on {}: ignoring call {} in correlationIdToCall " +
"that did not exist in callsInFlight", response.destination(), call);
continue;
}
// Handle the result of the call. This may involve retrying the call, if we got a
// retryible exception.
if (response.versionMismatch() != null) {
call.fail(now, response.versionMismatch());
} else if (response.wasDisconnected()) {
AuthenticationException authException = client.authenticationException(call.curNode());
if (authException != null) {
call.fail(now, authException);
} else {
call.fail(now, new DisconnectException(String.format(
"Cancelled %s request with correlation id %s due to node %s being disconnected",
call.callName, correlationId, response.destination())));
}
} else {
try {
call.handleResponse(response.responseBody());
if (log.isTraceEnabled())
log.trace("{} got response {}", call,
response.responseBody().toString(response.requestHeader().apiVersion()));
} catch (Throwable t) {
if (log.isTraceEnabled())
log.trace("{} handleResponse failed with {}", call, prettyPrintException(t));
call.fail(now, t);
}
}
}
}
private void unassignUnsentCalls(Predicate<Node> shouldUnassign) {
for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Node, List<Call>> entry = iter.next();
Node node = entry.getKey();
List<Call> awaitingCalls = entry.getValue();
if (awaitingCalls.isEmpty()) {
iter.remove();
} else if (shouldUnassign.test(node)) {
pendingCalls.addAll(awaitingCalls);
iter.remove();
}
}
}
private boolean hasActiveExternalCalls(Collection<Call> calls) {
for (Call call : calls) {
if (!call.isInternal()) {
return true;
}
}
return false;
}
/**
* Return true if there are currently active external calls.
*/
private boolean hasActiveExternalCalls() {
if (hasActiveExternalCalls(pendingCalls)) {
return true;
}
for (List<Call> callList : callsToSend.values()) {
if (hasActiveExternalCalls(callList)) {
return true;
}
}
return hasActiveExternalCalls(correlationIdToCalls.values());
}
private boolean threadShouldExit(long now, long curHardShutdownTimeMs) {
if (!hasActiveExternalCalls()) {
log.trace("All work has been completed, and the I/O thread is now exiting.");
return true;
}
if (now >= curHardShutdownTimeMs) {
log.info("Forcing a hard I/O thread shutdown. Requests in progress will be aborted.");
return true;
}
log.debug("Hard shutdown in {} ms.", curHardShutdownTimeMs - now);
return false;
}
@Override
public void run() {
long now = time.milliseconds();
log.trace("Thread starting");
while (true) {
// Copy newCalls into pendingCalls.
drainNewCalls();
// Check if the AdminClient thread should shut down.
long curHardShutdownTimeMs = hardShutdownTimeMs.get();
if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs))
break;
// Handle timeouts.
TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now);
timeoutPendingCalls(timeoutProcessor);
timeoutCallsToSend(timeoutProcessor);
timeoutCallsInFlight(timeoutProcessor);
long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs());
if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) {
pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now);
}
// Choose nodes for our pending calls.
pollTimeout = Math.min(pollTimeout, maybeDrainPendingCalls(now));
long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now);
if (metadataFetchDelayMs == 0) {
metadataManager.transitionToUpdatePending(now);
Call metadataCall = makeMetadataCall(now);
// Create a new metadata fetch call and add it to the end of pendingCalls.
// Assign a node for just the new call (we handled the other pending nodes above).
if (!maybeDrainPendingCall(metadataCall, now))
pendingCalls.add(metadataCall);
}
pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now));
if (metadataFetchDelayMs > 0) {
pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
}
// Ensure that we use a small poll timeout if there are pending calls which need to be sent
if (!pendingCalls.isEmpty())
pollTimeout = Math.min(pollTimeout, retryBackoffMs);
// Wait for network responses.
log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout);
List<ClientResponse> responses = client.poll(pollTimeout, now);
log.trace("KafkaClient#poll retrieved {} response(s)", responses.size());
// unassign calls to disconnected nodes
unassignUnsentCalls(client::connectionFailed);
// Update the current time and handle the latest responses.
now = time.milliseconds();
handleResponses(now, responses);
}
int numTimedOut = 0;
TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);
synchronized (this) {
numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The AdminClient thread has exited.");
newCalls = null;
}
numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited.");
numTimedOut += timeoutCallsToSend(timeoutProcessor);
numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
"The AdminClient thread has exited.");
if (numTimedOut > 0) {
log.debug("Timed out {} remaining operation(s).", numTimedOut);
}
closeQuietly(client, "KafkaClient");
closeQuietly(metrics, "Metrics");
log.debug("Exiting AdminClientRunnable thread.");
}
void enqueue(Call call, long now) {
if (log.isDebugEnabled()) {
log.debug("Queueing {} with a timeout {} ms from now.", call, call.deadlineMs - now);
}
boolean accepted = false;
synchronized (this) {
if (newCalls != null) {
newCalls.add(call);
accepted = true;
}
}
if (accepted) {
client.wakeup(); // wake the thread if it is in poll()
} else {
log.debug("The AdminClient thread has exited. Timing out {}.", call);
call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread has exited."));
}
}
void call(Call call, long now) {
if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is not accepting new calls."));
} else {
enqueue(call, now);
}
}
private Call makeMetadataCall(long now) {
return new Call(true, "fetchMetadata", calcDeadlineMs(now, defaultTimeoutMs),
new MetadataUpdateNodeIdProvider()) {
@Override
public AbstractRequest.Builder createRequest(int timeoutMs) {
// Since this only requests node information, it's safe to pass true
// for allowAutoTopicCreation (and it simplifies communication with
// older brokers)
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(Collections.emptyList())
.setAllowAutoTopicCreation(true));
}
@Override
public void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
long now = time.milliseconds();
metadataManager.update(response.cluster(), now);
// Unassign all unsent requests after a metadata refresh to allow for a new
// destination to be selected from the new metadata
unassignUnsentCalls(node -> true);
}
@Override
public void handleFailure(Throwable e) {
metadataManager.updateFailed(e);
}
};
}
}
@Override
public ListTopicsResult listTopics(final ListTopicsOptions options) {
final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return MetadataRequest.Builder.allTopics();
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
Map<String, TopicListing> topicListing = new HashMap<>();
for (MetadataResponse.TopicMetadata topicMetadata : response.topicMetadata()) {
String topicName = topicMetadata.topic();
boolean isInternal = topicMetadata.isInternal();
if (!topicMetadata.isInternal() || options.shouldListInternal())
topicListing.put(topicName, new TopicListing(topicName, isInternal));
}
topicListingFuture.complete(topicListing);
}
@Override
void handleFailure(Throwable throwable) {
topicListingFuture.completeExceptionally(throwable);
}
}, now);
return new ListTopicsResult(topicListingFuture);
}
}