Kafka is an open-source real-time streaming messaging system built around the publish-subscribe system. In a service-oriented architecture, instead of subsystems establishing direct connections with each other, the producer subsystem communicates information via a distributed server, which brokers information and helps move enormous number of messages with low-latency and fault tolerance and allows one or more consumers to concurrently consume these messages.
Kafka does an excellent job with respect to fault-tolerance and ensuring that the messages that are delivered are not lost by partitioning, replication, and distributing the data across multiple brokers.
In distributed systems failures are inevitable, whether it be DB connection failure, or network call failure, or outages in downstream dependencies, especially in a microservices ecosystem.
There are multiple issues that could occur on the consumer side that need special handling. When implementing the Kafka Consumer, there are some scenarios that need to be considered that need special handling:
Downstream Service or Data Store Failure
Consumer is not able to process the message because a downstream microservice API is unavailable or returns an error, or a DB it's trying to connect to is down or unresponsive.
This blog post discusses some of the error handling mechanisms that we implemented as a part of the MetricStream Platform to improve the robustness and resiliency of the Platform.
Data Format Changes or Event Version Incompatibility
The consumer is expecting the message payload to be in a certain format, whereas the producer has changed the format of the message e.g., a required field is removed, i.e., for example the consumer is unable to deserialize the message which is sent by the producer in a certain format.
Unable to reach Kafka cluster
The producer may fail to push message to a topic due to a network partition or unavailability of the Kafka cluster, in such cases there are high chances of messages being lost, hence we need a retry mechanism to avoid loss of data. So, the approach we take here is to store the message in a temporary secondary store DB/Cache and retry the messages from the secondary store and try to write the message to the main topic.
Clogged processing
When we are required to process many messages in real time, repeatedly failed messages can clog processing. The worst offenders consistently exceed the retry limit, which also means that they take the longest and use the most resources. Without a success response, the Kafka consumer will not commit a new offset and the batches with these bad messages would be blocked, as they are re-consumed again and again.
Difficulty retrieving retry metadata
It can be cumbersome to obtain metadata on the retries, such as timestamps and nth retry. If requests continue to fail retry after retrying, we want to collect these failures in a DLQ for visibility and diagnosis. A DLQ should allow listing for viewing the contents of the queue, purging for clearing those contents, and merging for reprocessing the dead-lettered messages, allowing comprehensive resolution for all failures affected by a shared issue.
To address the problem of blocked batches, we set up a distinct retry queue using a separately defined Kafka topic. Under this paradigm, when a consumer handler returns a failed response for a given message after a certain number of retries, the consumer publishes that message to its corresponding retry topic. The handler then returns true to the original consumer, which commits its offset.
Here are some of the possible scenarios why the Producer API is unable to send the message.
The approach to recover from the above errors involves building a retry mechanism within the producer to ensure that there is an auto-retry process to try and re-deliver messages and a dead-letter store to save messages that were undeliverable even after the auto-retry process.
The steps involved are (see diagram above):
1. Client invokes the Kafka client's producer API to push a message to the main topic (configured in the producer API).
2. If there is an exception thrown by Kafka while pushing the message to the topic, then we need a way of handling the error and managing the message in way that we don't lose the message (prevent data loss).
3. When there is an exception returned by Kafka, then the message will be written to a secondary store.
4. Retry policy defines three key things,
5. Based on the retry policy the message will be pushed to a secondary store (DB) till the max retry limit is not reached, once the max retry count is reached, the message will be pushed to the dead letter store.
6. The retry consumer implemented internally as part of the framework will read the messages from the retry store and invoke the producer API to push the message to the main topic.
7. The retry will be done by a separate set of threads from a dedicated retry thread pool, which will not interfere with the main threads pushing the data to Kafka topic or consuming data from Kafka topics.
8. The error handling is controlled through a flag, which the producer can set at the API level, as certain messages may not be as important as the others, such that we can allow the messages from being lost, e.g., log messages.
9. There will be a flag "enableRetry" which will be enabled by default; this can be set at the producer API level to enable/disable error handling.
Some of the scenarios where the consumer process could run into errors are:
To handle these errors, the following mechanisms can be followed to improve resiliency:
Following are a series of steps to be followed for retrying in case of a failure on the consumer end as shown in the above figure:
1. Kafka consumer listener in Service A tries to consume an event/message from the main topic.
2. The consumer in Service A has a dependency on another service e.g., Service B or a data store to complete the processing, e.g., it may try to invoke another API on a microservice to fetch or update some data.
3. There is an exception thrown while making a call to the microservice i.e., Service B due to some network failure or the service throws an exception due to some internal service failure (i.e., Internal Server Error or Service not available)
Valid Characters for Kafka topics:
Max Allowed Topic Name Length:
Main topics convention:
Resulting Topic Name: org.orders.created
Example of retry policy configuration in KafkaListener annotation:
@KafkaListener(name = "workflow", topics = "forms", group = "workflow", retryPolicy = @SimpleRetryPolicy(retryBackoffMs = 5000, retryCount = 2, exceptions = {KafkaConsumerException.class, IOException.class}))
Following mechanisms can be optionally added to the producer/consumer retry policy.