上一篇介绍了服务端消费者的创建,本章介绍消息拉取请求实现
一、消息拉取请求入口
public class ServerCnx {
protected void handleFlow(CommandFlow flow) {
checkArgument(state == State.Connected);
CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (consumer != null) {
// 传入客户端配置的默认1000
consumer.flowPermits(flow.getMessagePermits());
} else {
}
}
}
}
继续(...)
二、Consumer层请求拉取
public class Consumer {
public void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);
// 超过限制不拉取
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits;
if (!blockedConsumerOnUnackedMsgs) {
oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
// 通过订阅
subscription.consumerFlow(this, additionalNumberOfMessages);
} else {
oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
}
}
}
上一章介绍过subscription是最外层,subscription关联的每个consumer持有subscription的引用,具体看上一章的consumer创建的构造传参。多个消费者相同的订阅名称配置都是调用(...);
继续(...);
三、Consumer所属的Subscription层消息拉取请求
public class PersistentSubscription {
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
// 更新拉取时间
this.lastConsumedFlowTimestamp = System.currentTimeMillis();
// 根据具体的订阅类型创建的dispatcher进行拉取
dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
}
}
继续(...)
dispatcher实现有两个:
1、PersistentDispatcherSingleActiveConsumer
订阅模式:Exclusive、Failover
作用:组装从bk读出entry,组装数据,发送到请求的consumer
2、PersistentDispatcherMultipleConsumers
订阅模式:Shared、Key_Shared是PersistentStickyKeyDispatcherMultipleConsumers继承PersistentDispatcherMultipleConsumers
Shared作用:组装从bk读出entry,组装数据,轮询发送到已存在的consumer,如果消费者客户端配置了优先级相当于加权轮询
Key_Shared作用:组装从bk读出entry,组装数据,根据客户端配置的key的hash % consumer数量得出consumer(大概这意思,实际还是有些细节优化的)
核心都差不多,区别就是上面加粗的黑色。所以我们只看PersistentDispatcherMultipleConsumers的实现
四、Subscription对应的分发策略-PersistentDispatcherMultipleConsumers第1层回调
public class PersistentDispatcherMultipleConsumers {
public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
if (!consumerSet.contains(consumer)) {
return;
}
totalAvailablePermits += additionalNumberOfMessages;
readMoreEntries();
}
}
继续readMoreEntries();
public class PersistentDispatcherMultipleConsumers {
public synchronized void readMoreEntries() {
// 获取一个consumer的availablePermits
int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
// 与总availablePermits比较
int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
// 通过限流配置计算出 1:读的数量 2:读的字节大小
Pair<Integer, Long> calculateResult = calculateToRead(currentTotalAvailablePermits);
// 读的数量
int messagesToRead = calculateResult.getLeft();
// 读的大小
long bytesToRead = calculateResult.getRight();
if (messagesToRead == -1 || bytesToRead == -1) {
return;
}
// 获取重新投递的列表或者可发送的延迟列表数据
Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
if (!messagesToReplayNow.isEmpty()) {
havePendingReplayRead = true;
// 如果topic开启延迟投递
// 1、过滤已经标记删除的数据 2、剩余从bk读出发给客户端
// 返回的是已经标记删除的
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// 从重新投递队列移除删除的
deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
((PositionImpl) position).getEntryId()));
//
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
// 重新读
topic.getBrokerService().executor().execute(() -> readMoreEntries());
}
}
// 超过最大未ack的数量阻塞消息分发
else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
}
// 没有正在读的请求
else if (!havePendingRead) {
havePendingRead = true;
// 读取 this是第一层回调
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
}
} else {
}
}
}
开始读(...)
四、ManagedCursorImpl读取-第2层回调OpReadEntry
public class ManagedCursorImpl {
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
if (isClosed()) {
return;
}
// 通过ledger读取的平均流量计算最小读多少
int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes);
// 当前读的位置比上次插入的位置小,说明可以读
if (hasMoreEntries()) {
// 读取
asyncReadEntries(numberOfEntriesToRead, callback, ctx, maxPosition);
} else {
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition);
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
return;
}
// 延迟检查是否可读
if (config.getNewEntriesCheckDelayInMillis() > 0) {
ledger.getScheduledExecutor()
.schedule(() -> checkForNewEntries(op, callback, ctx),
config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
} else {
// 检查是否可读
// 如果不可读 加入(),发送消息的回调中唤醒,以前解析发送时说过
checkForNewEntries(op, callback, ctx);
}
}
}
}
可以读,继续asyncReadEntries(numberOfEntriesToRead, callback, ctx, maxPosition)
public class ManagedCursorImpl {
public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition) {
checkArgument(numberOfEntriesToRead > 0);
if (isClosed()) {
callback.readEntriesFailed(new ManagedLedgerException
.CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}
// 上面调用过了,所以这里是直接return就是numberOfEntriesToRead
int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes);
PENDING_READ_OPS_UPDATER.incrementAndGet(this);
// 读取 第二层回调
OpReadEntry op = OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition);
ledger.asyncReadEntries(op);
}
}
继续看ledger中的读取(op)
public class ManagedLedgerImpl {
void asyncReadEntries(OpReadEntry opReadEntry) {
final State state = STATE_UPDATER.get(this);
if (state == State.Fenced || state == State.Closed) {
opReadEntry.readEntriesFailed(new ManagedLedgerFencedException(), opReadEntry.ctx);
return;
}
// 读的位置对应的ledger和当前ledger一致
long ledgerId = opReadEntry.readPosition.getLedgerId();
LedgerHandle currentLedger = this.currentLedger;
if (currentLedger != null && ledgerId == currentLedger.getId()) {
// 使用当前ledger读
internalReadFromLedger(currentLedger, opReadEntry);
} else {
// 说明:写的快,读的慢,ledger切换后,当前读的位置还是以前ledger
// ledgers是zk加载topic对应所有ledger的元信息缓存在内存
// 获取到当前的ledger信息
LedgerInfo ledgerInfo = ledgers.get(ledgerId);
// 异常场景
if (ledgerInfo == null || ledgerInfo.getEntries() == 0) {
opReadEntry.updateReadPosition(new PositionImpl(opReadEntry.readPosition.getLedgerId() + 1, 0));
opReadEntry.checkReadCompletion();
return;
}
// 创建一个ledger对应的操作bk的客户端LedgerHandle实例
getLedgerHandle(ledgerId).thenAccept(ledger -> internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex -> {
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
opReadEntry.ctx);
return null;
});
}
}
}
继续internalReadFromLedger(currentLedger, opReadEntry)
五、ManagedLedgerImpl读取-第3层回调ReadEntryCallbackWrapper
public class ManagedLedgerImpl {
private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {
// 上面分析这条链路maxPosition默认long最大值,其他链路调用可以自定义设置
if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
opReadEntry.checkReadCompletion();
return;
}
// 最后一次读的entryId
long firstEntry = opReadEntry.readPosition.getEntryId();
long lastEntryInLedger;
PositionImpl lastPosition = lastConfirmedEntry;
// 如果没有ledger切换,上一次新增的消息就是最后entry
if (ledger.getId() == lastPosition.getLedgerId()) {
lastEntryInLedger = lastPosition.getEntryId();
} else {
// 获取以前ledger的最后一个entry
lastEntryInLedger = ledger.getLastAddConfirmed();
}
if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) {
lastEntryInLedger = min(opReadEntry.maxPosition.getEntryId(), lastEntryInLedger);
}
// 如果起始id比最后一个还大,说明没办法读。
if (firstEntry > lastEntryInLedger) {
if (currentLedger == null || ledger.getId() != currentLedger.getId()) {
Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1);
if (nextLedgerId != null) {
opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0));
} else {
opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0));
}
} else {
opReadEntry.updateReadPosition(opReadEntry.readPosition);
}
opReadEntry.checkReadCompletion();
return;
}
// 开始id+读的数量=结束id 因为entryId是自增的
long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
// 读取
asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx);
}
}
继续asyncReadEntry
public class ManagedLedgerImpl {
protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader,
OpReadEntry opReadEntry, Object ctx) {
if (config.getReadEntryTimeoutSeconds() > 0) {
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
// 第三层回调
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
// 缓存读
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
}
}
}
继续看缓存读实现
六、Broker端缓存读取
public class EntryCacheImpl {
private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final ReadEntriesCallback callback, Object ctx) {
final long ledgerId = lh.getId();
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
final PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry);
final PositionImpl lastPosition = PositionImpl.get(lh.getId(), lastEntry);
// 缓存实现是ConcurrentSkipListMap
// value是堆外内存
// 获取entry
Collection<EntryImpl> cachedEntries = entries.getRange(firstPosition, lastPosition);
// 数量一致
if (cachedEntries.size() == entriesToRead) {
long totalCachedSize = 0;
final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
for (EntryImpl entry : cachedEntries) {
entriesToReturn.add(EntryImpl.create(entry));
totalCachedSize += entry.getLength();
entry.release();
}
// 监控记录命中缓存
manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(), totalCachedSize);
// 放入回调,一层一层向上。等会我们挑一个重要的回调解析
callback.readEntriesComplete((List) entriesToReturn, ctx);
} else {
// 说明没全部命中
// 清空,这也是为了降低实现的复杂度,大部分还是能命中
if (!cachedEntries.isEmpty()) {
cachedEntries.forEach(entry -> entry.release());
}
// 从bk读
lh.readAsync(firstEntry, lastEntry).whenCompleteAsync(
(ledgerEntries, exception) -> {
if (exception != null) {
return;
}
try {
long totalSize = 0;
final List<EntryImpl> entriesToReturn
= Lists.newArrayListWithExpectedSize(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = EntryImpl.create(e);
entriesToReturn.add(entry);
totalSize += entry.getLength();
}
// 监控记录未命中缓存
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize);
// 放入回调,一层一层向上。等会我们挑一个重要的回调解析
callback.readEntriesComplete((List) entriesToReturn, ctx);
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
return null;
});
}
}
}
可以看到,如果缓存没命中会直接调用bk,收到bk响应组装list放入回调结束。并没有放入缓存,这个需要知道,也没必要放入,因为消息大多数场景都是消费完结束了。放入命中率低,还占用空间。缓存主要发送时往里放,调大可以提升性能。对应配置项:managedLedgerCacheSizeMB
接下来回调就不一一解析了,挑一个最重要的解析一下就是第一层回调。
七、第一层回调PersistentDispatcherMultipleConsumers客户端解析
public class PersistentDispatcherMultipleConsumers {
public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
...
sendMessagesToConsumers(readType, entries);
}
}
继续sendMessagesToConsumers
public class PersistentDispatcherMultipleConsumers {
protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
// 如果最后一次删除比read位置大,说明当前数据可能存在已删除
if (needTrimAckedMessages()) {
// 过滤已删除的数据
cursor.trimDeletedEntries(entries);
}
int entriesToDispatch = entries.size();
// Trigger read more messages
if (entriesToDispatch == 0) {
readMoreEntries();
return;
}
EntryWrapper[] entryWrappers = new EntryWrapper[entries.size()];
// 消息存在批量,一批也是一个entry
// 这里统计具体消息条数
int remainingMessages = updateEntryWrapperWithMetadata(entryWrappers, entries);
int start = 0;
// 发送总数
long totalMessagesSent = 0;
// 发送总字节数
long totalBytesSent = 0;
// 平均
int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;
int firstAvailableConsumerPermits, currentTotalAvailablePermits;
boolean dispatchMessage;
while (entriesToDispatch > 0) {
// 第一个消费者拉取数
firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
dispatchMessage = currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0;
if (!dispatchMessage) {
break;
}
// 获取下一个
Consumer c = getNextConsumer();
if (c == null) {
return;
}
// 当前消费者的拉取数量
int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
// ()默认20
// availablePermits客户端默认配置1000
// 如果是批量remainingMessages可能大于1000,实际可能是20
int messagesForC = Math.min(Math.min(remainingMessages, availablePermits),
serviceConfig.getDispatcherMaxRoundRobinBatchSize());
// 计算一个折中的大小
messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1);
if (messagesForC > 0) {
// 对着一批entry进行分批发送
// 计算结束位置
int end = Math.min(start + messagesForC, entries.size());
if (readType == ReadType.Replay) {
entries.subList(start, end).forEach(entry -> {
redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
});
}
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
List<Entry> entriesForThisConsumer = entries.subList(start, end);
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
// 过滤 例如:延迟消息未到时间过滤
filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
// 调用consumer发送
c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
int msgSent = sendMessageInfo.getTotalMessages();
remainingMessages -= msgSent;
start += messagesForC;
entriesToDispatch -= messagesForC;
// 递归停止的关键
// TOTAL_AVAILABLE_PERMITS_UPDATER是总的拉取数
// 一直减到<0,说明客户端请求的拉取数量服务端都推送完毕
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}
}
for (EntryWrapper entry : entryWrappers) {
if (entry != null) {
entry.recycle();
}
}
// 分发速率控制
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
}
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
}
}
if (entriesToDispatch > 0) {
entries.subList(start, entries.size()).forEach(entry -> {
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});
}
// 上面讲过该方法,可以看到这是一个递归调用,链路很长
readMoreEntries();
}
}
再往下直接看最后
public class PulsarCommandSenderImpl {
public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription,
int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
RedeliveryTracker redeliveryTracker) {
final ChannelHandlerContext ctx = cnx.ctx();
final ChannelPromise writePromise = ctx.newPromise();
ctx.channel().eventLoop().execute(() -> {
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
if (entry == null) {
// Entry was filtered out
continue;
}
int batchSize = batchSizes.getBatchSize(i);
if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) {
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
metadataAndPayload.retain();
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getValue()) {
Commands.skipChecksumIfPresent(metadataAndPayload);
}
int redeliveryCount = 0;
PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
if (redeliveryTracker.contains(position)) {
redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
}
// 写入缓冲区
ctx.write(
cnx.newMessageAndIntercept(consumerId, entry.getLedgerId(), entry.getEntryId(), partitionIdx,
redeliveryCount, metadataAndPayload,
batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName),
ctx.voidPromise());
entry.release();
}
// 刷新
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
batchSizes.recyle();
if (batchIndexesAcks != null) {
batchIndexesAcks.recycle();
}
});
return writePromise;
}
}
到协议层了。
整个核心流程介绍完了,它细节还有很多很多,过程中很多方法没仔细点进去进去分析,会偏离文章主题,还是围绕着消费请求拉取的流程介绍。