[Azure Event Hub] What is causing Azure Event Hubs ReceiverDisconnectedException/LeaseLostException?

时间:2022-11-14 17:01:27

Question:

I'm receiving events from an EventHub using EventProcessorHost and an IEventProcessor class (call it: MyEventProcessor). I scale this out to two servers by running my EPH on both servers, and having them connect to the Hub using the same ConsumerGroup, but unique hostName's (using the machine name).

The problem is: at random hours of the day/night, the app logs this:

Exception information: 
Exception type: ReceiverDisconnectedException
Exception message: New receiver with higher epoch of '186' is created hence current receiver with epoch '186' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used.
at Microsoft.ServiceBus.Common.ExceptionDispatcher.Throw(Exception exception)
at Microsoft.ServiceBus.Common.Parallel.TaskHelpers.EndAsyncResult(IAsyncResult asyncResult)
at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)

This Exception occurs at the same time as a LeaseLostException, thrown from MyEventProcessor's CloseAsync method when it tries to checkpoint. (Presumably Close is being called because of the ReceiverDisconnectedException?)

I think this is occurring due to Event Hubs' automatic lease management when scaling out to multiple machines. But I'm wondering if I need to do something different to make it work more cleanly and avoid these Exceptions? Eg: something with epochs?

 

Answer

TLDR: This behavior is absolutely normal.

Why can't Lease Management be smooth & exception-free: To give more control on the situation to developer.

 

The really long story - all-the-way from Basics ​​EventProcessorhost​​​ (hereby ​​EPH​​​ - is very similar to what ​​__consumer_offset topic​​​ does for ​​Kafka Consumers​​​ - partition ownership & checkpoint store) is written by ​​Microsoft Azure EventHubs​​​ team themselves - to translate all of the ​​EventHubs partition receiver Gu​​​ into a simple ​​onReceive(Events)​​ callback.

​EPH​​​ is used to address 2 general, major, well-known problems while reading out of a high-throughput partitioned streams like ​​EventHubs​​:

  1. fault tolerant receive pipe-line - for ex: a simpler version of the problem - if the host running a ​​PartitionReceiver​​ dies and comes back - it needs to resume processing from where it left. To remember the last successfully processed ​​EventData​​, ​​EPH​​ uses the ​​blob​​ supplied to ​​EPH​​ constructor to store the checkpoints - when ever user invokes ​​context.CheckpointAsync()​​. Eventually, when the host process dies (for ex: abruptly reboots or hits a hardware fault and never/comesback) - any ​​EPH​​ instance can pick up this task and resume from that ​​Checkpoint​​.
  2. Balance/distribute partitions across EPH instances - lets say, if there are 10 partitions and 2 ​​EPH​​ instances processing events from these 10 partitions - we need a way to divide partitions across the instances (​​PartitionManager​​ component of ​​EPH​​ library does this). We use ​Azure Storage - Blob LeaseManagement-feature​​ to implement this. As of version ​​2.2.10​​ - to simplify the problem, ​​EPH​​ assumes that all partitions are loaded equally.

TLDR: This behavior is absolutely normal.

Why can't Lease Management be smooth & exception-free: To give more control on the situation to developer.

 

The really long story - all-the-way from Basics ​​EventProcessorhost​​​ (hereby ​​EPH​​​ - is very similar to what ​​__consumer_offset topic​​​ does for ​​Kafka Consumers​​​ - partition ownership & checkpoint store) is written by ​​Microsoft Azure EventHubs​​​ team themselves - to translate all of the ​​EventHubs partition receiver Gu​​​ into a simple ​​onReceive(Events)​​ callback.

​EPH​​​ is used to address 2 general, major, well-known problems while reading out of a high-throughput partitioned streams like ​​EventHubs​​:

  1. fault tolerant receive pipe-line - for ex: a simpler version of the problem - if the host running a ​​PartitionReceiver​​ dies and comes back - it needs to resume processing from where it left. To remember the last successfully processed ​​EventData​​, ​​EPH​​ uses the ​​blob​​ supplied to ​​EPH​​ constructor to store the checkpoints - when ever user invokes ​​context.CheckpointAsync()​​. Eventually, when the host process dies (for ex: abruptly reboots or hits a hardware fault and never/comesback) - any ​​EPH​​ instance can pick up this task and resume from that ​​Checkpoint​​.
  2. Balance/distribute partitions across EPH instances - lets say, if there are 10 partitions and 2 ​​EPH​​ instances processing events from these 10 partitions - we need a way to divide partitions across the instances (​​PartitionManager​​ component of ​​EPH​​ library does this). We use ​Azure Storage - Blob LeaseManagement-feature​​ to implement this. As of version ​​2.2.10​​ - to simplify the problem, ​​EPH​​ assumes that all partitions are loaded equally.

With this, lets try to see what's going on: So, to start with, in the above example of ​​10​​​ event hub partitions and ​​2​​​ ​​EPH​​ instances processing events out of them:

  1. lets say the first ​​EPH​​​ instance - ​​EPH1​​​ started, at-first, alone and a part of start-up, it created receivers to all 10 partitions and is processing events. In the start up - ​​EPH1​​​ will announce that it owns all these ​​10​​​ partitions by acquiring Leases on ​​10​​​ storage blobs representing these ​​10​​​ event hub partitions (with a standard ​​nomenclature​​​- which ​​EPH​​​ internally creates in the Storage account - from the ​​StorageConnectionString​​​ passed to the ​​ctor​​​). Leases will be ​​acquired for a set time​​​, after which the ​​EPH​​ instance will loose the ownership on this Partition.
  2. ​EPH1​​​ continually ​​announces​​​ once in a while - that it is still owning those partitions - by ​​renewing​​​ leases on the blob. Frequency of ​​renewal​​​, along with other useful tuning, can be performed using ​​PartitionManagerOptions​
  3. now, lets say, ​​EPH2​​​ starts up - and you supplied the same ​​AzureStorageAccount​​​ as ​​EPH1​​​ to the ​​ctor​​​ of ​​EPH2​​​ as well. Right now, it has ​​0​​​ partitions to process. So, to achieve balance of partitions across ​​EPH​​​ instances, it will go ahead and ​​download​​​ the list of all ​​leaseblobs​​​ which has the mapping of ​​owner​​​ to ​​partitionId​​. From this, it will STEAL leases for its fair share of ​​partitions​​​ - which is ​​5​​​ in our example, and will announce that information on that ​​lease blob​​​. As part of this, ​​EPH2​​​ reads the latest checkpoint written by ​​PartitionX​​​ it wants to steal the lease for and goes ahead and creates corresponding ​​PartitionReceiver​​​'s with the ​​EPOCH​​​ same as the one in the ​​Checkpoint​​.
  4. As a result, ​​EPH1​​​ will loose ownership of these 5 ​​partitions​​ and will run into different errors based on the exact state it is in.
  • if ​​EPH1​​​ is actually invoking the ​​PartitionReceiver.Receive()​​​ call - while ​​EPH2​​​ is creating the ​​PartitionReceiver​​​ on the same receiver - ​​EPH1​​​ will experience ​​ReceiverDisconnectedException​​​. This will eventually, invoke ​​IEventProcessor.Close(CloseReason=LeaseLost)​​​. Note that, probability of hitting this specific Exception is higher, if the messages being received are larger or the ​​PrefetchCount​​ is smaller - as in both cases the receiver would be performing more aggressive I/O.
  • if ​​EPH1​​​ is in the state of ​​checkpointing​​​ the ​​lease​​​ or ​​renewing​​​ the ​​lease​​​, while the ​​EPH2​​​ ​​stole​​​ the lease, the ​​EventProcessorOptions.ExceptionReceived​​​ eventHandler would be signaled with a ​​leaselostException​​​ (with ​​409​​​ conflict error on the ​​leaseblob​​​) - which also eventually invokes ​​IEventProcess.Close(LeaseLost)​​.

Why can't Lease Management be smooth & exception-free:

To keep the consumer simple and error-free, lease management related exceptions could have been swallowed by ​​EPH​​​ and not notified to the user-code at all. However, we realized, throwing ​​LeaseLostException​​​ could empower customers to find interesting bugs in ​​IEventProcessor.ProcessEvents()​​ callback - for which the symptom would be - frequent partition-moves

  • minor network outage on a specific machine - due to which ​​EPH1​​​ fails to ​​renew​​​ leases and comes back up! - and imagine if the n/w of this machine stands flaky for a day - ​​EPH​​​ instances are going to play ​​ping-pong​​​ with ​​Partitions​​​! This machine will continuously try to steal the lease from other machine - which is legitimate from ​​EPH​​​ point-of-view - but, is a total disaster for the user of ​​EPH​​​ - as it completely interferes with the processing pipe. ​​EPH​​​ - would exactly see a ​​ReceiverDisconnectedException​​, when the n/w comes back up on this flaky m/c! We think the best and infact the only way is to enable the developer to smell this!
  • or a simple scenario like, having a bug in ​​ProcessEvents​​ logic - which throws unhandled exceptions which are fatal and brings down the whole process - ex: a poison event. This partition is going to move around a lot.
  • customers, performing write/delete operations on the same storage account which ​​EPH​​ is also using - by mistake (like an automated clean-up script) etc.
  • last but not the least - which we never wish could happen - say a 5 min ​​outage​​​ on Azure d.c where a specific ​​EventHub.Partition​​​ is located - say n/w incident. Partitions are going to move around across ​​EPH​​ instances.

 

Basically, in majority of situations, it would be tricky - for us to detect the diff. between these situations and a legitimate leaseLost due to balancing and we want to delegate control of these situations to the Developer.

 

Reference Link: ​https://*.com/questions/41496754/what-is-causing-azure-event-hubs-receiverdisconnectedexception-leaselostexceptio/41867611#41867611​

当在复杂的环境中面临问题,格物之道需:浊而静之徐清,安以动之徐生。 云中,恰是如此!