Kafka Message Filtering – An Analysis

by Horatiu Dan

A lot of companies nowadays use event driven architectures in their day to day business activities, especially when they desire their applications to own real-time or near real-time reactiveness.

In such a scenery, during the interactions among the three main type of actors – producers, message broker and consumers – a lot of messages are exchanged. Nevertheless, under certain circumstances, some of these messages might not be of interest and thus they are discarded and ignored.

This article aims to analyze in detail how a consumer application shall be configured so that it behaves correctly when it needs to filter messages that are “irrelevant”. First, a standard record filter strategy is configured at consumer level. Then, a custom deserialization mechanism is added and the analysis is refined. As stated, the intention is to preserve a correct behavior of the consumer.

Set-up

  • Java 21
  • Maven 3.9.2
  • Spring Boot – version 3.2.2
  • Redpanda message broker running in Docker – image version 23.2.15

As message broker, the great and lightweight Redpanda is chosen. Since it is completely Kafka compatible, the development and the configuration do not need to be modified at all if deciding to change it with a different one. [Resource 1] describes how to accomplish the Redpanda minimal set-up.

Once the Docker container is up and running, a topic called request is created with the following command:

>docker exec -it redpanda-0 rpk topic create request
TOPIC    STATUS
request  OK
>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421

BROKERS
=======
ID    HOST        PORT
0*    redpanda-0  9092

TOPICS
======
NAME                PARTITIONS  REPLICAS
__consumer_offsets  3           1
_schemas            1           1
request             1           1

As shown, the request topic was created successfully.

Implement a Record Filter Strategy

The use case is the following:

  • the producer sends a request to the configured topic
  • if the request message fulfills the acceptance criteria, the consumer processes it
  • otherwise, the message is discarded

A request message has a simple form:

{
	"id": "34b25c6b-60d6-4e53-8f79-bdcdd17b3a2d",
	"contextId": "hcd"
}

having just two fields, an identifier and a context identifier.

Messages are taken into account only in a certain acceptable context. Differently put, a message is accepted if it has a certain contextId, that is equal to the one configured on the consumer side, otherwise it is discarded.

A request is modelled by the following record:

public record Request(String id, String contextId) {
}

For configuring a producer and a consumer, at least these properties are needed (application.properties file):

# the path to the message broker
broker.url = localhost:19092

# the name of the topic
topic.request = request

# the unique string that identifies the consumer group of the consumer
context.id = hcd

The requirement is clear – only the messages having hcd as contextId are accepted.

In order to send messages, a producer needs a KafkaTemplate instance, configured as below:

@Configuration
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);

        return new KafkaTemplate<>(producerFactory);
    }
}

One may observe in the producer configuration that a StringSerializer was chosen for marshalling the payload value. Usually, a JsonSerializer provides more robustness to the producer-consumer contract. Nevertheless, the choice here was intentional to increase the experimental flexibility.

Once the messages reach the request topic, a consumer is configured to pick them up.

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Value("${context.id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());

        DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(), new JsonDeserializer<>(Request.class));

        ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(defaultFactory);
        factory.setRecordFilterStrategy(recordFilterStrategy);
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        return factory;
    }
}

Line 26 in the listing above allows injecting a record filtering strategy, which is exactly the purpose here – a mean to decide whether a message is filtered out or not.

The RecordFilterStrategy interface has one abstract method

boolean filter(ConsumerRecord consumerRecord);

which according to its JavaDoc, returns true if the ConsumerRecord should be discarded (K represents the message key, while V the message value).

In the case of this proof of concept, all messages that have their contextId equal to hcd are accepted and consumed, while the rest are filtered out. The implementation is below.

@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRecordFilterStrategy.class);

    @Value("${context.id}")
    private String contextId;

    @Override
    public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
        Request request = consumerRecord.value();

        boolean discard = !contextId.equals(request.contextId());
        LOG.info("{} is{} compliant.", request, discard ? "n't" : "");
        return discard;
    }
}

As part of the configuration, the KafkaListenerContainerFactory interface is responsible for creating the listener container of a particular endpoint. The @EnableKafka annotation on the configuration class enables the detection of @KafkaListener annotations on any Spring-managed beans in the container. Thus, the actual listener (the message consumer) is developed next.

@Component
public class RequestMessageListener {

    private static final Logger LOG = LoggerFactory.getLogger(RequestMessageListener.class);

    private final ResponseService responseService;

    public RequestMessageListener(ResponseService responseService) {
        this.responseService = responseService;
    }

    @KafkaListener(topics = "${topic.request}", groupId = "${context.id}")
    public void onMessage(@Payload Request request) {
        LOG.info("Processing {}.", request);

        responseService.send(Response.success());
    }
}

Its functionality is trivial, it logs the messages read from the request topic and destined to the configured consumer group. Then, it invokes a ResponseService which acts as the entity that sends a message back (here, it only logs it).

@Service
public class ResponseService {

    private static final Logger LOG = LoggerFactory.getLogger(ResponseService.class);

    public void send(Response response) {
        LOG.info("Sending {}.", response);
    }
}

A Reponse is modeled simply, as below:

public record Response (String id,
                        Result result) {

    public static Response success() {
        return new Response(UUID.randomUUID().toString(), Result.SUCCESS);
    }
    
    public enum Result {
        SUCCESS, FAILURE
    }
}

When the the application is started, provided the message broker is up, the listener is ready to receive messages.

INFO 20080 --- [main] c.h.r.RecordFilterStrategyApplication				  : Started RecordFilterStrategyApplication in 1.282 seconds (process running for 1.868)
INFO 20080 --- [main] fkaConsumerFactory$ExtendedKafkaConsumer            : [Consumer clientId=consumer-hcd-1, groupId=hcd] Subscribed to topic(s): request
INFO 20080 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-hcd-1, groupId=hcd] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hcd-1, groupId=hcd] Request joining group due to: need to re-join with the given member-id: consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hcd-1, groupId=hcd] Successfully joined group with generation Generation{generationId=7, memberId='consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc', protocol='range'}

In order to check the integration, the following two tests are used. Since a Request is expected by the listener, a compliance template was created for convenience.

@SpringBootTest
class RecordFilterStrategyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topic.request}")
    private String topic;

    @Value("${context.id}")
    private String contextId;

    private static final String template = """
        {
            "id": "%s",
            "contextId": "%s"
        }""";

    @Test
    void compliant() {
        kafkaTemplate.send(topic,
                String.format(template, UUID.randomUUID(), contextId));
    }

    @Test
    void notCompliant() {
        kafkaTemplate.send(topic,
                String.format(template, UUID.randomUUID(), "other context"));
    }
}

compliant() sends a message whose contextId is as this consumer has configured it. As expected, it is processed and a response is sent back.

INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy    : Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd] is compliant.
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.RequestMessageListener : Processing Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd].
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService        : Sending Response[id=ebe0f65c-eddf-4866-b71f-e6cd766dd499, result=SUCCESS].

notCompliant() sends a message whose contextId is different from what was configured on this consumer. Thus, the message is neither processed, nor responded to, but ignored.

INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy : Request[id=ed22f60c-b13d-4315-8132-46aa83ddf33b, contextId=other context] isn't compliant.

So far, the proof of concept has exemplified how to configure the consumer with a record filtering strategy so that only certain messages are accepted.

The code for this part is here – 1-filter-strategy

Implement a Record Filter Strategy with Custom Deserialization

Let’s assume that the messages which are consumed from the request queue are unmarshalled using a custom deserializer and the filtering is still required.

The custom deserializer here is trivial and has a didactic purpose. Moreover, in case the id field is missing, a runtime RequestDeserializationException is thrown. Such an action is not necessarily needed at this point, but it was put here to outline a certain use case. Read on.

public class CustomRequestDeserializer extends StdDeserializer<Request> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRequestDeserializer.class);

    public CustomRequestDeserializer() {
        super(Request.class);
    }

    @Override
    public Request deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
        ObjectCodec oc = jsonParser.getCodec();
        JsonNode root = oc.readTree(jsonParser);

        final String contextId = deserializeField(root, "contextId");

        final String id = deserializeField(root, "id");
        if (id == null || id.isEmpty()) {
            throw new RequestDeserializationException("'id' is required");
        }

        Request request = new Request(id, contextId);
        LOG.info("Successfully deserialized {}", request);
        return request;
    }
}

To apply it, the Request record is annotated as below:

@JsonDeserialize(using = CustomRequestDeserializer.class)
public record Request(String id, String contextId) {
}

Up until now, the behavior described in the first part is preserved. If the previous compliant() and nonCompliant() tests are run again, the outcome is the same.

The next analyzed situation is the one in which a RequestDeserializationException is thrown when deserializing an incoming message. Let’s assume the id is empty, thus the form is as below:

{
	"id": "",
	"contextId": "hcd"
}
@Test
void deserializationError_compliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", contextId));
}

When such a message is received, the outcome is the following:

...
Caused by: com.hcd.recordfilterstrategy.domain.deserialization.RequestDeserializationException: 'id' is required
...

An exception thrown at desrialization time determines the message to be neither consumed, nor responded to, but to be lost.

See [Resource 3] for a detailed analysis on situations like this.

One solution that allows recovering after deserialization exceptions is to configure the value deserializer of the KafkaListenerContainerFactory with a failed deserialization function – see line 15 below:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy,
																							  FailedRequestDeserializationFunction failedDeserializationFunction) {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
	props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
	props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
	props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());

	JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(Request.class);

	ErrorHandlingDeserializer<Request> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
	valueDeserializer.setFailedDeserializationFunction(failedDeserializationFunction);

	DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
			new StringDeserializer(), valueDeserializer);

	ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(defaultFactory);
	factory.setRecordFilterStrategy(recordFilterStrategy);
	factory.setCommonErrorHandler(new DefaultErrorHandler());
	return factory;
}

The purpose of the component is to allow recovering after such an exceptional situation and to be able to send a failure response back.

@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);

    private final ResponseService responseService;

    public FailedRequestDeserializationFunction(ResponseService responseService) {
        this.responseService = responseService;
    }

    @Override
    public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception ex = failedDeserializationInfo.getException();

        if (ex instanceof RequestDeserializationException deserializationEx) {
            LOG.info("Error deserializing request - {}", deserializationEx.getMessage());

            responseService.send(Response.failure());

        } else {
            LOG.error("Unexpected error deserializing request.", ex);
        }

        return null;
    }
}

If the same test is run again and a compliant, but incorrect message is sent, the behavior changes.

2024-03-13T10:52:38.893+02:00  INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T10:52:38.895+02:00  INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           : Sending Response[id=5393b4a0-3849-4130-934b-671e43a2358f, result=FAILURE].

The only left case is that of a non-compliant and incorrect message, meaning the id is still empty, but the contextId is different from the expected one.

{
	"id": "",
	"contextId": "other context"
}

If the following test is run, nothing changes, unfortunately the failed deserialization function still sends a failure response back, although the record filtering strategy should have filtered the message out as the contextId is non-compliant.

@Test
void deserializationError_notCompliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", "other context"));
}
2024-03-13T11:03:56.609+02:00  INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T11:03:56.610+02:00  INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           : Sending Response[id=b345f001-3543-46da-bc0f-17c63c20e32a, result=FAILURE].

The code for this second section is here – 2-filter-strategy-custom-deser.

Implement a Record Filter Strategy with Custom Deserialization – Correctly

The last part of this analysis provides a solution on how to address this last use-case.

Before moving on with it, let’s recall what currently happens in all possible use-cases:

Correct, compliant message

  1. since the message is correct, the custom deserializer successfully unmarshalls it
  2. the failed deserialization function is not invoked
  3. since the message is compliant, the record filter strategy does not reject it
  4. the listener is invoked, it processes the request and sends a response back

Correct, non-compliant message

  1. since the message is correct, the custom deserializer successfully unmarshalls it
  2. the failed deserialization function is not invoked
  3. since the message is non-compliant, the record filter strategy rejects it
  4. the listener is not invoked

Incorrect, compliant or non-compliant message

  1. since the message is incorrect, the custom deserializer throws an exception
  2. the failed deserialization is invoked and it sends a failure response back
  3. the record filter strategy is not invoked
  4. the listener is not invoked

In case of a correct message, the consumer application behaves correctly, irrespective of the compliancy of the message.
In case of an incorrect message, a failure response is sent back, irrespective of the compliancy of the message, which means the consumer behaves correctly only for compliant messages.

For incorrect, non-compliant messages it should act as follows:

  1. since the message is incorrect, the custom deserializer throws an exception
  2. the failed deserialization is invoked and it sends a failure response back only if the message is compliant
  3. the record filter strategy is not invoked
  4. the listener is not invoked

At a first glance, in order to cover the last use-case as well, only the FailedRequestDeserializationFunction needs to be enhanced to also check the message compliancy.

Basically, before sending the response, the same check as the one in CustomRecordFilterStrategy shall be added. To avoid repetition, some refactoring is done.

To isolate the compliancy check, a separate component in charge of it is created.

@Component
public class RequestFilterStrategy {

    private static final Logger LOG = LoggerFactory.getLogger(RequestFilterStrategy.class);

    @Value("${context.id}")
    private String contextId;

    public boolean filter(String contextId) {
        boolean discard = !this.contextId.equals(contextId);
        LOG.info("Request is{} compliant.", discard ? "n't" : "");
        return discard;
    }
}

Then, the component is injected both in the CustomRecordFilterStrategy and in the FailedRequestDeserializationFunction and consequently, they are refactored as follows.

@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {

    private final RequestFilterStrategy requestFilterStrategy;

    public CustomRecordFilterStrategy(RequestFilterStrategy requestFilterStrategy) {
        this.requestFilterStrategy = requestFilterStrategy;
    }

    @Override
    public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
        return requestFilterStrategy.filter(consumerRecord.value().contextId());
    }
}
@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);

    private final RequestFilterStrategy requestFilterStrategy;
    private final ResponseService responseService;

    public FailedRequestDeserializationFunction(RequestFilterStrategy requestFilterStrategy,
                                                ResponseService responseService) {
        this.requestFilterStrategy = requestFilterStrategy;
        this.responseService = responseService;
    }

    @Override
    public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception ex = failedDeserializationInfo.getException();

        if (ex instanceof RequestDeserializationException deserializationEx) {
            LOG.info("Error deserializing request - {}", deserializationEx.getMessage());

            if (!requestFilterStrategy.filter(deserializationEx.getContextId())) {
                responseService.send(Response.failure());
            }
        } else {
            LOG.error("Unexpected error deserializing request.", ex);
        }

        return null;
    }
}

To check the behavior, the last unit test is run again.

@Test
void deserializationError_notCompliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", "other context"));
}

The output clearly shows that for incorrect, non-compliant messages, no response is sent anymore.

2024-03-13T15:05:56.432+02:00  INFO 17916 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T15:05:56.432+02:00  INFO 17916 --- [ntainer#0-0-C-1] c.h.r.listener.RequestFilterStrategy     : Request isn't compliant.

The code for the enhanced solution is here – 3-filter-strategy-custom-deser-covered

Resources

  1. Redpanda Quickstart
  2. Spring for Apache Kafka Reference
  3. Acting Soon on Kafka Deserialization Errors
  4. The picture was taken at Legoland, Germany

One thought on “Kafka Message Filtering – An Analysis

Leave a comment