Kafka Message Filtering – An Analysis

by Horatiu Dan

A lot of companies nowadays use event driven architectures in their day to day business activities, especially when they desire their applications to own real-time or near real-time reactiveness.

In such a scenery, during the interactions among the three main type of actors – producers, message broker and consumers – a lot of messages are exchanged. Nevertheless, under certain circumstances, some of these messages might not be of interest and thus they are discarded and ignored.

This article aims to analyze in detail how a consumer application shall be configured so that it behaves correctly when it needs to filter messages that are “irrelevant”. First, a standard record filter strategy is configured at consumer level. Then, a custom deserialization mechanism is added and the analysis is refined. As stated, the intention is to preserve a correct behavior of the consumer.

Set-up

  • Java 21
  • Maven 3.9.2
  • Spring Boot – version 3.2.2
  • Redpanda message broker running in Docker – image version 23.2.15

As message broker, the great and lightweight Redpanda is chosen. Since it is completely Kafka compatible, the development and the configuration do not need to be modified at all if deciding to change it with a different one. [Resource 1] describes how to accomplish the Redpanda minimal set-up.

Once the Docker container is up and running, a topic called request is created with the following command:

>docker exec -it redpanda-0 rpk topic create request
TOPIC    STATUS
request  OK
>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421

BROKERS
=======
ID    HOST        PORT
0*    redpanda-0  9092

TOPICS
======
NAME                PARTITIONS  REPLICAS
__consumer_offsets  3           1
_schemas            1           1
request             1           1

As shown, the request topic was created successfully.

Implement a Record Filter Strategy

The use case is the following:

  • the producer sends a request to the configured topic
  • if the request message fulfills the acceptance criteria, the consumer processes it
  • otherwise, the message is discarded

A request message has a simple form:

{
	"id": "34b25c6b-60d6-4e53-8f79-bdcdd17b3a2d",
	"contextId": "hcd"
}

having just two fields, an identifier and a context identifier.

Messages are taken into account only in a certain acceptable context. Differently put, a message is accepted if it has a certain contextId, that is equal to the one configured on the consumer side, otherwise it is discarded.

A request is modelled by the following record:

public record Request(String id, String contextId) {
}

For configuring a producer and a consumer, at least these properties are needed (application.properties file):

# the path to the message broker
broker.url = localhost:19092

# the name of the topic
topic.request = request

# the unique string that identifies the consumer group of the consumer
context.id = hcd

The requirement is clear – only the messages having hcd as contextId are accepted.

In order to send messages, a producer needs a KafkaTemplate instance, configured as below:

@Configuration
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);

        return new KafkaTemplate<>(producerFactory);
    }
}

One may observe in the producer configuration that a StringSerializer was chosen for marshalling the payload value. Usually, a JsonSerializer provides more robustness to the producer-consumer contract. Nevertheless, the choice here was intentional to increase the experimental flexibility.

Once the messages reach the request topic, a consumer is configured to pick them up.

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Value("${context.id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());

        DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(), new JsonDeserializer<>(Request.class));

        ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(defaultFactory);
        factory.setRecordFilterStrategy(recordFilterStrategy);
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        return factory;
    }
}

Line 26 in the listing above allows injecting a record filtering strategy, which is exactly the purpose here – a mean to decide whether a message is filtered out or not.

The RecordFilterStrategy interface has one abstract method

boolean filter(ConsumerRecord consumerRecord);

which according to its JavaDoc, returns true if the ConsumerRecord should be discarded (K represents the message key, while V the message value).

In the case of this proof of concept, all messages that have their contextId equal to hcd are accepted and consumed, while the rest are filtered out. The implementation is below.

@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRecordFilterStrategy.class);

    @Value("${context.id}")
    private String contextId;

    @Override
    public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
        Request request = consumerRecord.value();

        boolean discard = !contextId.equals(request.contextId());
        LOG.info("{} is{} compliant.", request, discard ? "n't" : "");
        return discard;
    }
}

As part of the configuration, the KafkaListenerContainerFactory interface is responsible for creating the listener container of a particular endpoint. The @EnableKafka annotation on the configuration class enables the detection of @KafkaListener annotations on any Spring-managed beans in the container. Thus, the actual listener (the message consumer) is developed next.

@Component
public class RequestMessageListener {

    private static final Logger LOG = LoggerFactory.getLogger(RequestMessageListener.class);

    private final ResponseService responseService;

    public RequestMessageListener(ResponseService responseService) {
        this.responseService = responseService;
    }

    @KafkaListener(topics = "${topic.request}", groupId = "${context.id}")
    public void onMessage(@Payload Request request) {
        LOG.info("Processing {}.", request);

        responseService.send(Response.success());
    }
}

Its functionality is trivial, it logs the messages read from the request topic and destined to the configured consumer group. Then, it invokes a ResponseService which acts as the entity that sends a message back (here, it only logs it).

@Service
public class ResponseService {

    private static final Logger LOG = LoggerFactory.getLogger(ResponseService.class);

    public void send(Response response) {
        LOG.info("Sending {}.", response);
    }
}

A Reponse is modeled simply, as below:

public record Response (String id,
                        Result result) {

    public static Response success() {
        return new Response(UUID.randomUUID().toString(), Result.SUCCESS);
    }
    
    public enum Result {
        SUCCESS, FAILURE
    }
}

When the the application is started, provided the message broker is up, the listener is ready to receive messages.

INFO 20080 --- [main] c.h.r.RecordFilterStrategyApplication				  : Started RecordFilterStrategyApplication in 1.282 seconds (process running for 1.868)
INFO 20080 --- [main] fkaConsumerFactory$ExtendedKafkaConsumer            : [Consumer clientId=consumer-hcd-1, groupId=hcd] Subscribed to topic(s): request
INFO 20080 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-hcd-1, groupId=hcd] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hcd-1, groupId=hcd] Request joining group due to: need to re-join with the given member-id: consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hcd-1, groupId=hcd] Successfully joined group with generation Generation{generationId=7, memberId='consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc', protocol='range'}

In order to check the integration, the following two tests are used. Since a Request is expected by the listener, a compliance template was created for convenience.

@SpringBootTest
class RecordFilterStrategyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topic.request}")
    private String topic;

    @Value("${context.id}")
    private String contextId;

    private static final String template = """
        {
            "id": "%s",
            "contextId": "%s"
        }""";

    @Test
    void compliant() {
        kafkaTemplate.send(topic,
                String.format(template, UUID.randomUUID(), contextId));
    }

    @Test
    void notCompliant() {
        kafkaTemplate.send(topic,
                String.format(template, UUID.randomUUID(), "other context"));
    }
}

compliant() sends a message whose contextId is as this consumer has configured it. As expected, it is processed and a response is sent back.

INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy    : Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd] is compliant.
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.RequestMessageListener : Processing Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd].
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService        : Sending Response[id=ebe0f65c-eddf-4866-b71f-e6cd766dd499, result=SUCCESS].

notCompliant() sends a message whose contextId is different from what was configured on this consumer. Thus, the message is neither processed, nor responded to, but ignored.

INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy : Request[id=ed22f60c-b13d-4315-8132-46aa83ddf33b, contextId=other context] isn't compliant.

So far, the proof of concept has exemplified how to configure the consumer with a record filtering strategy so that only certain messages are accepted.

The code for this part is here – 1-filter-strategy

Implement a Record Filter Strategy with Custom Deserialization

Let’s assume that the messages which are consumed from the request queue are unmarshalled using a custom deserializer and the filtering is still required.

The custom deserializer here is trivial and has a didactic purpose. Moreover, in case the id field is missing, a runtime RequestDeserializationException is thrown. Such an action is not necessarily needed at this point, but it was put here to outline a certain use case. Read on.

public class CustomRequestDeserializer extends StdDeserializer<Request> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRequestDeserializer.class);

    public CustomRequestDeserializer() {
        super(Request.class);
    }

    @Override
    public Request deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
        ObjectCodec oc = jsonParser.getCodec();
        JsonNode root = oc.readTree(jsonParser);

        final String contextId = deserializeField(root, "contextId");

        final String id = deserializeField(root, "id");
        if (id == null || id.isEmpty()) {
            throw new RequestDeserializationException("'id' is required");
        }

        Request request = new Request(id, contextId);
        LOG.info("Successfully deserialized {}", request);
        return request;
    }
}

To apply it, the Request record is annotated as below:

@JsonDeserialize(using = CustomRequestDeserializer.class)
public record Request(String id, String contextId) {
}

Up until now, the behavior described in the first part is preserved. If the previous compliant() and nonCompliant() tests are run again, the outcome is the same.

The next analyzed situation is the one in which a RequestDeserializationException is thrown when deserializing an incoming message. Let’s assume the id is empty, thus the form is as below:

{
	"id": "",
	"contextId": "hcd"
}
@Test
void deserializationError_compliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", contextId));
}

When such a message is received, the outcome is the following:

...
Caused by: com.hcd.recordfilterstrategy.domain.deserialization.RequestDeserializationException: 'id' is required
...

An exception thrown at desrialization time determines the message to be neither consumed, nor responded to, but to be lost.

See [Resource 3] for a detailed analysis on situations like this.

One solution that allows recovering after deserialization exceptions is to configure the value deserializer of the KafkaListenerContainerFactory with a failed deserialization function – see line 15 below:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy,
																							  FailedRequestDeserializationFunction failedDeserializationFunction) {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
	props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
	props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
	props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());

	JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(Request.class);

	ErrorHandlingDeserializer<Request> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
	valueDeserializer.setFailedDeserializationFunction(failedDeserializationFunction);

	DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
			new StringDeserializer(), valueDeserializer);

	ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(defaultFactory);
	factory.setRecordFilterStrategy(recordFilterStrategy);
	factory.setCommonErrorHandler(new DefaultErrorHandler());
	return factory;
}

The purpose of the component is to allow recovering after such an exceptional situation and to be able to send a failure response back.

@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);

    private final ResponseService responseService;

    public FailedRequestDeserializationFunction(ResponseService responseService) {
        this.responseService = responseService;
    }

    @Override
    public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception ex = failedDeserializationInfo.getException();

        if (ex instanceof RequestDeserializationException deserializationEx) {
            LOG.info("Error deserializing request - {}", deserializationEx.getMessage());

            responseService.send(Response.failure());

        } else {
            LOG.error("Unexpected error deserializing request.", ex);
        }

        return null;
    }
}

If the same test is run again and a compliant, but incorrect message is sent, the behavior changes.

2024-03-13T10:52:38.893+02:00  INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T10:52:38.895+02:00  INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           : Sending Response[id=5393b4a0-3849-4130-934b-671e43a2358f, result=FAILURE].

The only left case is that of a non-compliant and incorrect message, meaning the id is still empty, but the contextId is different from the expected one.

{
	"id": "",
	"contextId": "other context"
}

If the following test is run, nothing changes, unfortunately the failed deserialization function still sends a failure response back, although the record filtering strategy should have filtered the message out as the contextId is non-compliant.

@Test
void deserializationError_notCompliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", "other context"));
}
2024-03-13T11:03:56.609+02:00  INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T11:03:56.610+02:00  INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           : Sending Response[id=b345f001-3543-46da-bc0f-17c63c20e32a, result=FAILURE].

The code for this second section is here – 2-filter-strategy-custom-deser.

Implement a Record Filter Strategy with Custom Deserialization – Correctly

The last part of this analysis provides a solution on how to address this last use-case.

Before moving on with it, let’s recall what currently happens in all possible use-cases:

Correct, compliant message

  1. since the message is correct, the custom deserializer successfully unmarshalls it
  2. the failed deserialization function is not invoked
  3. since the message is compliant, the record filter strategy does not reject it
  4. the listener is invoked, it processes the request and sends a response back

Correct, non-compliant message

  1. since the message is correct, the custom deserializer successfully unmarshalls it
  2. the failed deserialization function is not invoked
  3. since the message is non-compliant, the record filter strategy rejects it
  4. the listener is not invoked

Incorrect, compliant or non-compliant message

  1. since the message is incorrect, the custom deserializer throws an exception
  2. the failed deserialization is invoked and it sends a failure response back
  3. the record filter strategy is not invoked
  4. the listener is not invoked

In case of a correct message, the consumer application behaves correctly, irrespective of the compliancy of the message.
In case of an incorrect message, a failure response is sent back, irrespective of the compliancy of the message, which means the consumer behaves correctly only for compliant messages.

For incorrect, non-compliant messages it should act as follows:

  1. since the message is incorrect, the custom deserializer throws an exception
  2. the failed deserialization is invoked and it sends a failure response back only if the message is compliant
  3. the record filter strategy is not invoked
  4. the listener is not invoked

At a first glance, in order to cover the last use-case as well, only the FailedRequestDeserializationFunction needs to be enhanced to also check the message compliancy.

Basically, before sending the response, the same check as the one in CustomRecordFilterStrategy shall be added. To avoid repetition, some refactoring is done.

To isolate the compliancy check, a separate component in charge of it is created.

@Component
public class RequestFilterStrategy {

    private static final Logger LOG = LoggerFactory.getLogger(RequestFilterStrategy.class);

    @Value("${context.id}")
    private String contextId;

    public boolean filter(String contextId) {
        boolean discard = !this.contextId.equals(contextId);
        LOG.info("Request is{} compliant.", discard ? "n't" : "");
        return discard;
    }
}

Then, the component is injected both in the CustomRecordFilterStrategy and in the FailedRequestDeserializationFunction and consequently, they are refactored as follows.

@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {

    private final RequestFilterStrategy requestFilterStrategy;

    public CustomRecordFilterStrategy(RequestFilterStrategy requestFilterStrategy) {
        this.requestFilterStrategy = requestFilterStrategy;
    }

    @Override
    public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
        return requestFilterStrategy.filter(consumerRecord.value().contextId());
    }
}
@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);

    private final RequestFilterStrategy requestFilterStrategy;
    private final ResponseService responseService;

    public FailedRequestDeserializationFunction(RequestFilterStrategy requestFilterStrategy,
                                                ResponseService responseService) {
        this.requestFilterStrategy = requestFilterStrategy;
        this.responseService = responseService;
    }

    @Override
    public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception ex = failedDeserializationInfo.getException();

        if (ex instanceof RequestDeserializationException deserializationEx) {
            LOG.info("Error deserializing request - {}", deserializationEx.getMessage());

            if (!requestFilterStrategy.filter(deserializationEx.getContextId())) {
                responseService.send(Response.failure());
            }
        } else {
            LOG.error("Unexpected error deserializing request.", ex);
        }

        return null;
    }
}

To check the behavior, the last unit test is run again.

@Test
void deserializationError_notCompliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", "other context"));
}

The output clearly shows that for incorrect, non-compliant messages, no response is sent anymore.

2024-03-13T15:05:56.432+02:00  INFO 17916 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T15:05:56.432+02:00  INFO 17916 --- [ntainer#0-0-C-1] c.h.r.listener.RequestFilterStrategy     : Request isn't compliant.

The code for the enhanced solution is here – 3-filter-strategy-custom-deser-covered

Resources

  1. Redpanda Quickstart
  2. Spring for Apache Kafka Reference
  3. Acting Soon on Kafka Deserialization Errors
  4. The picture was taken at Legoland, Germany

Acting Soon on Kafka Deserialization Errors

by Horatiu Dan

Context

Event driven architectures have been successfully used for quite an amount of time by a lot of organizations in various business cases. They excel at performance, scalability, evolvability and fault-tolerance providing a good level of abstraction and elasticity. These strengths made them good choices when applications needed real or near real-time reactiveness.

In terms of implementations, for standard messaging, ActiveMQ and RabbitMQ are good candidates, while for data streaming, platforms as Apache Kafka and Redpanda are more suitable. Usually, when developers and architects need to opt for either one of these two directions they analyze and weight from a bunch of angles – message payload, flow and usage of data, throughput, solution topology. As the discussion around these aspects can get too big and complex, it is not going to be refined as part of this article.

Conceptually, event driven architectures involve at least three main actors – message producers, message brokers and message consumers. Briefly, the purpose is to allow the producers and the consumers to communicate in a decoupled and asynchronous way, mean that is accomplished with the help of the previously mentioned message brokers. In the optimistic scenario, a producer creates a message, publishes it to a topic owned by the broker from which the consumer reads it, deals with it and out of courtesy provides a response back. Messages are serialized (marshalled) by the producers when sent to topics and de-serialized (unmarshalled) by consumers when received from topics.

This article focuses on the situation in which a consumer experiences issues when de-serializing a received message and provides a way of being able to act further. A few examples of such actions may include constructing a default message or sending back feedback to the message broker. Developers are creative enough to decide on this behavior, depending on the particular implemented use cases.

Set-up

  • Java 21
  • Maven 3.9.2
  • Spring Boot – version 3.1.5
  • Redpanda message broker running in Docker – image version 23.2.15

Redpanda is a lightweight message broker and it was chosen for this proof of concept to give the readers the opportunity to experiment a different option than the widely used Kafka one. As it is Kafka compatible, the development and the configuration of the producers and consumers will not need to change at all if moving from one service provider to another.

According to Redpanda documentation, the Docker support applies only to development and testing. For the purpose of this project, this is more than enough, thus a single Redpanda message broker is set-up to run in Docker.

See [Resource 1] for details on how to accomplish the minimal set-up.

Once up and running, a topic called minifig is created with the following command:

>docker exec -it redpanda-0 rpk topic create minifig
TOPIC    STATUS
minifig  OK

If the cluster is inspected, one may observe that a topic with one partition and one replica was created.

>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421

BROKERS
=======
ID		HOST		PORT
0*		redpanda-0	9092

TOPICS
======
NAME				PARTITIONS	REPLICAS
__consumer_offsets	3			1
_schemas			1			1
minifig				1			1

Implementation

The flow is straight-forward, the producer sends a request to the configured topic which is further read by the consumer, as it is able to.

A request represents a mini-figure which is simplistically modelled by the following record:

public record Minifig(String id,
                      Size size,
                      String name) {

    public Minifig(Size size, String name) {
        this(UUID.randomUUID().toString(), size, name);
    }

    public enum Size {
        SMALL, MEDIUM, BIG;
    }
}

id is the unique identifier of the Minifig which has a certain name and is of a certain size – small, medium or big.

For configuring a producer and a consumer, at least these properties are needed (application.properties file):

# the path to the message broker
broker.url=localhost:19092

# the name of the broker topic
topic.minifig=minifig

# the unique string that identifies the consumer group of the consumer
topic.minifig.group.id=group-0

For sending messages, the producer needs a KafkaTemplate instance.

@Configuration
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);

        return new KafkaTemplate<>(producerFactory);
    }
}

One may observe in the producer configuration that a StringSerializer was chosen for marshalling the payload value. Usually, a JsonSerializer provides more robustness to the producer-consumer contract. Nevertheless, the choice here was intentional to increase the experimental flexibility on the consumer side (will see later). Just as a reminder, the interest in this proof of concept is to act on the encountered deserialization errors.

Once the messages reach the minifig topic, a consumer is configured to pick them up.

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Value("${topic.minifig.group.id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName());

        DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props);

        ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(defaultFactory);
		factory.setCommonErrorHandler(new DefaultErrorHandler());
        return factory;
    }
}

The KafkaListenerContainerFactory interface is responsible to create the listener container for a particular endpoint. The @EnableKafka annotation on the configuration class enables the detection of @KafkaListener annotations on any Spring-managed beans in the container. Thus, the actual listener (the message consumer) is developed next.

@Component
public class MinifigListener {

    private static final Logger LOG = LoggerFactory.getLogger(MinifigListener.class);

    @KafkaListener(topics = "${topic.minifig}", groupId = "${topic.minifig.group.id}")
    public void onReceive(@Payload Minifig minifig) {
        LOG.info("New minifig received - {}.", minifig);
    }
}

Its functionality is trivial, It only logs the messages read from the minifig topic, destined for the configured consumer group.

If the the application is started, provided the message broker is up, the listener is ready to receive messages.

INFO 10008 --- [main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group-0-1, groupId=group-0] Subscribed to topic(s): minifig
INFO 10008 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group-0-1, groupId=group-0] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421
INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-0-1, groupId=group-0] Discovered group coordinator localhost:19092 (id: 2147483647 rack: null)
INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-0-1, groupId=group-0] Found no committed offset for partition minifig-0
INFO 10008 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-0: partitions assigned: [minifig-0]

In order to check the integration, the following simple test is used. Since a Minifig is expected by the listener, a compliance template was created for convenience.

@SpringBootTest
class AppTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topic.minifig}")
    private String topic;

    final String template = "{" +
            "\"id\":\"%s\"," +
            "\"size\":\"%s\"," +
            "\"name\":\"%s\"" +
            "}";

    @Test
    void send_compliant() {
        final String minifig = String.format(template,
                UUID.randomUUID(), Minifig.Size.SMALL, "Spider-Man");

        kafkaTemplate.send(topic, minifig);
    }
}

When running the test, a ‘compliant’ message is sent to the broker and as expected, it is successfully picked up by the local consumer.

INFO 10008 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener           : New minifig received - Minifig[id=0c75b9e4-511a-48b3-a984-404d2fc1d47b, size=SMALL, name=Spider-Man].

Redpanda Console can be helpful in observing what is happening at the broker level, particularly what is flowing through the minifig topic.

In scenarios as the one above, messages are sent from the producer to the consumer via the message broker, as planed.

Recover on Deserialization Failures

In the particular case of this proof of concept, it is assumed the type of a mini-figure can be SMALL, MEDIUM or BIG, in line to the defined Type enum. In case the producer sends a mini-figure of an unknown type, one that deviates a bit from the agreed contract, the messages are basically rejected by the listener, as the payload cannot be de-serialized.

To simulate this, the following test is run.

@SpringBootTest
class AppTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topic.minifig}")
    private String topic;

    final String template = "{" +
            "\"id\":\"%s\"," +
            "\"size\":\"%s\"," +
            "\"name\":\"%s\"" +
            "}";
    
    @Test
    void send_non_compliant() {
        final String minifig = String.format(template,
                UUID.randomUUID(), "Unknown", "Spider-Man");

        kafkaTemplate.send(topic, minifig);
    }
}

The message reaches the topic, but not the MinifigListener#onReceive() method. As expected, the error appeared when the payload was being unmarshalled. The causes can be depicted by looking deep down the stack trace.

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data  from topic [minifig]
Caused by: com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL]
 at [Source: (byte[])"{"id":"fbc86874-55ac-4313-bbbb-0ed99341825a","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])

One other aspect is that the messages are continually tried to be read on the consumer side. This is unfortunate at least from the consumer point of view, as the logs are accumulating.

In order to pass over such situations, the JsonDeserializer used for unmarshalling the payload value is decorated in an ErrorHandlingDeserializer as its actual delegate. Moreover, the ErrorHandlingDeserializer has a failedDeserializationFunction member that according to its JavaDoc, provides an alternative mechanism when the deserialization fails.

The new consumer configuration looks as below:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
	props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
	props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName());
	props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName());

	JsonDeserializer<Minifig> jsonDeserializer = new JsonDeserializer<>(Minifig.class);

	ErrorHandlingDeserializer<Minifig> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
	valueDeserializer.setFailedDeserializationFunction(new MinifigFailedDeserializationFunction());

	DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
			new StringDeserializer(), valueDeserializer);

	ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(defaultFactory);
	factory.setCommonErrorHandler(new DefaultErrorHandler());
	return factory;
}

The failedDeserializationFunction used here is simplistic, but the reason is to prove its utility.

public class MinifigFailedDeserializationFunction implements Function<FailedDeserializationInfo, Minifig> {

    private static final Logger LOG = LoggerFactory.getLogger(MinifigFailedDeserializationFunction.class);

    @Override
    public Minifig apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception exception = failedDeserializationInfo.getException();
        LOG.info("Error deserializing minifig - {}", exception.getCause().getMessage());
        return new Minifig("Default");
    }
}

The FailedDeserializationInfo entity (the Function#apply() input) is constructed during the recovery from the de-serialization exception and it encapsulates various pieces of information (here, the exception is the one leveraged).
Since the output of the apply() method is the actual deserialization result, one may return either null or whatever it is suitable depending on the aimed behavior.

If running the send_non_compliant() test again, the deserialization exception is handled and a default value is returned. Further, the MinifigListener is invoked and has the opportunity to deal with it.

INFO 30160 --- [ntainer#0-0-C-1] e.l.MinifigFailedDeserializationFunction : Error deserializing minifig - Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL]
 at [Source: (byte[])"{"id":"f35a77bf-29e5-4f5c-b5de-cc674f22029f","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])
INFO 30160 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener           : New minifig received - Minifig[id=null, size=SMALL, name=Undefined].

Conclusion

Configuring Kafka producers and consumers and fine-tuning them in order to achieve the desired performance in accordance to the used message brokers is not always straight-forward. Controlling each step of the communication is by all means something desirable and moreover, acting fast to unknown situations helps delivering robust and easy to maintain solutions. This post focused on the deserialization issues that might appear at Kafka consumers level and provided with a way of having a second plan when dealing with non compliant payloads.

Sample Code

Available here – https://github.com/horatiucd/err-handler-deserializer

Resources

  1. Redpanda Quickstart
  2. Spring for Apache Kafka Reference
  3. The picture was taken at Zoo Brasov, Romania