342 строки
18 KiB
Markdown
342 строки
18 KiB
Markdown
# How to Avoid the `ReceiverDisconnectedException`
|
||
|
||
In version 2.3.2 and above, the connector by default uses epoch receivers from the Event Hubs Java client.
|
||
The epoch receiver allows only one receiver to be open per consumer group-partition combo for an Event Hubs instance.
|
||
Let's say we have `receiverA` with an epoch of `0` which is open within the consumer group `foo` on partition `0`.
|
||
Now, if we open a new receiver, `receiverB`, for the same consumer group and partition with an epoch of
|
||
`0` (or higher), then `receiverA` will be disconnected and get the `ReceiverDisconnectedException`.
|
||
|
||
There are several scenarios in which the connector tries to open more than one epoch receiver for a particular
|
||
<Event Hubs instance, consumer group, partition> combination. In this document, we elaborate on some of those scenarios
|
||
and discuss how you can avoid them.
|
||
|
||
One scenario which causes this exception is when your code generates multiple concurrent tasks that read events from
|
||
a particular partition in an Event Hubs instance using the same consumer group. This situation happens if you define
|
||
multiple streams reading from an Event Hubs instance using the same consumer group. Since each stream generates a set of
|
||
tasks to read events from each partition, if you use the same consumer group in more than one stream it results in having
|
||
multiple tasks that use the same <Event Hubs instance, consumer group, partition> combination. You can avoid this situation
|
||
by using a unique consumer group for each stream you create on an Event Hubs instance.
|
||
|
||
Your code may also generate multiple concurrent tasks using the same <Event Hubs instance, consumer group, partition>
|
||
combination for a single stream. This situation happens if the code execution results in re-computing the input stream
|
||
from the Event Hubs instance. The [Stream Recomputation](#stream-recomputation) section below explains what may cause
|
||
stream re-computation in more detail and proposes ways to avoid such cases.
|
||
|
||
Another situation that could result in seeing the `ReceiverDisconnectedException` is when batches dispatch
|
||
receiving tasks for the same <Event Hubs instance, consumer group, partition> combination to different executor nodes
|
||
in a cluster. The [Receivers Move Between Executor Nodes](#receivers-move-between-executor-nodes) section below
|
||
describes this situation and suggests how you can reduce the chance of being in such a situation.
|
||
|
||
|
||
## Table of Contents
|
||
* [Stream Recomputation](#stream-recomputation)
|
||
* [RDD Actions](#rdd-actions)
|
||
* [Write to Multiple Data Sinks](#write-to-multiple-data-sinks)
|
||
* [Quick Check for Multiple Readers](#quick-check-for-multiuple-readers)
|
||
* [Examples of Having Multiple Readers Unintentionally](#examples-of-having-multiple-readers-uninttentially)
|
||
* [Multiple Actions](#multiple-actions)
|
||
* [Multiple Sinks](#multiple-sinks)
|
||
* [Persist Data to Prevent Recomputation](#persist-data-to-prevent-recomputation)
|
||
* [Receivers Move Between Executor Nodes](#receivers-move-between-executor-nodes)
|
||
* [Example of Receiver Recreation](#example-of-receiver-recreation)
|
||
* [Reducing Receiver Movements](#reducing-receiver-movements)
|
||
|
||
|
||
## Stream Recomputation
|
||
|
||
Sometimes a Spark application recomputes a single input stream multiple times. If the stream source is an Event Hubs
|
||
instance, this recomputation can eventually cause opening multiple receivers per consumer group-partition combo and
|
||
result in getting the `ReceiverDisconnectedException`. Therefore, it is important to make sure that the application
|
||
that reads events from an Event Hubs instance does not recompute the input stream multiple times.
|
||
|
||
|
||
### RDD Actions
|
||
|
||
In Spark, RDD operations are either `Transformations` or `Actions`. In abstract, transformations create a new dataset
|
||
and actions return a value. Spark has a `lazy` execution model for transformations, which means those operations are
|
||
being executed only when there is an action on the result dataset. Therefore, a transformed RDD may be **recomputed**
|
||
each time an action is being ran on it. However, you can avoid this recomputation by persisting an RDD in memory using
|
||
the `persist` or `cache` method. Please refer
|
||
to [RDD Operations](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations) for more details.
|
||
Note that persisting an RDD in memory only helps if all actions are being executed on the same executor node.
|
||
|
||
|
||
### Write to Multiple Data Sinks
|
||
|
||
You can write the output of a streaming query to multiple sinks by simply using the DataFrame/Dataset multiple times.
|
||
However, each write may cause the recomputation of the DataFrame/Dataset. In order to avoid this recomputation, similar
|
||
to the RDD case you can `persist` or `cache` the DataFrame/Dataset before writing it to multiple locations. Please refer to
|
||
[Using Foreach and ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch)
|
||
for more information.
|
||
|
||
|
||
### Spark Speculation
|
||
|
||
One of Spark scheduling configuration options is [`spark.speculation`](https://spark.apache.org/docs/latest/configuration.html#scheduling).
|
||
Briefly, if speculation is enabled, Spark relaunches tasks that are running slower than other tasks in a stage. Therefore,
|
||
if you are defining a streaming query with an Event Hubs source while enabling the `spark.speculation` config, you may
|
||
end up in a situation where multiple tasks try to read events from the same <Event Hubs instance, consumer group, partition>
|
||
combination concurrently.
|
||
|
||
|
||
## Quick Check for Multiple Readers
|
||
|
||
A quick way to check if your application uses multiple readers is to compare the rate of `Incoming` and `Outgoing`
|
||
messages to/from the underlying Event Hubs instance. You have access to both `Messages` and `Throughput` metrics in
|
||
the Overview page of the Event Hubs instance on Azure Portal.
|
||
|
||
Assume you have only Spark application (with a single stream reader) that reads events from an Event Hubs instance.
|
||
In this case, you should see the number(or total bytes) of Outgoing messages matching the number (or total bytes) of
|
||
Incoming messages. If you find out the rate of Outgoing messages is `N` times the rate of Incoming messages,
|
||
it indicates that your application is recomputing the input stream `N` times. This is a strong signal to
|
||
update the application code to eliminate input stream recomputations (usually by using `persist` or `cache` method).
|
||
|
||
|
||
## Examples of Having Multiple Readers Unintentionally
|
||
|
||
### Multiple Actions
|
||
|
||
The code below is an example where a single read stream is being used by multiple RDD actions without caching:
|
||
|
||
```scala
|
||
import org.apache.spark.eventhubs._
|
||
import org.apache.spark.sql.DataFrame
|
||
import org.apache.spark.sql.streaming.Trigger
|
||
|
||
// EventHubs connection string
|
||
val endpoint = "Endpoint=sb://SAMPLE;SharedAccessKeyName=KEY_NAME;SharedAccessKey=KEY;"
|
||
val eventHub = "EVENTHUBS_NAME"
|
||
val consumerGroup = "CONSUMER_GROUP"
|
||
val connectionString = ConnectionStringBuilder(endpoint)
|
||
.setEventHubName(eventHub)
|
||
.build
|
||
|
||
// Eventhubs configuration
|
||
val ehConf = EventHubsConf(connectionString)
|
||
.setStartingPosition(EventPosition.fromEndOfStream)
|
||
.setConsumerGroup(consumerGroup)
|
||
.setMaxEventsPerTrigger(500)
|
||
|
||
// read stream
|
||
val ehStream = spark.readStream
|
||
.format("eventhubs")
|
||
.options(ehConf.toMap)
|
||
.load
|
||
|
||
ehStream.writeStream
|
||
.trigger(Trigger.ProcessingTime("5 seconds"))
|
||
.foreachBatch { (ehStreamDF,_) =>
|
||
handleEhDataFrame(ehStreamDF)
|
||
}
|
||
.start
|
||
.awaitTermination
|
||
|
||
|
||
def handleEhDataFrame(ehStreamDF : DataFrame) : Unit = {
|
||
val totalSize = ehStreamDF.map(s => s.length).reduce((a, b) => a + b)
|
||
val eventCount = ehStreamDF.count
|
||
println("Batch contained " + eventCount + " events with total size of " + totalSize)
|
||
}
|
||
```
|
||
|
||
As you can see in the graph below which shows the rate of Incoming vs Outgoing messages in the
|
||
Event Hubs entity, the number of Outgoing messages is almost twice the number of Incoming messages.
|
||
This pattern indicates that the above code reads events from the Event Hubs entity twice: Once when
|
||
it computes the `reduce` action, and once when it computes the `count` action.
|
||
|
||
<img src="files/two-actions-without-persist.png" alt="Incoming vs Outgoing Messages without persist"/>
|
||
|
||
|
||
### Multiple Sinks
|
||
|
||
The code below shows how writing the output of a streaming query to multiple sinks creates multiple readers:
|
||
|
||
```scala
|
||
import org.apache.spark.eventhubs._
|
||
import org.apache.spark.sql.DataFrame
|
||
import org.apache.spark.sql.streaming.Trigger
|
||
|
||
// EventHub connection string
|
||
val endpoint = "Endpoint=sb://SAMPLE;SharedAccessKeyName=KEY_NAME;SharedAccessKey=KEY;"
|
||
val src_eventHub = "SRC_EVENTHUBS_NAME"
|
||
val dst_eventHub = "DST_EVENTHUBS_NAME"
|
||
val consumerGroup = "CONSUMER_GROUP"
|
||
val src_connectionString = ConnectionStringBuilder(endpoint)
|
||
.setEventHubName(src_eventHub)
|
||
.build
|
||
val dst_connectionString = ConnectionStringBuilder(endpoint)
|
||
.setEventHubName(dst_eventHub)
|
||
.build
|
||
|
||
// Eventhub configuration
|
||
val src_ehConf = EventHubsConf(src_connectionString)
|
||
.setStartingPosition(EventPosition.fromEndOfStream)
|
||
.setConsumerGroup(consumerGroup)
|
||
.setMaxEventsPerTrigger(500)
|
||
val dst_ehConf = EventHubsConf(dst_connectionString)
|
||
|
||
// read stream
|
||
val ehStream = spark.readStream
|
||
.format("eventhubs")
|
||
.options(src_ehConf.toMap)
|
||
.load
|
||
.select($"body" cast "string")
|
||
|
||
// eventhub write stream
|
||
val wst1 = ehStream.writeStream
|
||
.format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider")
|
||
.options(dst_ehConf.toMap)
|
||
.option("checkpointLocation", "/checkpointDir")
|
||
.trigger(Trigger.ProcessingTime("10 seconds"))
|
||
.start()
|
||
|
||
// console write stream
|
||
val wst2 = ehStream.writeStream
|
||
.outputMode("append")
|
||
.format("console")
|
||
.option("truncate", false)
|
||
.option("numRows",10)
|
||
.trigger(Trigger.ProcessingTime("10 seconds"))
|
||
.start()
|
||
|
||
wst1.awaitTermination()
|
||
wst2.awaitTermination()
|
||
```
|
||
|
||
You can see in the graph below from the source Event Hubs entity that the number of Outgoing messages is almost
|
||
twice the number of Incoming messages which indicates the existence of two separate readers in our application.
|
||
|
||
<img src="files/two-writes-using-same-input-stream.png" alt="Incoming vs Outgoing Messages for two srite stream using a single input stream"/>
|
||
|
||
|
||
### Persist Data to Prevent Recomputation
|
||
|
||
As we have mentioned before, one way to avoid recomputations is to persist (or cache) the generated
|
||
DataFrame/Dataset from the input stream before performing your desired tasks and unpersist it afterward.
|
||
Please remember this avoids recomputation only if all actions are being scheduled on the same executor node.
|
||
The code below shows how you can run multiple actions on a DataFrame without recomputing the input stream:
|
||
|
||
```scala
|
||
import org.apache.spark.eventhubs._
|
||
import org.apache.spark.sql.DataFrame
|
||
import org.apache.spark.sql.streaming.Trigger
|
||
|
||
// EventHubs connection string
|
||
val endpoint = "Endpoint=sb://SAMPLE;SharedAccessKeyName=KEY_NAME;SharedAccessKey=KEY;"
|
||
val eventHub = "EVENTHUBS_NAME"
|
||
val consumerGroup = "CONSUMER_GROUP"
|
||
val connectionString = ConnectionStringBuilder(endpoint)
|
||
.setEventHubName(eventHub)
|
||
.build
|
||
|
||
// Eventhubs configuration
|
||
val ehConf = EventHubsConf(connectionString)
|
||
.setStartingPosition(EventPosition.fromEndOfStream)
|
||
.setConsumerGroup(consumerGroup)
|
||
.setMaxEventsPerTrigger(500)
|
||
|
||
// read stream
|
||
val ehStream = spark.readStream
|
||
.format("eventhubs")
|
||
.options(ehConf.toMap)
|
||
.load
|
||
|
||
ehStream.writeStream
|
||
.trigger(Trigger.ProcessingTime("5 seconds"))
|
||
.foreachBatch { (ehStreamDF,_) =>
|
||
handleEhDataFrame(ehStreamDF)
|
||
}
|
||
.start
|
||
.awaitTermination
|
||
|
||
|
||
def handleEhDataFrame(ehStreamDF : DataFrame) : Unit = {
|
||
ehStreamDF.persist
|
||
val totalSize = ehStreamDF.map(s => s.length).reduce((a, b) => a + b)
|
||
val eventCount = ehStreamDF.count
|
||
println("Batch contained " + eventCount + " events with total size of " + totalSize)
|
||
ehStreamDF.persist
|
||
}
|
||
```
|
||
|
||
The graph below from the Event Hubs entity shows the number of Incoming and Outgoing messages are
|
||
almost the same, which means the application reads events only once despite executing two actions
|
||
on the generated DataFrame.
|
||
|
||
<img src="files/two-actions-using-persist.png" alt="Incoming vs Outgoing Messages using persist"/>
|
||
|
||
|
||
## Receivers Move Between Executor Nodes
|
||
|
||
The Spark Event Hubs connector executes an input stream by dividing it into batches. Each batch generates a set of tasks
|
||
where each task receives events from one partition. These tasks are being scheduled on the available executor nodes in
|
||
the cluster.
|
||
|
||
The Spark Event Hubs connector creates and caches a receiver for each <Event Hubs instance, consumer group, partition>
|
||
combination on the executor node that runs the task to read events from that specific combination. These receivers are
|
||
epoch receivers by default. Whenever a task that is supposed to read events from a <Event Hubs instance, consumer group, partition>
|
||
is being scheduled on an executor node, it first retrieves the corresponding receiver for the combination from the cache.
|
||
Then, it checks if the receiver's cursor is at the right location (i.e. the latest event offset read by this receiver
|
||
is exactly before the next offset requested to be read). If the receiver's cursor falls behind, it means that another
|
||
node has created and used a receiver for the same <Event Hubs instance, consumer group, partition>, therefore the cached
|
||
receiver on this node is disconnected (since it's an epoch receiver) and a new receiver must be recreated. This results
|
||
in recreating the receiver and updating the cache on this node, which consequently disconnects the currently open
|
||
receiver on the other node.
|
||
|
||
### Example of Receiver Recreation
|
||
|
||
Let's consider the following example to elaborate on this scenario. Assume we have a stream that tries to read from an
|
||
Event Hubs instance with only 2 partitions (partitions 0 and 1) using the consumer group `CG`. This stream creates a
|
||
sequence of batches to read events from the Event Hubs instance and each batch creates 2 tasks, one per partition. For
|
||
simplicity, we name the task that is responsible to read events from partition `k` in batch `i`, `task i.k`, and assume
|
||
each task reads 10 events from its corresponding partition. We run this stream on a cluster with two executor nodes,
|
||
`executor A` and `executor B`. This cluster, for some unknown reason, schedules the tasks to read from partition 0 on
|
||
`executor A` for batches with odd number ids (1, 3, 5, ...) and schedules the tasks to read from partition 0 on
|
||
`executor B` for batches with even number ids (2, 4, 6, ...).
|
||
|
||
Now, we focus on tasks that read events from partition 0 (`tasks i.0`):
|
||
- Batch 1 sends `task 1.0` to `executor A` to read events from offset 0 to 9. The `executor A` creates a receiver for
|
||
the (`CG`, 0) combo and caches it locally. The receiver cursor is at offset 9.
|
||
|
||
- Batch 2 sends `task 2.0` to `executor B` to read events from offset 10 to 19. The `executor B` creates a receiver for
|
||
the (`CG`, 0) combo and caches it locally. The receiver cursor is at offset 19. Note that this new receiver creation
|
||
results in disconnecting the existing receiver from `executor A`.
|
||
|
||
- Batch 3 sends `task 3.0` to `executor A` to read events from offset 20 to 29. The `executor A` retrieves the receiver
|
||
for the (`CG`, 0) combo from its cache. However, when it checks the receiver's cursor, it finds out the cursor
|
||
(at offset 9) falls behind the offset for the current request (offset 20), which means the cached receiver has been
|
||
disconnected by another receiver. Therefore, it recreates the receiver and updates its cache. Again, the receiver
|
||
recreation on `executor A` disconnects the other open receiver to the (`CG`, 0) combo from `executor B`.
|
||
|
||
- Batch 4 sends `task 4.0` to `executor B` to read events from offset 30 to 39. The `executor B` retrieves the receiver
|
||
for the (`CG`, 0) combo from its cache. However, when it checks the receiver's cursor, it finds out the cursor
|
||
(at offset 19) falls behind the offset for the current request (offset 30), which means the cached receiver has been
|
||
disconnected by another receiver. Therefore, it recreates the receiver and updates its cache.
|
||
|
||
- The same behavior goes on and on, ...
|
||
|
||
|
||
### Reducing Receiver Movements
|
||
|
||
This situation would be avoided if all tasks that read events from the same <Event Hubs instance, consumer group, partition>
|
||
combination are being scheduled on the same executor node. In such a case, the executor node reuses the cached receiver,
|
||
and the connector doesn't have to recreate the receiver. Sparks scheduler decides where to schedule a task based on its
|
||
locality level. Generally speaking, Spark tries to schedule tasks locally as much as possible, but if the wait is getting
|
||
too long it schedules the tasks on nodes with less locality level. You can set "how long to wait to launch a data-local
|
||
task before giving up and launching it on a less-local node" by using the `spark.locality.wait` configuration option.
|
||
In this case, increasing the spark locality can help to reduce/avoid recreating receivers. The spark locality can be
|
||
increased by assigning a higher value to the `spark.locality.wait` property (for instance, increase the value to 15s
|
||
instead of the default value 3s). Please refer to
|
||
[Spark Scheduling Configuration](http://spark.apache.org/docs/latest/configuration.html#scheduling) for more information.
|
||
|
||
Please note that increasing the locality level doesn't guarantee that all tasks being executed at the PROCESS_LOCAL
|
||
level. This happens because based on the cluster workload some executor nodes might be busy and the scheduler couldn't
|
||
send a task to those nodes even after an extended period of waiting. Therefore, another way to reduce the chance of
|
||
receiver movement between different executor nodes is to increase the number of executor nodes on the cluster.
|
||
|
||
Finally, another option is to use non-epoch receivers instead of epoch receivers by setting the `UseExclusiveReceiver`
|
||
to false (you can do so by using the `setUseExclusiveReceiver(b: Boolean)` API in `EventHubsConf`). If you decide to
|
||
use the non-epoch receiver please be aware of its limitation of allowing up to 5 concurrent receivers (please refer to
|
||
[Event Hubs quotas and limitations](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas) for more information).
|
||
|