Kafka Message Filtering – An Analysis

by Horatiu Dan

A lot of companies nowadays use event driven architectures in their day to day business activities, especially when they desire their applications to own real-time or near real-time reactiveness.

In such a scenery, during the interactions among the three main type of actors – producers, message broker and consumers – a lot of messages are exchanged. Nevertheless, under certain circumstances, some of these messages might not be of interest and thus they are discarded and ignored.

This article aims to analyze in detail how a consumer application shall be configured so that it behaves correctly when it needs to filter messages that are “irrelevant”. First, a standard record filter strategy is configured at consumer level. Then, a custom deserialization mechanism is added and the analysis is refined. As stated, the intention is to preserve a correct behavior of the consumer.

Set-up

  • Java 21
  • Maven 3.9.2
  • Spring Boot – version 3.2.2
  • Redpanda message broker running in Docker – image version 23.2.15

As message broker, the great and lightweight Redpanda is chosen. Since it is completely Kafka compatible, the development and the configuration do not need to be modified at all if deciding to change it with a different one. [Resource 1] describes how to accomplish the Redpanda minimal set-up.

Once the Docker container is up and running, a topic called request is created with the following command:

>docker exec -it redpanda-0 rpk topic create request
TOPIC    STATUS
request  OK
>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421

BROKERS
=======
ID    HOST        PORT
0*    redpanda-0  9092

TOPICS
======
NAME                PARTITIONS  REPLICAS
__consumer_offsets  3           1
_schemas            1           1
request             1           1

As shown, the request topic was created successfully.

Implement a Record Filter Strategy

The use case is the following:

  • the producer sends a request to the configured topic
  • if the request message fulfills the acceptance criteria, the consumer processes it
  • otherwise, the message is discarded

A request message has a simple form:

{
	"id": "34b25c6b-60d6-4e53-8f79-bdcdd17b3a2d",
	"contextId": "hcd"
}

having just two fields, an identifier and a context identifier.

Messages are taken into account only in a certain acceptable context. Differently put, a message is accepted if it has a certain contextId, that is equal to the one configured on the consumer side, otherwise it is discarded.

A request is modelled by the following record:

public record Request(String id, String contextId) {
}

For configuring a producer and a consumer, at least these properties are needed (application.properties file):

# the path to the message broker
broker.url = localhost:19092

# the name of the topic
topic.request = request

# the unique string that identifies the consumer group of the consumer
context.id = hcd

The requirement is clear – only the messages having hcd as contextId are accepted.

In order to send messages, a producer needs a KafkaTemplate instance, configured as below:

@Configuration
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);

        return new KafkaTemplate<>(producerFactory);
    }
}

One may observe in the producer configuration that a StringSerializer was chosen for marshalling the payload value. Usually, a JsonSerializer provides more robustness to the producer-consumer contract. Nevertheless, the choice here was intentional to increase the experimental flexibility.

Once the messages reach the request topic, a consumer is configured to pick them up.

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Value("${context.id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());

        DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(), new JsonDeserializer<>(Request.class));

        ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(defaultFactory);
        factory.setRecordFilterStrategy(recordFilterStrategy);
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        return factory;
    }
}

Line 26 in the listing above allows injecting a record filtering strategy, which is exactly the purpose here – a mean to decide whether a message is filtered out or not.

The RecordFilterStrategy interface has one abstract method

boolean filter(ConsumerRecord consumerRecord);

which according to its JavaDoc, returns true if the ConsumerRecord should be discarded (K represents the message key, while V the message value).

In the case of this proof of concept, all messages that have their contextId equal to hcd are accepted and consumed, while the rest are filtered out. The implementation is below.

@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRecordFilterStrategy.class);

    @Value("${context.id}")
    private String contextId;

    @Override
    public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
        Request request = consumerRecord.value();

        boolean discard = !contextId.equals(request.contextId());
        LOG.info("{} is{} compliant.", request, discard ? "n't" : "");
        return discard;
    }
}

As part of the configuration, the KafkaListenerContainerFactory interface is responsible for creating the listener container of a particular endpoint. The @EnableKafka annotation on the configuration class enables the detection of @KafkaListener annotations on any Spring-managed beans in the container. Thus, the actual listener (the message consumer) is developed next.

@Component
public class RequestMessageListener {

    private static final Logger LOG = LoggerFactory.getLogger(RequestMessageListener.class);

    private final ResponseService responseService;

    public RequestMessageListener(ResponseService responseService) {
        this.responseService = responseService;
    }

    @KafkaListener(topics = "${topic.request}", groupId = "${context.id}")
    public void onMessage(@Payload Request request) {
        LOG.info("Processing {}.", request);

        responseService.send(Response.success());
    }
}

Its functionality is trivial, it logs the messages read from the request topic and destined to the configured consumer group. Then, it invokes a ResponseService which acts as the entity that sends a message back (here, it only logs it).

@Service
public class ResponseService {

    private static final Logger LOG = LoggerFactory.getLogger(ResponseService.class);

    public void send(Response response) {
        LOG.info("Sending {}.", response);
    }
}

A Reponse is modeled simply, as below:

public record Response (String id,
                        Result result) {

    public static Response success() {
        return new Response(UUID.randomUUID().toString(), Result.SUCCESS);
    }
    
    public enum Result {
        SUCCESS, FAILURE
    }
}

When the the application is started, provided the message broker is up, the listener is ready to receive messages.

INFO 20080 --- [main] c.h.r.RecordFilterStrategyApplication				  : Started RecordFilterStrategyApplication in 1.282 seconds (process running for 1.868)
INFO 20080 --- [main] fkaConsumerFactory$ExtendedKafkaConsumer            : [Consumer clientId=consumer-hcd-1, groupId=hcd] Subscribed to topic(s): request
INFO 20080 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-hcd-1, groupId=hcd] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hcd-1, groupId=hcd] Request joining group due to: need to re-join with the given member-id: consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hcd-1, groupId=hcd] Successfully joined group with generation Generation{generationId=7, memberId='consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc', protocol='range'}

In order to check the integration, the following two tests are used. Since a Request is expected by the listener, a compliance template was created for convenience.

@SpringBootTest
class RecordFilterStrategyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topic.request}")
    private String topic;

    @Value("${context.id}")
    private String contextId;

    private static final String template = """
        {
            "id": "%s",
            "contextId": "%s"
        }""";

    @Test
    void compliant() {
        kafkaTemplate.send(topic,
                String.format(template, UUID.randomUUID(), contextId));
    }

    @Test
    void notCompliant() {
        kafkaTemplate.send(topic,
                String.format(template, UUID.randomUUID(), "other context"));
    }
}

compliant() sends a message whose contextId is as this consumer has configured it. As expected, it is processed and a response is sent back.

INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy    : Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd] is compliant.
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.RequestMessageListener : Processing Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd].
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService        : Sending Response[id=ebe0f65c-eddf-4866-b71f-e6cd766dd499, result=SUCCESS].

notCompliant() sends a message whose contextId is different from what was configured on this consumer. Thus, the message is neither processed, nor responded to, but ignored.

INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy : Request[id=ed22f60c-b13d-4315-8132-46aa83ddf33b, contextId=other context] isn't compliant.

So far, the proof of concept has exemplified how to configure the consumer with a record filtering strategy so that only certain messages are accepted.

The code for this part is here – 1-filter-strategy

Implement a Record Filter Strategy with Custom Deserialization

Let’s assume that the messages which are consumed from the request queue are unmarshalled using a custom deserializer and the filtering is still required.

The custom deserializer here is trivial and has a didactic purpose. Moreover, in case the id field is missing, a runtime RequestDeserializationException is thrown. Such an action is not necessarily needed at this point, but it was put here to outline a certain use case. Read on.

public class CustomRequestDeserializer extends StdDeserializer<Request> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRequestDeserializer.class);

    public CustomRequestDeserializer() {
        super(Request.class);
    }

    @Override
    public Request deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
        ObjectCodec oc = jsonParser.getCodec();
        JsonNode root = oc.readTree(jsonParser);

        final String contextId = deserializeField(root, "contextId");

        final String id = deserializeField(root, "id");
        if (id == null || id.isEmpty()) {
            throw new RequestDeserializationException("'id' is required");
        }

        Request request = new Request(id, contextId);
        LOG.info("Successfully deserialized {}", request);
        return request;
    }
}

To apply it, the Request record is annotated as below:

@JsonDeserialize(using = CustomRequestDeserializer.class)
public record Request(String id, String contextId) {
}

Up until now, the behavior described in the first part is preserved. If the previous compliant() and nonCompliant() tests are run again, the outcome is the same.

The next analyzed situation is the one in which a RequestDeserializationException is thrown when deserializing an incoming message. Let’s assume the id is empty, thus the form is as below:

{
	"id": "",
	"contextId": "hcd"
}
@Test
void deserializationError_compliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", contextId));
}

When such a message is received, the outcome is the following:

...
Caused by: com.hcd.recordfilterstrategy.domain.deserialization.RequestDeserializationException: 'id' is required
...

An exception thrown at desrialization time determines the message to be neither consumed, nor responded to, but to be lost.

See [Resource 3] for a detailed analysis on situations like this.

One solution that allows recovering after deserialization exceptions is to configure the value deserializer of the KafkaListenerContainerFactory with a failed deserialization function – see line 15 below:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy,
																							  FailedRequestDeserializationFunction failedDeserializationFunction) {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
	props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
	props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
	props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());

	JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(Request.class);

	ErrorHandlingDeserializer<Request> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
	valueDeserializer.setFailedDeserializationFunction(failedDeserializationFunction);

	DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
			new StringDeserializer(), valueDeserializer);

	ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(defaultFactory);
	factory.setRecordFilterStrategy(recordFilterStrategy);
	factory.setCommonErrorHandler(new DefaultErrorHandler());
	return factory;
}

The purpose of the component is to allow recovering after such an exceptional situation and to be able to send a failure response back.

@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);

    private final ResponseService responseService;

    public FailedRequestDeserializationFunction(ResponseService responseService) {
        this.responseService = responseService;
    }

    @Override
    public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception ex = failedDeserializationInfo.getException();

        if (ex instanceof RequestDeserializationException deserializationEx) {
            LOG.info("Error deserializing request - {}", deserializationEx.getMessage());

            responseService.send(Response.failure());

        } else {
            LOG.error("Unexpected error deserializing request.", ex);
        }

        return null;
    }
}

If the same test is run again and a compliant, but incorrect message is sent, the behavior changes.

2024-03-13T10:52:38.893+02:00  INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T10:52:38.895+02:00  INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           : Sending Response[id=5393b4a0-3849-4130-934b-671e43a2358f, result=FAILURE].

The only left case is that of a non-compliant and incorrect message, meaning the id is still empty, but the contextId is different from the expected one.

{
	"id": "",
	"contextId": "other context"
}

If the following test is run, nothing changes, unfortunately the failed deserialization function still sends a failure response back, although the record filtering strategy should have filtered the message out as the contextId is non-compliant.

@Test
void deserializationError_notCompliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", "other context"));
}
2024-03-13T11:03:56.609+02:00  INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T11:03:56.610+02:00  INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           : Sending Response[id=b345f001-3543-46da-bc0f-17c63c20e32a, result=FAILURE].

The code for this second section is here – 2-filter-strategy-custom-deser.

Implement a Record Filter Strategy with Custom Deserialization – Correctly

The last part of this analysis provides a solution on how to address this last use-case.

Before moving on with it, let’s recall what currently happens in all possible use-cases:

Correct, compliant message

  1. since the message is correct, the custom deserializer successfully unmarshalls it
  2. the failed deserialization function is not invoked
  3. since the message is compliant, the record filter strategy does not reject it
  4. the listener is invoked, it processes the request and sends a response back

Correct, non-compliant message

  1. since the message is correct, the custom deserializer successfully unmarshalls it
  2. the failed deserialization function is not invoked
  3. since the message is non-compliant, the record filter strategy rejects it
  4. the listener is not invoked

Incorrect, compliant or non-compliant message

  1. since the message is incorrect, the custom deserializer throws an exception
  2. the failed deserialization is invoked and it sends a failure response back
  3. the record filter strategy is not invoked
  4. the listener is not invoked

In case of a correct message, the consumer application behaves correctly, irrespective of the compliancy of the message.
In case of an incorrect message, a failure response is sent back, irrespective of the compliancy of the message, which means the consumer behaves correctly only for compliant messages.

For incorrect, non-compliant messages it should act as follows:

  1. since the message is incorrect, the custom deserializer throws an exception
  2. the failed deserialization is invoked and it sends a failure response back only if the message is compliant
  3. the record filter strategy is not invoked
  4. the listener is not invoked

At a first glance, in order to cover the last use-case as well, only the FailedRequestDeserializationFunction needs to be enhanced to also check the message compliancy.

Basically, before sending the response, the same check as the one in CustomRecordFilterStrategy shall be added. To avoid repetition, some refactoring is done.

To isolate the compliancy check, a separate component in charge of it is created.

@Component
public class RequestFilterStrategy {

    private static final Logger LOG = LoggerFactory.getLogger(RequestFilterStrategy.class);

    @Value("${context.id}")
    private String contextId;

    public boolean filter(String contextId) {
        boolean discard = !this.contextId.equals(contextId);
        LOG.info("Request is{} compliant.", discard ? "n't" : "");
        return discard;
    }
}

Then, the component is injected both in the CustomRecordFilterStrategy and in the FailedRequestDeserializationFunction and consequently, they are refactored as follows.

@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {

    private final RequestFilterStrategy requestFilterStrategy;

    public CustomRecordFilterStrategy(RequestFilterStrategy requestFilterStrategy) {
        this.requestFilterStrategy = requestFilterStrategy;
    }

    @Override
    public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
        return requestFilterStrategy.filter(consumerRecord.value().contextId());
    }
}
@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);

    private final RequestFilterStrategy requestFilterStrategy;
    private final ResponseService responseService;

    public FailedRequestDeserializationFunction(RequestFilterStrategy requestFilterStrategy,
                                                ResponseService responseService) {
        this.requestFilterStrategy = requestFilterStrategy;
        this.responseService = responseService;
    }

    @Override
    public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception ex = failedDeserializationInfo.getException();

        if (ex instanceof RequestDeserializationException deserializationEx) {
            LOG.info("Error deserializing request - {}", deserializationEx.getMessage());

            if (!requestFilterStrategy.filter(deserializationEx.getContextId())) {
                responseService.send(Response.failure());
            }
        } else {
            LOG.error("Unexpected error deserializing request.", ex);
        }

        return null;
    }
}

To check the behavior, the last unit test is run again.

@Test
void deserializationError_notCompliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", "other context"));
}

The output clearly shows that for incorrect, non-compliant messages, no response is sent anymore.

2024-03-13T15:05:56.432+02:00  INFO 17916 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T15:05:56.432+02:00  INFO 17916 --- [ntainer#0-0-C-1] c.h.r.listener.RequestFilterStrategy     : Request isn't compliant.

The code for the enhanced solution is here – 3-filter-strategy-custom-deser-covered

Resources

  1. Redpanda Quickstart
  2. Spring for Apache Kafka Reference
  3. Acting Soon on Kafka Deserialization Errors
  4. The picture was taken at Legoland, Germany

Stream Summary Statistics

by Horatiu Dan

Context

In order to be able to leverage various capabilities of the Java Streams, one shall first understand two general concepts – the stream and the stream pipeline. A Stream in Java is a sequential flow of data. A stream pipeline on the other hand, represents a series of steps applied to data, series that ultimately produce a result.

My family and I recently visited the Legoland Resort in Germany – a great place by the way – and there, among other attractions, we had the chance to observe in detail a sample of the brick building process. Briefly, everything starts from the granular plastic that is melted, modeled accordingly, assembled, painted, stenciled if needed and packed up in bags and boxes. All the steps are part of an assembly factory pipeline.

What is worth mentioning is the fact that the next step cannot be done until the previous one has completed and also that the number of steps is finite. Moreover, at every step, each Lego element is touched to perform the corresponding operation and then it moves only forward, never backwards, so that the next step is done. The same applies to Java streams.

In functional programming, the steps are called stream operations and they are of three categories – one that starts the job (source), one that ends it and produces the result (terminal) and a couple of intermediate ones in between.

As a last consideration it’s worth mentioning the intermediate operations have the ability to transform the stream into another one, but are never run until the terminal operation runs (they are lazy evaluated). Finally, once the result is produced and the initial scope achieved, the stream is no longer valid.

Abstract

Having as starting point the fact that in case of Java Streams once the terminal stream operation is done, the stream is no longer valid, this article aims to present a way of computing multiple operations at once through only one stream traversal. It is accomplished by leveraging the Java summary statistics objects (in particular IntSummaryStatistics) that reside since version 1.8.

Proof of Concept

The small project built especially to show case the statistics computation uses the following:

  • Java 17
  • Maven 3.6.3
  • JUnit Jupiter Engine v.5.9.3

As domain, there is one straight forward entity – a parent.

public record Parent(String name, int age) { }

It is modeled by two attributes – the name and its age. While the name is present only for being able to distinguish the parents, the age is the one of interest here.

The purpose is to be able to compute a few age statistics on a set of parents, that is:

  • the total sample count
  • the ages of the youngest and the oldest parent
  • the age range of the group
  • the average age
  • the total number of years the parents accumulate.

The results are encapsulated into a ParentStats structure, represented as a record as well.

public record ParentStats(long count,
                          int youngest,
                          int oldest,
                          int ageRange,
                          double averageAge,
                          long totalYearsOfAge) { }

In order to accomplish this, an interface is defined.

public interface Service {

    ParentStats getStats(List<Parent> parents);
}

For now, it has only one method that receives an input a list of Parents and provides as output the desired statistics.

Initial Implementation

As the problem is trivial, an initial and imperative implementation of the service might be as below:

public class InitialService implements Service {

    @Override
    public ParentStats getStats(List<Parent> parents) {
        int count = parents.size();
        int min = Integer.MAX_VALUE;
        int max = 0;
        int sum = 0;
        for (Parent human : parents) {
            int age = human.age();
            if (age < min) {
                min = age;
            }
            if (age > max) {
                max = age;
            }
            sum += age;
        }

        return new ParentStats(count, min, max, max - min, (double) sum/count, sum);
    }
}

The code looks clear, but it seems too focused on the how rather than on the what, thus the problem seems to get lost in the implementation and the code hard to read.

As the functional style and streams are already part of every Java developer’s practices, most probably the next service implementation would be chosen.

public class StreamService implements Service {

    @Override
    public ParentStats getStats(List<Parent> parents) {
        int count = parents.size();

        int min = parents.stream()
                .mapToInt(Parent::age)
                .min()
                .orElseThrow(RuntimeException::new);

        int max = parents.stream()
                .mapToInt(Parent::age)
                .max()
                .orElseThrow(RuntimeException::new);

        int sum = parents.stream()
                .mapToInt(Parent::age)
                .sum();

        return new ParentStats(count, min, max, max - min, (double) sum/count, sum);
    }
}

The code is more readable now, the downside though is the stream traversal redundancy for computing all the desired stats – three times in this particular case. As stated in the beginning of the article, once the terminal operation is done – min, max, sum – the stream is no longer valid. It would be convenient to be able to compute the aimed statistics without having to loop the list of parents multiple times.

Summary Statistics Implementation

In Java, there is a series of objects called SummaryStatistics which come as different types – IntSummaryStatistics, LongSummaryStatistics, DoubleSummaryStatistics.

According to the JavaDoc, IntSummaryStatistics is “a state object for collecting statistics such as count, min, max, sum and average. The class is designed to work with (though does not require) streams”. [Resource 1]

It is a good candidate for the initial purpose, thus the following implementation of the Service seems the preferred one.

public class StatsService implements Service {

    @Override
    public ParentStats getStats(List<Parent> parents) {
        IntSummaryStatistics stats = parents.stream()
                .mapToInt(Parent::age)
                .summaryStatistics();

        return new ParentStats(stats.getCount(),
                stats.getMin(),
                stats.getMax(),
                stats.getMax() - stats.getMin(),
                stats.getAverage(),
                stats.getSum());
    }
}

There is only one stream of parents, the statistics get computed and the code is way readable this time.

In order to check all three implementations, the following abstract base unit test is used.

abstract class ServiceTest {

    private Service service;

    private List<Parent> mothers;
    private List<Parent> fathers;
    private List<Parent> parents;

    protected abstract Service setupService();

    @BeforeEach
    void setup() {
        service = setupService();

        mothers = IntStream.rangeClosed(1, 3)
                .mapToObj(i -> new Parent("Mother" + i, i + 30))
                .collect(Collectors.toList());

        fathers = IntStream.rangeClosed(4, 6)
                .mapToObj(i -> new Parent("Father" + i, i + 30))
                .collect(Collectors.toList());

        parents = new ArrayList<>(mothers);
        parents.addAll(fathers);
    }

    private void assertParentStats(ParentStats stats) {
        Assertions.assertNotNull(stats);
        Assertions.assertEquals(6, stats.count());
        Assertions.assertEquals(31, stats.youngest());
        Assertions.assertEquals(36, stats.oldest());
        Assertions.assertEquals(5, stats.ageRange());

        final int sum = 31 + 32 + 33 + 34 + 35 + 36;

        Assertions.assertEquals((double) sum/6, stats.averageAge());
        Assertions.assertEquals(sum, stats.totalYearsOfAge());
    }

    @Test
    void getStats() {
        final ParentStats stats = service.getStats(parents);
        assertParentStats(stats);
    }
}

As the stats are computed for all the parents, the mothers and fathers are first put together in the same parents list (we will see later why there were two lists in the first place).

The particular unit-test for each implementation is trivial – it sets up the service instance.

class StatsServiceTest extends ServiceTest {

    @Override
    protected Service setupService() {
        return new StatsService();
    }
}

Combining Statistics

In addition to the already used methods – getMin(), getMax(), getCount(), getSum(), getAverage()IntSummaryStatistics provides a way to combine the state of another similar object into the current one.

void combine(IntSummaryStatistics other)

As we saw in the above unit-test, initially there are two source lists – mothers and fathers. It would be convenient to be able to directly compute the statistics, without first merging them.

In order to accomplish this, the Service is enriched with the following method.

default ParentStats getCombinedStats(List<Parent> mothers, List<Parent> fathers) {
	final List<Parent> parents = new ArrayList<>(mothers);
	parents.addAll(fathers);
	return getStats(parents);
}

The first two implementations – InitialService and StreamService – are not of interest here, thus a default implementation was provided for convenince. It is overwritten only by the StatsService.

@Override
public ParentStats getCombinedStats(List<Parent> mothers, List<Parent> fathers) {
	Collector<Parent, ?, IntSummaryStatistics> collector = Collectors.summarizingInt(Parent::age);

	IntSummaryStatistics stats = mothers.stream().collect(collector);
	stats.combine(fathers.stream().collect(collector));

	return new ParentStats(stats.getCount(),
			stats.getMin(),
			stats.getMax(),
			stats.getMax() - stats.getMin(),
			stats.getAverage(),
			stats.getSum());
}

By leveraging the combine() method, the statistics can be merged directly as different source lists are available.

The corresponding unit test is straight-forward.

@Test
void getCombinedStats() {
	final ParentStats stats = service.getCombinedStats(mothers, fathers);
	assertParentStats(stats);
}

Having seen the above Collector, the initial getStats() method may be written even more briefly.

@Override
public ParentStats getStats(List<Parent> parents) {
	IntSummaryStatistics stats = parents.stream()
			.collect(Collectors.summarizingInt(Parent::age));

	return new ParentStats(stats.getCount(),
			stats.getMin(),
			stats.getMax(),
			stats.getMax() - stats.getMin(),
			stats.getAverage(),
			stats.getSum());
}

Conclusion

Depending on the used data types, IntSummaryStatistics, LongSummaryStatistics or DoubleSummaryStatistics are convenient out-of-the-box structures that one can use to quickly compute simple statistics and focus on writing more readable and maintainable code.

Resources

  1. IntSummaryStatistics JavaDoc
  2. Source code for the sample POC
  3. The picture was taken at Legoland Resort, Germany

Idempotent Liquibase Change Sets

by Horatiu Dan

Abstract

“Idempotence is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application” [Resource 3].

The purpose of this article is to outline a few ways of creating idempotent changes when the database modifications are managed with Liquibase. Throughout the life time of a software product that has such tier, various database modifications are being applied as it evolves. The more robust the modifications are, the more maintainable the solution is. In order to accomplish such a way of working, it is usually a good practice to design the executed change sets to have zero side effects, that is to be able to be run as many times as needed with the same end result.

The simple proof of concept built here aims to show case how Liquibase change sets may be written to be idempotent. Moreover, the article explains in more depth what exactly happens when the application starts.

Set-up

  • Java 17
  • Spring Boot v.3.1.0
  • Liquibase 4.20.0
  • PostgreSQL Driver 42.6.0
  • Maven 3.6.3

Proof of Concept

As PostgreSQL is the database used here, first and foremost one shall create a new schema – liquidempo. This operation is easy to accomplish by issuing the following SQL command, once connected to the database.

create schema liquidempo;

At application level:

  • The Maven Spring Boot project is created and configured to use the PostgreSQL Driver, Spring Data JPA and Liquibase dependencies.
  • A simple entity is created – Human – with only one attribute, a unique identifier which is also the primary key at database level.
@Entity
@Table(name = "human")
@SequenceGenerator(sequenceName = "human_seq", name = "CUSTOM_SEQ_GENERATOR", allocationSize = 1)
public class Human {

    @Id
    @GeneratedValue(strategy = GenerationType.AUTO, generator = "CUSTOM_SEQ_GENERATOR")
    @Column(name = "id")
    private Long id;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }
}

For convenience, when entities are stored, their unique identifiers are generated using a database sequence, called human_seq.

  • The data source is configured as usual in the application.properties file
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.url=jdbc:postgresql://localhost:5432/postgres?currentSchema=liquidempo&useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true
spring.datasource.username=postgres
spring.datasource.password=123456

spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect
spring.jpa.hibernate.ddl-auto=none

The previously created schema is referred in the connection URL. DDL handling is disabled, as the infrastructure and the data are intended to be persistent when the application is restarted.

  • As Liquibase is the database migration manager, the changelog path is configured in the application.properties file as well.
spring.liquibase.change-log=classpath:/db/changelog/db.changelog-root.xml

For now, the db.changelog-root.xml file is empty.

The current state of the project requires a few simple change sets, in order to create the database elements depicted around the Human entity – the table, the sequence and the primary key constraint.

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd">

    <changeSet author="horatiucd" id="100">
        <createSequence sequenceName="human_seq" startValue="1" incrementBy="1"/>
    </changeSet>

    <changeSet author="horatiucd" id="200">
        <createTable tableName="human">
            <column name="id" type="BIGINT">
                <constraints nullable="false"/>
            </column>
        </createTable>
    </changeSet>

    <changeSet author="horatiucd" id="300">
        <addPrimaryKey columnNames="id" constraintName="human_pk" tableName="human"/>
    </changeSet>

</databaseChangeLog>

In order for these to be applied, they need to be recorded as part of db.changelog-root.xml file, as indicated below.

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd">

    <include file="db/changelog/human_init.xml"/>

</databaseChangeLog>

When the application is restarted, the three change sets are executed in the order they are declared.

INFO 9092 --- [main] liquibase.database      : Set default schema name to liquidempo
INFO 9092 --- [main] liquibase.lockservice   : Successfully acquired change log lock
INFO 9092 --- [main] liquibase.changelog     : Creating database history table with name: liquidempo.databasechangelog
INFO 9092 --- [main] liquibase.changelog     : Reading from liquidempo.databasechangelog
Running Changeset: db/changelog/human_init.xml::100::horatiucd
INFO 9092 --- [main] liquibase.changelog     : Sequence human_seq created
INFO 9092 --- [main] liquibase.changelog     : ChangeSet db/changelog/human_init.xml::100::horatiucd ran successfully in 6ms
Running Changeset: db/changelog/human_init.xml::200::horatiucd
INFO 9092 --- [main] liquibase.changelog     : Table human created
INFO 9092 --- [main] liquibase.changelog     : ChangeSet db/changelog/human_init.xml::200::horatiucd ran successfully in 4ms
Running Changeset: db/changelog/human_init.xml::300::horatiucd
INFO 9092 --- [main] liquibase.changelog     : Primary key added to human (id)
INFO 9092 --- [main] liquibase.changelog     : ChangeSet db/changelog/human_init.xml::300::horatiucd ran successfully in 8ms
INFO 9092 --- [main] liquibase               : Update command completed successfully.
INFO 9092 --- [main] liquibase.lockservice   : Successfully released change log lock

Moreover, they are recorded as separate rows in the databasechangelog database table.

+---+---------+---------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+
|id |author   |filename                   |dateexecuted              |orderexecuted|exectype|md5sum                            |description                                           |
+---+---------+---------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+
|100|horatiucd|db/changelog/human_init.xml|2023-05-26 16:23:17.184239|1            |EXECUTED|8:db8c5fb392dc96efa322da2c326b5eba|createSequence sequenceName=human_seq                 |
|200|horatiucd|db/changelog/human_init.xml|2023-05-26 16:23:17.193031|2            |EXECUTED|8:ed8e5e7df5edb17ed9a0682b9b640d7f|createTable tableName=human                           |
|300|horatiucd|db/changelog/human_init.xml|2023-05-26 16:23:17.204184|3            |EXECUTED|8:a2d6eff5a1e7513e5ab7981763ae532b|addPrimaryKey constraintName=human_pk, tableName=human|
+---+---------+---------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+

So far, everything is straight forward, nothing out of the ordinary – a simple Spring Boot application whose database changes are managed with Liquibase.

When examining the above human_init.xml file, one can easily depict the three scripts that result from the three changesets. None is idempotent. It means that if they are executed again (although there is no reason for doing it here) errors will occur because the human_seq sequence, the human table and the human_pk primary key already exist.

Idempotent Change Sets

If the SQL code that results from the XML change sets had been written directly and aimed to be idempotent, it would have read as follows:

CREATE SEQUENCE IF NOT EXISTS human_seq INCREMENT 1 MINVALUE 1 MAXVALUE 99999999999;

CREATE TABLE IF NOT EXISTS human (
	id SERIAL CONSTRAINT human_pk PRIMARY KEY
);

If the two commands are executed several times, no errors occur and the outcome remains the same. After the first run, the sequence, the table and the constraint are created, then every new execution leaves them in the same usable state.

The aim is to accomplish the same in the written Liquibase change sets (change log).

According to the Liquibase documentation [Resource 1] – “Preconditions are tags you add to your changelog or individual changesets to control the execution of an update based on the state of the database. Preconditions let you specify security and standardization requirements for your changesets. If a precondition on a changeset fails, Liquibase does not deploy that changeset.”

These constructs may be configured in various ways, either at change log or change set level. For simplicity, the three change sets of this proof of concept will be made idempotent.

Basically, whenever a change set fails to execute because the entity (sequence, table or primary key) already exists, it would be convenient to continue and not halt the execution of the entire change log and not be able to start the application.

In this direction, Liquibase preconditions provides at least two options:

  • either skip over the changeset and continue with the change log, or
  • skip over the change set but mark it as executed and continue with the change log.

Either of the two can be configured by adding a preConditions tag in the change set of interest and setting the onFail attribute as CONTINUE (the former case) or MARK_RAN (the latter case).

In pseudo-code, this looks as below:

<changeSet author="horatiucd" id="100">
	<preConditions onFail="CONTINUE or MARK_RAN">
		...
	</preConditions>
	...
</changeSet>

This seems in line to the initial desire – execute the change set only if the preconditions are met. Next, each of the two situations is analyzed.

onFail=”CONTINUE”

The change log file – human_init_idempo_continue.xml – becomes as below:

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd">

    <changeSet author="horatiucd" id="101">
        <preConditions onFail="CONTINUE">
            <not>
                <sequenceExists sequenceName="human_seq"/>
            </not>
        </preConditions>
        <createSequence sequenceName="human_seq" startValue="1" incrementBy="1"/>
    </changeSet>

    <changeSet author="horatiucd" id="201">
        <preConditions onFail="CONTINUE">
            <not>
                <tableExists tableName="human"/>
            </not>
        </preConditions>
        <createTable tableName="human">
            <column name="id" type="BIGINT">
                <constraints nullable="false"/>
            </column>
        </createTable>
    </changeSet>

    <changeSet author="horatiucd" id="301">
        <preConditions onFail="CONTINUE">
            <not>
                <primaryKeyExists primaryKeyName="human_pk" tableName="human"/>
            </not>
        </preConditions>
        <addPrimaryKey columnNames="id" constraintName="human_pk" tableName="human"/>
    </changeSet>

</databaseChangeLog>

For each item, the precondition checks if it does not exist.

When running the application, the log shows what is executed:

INFO 49016 --- [main] liquibase.database     : Set default schema name to liquidempo
INFO 49016 --- [main] liquibase.changelog    : Reading from liquidempo.databasechangelog
INFO 49016 --- [main] liquibase.lockservice  : Successfully acquired change log lock
Running Changeset: db/changelog/human_init_idempo_continue.xml::101::horatiucd
INFO 49016 --- [main] liquibase.changelog    : Continuing past: db/changelog/human_init_idempo_continue.xml::101::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_continue.xml::201::horatiucd
INFO 49016 --- [main] liquibase.changelog    : Continuing past: db/changelog/human_init_idempo_continue.xml::201::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_continue.xml::301::horatiucd
INFO 49016 --- [main] liquibase.changelog    : Continuing past: db/changelog/human_init_idempo_continue.xml::301::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
INFO 49016 --- [main] liquibase              : Update command completed successfully.
INFO 49016 --- [main] liquibase.lockservice  : Successfully released change log lock

As expected, all three preconditions failed and the execution of the change log continued.

The databasechangelog database table does not have any records in addition to the previous three, which means the change sets will be attempted to be executed again at the next start-up of the application.

onFail=”MARK_RAN”

The change log file – human_init_idempo_mark_ran.xml – is similar to the one in human_init_idempo_continue.xml, the only difference is the onFail attribute, which is set as onFail="MARK_RAN".

The db.changelog-root.xml root change log now looks as below:

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd">

    <include file="db/changelog/human_init.xml"/>
    <include file="db/changelog/human_init_idempo_continue.xml"/>
    <include file="db/changelog/human_init_idempo_mark_ran.xml"/>
    
</databaseChangeLog>

For this proof of concept, all three files were kept on purpose, in order to be able to observe the behavior in detail.

If the application is restarted, no errors are encountered and the log depicts the following:

INFO 38788 --- [main] liquibase.database      : Set default schema name to liquidempo
INFO 38788 --- [main] liquibase.changelog     : Reading from liquidempo.databasechangelog
INFO 38788 --- [main] liquibase.lockservice   : Successfully acquired change log lock
INFO 38788 --- [main] liquibase.changelog     : Reading from liquidempo.databasechangelog
Running Changeset: db/changelog/human_init_idempo_continue.xml::101::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Continuing past: db/changelog/human_init_idempo_continue.xml::101::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_continue.xml::201::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Continuing past: db/changelog/human_init_idempo_continue.xml::201::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_continue.xml::301::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Continuing past: db/changelog/human_init_idempo_continue.xml::301::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_mark_ran.xml::101::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Marking ChangeSet: "db/changelog/human_init_idempo_mark_ran.xml::101::horatiucd" as ran despite precondition failure due to onFail='MARK_RAN': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_mark_ran.xml::201::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Marking ChangeSet: "db/changelog/human_init_idempo_mark_ran.xml::201::horatiucd" as ran despite precondition failure due to onFail='MARK_RAN': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_mark_ran.xml::301::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Marking ChangeSet: "db/changelog/human_init_idempo_mark_ran.xml::301::horatiucd" as ran despite precondition failure due to onFail='MARK_RAN': 
          db/changelog/db.changelog-root.xml : Not precondition failed
INFO 38788 --- [main] liquibase               : Update command completed successfully.
INFO 38788 --- [main] liquibase.lockservice   : Successfully released change log lock

The change sets with onFail="CONTINUE" were tried to be re-executed, as this is a new attempt, while the ones with onFail="MARK_RAN" were marked in the databasechangelog and will be passed over at the next star-up.

+---+---------+-------------------------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+
|id |author   |filename                                   |dateexecuted              |orderexecuted|exectype|md5sum                            |description                                           |
+---+---------+-------------------------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+
|100|horatiucd|db/changelog/human_init.xml                |2023-05-26 16:23:17.184239|1            |EXECUTED|8:db8c5fb392dc96efa322da2c326b5eba|createSequence sequenceName=human_seq                 |
|200|horatiucd|db/changelog/human_init.xml                |2023-05-26 16:23:17.193031|2            |EXECUTED|8:ed8e5e7df5edb17ed9a0682b9b640d7f|createTable tableName=human                           |
|300|horatiucd|db/changelog/human_init.xml                |2023-05-26 16:23:17.204184|3            |EXECUTED|8:a2d6eff5a1e7513e5ab7981763ae532b|addPrimaryKey constraintName=human_pk, tableName=human|
|101|horatiucd|db/changelog/human_init_idempo_mark_ran.xml|2023-05-29 16:40:26.453305|4            |MARK_RAN|8:db8c5fb392dc96efa322da2c326b5eba|createSequence sequenceName=human_seq                 |
|201|horatiucd|db/changelog/human_init_idempo_mark_ran.xml|2023-05-29 16:40:26.463021|5            |MARK_RAN|8:ed8e5e7df5edb17ed9a0682b9b640d7f|createTable tableName=human                           |
|301|horatiucd|db/changelog/human_init_idempo_mark_ran.xml|2023-05-29 16:40:26.475153|6            |MARK_RAN|8:a2d6eff5a1e7513e5ab7981763ae532b|addPrimaryKey constraintName=human_pk, tableName=human|
+---+---------+-------------------------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+

At the next run of the application, the log will be similar to the one where the onFail was set on "CONTINUE".

One more observation is worth making at this point. In case a change set whose preconditions do not fail, they are executed normally and recorded with exectype = EXECUTED in the databasechangelog table.

Conclusions

This article presented two ways of writing idempotent Liquibase change sets, practice that allows having more robust and easy to maintain applications. This was accomplished by leveraging the change set preConditions tag inside the change log files. While both onFail attribute values – CONTINUE and MARK_RAN – may be used depending on the actual performed operation, the latter seems more appropriate for this proof of concept as it does not attempt to re-run the change sets at every start-up of the application.

Resources

  1. Liquibase Documentation
  2. Source code for the sample application
  3. Idempotence