Kafka Message Filtering – An Analysis

by Horatiu Dan

A lot of companies nowadays use event driven architectures in their day to day business activities, especially when they desire their applications to own real-time or near real-time reactiveness.

In such a scenery, during the interactions among the three main type of actors – producers, message broker and consumers – a lot of messages are exchanged. Nevertheless, under certain circumstances, some of these messages might not be of interest and thus they are discarded and ignored.

This article aims to analyze in detail how a consumer application shall be configured so that it behaves correctly when it needs to filter messages that are “irrelevant”. First, a standard record filter strategy is configured at consumer level. Then, a custom deserialization mechanism is added and the analysis is refined. As stated, the intention is to preserve a correct behavior of the consumer.

Set-up

  • Java 21
  • Maven 3.9.2
  • Spring Boot – version 3.2.2
  • Redpanda message broker running in Docker – image version 23.2.15

As message broker, the great and lightweight Redpanda is chosen. Since it is completely Kafka compatible, the development and the configuration do not need to be modified at all if deciding to change it with a different one. [Resource 1] describes how to accomplish the Redpanda minimal set-up.

Once the Docker container is up and running, a topic called request is created with the following command:

>docker exec -it redpanda-0 rpk topic create request
TOPIC    STATUS
request  OK
>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421

BROKERS
=======
ID    HOST        PORT
0*    redpanda-0  9092

TOPICS
======
NAME                PARTITIONS  REPLICAS
__consumer_offsets  3           1
_schemas            1           1
request             1           1

As shown, the request topic was created successfully.

Implement a Record Filter Strategy

The use case is the following:

  • the producer sends a request to the configured topic
  • if the request message fulfills the acceptance criteria, the consumer processes it
  • otherwise, the message is discarded

A request message has a simple form:

{
	"id": "34b25c6b-60d6-4e53-8f79-bdcdd17b3a2d",
	"contextId": "hcd"
}

having just two fields, an identifier and a context identifier.

Messages are taken into account only in a certain acceptable context. Differently put, a message is accepted if it has a certain contextId, that is equal to the one configured on the consumer side, otherwise it is discarded.

A request is modelled by the following record:

public record Request(String id, String contextId) {
}

For configuring a producer and a consumer, at least these properties are needed (application.properties file):

# the path to the message broker
broker.url = localhost:19092

# the name of the topic
topic.request = request

# the unique string that identifies the consumer group of the consumer
context.id = hcd

The requirement is clear – only the messages having hcd as contextId are accepted.

In order to send messages, a producer needs a KafkaTemplate instance, configured as below:

@Configuration
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);

        return new KafkaTemplate<>(producerFactory);
    }
}

One may observe in the producer configuration that a StringSerializer was chosen for marshalling the payload value. Usually, a JsonSerializer provides more robustness to the producer-consumer contract. Nevertheless, the choice here was intentional to increase the experimental flexibility.

Once the messages reach the request topic, a consumer is configured to pick them up.

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Value("${context.id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());

        DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(), new JsonDeserializer<>(Request.class));

        ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(defaultFactory);
        factory.setRecordFilterStrategy(recordFilterStrategy);
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        return factory;
    }
}

Line 26 in the listing above allows injecting a record filtering strategy, which is exactly the purpose here – a mean to decide whether a message is filtered out or not.

The RecordFilterStrategy interface has one abstract method

boolean filter(ConsumerRecord consumerRecord);

which according to its JavaDoc, returns true if the ConsumerRecord should be discarded (K represents the message key, while V the message value).

In the case of this proof of concept, all messages that have their contextId equal to hcd are accepted and consumed, while the rest are filtered out. The implementation is below.

@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRecordFilterStrategy.class);

    @Value("${context.id}")
    private String contextId;

    @Override
    public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
        Request request = consumerRecord.value();

        boolean discard = !contextId.equals(request.contextId());
        LOG.info("{} is{} compliant.", request, discard ? "n't" : "");
        return discard;
    }
}

As part of the configuration, the KafkaListenerContainerFactory interface is responsible for creating the listener container of a particular endpoint. The @EnableKafka annotation on the configuration class enables the detection of @KafkaListener annotations on any Spring-managed beans in the container. Thus, the actual listener (the message consumer) is developed next.

@Component
public class RequestMessageListener {

    private static final Logger LOG = LoggerFactory.getLogger(RequestMessageListener.class);

    private final ResponseService responseService;

    public RequestMessageListener(ResponseService responseService) {
        this.responseService = responseService;
    }

    @KafkaListener(topics = "${topic.request}", groupId = "${context.id}")
    public void onMessage(@Payload Request request) {
        LOG.info("Processing {}.", request);

        responseService.send(Response.success());
    }
}

Its functionality is trivial, it logs the messages read from the request topic and destined to the configured consumer group. Then, it invokes a ResponseService which acts as the entity that sends a message back (here, it only logs it).

@Service
public class ResponseService {

    private static final Logger LOG = LoggerFactory.getLogger(ResponseService.class);

    public void send(Response response) {
        LOG.info("Sending {}.", response);
    }
}

A Reponse is modeled simply, as below:

public record Response (String id,
                        Result result) {

    public static Response success() {
        return new Response(UUID.randomUUID().toString(), Result.SUCCESS);
    }
    
    public enum Result {
        SUCCESS, FAILURE
    }
}

When the the application is started, provided the message broker is up, the listener is ready to receive messages.

INFO 20080 --- [main] c.h.r.RecordFilterStrategyApplication				  : Started RecordFilterStrategyApplication in 1.282 seconds (process running for 1.868)
INFO 20080 --- [main] fkaConsumerFactory$ExtendedKafkaConsumer            : [Consumer clientId=consumer-hcd-1, groupId=hcd] Subscribed to topic(s): request
INFO 20080 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-hcd-1, groupId=hcd] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hcd-1, groupId=hcd] Request joining group due to: need to re-join with the given member-id: consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hcd-1, groupId=hcd] Successfully joined group with generation Generation{generationId=7, memberId='consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc', protocol='range'}

In order to check the integration, the following two tests are used. Since a Request is expected by the listener, a compliance template was created for convenience.

@SpringBootTest
class RecordFilterStrategyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topic.request}")
    private String topic;

    @Value("${context.id}")
    private String contextId;

    private static final String template = """
        {
            "id": "%s",
            "contextId": "%s"
        }""";

    @Test
    void compliant() {
        kafkaTemplate.send(topic,
                String.format(template, UUID.randomUUID(), contextId));
    }

    @Test
    void notCompliant() {
        kafkaTemplate.send(topic,
                String.format(template, UUID.randomUUID(), "other context"));
    }
}

compliant() sends a message whose contextId is as this consumer has configured it. As expected, it is processed and a response is sent back.

INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy    : Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd] is compliant.
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.RequestMessageListener : Processing Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd].
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService        : Sending Response[id=ebe0f65c-eddf-4866-b71f-e6cd766dd499, result=SUCCESS].

notCompliant() sends a message whose contextId is different from what was configured on this consumer. Thus, the message is neither processed, nor responded to, but ignored.

INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy : Request[id=ed22f60c-b13d-4315-8132-46aa83ddf33b, contextId=other context] isn't compliant.

So far, the proof of concept has exemplified how to configure the consumer with a record filtering strategy so that only certain messages are accepted.

The code for this part is here – 1-filter-strategy

Implement a Record Filter Strategy with Custom Deserialization

Let’s assume that the messages which are consumed from the request queue are unmarshalled using a custom deserializer and the filtering is still required.

The custom deserializer here is trivial and has a didactic purpose. Moreover, in case the id field is missing, a runtime RequestDeserializationException is thrown. Such an action is not necessarily needed at this point, but it was put here to outline a certain use case. Read on.

public class CustomRequestDeserializer extends StdDeserializer<Request> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRequestDeserializer.class);

    public CustomRequestDeserializer() {
        super(Request.class);
    }

    @Override
    public Request deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
        ObjectCodec oc = jsonParser.getCodec();
        JsonNode root = oc.readTree(jsonParser);

        final String contextId = deserializeField(root, "contextId");

        final String id = deserializeField(root, "id");
        if (id == null || id.isEmpty()) {
            throw new RequestDeserializationException("'id' is required");
        }

        Request request = new Request(id, contextId);
        LOG.info("Successfully deserialized {}", request);
        return request;
    }
}

To apply it, the Request record is annotated as below:

@JsonDeserialize(using = CustomRequestDeserializer.class)
public record Request(String id, String contextId) {
}

Up until now, the behavior described in the first part is preserved. If the previous compliant() and nonCompliant() tests are run again, the outcome is the same.

The next analyzed situation is the one in which a RequestDeserializationException is thrown when deserializing an incoming message. Let’s assume the id is empty, thus the form is as below:

{
	"id": "",
	"contextId": "hcd"
}
@Test
void deserializationError_compliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", contextId));
}

When such a message is received, the outcome is the following:

...
Caused by: com.hcd.recordfilterstrategy.domain.deserialization.RequestDeserializationException: 'id' is required
...

An exception thrown at desrialization time determines the message to be neither consumed, nor responded to, but to be lost.

See [Resource 3] for a detailed analysis on situations like this.

One solution that allows recovering after deserialization exceptions is to configure the value deserializer of the KafkaListenerContainerFactory with a failed deserialization function – see line 15 below:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy,
																							  FailedRequestDeserializationFunction failedDeserializationFunction) {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
	props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
	props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
	props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());

	JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(Request.class);

	ErrorHandlingDeserializer<Request> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
	valueDeserializer.setFailedDeserializationFunction(failedDeserializationFunction);

	DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
			new StringDeserializer(), valueDeserializer);

	ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(defaultFactory);
	factory.setRecordFilterStrategy(recordFilterStrategy);
	factory.setCommonErrorHandler(new DefaultErrorHandler());
	return factory;
}

The purpose of the component is to allow recovering after such an exceptional situation and to be able to send a failure response back.

@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);

    private final ResponseService responseService;

    public FailedRequestDeserializationFunction(ResponseService responseService) {
        this.responseService = responseService;
    }

    @Override
    public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception ex = failedDeserializationInfo.getException();

        if (ex instanceof RequestDeserializationException deserializationEx) {
            LOG.info("Error deserializing request - {}", deserializationEx.getMessage());

            responseService.send(Response.failure());

        } else {
            LOG.error("Unexpected error deserializing request.", ex);
        }

        return null;
    }
}

If the same test is run again and a compliant, but incorrect message is sent, the behavior changes.

2024-03-13T10:52:38.893+02:00  INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T10:52:38.895+02:00  INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           : Sending Response[id=5393b4a0-3849-4130-934b-671e43a2358f, result=FAILURE].

The only left case is that of a non-compliant and incorrect message, meaning the id is still empty, but the contextId is different from the expected one.

{
	"id": "",
	"contextId": "other context"
}

If the following test is run, nothing changes, unfortunately the failed deserialization function still sends a failure response back, although the record filtering strategy should have filtered the message out as the contextId is non-compliant.

@Test
void deserializationError_notCompliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", "other context"));
}
2024-03-13T11:03:56.609+02:00  INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T11:03:56.610+02:00  INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           : Sending Response[id=b345f001-3543-46da-bc0f-17c63c20e32a, result=FAILURE].

The code for this second section is here – 2-filter-strategy-custom-deser.

Implement a Record Filter Strategy with Custom Deserialization – Correctly

The last part of this analysis provides a solution on how to address this last use-case.

Before moving on with it, let’s recall what currently happens in all possible use-cases:

Correct, compliant message

  1. since the message is correct, the custom deserializer successfully unmarshalls it
  2. the failed deserialization function is not invoked
  3. since the message is compliant, the record filter strategy does not reject it
  4. the listener is invoked, it processes the request and sends a response back

Correct, non-compliant message

  1. since the message is correct, the custom deserializer successfully unmarshalls it
  2. the failed deserialization function is not invoked
  3. since the message is non-compliant, the record filter strategy rejects it
  4. the listener is not invoked

Incorrect, compliant or non-compliant message

  1. since the message is incorrect, the custom deserializer throws an exception
  2. the failed deserialization is invoked and it sends a failure response back
  3. the record filter strategy is not invoked
  4. the listener is not invoked

In case of a correct message, the consumer application behaves correctly, irrespective of the compliancy of the message.
In case of an incorrect message, a failure response is sent back, irrespective of the compliancy of the message, which means the consumer behaves correctly only for compliant messages.

For incorrect, non-compliant messages it should act as follows:

  1. since the message is incorrect, the custom deserializer throws an exception
  2. the failed deserialization is invoked and it sends a failure response back only if the message is compliant
  3. the record filter strategy is not invoked
  4. the listener is not invoked

At a first glance, in order to cover the last use-case as well, only the FailedRequestDeserializationFunction needs to be enhanced to also check the message compliancy.

Basically, before sending the response, the same check as the one in CustomRecordFilterStrategy shall be added. To avoid repetition, some refactoring is done.

To isolate the compliancy check, a separate component in charge of it is created.

@Component
public class RequestFilterStrategy {

    private static final Logger LOG = LoggerFactory.getLogger(RequestFilterStrategy.class);

    @Value("${context.id}")
    private String contextId;

    public boolean filter(String contextId) {
        boolean discard = !this.contextId.equals(contextId);
        LOG.info("Request is{} compliant.", discard ? "n't" : "");
        return discard;
    }
}

Then, the component is injected both in the CustomRecordFilterStrategy and in the FailedRequestDeserializationFunction and consequently, they are refactored as follows.

@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {

    private final RequestFilterStrategy requestFilterStrategy;

    public CustomRecordFilterStrategy(RequestFilterStrategy requestFilterStrategy) {
        this.requestFilterStrategy = requestFilterStrategy;
    }

    @Override
    public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
        return requestFilterStrategy.filter(consumerRecord.value().contextId());
    }
}
@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);

    private final RequestFilterStrategy requestFilterStrategy;
    private final ResponseService responseService;

    public FailedRequestDeserializationFunction(RequestFilterStrategy requestFilterStrategy,
                                                ResponseService responseService) {
        this.requestFilterStrategy = requestFilterStrategy;
        this.responseService = responseService;
    }

    @Override
    public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception ex = failedDeserializationInfo.getException();

        if (ex instanceof RequestDeserializationException deserializationEx) {
            LOG.info("Error deserializing request - {}", deserializationEx.getMessage());

            if (!requestFilterStrategy.filter(deserializationEx.getContextId())) {
                responseService.send(Response.failure());
            }
        } else {
            LOG.error("Unexpected error deserializing request.", ex);
        }

        return null;
    }
}

To check the behavior, the last unit test is run again.

@Test
void deserializationError_notCompliant() {
	kafkaTemplate.send(topic,
			String.format(template, "", "other context"));
}

The output clearly shows that for incorrect, non-compliant messages, no response is sent anymore.

2024-03-13T15:05:56.432+02:00  INFO 17916 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T15:05:56.432+02:00  INFO 17916 --- [ntainer#0-0-C-1] c.h.r.listener.RequestFilterStrategy     : Request isn't compliant.

The code for the enhanced solution is here – 3-filter-strategy-custom-deser-covered

Resources

  1. Redpanda Quickstart
  2. Spring for Apache Kafka Reference
  3. Acting Soon on Kafka Deserialization Errors
  4. The picture was taken at Legoland, Germany

PostgreSQL Views with Runtime Parameters

by Horatiu Dan

There aren’t few the situations when applications are requested to be agile and versatile enough so that they can run dynamic reports for which the input comes at runtime.

This post aims to present a way of achieving it by leveraging the temporary configuration parameters supported by PostgreSQL databases.

According to the PostgreSQL documentation, starting with version 7.3, it is possible to set a configuration parameter using the set_config(name, value, is_local) function. Later, the value of the previously set parameter may be read using the current_setting(name) function, converted if needed and used. If the third parameter of the former function is true, the changed setting will only apply to the current transaction.

This is exactly what is needed here, a way of providing a runtime parameter value that can be used as part of an atomic operation.

Set-Up

The sample application is build with:

  • Java 21
  • Spring Boot version 3.1.15
  • PostgreSQL Driver version 42.6.0.
  • Liquibase 4.20.0
  • Maven 3.6.3

At application level, the Maven project is configured to use Spring Data JPA and Liquibase dependencies.

The domain is represented by products, whose prices are in various currencies. For converting between currencies, a currency exchange rate exists. The goal is to be able to read all products with their prices represented in a certain currency, at the rate of a certain day.

Proof of Concept

In order to start modelling, one shall first create a new schema, once connected to the database.

create schema pgsetting;

There are three entities – Product, Currency, CurrencyExchange.

@Entity
@Table(name = "product")
public class Product {

    @Id
    @Column(name = "id")
    private Long id;

    @Column(name = "name", nullable = false)
    private String name;

    @Column(name = "price", nullable = false)
    private Double price;

    @ManyToOne
    @JoinColumn(name = "currency_id")
    private Currency currency;
	
	...
}

@Entity
@Table(name = "currency")
public class Currency {

    @Id
    @Column(name = "id", nullable = false)
    private Long id;

    @Column(name = "name", nullable = false)
    private String name;

	...
}


@Entity
@Table(name = "currency_exchange")
public class CurrencyExchange {

    @Id
    @Column(name = "id", nullable = false)
    private Long id;

    @Column(name = "date", nullable = false)
    private LocalDate date;

    @ManyToOne
    @JoinColumn(name = "from_currency_id", nullable = false)
    private Currency from;

    @ManyToOne
    @JoinColumn(name = "to_currency_id", nullable = false)
    private Currency to;

    @Column(name = "value", nullable = false)
    private Double value;

	...
}

Each one has a corresponding CrudRepository.

@Repository
public interface ProductRepository extends CrudRepository<Product, Long> { }

@Repository
public interface CurrencyRepository extends CrudRepository<Currency, Long> { }

@Repository
public interface CurrencyExchangeRepository extends CrudRepository<CurrencyExchange, Long> { }

The data source is configured as usual in the application.properties file, together with the path to the Liquibase changelog file that records a few simple change sets for initializing the schema with the three tables and the relations among them.

For details, the application properties and db/changelog/schema-init.xml files may be explored.

The root changelog file is:

<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd">

    <include file="/db/changelog/schema-init.xml"/>
    
</databaseChangeLog>

When the application is started, the change sets are executed in the order they are declared. So far, everything is straight forward, nothing out of the ordinary – a simple Spring Boot application whose database changes are managed with Liquibase.

Creating the Dynamic Report

Let’s assume that currently the application has two currencies defined – RON and EUR and two products with their prices recorded in different currencies.

Currency

+--+----+
|id|name|
+--+----+
|1 |RON |
|2 |EUR |
+--+----+

Product

+--+-------------------+-----+-----------+
|id|name               |price|currency_id|
+--+-------------------+-----+-----------+
|1 |Swatch Moonlight v1|100  |2          |
|2 |Winter Sky         |1000 |1          |
+--+-------------------+-----+-----------+

CurrencyExchange rates for the 15th of November

+--+----------+----------------+--------------+-----+
|id|date      |from_currency_id|to_currency_id|value|
+--+----------+----------------+--------------+-----+
|1 |2023-11-15|2               |1             |5    |
|2 |2023-11-15|2               |2             |1    |
|3 |2023-11-15|1               |2             |0.2  |
|4 |2023-11-15|1               |1             |1    |
+--+----------+----------------+--------------+-----+

The aimed result is a product report with all prices in EUR, using the exchange rate from the 15th of November 2023. This means the price of the second product needs to be converted.

To ease the design, the previously set goal is divided into smaller parts, then conquered. Conceptually, products shall be fetched and their prices converted (if needed).

  1. Fetch the products
  2. Convert the prices in the requested currency, using the exchange rate of the requested day

The former is trivial, a Spring Data Repository method would easily allow getting the products – List<Product> findAll().

The latter is achievable through a query that makes the conversions.

SELECT p.id,
       p.name,
       p.price * e.value price,       
       e.to_currency_id currency_id,
       e.date
FROM product p
LEFT JOIN currency_exchange e on p.currency_id = e.from_currency_id and 
		e.to_currency_id = 2 and
		e.date = '2023-11-15'

In order to unite the two, the following are accomplished:

  • a view is defined, for the above query – product_view

It is defined in the product-view.sql file and added as an idempotent operation in a repeatable Liquibase change set that is run whenever changed.

<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd">

    <include file="/db/changelog/schema-init.xml"/>

    <changeSet id="repeatable" author="horatiucd" runOnChange="true">
        <sqlFile dbms="postgresql" path="db/changelog/product-view.sql"/>
    </changeSet>

</databaseChangeLog>
  • a new entity – ProductView – is defined as part of the domain, together with the corresponding repository
@Entity
@Immutable
public class ProductView {

    @Id
    private Long id;

    private String name;

    private Double price;

    private LocalDate date;

    @ManyToOne
    @JoinColumn(name = "currency_id")
    private Currency currency;
	
	...
}
@Repository
public interface ProductViewRepository extends org.springframework.data.repository.Repository<ProductView, Long> {

    List<ProductView> findAll();
}

The application is now able to construct the desired report, but only for a hardcoded currency and exchange rate.

In order to pass the two at runtime, the following are performed in the same transaction:

  • the two parameter values are set as configuration parameters – SELECT set_config(:name, :value, true)
  • the ProductView entities are fetched using the repository method

Also, the product_view is modified to read the configuration parameters set as part of the current transaction and select the data accordingly.

SELECT p.id,
       p.name,
       p.price * e.value price,
       e.date,
       e.to_currency_id currency_id
FROM product p
LEFT JOIN currency_exchange e on p.currency_id = e.from_currency_id and
		e.to_currency_id = current_setting('pgsetting.CurrencyId')::int and
		e.date = current_setting('pgsetting.CurrencyDate')::date;

current_setting('pgsetting.CurrencyId') and current_setting('pgsetting.CurrencyDate') calls read the previously set parameters, which are further converted and used.

The implementation needs some additional adjustments.

ProductViewRepository is enhanced with a method that allows setting the configuration parameters.

@Repository
public interface ProductViewRepository extends org.springframework.data.repository.Repository<ProductView, Long> {

    List<ProductView> findAll();

    @Query(value = "SELECT set_config(:name, :value, true)")
    void setConfigParam(String name, String value);
}

The last parameter is always set to true, thus the value is kept only during the current transaction.

Also, a ProductService is defined to clearly mark all operations involved in the transaction.

@Service
public class ProductService {

    private final ProductViewRepository productViewRepository;

    public ProductService(ProductViewRepository productViewRepository) {
        this.productViewRepository = productViewRepository;
    }

    @Transactional
    public List<ProductView> getProducts(Currency currency, LocalDate date) {
        productViewRepository.setConfigParam("pgsetting.CurrencyId",
                String.valueOf(currency.getId()));

        productViewRepository.setConfigParam("pgsetting.CurrencyDate",
                DateTimeFormatter.ofPattern("yyyy-MM-dd").format(date));

        return productViewRepository.findAll();
    }
}

The name of the parameters are the ones used in the product_view definition.

To certify the implementation, two tests are set-up.

@SpringBootTest
class Product1Test {

    @Autowired
    private CurrencyRepository currencyRepository;

    @Autowired
    private ProductRepository productRepository;

    @Autowired
    private CurrencyExchangeRepository rateRepository;

    @Autowired
    private ProductService productService;

    private Currency ron, eur;
    private Product watch, painting;
    private CurrencyExchange eurToRon, ronToEur;
    private LocalDate date;

    @BeforeEach
    public void setup() {
        ron = new Currency(1L, "RON");
        eur = new Currency(2L, "EUR");
        currencyRepository.saveAll(List.of(ron, eur));

        watch = new Product(1L, "Swatch Moonlight v1", 100.0d, eur);
        painting = new Product(2L, "Winter Sky", 1000.0d, ron);
        productRepository.saveAll(List.of(watch, painting));

        date = LocalDate.now();
        eurToRon = new CurrencyExchange(1L, date, eur, ron, 5.0d);
        CurrencyExchange eurToEur = new CurrencyExchange(2L, date, eur, eur, 1.0d);
        ronToEur = new CurrencyExchange(3L, date, ron, eur, .2d);
        CurrencyExchange ronToRon = new CurrencyExchange(4L, date, ron, ron, 1.0d);
        rateRepository.saveAll(List.of(eurToRon, eurToEur, ronToEur, ronToRon));
    }
}

The former fetches the products with prices in EUR, using the recorded exchange rates.

@Test
void prices_in_eur() {
	List<ProductView> products = productService.getProducts(eur, date);
	Assertions.assertEquals(2, products.size());

	Assertions.assertTrue(products.stream()
			.allMatch(product -> product.getCurrency().getId().equals(eur.getId())));

	Assertions.assertTrue(products.stream()
			.allMatch(product -> product.getDate().equals(date)));

	Assertions.assertEquals(watch.getPrice(),
			products.get(0).getPrice());
	Assertions.assertEquals(painting.getPrice() * ronToEur.getValue(),
			products.get(1).getPrice());
}

When called, product_view is:

+--+-------------------+-----+-----------+----------+
|id|name               |price|currency_id|date      |
+--+-------------------+-----+-----------+----------+
|1 |Swatch Moonlight v1|100  |2          |2023-11-15|
|2 |Winter Sky         |200  |2          |2023-11-15|
+--+-------------------+-----+-----------+----------+

The latter fetches the products with prices in RON, using the same exchange rates.

@Test
void prices_in_ron() {
	List<ProductView> products = productService.getProducts(ron, date);
	Assertions.assertEquals(2, products.size());

	Assertions.assertTrue(products.stream()
			.allMatch(product -> product.getCurrency().getId().equals(ron.getId())));

	Assertions.assertTrue(products.stream()
			.allMatch(product -> product.getDate().equals(date)));

	Assertions.assertEquals(watch.getPrice() * eurToRon.getValue(),
			products.get(0).getPrice());
	Assertions.assertEquals(painting.getPrice(),
			products.get(1).getPrice());
}

When called, product_view is:

+--+-------------------+-----+-----------+----------+
|id|name               |price|currency_id|date      |
+--+-------------------+-----+-----------+----------+
|1 |Swatch Moonlight v1|500  |1          |2023-11-15|
|2 |Winter Sky         |1000 |1          |2023-11-15|
+--+-------------------+-----+-----------+----------+

Sample Code

Available here.

Resources

  1. PostgreSQL System Admin Functions
  2. The picture was taken at Legoland, Germany

Acting Soon on Kafka Deserialization Errors

by Horatiu Dan

Context

Event driven architectures have been successfully used for quite an amount of time by a lot of organizations in various business cases. They excel at performance, scalability, evolvability and fault-tolerance providing a good level of abstraction and elasticity. These strengths made them good choices when applications needed real or near real-time reactiveness.

In terms of implementations, for standard messaging, ActiveMQ and RabbitMQ are good candidates, while for data streaming, platforms as Apache Kafka and Redpanda are more suitable. Usually, when developers and architects need to opt for either one of these two directions they analyze and weight from a bunch of angles – message payload, flow and usage of data, throughput, solution topology. As the discussion around these aspects can get too big and complex, it is not going to be refined as part of this article.

Conceptually, event driven architectures involve at least three main actors – message producers, message brokers and message consumers. Briefly, the purpose is to allow the producers and the consumers to communicate in a decoupled and asynchronous way, mean that is accomplished with the help of the previously mentioned message brokers. In the optimistic scenario, a producer creates a message, publishes it to a topic owned by the broker from which the consumer reads it, deals with it and out of courtesy provides a response back. Messages are serialized (marshalled) by the producers when sent to topics and de-serialized (unmarshalled) by consumers when received from topics.

This article focuses on the situation in which a consumer experiences issues when de-serializing a received message and provides a way of being able to act further. A few examples of such actions may include constructing a default message or sending back feedback to the message broker. Developers are creative enough to decide on this behavior, depending on the particular implemented use cases.

Set-up

  • Java 21
  • Maven 3.9.2
  • Spring Boot – version 3.1.5
  • Redpanda message broker running in Docker – image version 23.2.15

Redpanda is a lightweight message broker and it was chosen for this proof of concept to give the readers the opportunity to experiment a different option than the widely used Kafka one. As it is Kafka compatible, the development and the configuration of the producers and consumers will not need to change at all if moving from one service provider to another.

According to Redpanda documentation, the Docker support applies only to development and testing. For the purpose of this project, this is more than enough, thus a single Redpanda message broker is set-up to run in Docker.

See [Resource 1] for details on how to accomplish the minimal set-up.

Once up and running, a topic called minifig is created with the following command:

>docker exec -it redpanda-0 rpk topic create minifig
TOPIC    STATUS
minifig  OK

If the cluster is inspected, one may observe that a topic with one partition and one replica was created.

>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421

BROKERS
=======
ID		HOST		PORT
0*		redpanda-0	9092

TOPICS
======
NAME				PARTITIONS	REPLICAS
__consumer_offsets	3			1
_schemas			1			1
minifig				1			1

Implementation

The flow is straight-forward, the producer sends a request to the configured topic which is further read by the consumer, as it is able to.

A request represents a mini-figure which is simplistically modelled by the following record:

public record Minifig(String id,
                      Size size,
                      String name) {

    public Minifig(Size size, String name) {
        this(UUID.randomUUID().toString(), size, name);
    }

    public enum Size {
        SMALL, MEDIUM, BIG;
    }
}

id is the unique identifier of the Minifig which has a certain name and is of a certain size – small, medium or big.

For configuring a producer and a consumer, at least these properties are needed (application.properties file):

# the path to the message broker
broker.url=localhost:19092

# the name of the broker topic
topic.minifig=minifig

# the unique string that identifies the consumer group of the consumer
topic.minifig.group.id=group-0

For sending messages, the producer needs a KafkaTemplate instance.

@Configuration
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);

        return new KafkaTemplate<>(producerFactory);
    }
}

One may observe in the producer configuration that a StringSerializer was chosen for marshalling the payload value. Usually, a JsonSerializer provides more robustness to the producer-consumer contract. Nevertheless, the choice here was intentional to increase the experimental flexibility on the consumer side (will see later). Just as a reminder, the interest in this proof of concept is to act on the encountered deserialization errors.

Once the messages reach the minifig topic, a consumer is configured to pick them up.

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Value("${topic.minifig.group.id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName());

        DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props);

        ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(defaultFactory);
		factory.setCommonErrorHandler(new DefaultErrorHandler());
        return factory;
    }
}

The KafkaListenerContainerFactory interface is responsible to create the listener container for a particular endpoint. The @EnableKafka annotation on the configuration class enables the detection of @KafkaListener annotations on any Spring-managed beans in the container. Thus, the actual listener (the message consumer) is developed next.

@Component
public class MinifigListener {

    private static final Logger LOG = LoggerFactory.getLogger(MinifigListener.class);

    @KafkaListener(topics = "${topic.minifig}", groupId = "${topic.minifig.group.id}")
    public void onReceive(@Payload Minifig minifig) {
        LOG.info("New minifig received - {}.", minifig);
    }
}

Its functionality is trivial, It only logs the messages read from the minifig topic, destined for the configured consumer group.

If the the application is started, provided the message broker is up, the listener is ready to receive messages.

INFO 10008 --- [main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group-0-1, groupId=group-0] Subscribed to topic(s): minifig
INFO 10008 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group-0-1, groupId=group-0] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421
INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-0-1, groupId=group-0] Discovered group coordinator localhost:19092 (id: 2147483647 rack: null)
INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-0-1, groupId=group-0] Found no committed offset for partition minifig-0
INFO 10008 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-0: partitions assigned: [minifig-0]

In order to check the integration, the following simple test is used. Since a Minifig is expected by the listener, a compliance template was created for convenience.

@SpringBootTest
class AppTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topic.minifig}")
    private String topic;

    final String template = "{" +
            "\"id\":\"%s\"," +
            "\"size\":\"%s\"," +
            "\"name\":\"%s\"" +
            "}";

    @Test
    void send_compliant() {
        final String minifig = String.format(template,
                UUID.randomUUID(), Minifig.Size.SMALL, "Spider-Man");

        kafkaTemplate.send(topic, minifig);
    }
}

When running the test, a ‘compliant’ message is sent to the broker and as expected, it is successfully picked up by the local consumer.

INFO 10008 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener           : New minifig received - Minifig[id=0c75b9e4-511a-48b3-a984-404d2fc1d47b, size=SMALL, name=Spider-Man].

Redpanda Console can be helpful in observing what is happening at the broker level, particularly what is flowing through the minifig topic.

In scenarios as the one above, messages are sent from the producer to the consumer via the message broker, as planed.

Recover on Deserialization Failures

In the particular case of this proof of concept, it is assumed the type of a mini-figure can be SMALL, MEDIUM or BIG, in line to the defined Type enum. In case the producer sends a mini-figure of an unknown type, one that deviates a bit from the agreed contract, the messages are basically rejected by the listener, as the payload cannot be de-serialized.

To simulate this, the following test is run.

@SpringBootTest
class AppTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topic.minifig}")
    private String topic;

    final String template = "{" +
            "\"id\":\"%s\"," +
            "\"size\":\"%s\"," +
            "\"name\":\"%s\"" +
            "}";
    
    @Test
    void send_non_compliant() {
        final String minifig = String.format(template,
                UUID.randomUUID(), "Unknown", "Spider-Man");

        kafkaTemplate.send(topic, minifig);
    }
}

The message reaches the topic, but not the MinifigListener#onReceive() method. As expected, the error appeared when the payload was being unmarshalled. The causes can be depicted by looking deep down the stack trace.

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data  from topic [minifig]
Caused by: com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL]
 at [Source: (byte[])"{"id":"fbc86874-55ac-4313-bbbb-0ed99341825a","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])

One other aspect is that the messages are continually tried to be read on the consumer side. This is unfortunate at least from the consumer point of view, as the logs are accumulating.

In order to pass over such situations, the JsonDeserializer used for unmarshalling the payload value is decorated in an ErrorHandlingDeserializer as its actual delegate. Moreover, the ErrorHandlingDeserializer has a failedDeserializationFunction member that according to its JavaDoc, provides an alternative mechanism when the deserialization fails.

The new consumer configuration looks as below:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
	props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
	props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName());
	props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName());

	JsonDeserializer<Minifig> jsonDeserializer = new JsonDeserializer<>(Minifig.class);

	ErrorHandlingDeserializer<Minifig> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
	valueDeserializer.setFailedDeserializationFunction(new MinifigFailedDeserializationFunction());

	DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
			new StringDeserializer(), valueDeserializer);

	ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(defaultFactory);
	factory.setCommonErrorHandler(new DefaultErrorHandler());
	return factory;
}

The failedDeserializationFunction used here is simplistic, but the reason is to prove its utility.

public class MinifigFailedDeserializationFunction implements Function<FailedDeserializationInfo, Minifig> {

    private static final Logger LOG = LoggerFactory.getLogger(MinifigFailedDeserializationFunction.class);

    @Override
    public Minifig apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception exception = failedDeserializationInfo.getException();
        LOG.info("Error deserializing minifig - {}", exception.getCause().getMessage());
        return new Minifig("Default");
    }
}

The FailedDeserializationInfo entity (the Function#apply() input) is constructed during the recovery from the de-serialization exception and it encapsulates various pieces of information (here, the exception is the one leveraged).
Since the output of the apply() method is the actual deserialization result, one may return either null or whatever it is suitable depending on the aimed behavior.

If running the send_non_compliant() test again, the deserialization exception is handled and a default value is returned. Further, the MinifigListener is invoked and has the opportunity to deal with it.

INFO 30160 --- [ntainer#0-0-C-1] e.l.MinifigFailedDeserializationFunction : Error deserializing minifig - Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL]
 at [Source: (byte[])"{"id":"f35a77bf-29e5-4f5c-b5de-cc674f22029f","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])
INFO 30160 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener           : New minifig received - Minifig[id=null, size=SMALL, name=Undefined].

Conclusion

Configuring Kafka producers and consumers and fine-tuning them in order to achieve the desired performance in accordance to the used message brokers is not always straight-forward. Controlling each step of the communication is by all means something desirable and moreover, acting fast to unknown situations helps delivering robust and easy to maintain solutions. This post focused on the deserialization issues that might appear at Kafka consumers level and provided with a way of having a second plan when dealing with non compliant payloads.

Sample Code

Available here – https://github.com/horatiucd/err-handler-deserializer

Resources

  1. Redpanda Quickstart
  2. Spring for Apache Kafka Reference
  3. The picture was taken at Zoo Brasov, Romania

Stream Summary Statistics

by Horatiu Dan

Context

In order to be able to leverage various capabilities of the Java Streams, one shall first understand two general concepts – the stream and the stream pipeline. A Stream in Java is a sequential flow of data. A stream pipeline on the other hand, represents a series of steps applied to data, series that ultimately produce a result.

My family and I recently visited the Legoland Resort in Germany – a great place by the way – and there, among other attractions, we had the chance to observe in detail a sample of the brick building process. Briefly, everything starts from the granular plastic that is melted, modeled accordingly, assembled, painted, stenciled if needed and packed up in bags and boxes. All the steps are part of an assembly factory pipeline.

What is worth mentioning is the fact that the next step cannot be done until the previous one has completed and also that the number of steps is finite. Moreover, at every step, each Lego element is touched to perform the corresponding operation and then it moves only forward, never backwards, so that the next step is done. The same applies to Java streams.

In functional programming, the steps are called stream operations and they are of three categories – one that starts the job (source), one that ends it and produces the result (terminal) and a couple of intermediate ones in between.

As a last consideration it’s worth mentioning the intermediate operations have the ability to transform the stream into another one, but are never run until the terminal operation runs (they are lazy evaluated). Finally, once the result is produced and the initial scope achieved, the stream is no longer valid.

Abstract

Having as starting point the fact that in case of Java Streams once the terminal stream operation is done, the stream is no longer valid, this article aims to present a way of computing multiple operations at once through only one stream traversal. It is accomplished by leveraging the Java summary statistics objects (in particular IntSummaryStatistics) that reside since version 1.8.

Proof of Concept

The small project built especially to show case the statistics computation uses the following:

  • Java 17
  • Maven 3.6.3
  • JUnit Jupiter Engine v.5.9.3

As domain, there is one straight forward entity – a parent.

public record Parent(String name, int age) { }

It is modeled by two attributes – the name and its age. While the name is present only for being able to distinguish the parents, the age is the one of interest here.

The purpose is to be able to compute a few age statistics on a set of parents, that is:

  • the total sample count
  • the ages of the youngest and the oldest parent
  • the age range of the group
  • the average age
  • the total number of years the parents accumulate.

The results are encapsulated into a ParentStats structure, represented as a record as well.

public record ParentStats(long count,
                          int youngest,
                          int oldest,
                          int ageRange,
                          double averageAge,
                          long totalYearsOfAge) { }

In order to accomplish this, an interface is defined.

public interface Service {

    ParentStats getStats(List<Parent> parents);
}

For now, it has only one method that receives an input a list of Parents and provides as output the desired statistics.

Initial Implementation

As the problem is trivial, an initial and imperative implementation of the service might be as below:

public class InitialService implements Service {

    @Override
    public ParentStats getStats(List<Parent> parents) {
        int count = parents.size();
        int min = Integer.MAX_VALUE;
        int max = 0;
        int sum = 0;
        for (Parent human : parents) {
            int age = human.age();
            if (age < min) {
                min = age;
            }
            if (age > max) {
                max = age;
            }
            sum += age;
        }

        return new ParentStats(count, min, max, max - min, (double) sum/count, sum);
    }
}

The code looks clear, but it seems too focused on the how rather than on the what, thus the problem seems to get lost in the implementation and the code hard to read.

As the functional style and streams are already part of every Java developer’s practices, most probably the next service implementation would be chosen.

public class StreamService implements Service {

    @Override
    public ParentStats getStats(List<Parent> parents) {
        int count = parents.size();

        int min = parents.stream()
                .mapToInt(Parent::age)
                .min()
                .orElseThrow(RuntimeException::new);

        int max = parents.stream()
                .mapToInt(Parent::age)
                .max()
                .orElseThrow(RuntimeException::new);

        int sum = parents.stream()
                .mapToInt(Parent::age)
                .sum();

        return new ParentStats(count, min, max, max - min, (double) sum/count, sum);
    }
}

The code is more readable now, the downside though is the stream traversal redundancy for computing all the desired stats – three times in this particular case. As stated in the beginning of the article, once the terminal operation is done – min, max, sum – the stream is no longer valid. It would be convenient to be able to compute the aimed statistics without having to loop the list of parents multiple times.

Summary Statistics Implementation

In Java, there is a series of objects called SummaryStatistics which come as different types – IntSummaryStatistics, LongSummaryStatistics, DoubleSummaryStatistics.

According to the JavaDoc, IntSummaryStatistics is “a state object for collecting statistics such as count, min, max, sum and average. The class is designed to work with (though does not require) streams”. [Resource 1]

It is a good candidate for the initial purpose, thus the following implementation of the Service seems the preferred one.

public class StatsService implements Service {

    @Override
    public ParentStats getStats(List<Parent> parents) {
        IntSummaryStatistics stats = parents.stream()
                .mapToInt(Parent::age)
                .summaryStatistics();

        return new ParentStats(stats.getCount(),
                stats.getMin(),
                stats.getMax(),
                stats.getMax() - stats.getMin(),
                stats.getAverage(),
                stats.getSum());
    }
}

There is only one stream of parents, the statistics get computed and the code is way readable this time.

In order to check all three implementations, the following abstract base unit test is used.

abstract class ServiceTest {

    private Service service;

    private List<Parent> mothers;
    private List<Parent> fathers;
    private List<Parent> parents;

    protected abstract Service setupService();

    @BeforeEach
    void setup() {
        service = setupService();

        mothers = IntStream.rangeClosed(1, 3)
                .mapToObj(i -> new Parent("Mother" + i, i + 30))
                .collect(Collectors.toList());

        fathers = IntStream.rangeClosed(4, 6)
                .mapToObj(i -> new Parent("Father" + i, i + 30))
                .collect(Collectors.toList());

        parents = new ArrayList<>(mothers);
        parents.addAll(fathers);
    }

    private void assertParentStats(ParentStats stats) {
        Assertions.assertNotNull(stats);
        Assertions.assertEquals(6, stats.count());
        Assertions.assertEquals(31, stats.youngest());
        Assertions.assertEquals(36, stats.oldest());
        Assertions.assertEquals(5, stats.ageRange());

        final int sum = 31 + 32 + 33 + 34 + 35 + 36;

        Assertions.assertEquals((double) sum/6, stats.averageAge());
        Assertions.assertEquals(sum, stats.totalYearsOfAge());
    }

    @Test
    void getStats() {
        final ParentStats stats = service.getStats(parents);
        assertParentStats(stats);
    }
}

As the stats are computed for all the parents, the mothers and fathers are first put together in the same parents list (we will see later why there were two lists in the first place).

The particular unit-test for each implementation is trivial – it sets up the service instance.

class StatsServiceTest extends ServiceTest {

    @Override
    protected Service setupService() {
        return new StatsService();
    }
}

Combining Statistics

In addition to the already used methods – getMin(), getMax(), getCount(), getSum(), getAverage()IntSummaryStatistics provides a way to combine the state of another similar object into the current one.

void combine(IntSummaryStatistics other)

As we saw in the above unit-test, initially there are two source lists – mothers and fathers. It would be convenient to be able to directly compute the statistics, without first merging them.

In order to accomplish this, the Service is enriched with the following method.

default ParentStats getCombinedStats(List<Parent> mothers, List<Parent> fathers) {
	final List<Parent> parents = new ArrayList<>(mothers);
	parents.addAll(fathers);
	return getStats(parents);
}

The first two implementations – InitialService and StreamService – are not of interest here, thus a default implementation was provided for convenince. It is overwritten only by the StatsService.

@Override
public ParentStats getCombinedStats(List<Parent> mothers, List<Parent> fathers) {
	Collector<Parent, ?, IntSummaryStatistics> collector = Collectors.summarizingInt(Parent::age);

	IntSummaryStatistics stats = mothers.stream().collect(collector);
	stats.combine(fathers.stream().collect(collector));

	return new ParentStats(stats.getCount(),
			stats.getMin(),
			stats.getMax(),
			stats.getMax() - stats.getMin(),
			stats.getAverage(),
			stats.getSum());
}

By leveraging the combine() method, the statistics can be merged directly as different source lists are available.

The corresponding unit test is straight-forward.

@Test
void getCombinedStats() {
	final ParentStats stats = service.getCombinedStats(mothers, fathers);
	assertParentStats(stats);
}

Having seen the above Collector, the initial getStats() method may be written even more briefly.

@Override
public ParentStats getStats(List<Parent> parents) {
	IntSummaryStatistics stats = parents.stream()
			.collect(Collectors.summarizingInt(Parent::age));

	return new ParentStats(stats.getCount(),
			stats.getMin(),
			stats.getMax(),
			stats.getMax() - stats.getMin(),
			stats.getAverage(),
			stats.getSum());
}

Conclusion

Depending on the used data types, IntSummaryStatistics, LongSummaryStatistics or DoubleSummaryStatistics are convenient out-of-the-box structures that one can use to quickly compute simple statistics and focus on writing more readable and maintainable code.

Resources

  1. IntSummaryStatistics JavaDoc
  2. Source code for the sample POC
  3. The picture was taken at Legoland Resort, Germany

Idempotent Liquibase Change Sets

by Horatiu Dan

Abstract

“Idempotence is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application” [Resource 3].

The purpose of this article is to outline a few ways of creating idempotent changes when the database modifications are managed with Liquibase. Throughout the life time of a software product that has such tier, various database modifications are being applied as it evolves. The more robust the modifications are, the more maintainable the solution is. In order to accomplish such a way of working, it is usually a good practice to design the executed change sets to have zero side effects, that is to be able to be run as many times as needed with the same end result.

The simple proof of concept built here aims to show case how Liquibase change sets may be written to be idempotent. Moreover, the article explains in more depth what exactly happens when the application starts.

Set-up

  • Java 17
  • Spring Boot v.3.1.0
  • Liquibase 4.20.0
  • PostgreSQL Driver 42.6.0
  • Maven 3.6.3

Proof of Concept

As PostgreSQL is the database used here, first and foremost one shall create a new schema – liquidempo. This operation is easy to accomplish by issuing the following SQL command, once connected to the database.

create schema liquidempo;

At application level:

  • The Maven Spring Boot project is created and configured to use the PostgreSQL Driver, Spring Data JPA and Liquibase dependencies.
  • A simple entity is created – Human – with only one attribute, a unique identifier which is also the primary key at database level.
@Entity
@Table(name = "human")
@SequenceGenerator(sequenceName = "human_seq", name = "CUSTOM_SEQ_GENERATOR", allocationSize = 1)
public class Human {

    @Id
    @GeneratedValue(strategy = GenerationType.AUTO, generator = "CUSTOM_SEQ_GENERATOR")
    @Column(name = "id")
    private Long id;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }
}

For convenience, when entities are stored, their unique identifiers are generated using a database sequence, called human_seq.

  • The data source is configured as usual in the application.properties file
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.url=jdbc:postgresql://localhost:5432/postgres?currentSchema=liquidempo&useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true
spring.datasource.username=postgres
spring.datasource.password=123456

spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect
spring.jpa.hibernate.ddl-auto=none

The previously created schema is referred in the connection URL. DDL handling is disabled, as the infrastructure and the data are intended to be persistent when the application is restarted.

  • As Liquibase is the database migration manager, the changelog path is configured in the application.properties file as well.
spring.liquibase.change-log=classpath:/db/changelog/db.changelog-root.xml

For now, the db.changelog-root.xml file is empty.

The current state of the project requires a few simple change sets, in order to create the database elements depicted around the Human entity – the table, the sequence and the primary key constraint.

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd">

    <changeSet author="horatiucd" id="100">
        <createSequence sequenceName="human_seq" startValue="1" incrementBy="1"/>
    </changeSet>

    <changeSet author="horatiucd" id="200">
        <createTable tableName="human">
            <column name="id" type="BIGINT">
                <constraints nullable="false"/>
            </column>
        </createTable>
    </changeSet>

    <changeSet author="horatiucd" id="300">
        <addPrimaryKey columnNames="id" constraintName="human_pk" tableName="human"/>
    </changeSet>

</databaseChangeLog>

In order for these to be applied, they need to be recorded as part of db.changelog-root.xml file, as indicated below.

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd">

    <include file="db/changelog/human_init.xml"/>

</databaseChangeLog>

When the application is restarted, the three change sets are executed in the order they are declared.

INFO 9092 --- [main] liquibase.database      : Set default schema name to liquidempo
INFO 9092 --- [main] liquibase.lockservice   : Successfully acquired change log lock
INFO 9092 --- [main] liquibase.changelog     : Creating database history table with name: liquidempo.databasechangelog
INFO 9092 --- [main] liquibase.changelog     : Reading from liquidempo.databasechangelog
Running Changeset: db/changelog/human_init.xml::100::horatiucd
INFO 9092 --- [main] liquibase.changelog     : Sequence human_seq created
INFO 9092 --- [main] liquibase.changelog     : ChangeSet db/changelog/human_init.xml::100::horatiucd ran successfully in 6ms
Running Changeset: db/changelog/human_init.xml::200::horatiucd
INFO 9092 --- [main] liquibase.changelog     : Table human created
INFO 9092 --- [main] liquibase.changelog     : ChangeSet db/changelog/human_init.xml::200::horatiucd ran successfully in 4ms
Running Changeset: db/changelog/human_init.xml::300::horatiucd
INFO 9092 --- [main] liquibase.changelog     : Primary key added to human (id)
INFO 9092 --- [main] liquibase.changelog     : ChangeSet db/changelog/human_init.xml::300::horatiucd ran successfully in 8ms
INFO 9092 --- [main] liquibase               : Update command completed successfully.
INFO 9092 --- [main] liquibase.lockservice   : Successfully released change log lock

Moreover, they are recorded as separate rows in the databasechangelog database table.

+---+---------+---------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+
|id |author   |filename                   |dateexecuted              |orderexecuted|exectype|md5sum                            |description                                           |
+---+---------+---------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+
|100|horatiucd|db/changelog/human_init.xml|2023-05-26 16:23:17.184239|1            |EXECUTED|8:db8c5fb392dc96efa322da2c326b5eba|createSequence sequenceName=human_seq                 |
|200|horatiucd|db/changelog/human_init.xml|2023-05-26 16:23:17.193031|2            |EXECUTED|8:ed8e5e7df5edb17ed9a0682b9b640d7f|createTable tableName=human                           |
|300|horatiucd|db/changelog/human_init.xml|2023-05-26 16:23:17.204184|3            |EXECUTED|8:a2d6eff5a1e7513e5ab7981763ae532b|addPrimaryKey constraintName=human_pk, tableName=human|
+---+---------+---------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+

So far, everything is straight forward, nothing out of the ordinary – a simple Spring Boot application whose database changes are managed with Liquibase.

When examining the above human_init.xml file, one can easily depict the three scripts that result from the three changesets. None is idempotent. It means that if they are executed again (although there is no reason for doing it here) errors will occur because the human_seq sequence, the human table and the human_pk primary key already exist.

Idempotent Change Sets

If the SQL code that results from the XML change sets had been written directly and aimed to be idempotent, it would have read as follows:

CREATE SEQUENCE IF NOT EXISTS human_seq INCREMENT 1 MINVALUE 1 MAXVALUE 99999999999;

CREATE TABLE IF NOT EXISTS human (
	id SERIAL CONSTRAINT human_pk PRIMARY KEY
);

If the two commands are executed several times, no errors occur and the outcome remains the same. After the first run, the sequence, the table and the constraint are created, then every new execution leaves them in the same usable state.

The aim is to accomplish the same in the written Liquibase change sets (change log).

According to the Liquibase documentation [Resource 1] – “Preconditions are tags you add to your changelog or individual changesets to control the execution of an update based on the state of the database. Preconditions let you specify security and standardization requirements for your changesets. If a precondition on a changeset fails, Liquibase does not deploy that changeset.”

These constructs may be configured in various ways, either at change log or change set level. For simplicity, the three change sets of this proof of concept will be made idempotent.

Basically, whenever a change set fails to execute because the entity (sequence, table or primary key) already exists, it would be convenient to continue and not halt the execution of the entire change log and not be able to start the application.

In this direction, Liquibase preconditions provides at least two options:

  • either skip over the changeset and continue with the change log, or
  • skip over the change set but mark it as executed and continue with the change log.

Either of the two can be configured by adding a preConditions tag in the change set of interest and setting the onFail attribute as CONTINUE (the former case) or MARK_RAN (the latter case).

In pseudo-code, this looks as below:

<changeSet author="horatiucd" id="100">
	<preConditions onFail="CONTINUE or MARK_RAN">
		...
	</preConditions>
	...
</changeSet>

This seems in line to the initial desire – execute the change set only if the preconditions are met. Next, each of the two situations is analyzed.

onFail=”CONTINUE”

The change log file – human_init_idempo_continue.xml – becomes as below:

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd">

    <changeSet author="horatiucd" id="101">
        <preConditions onFail="CONTINUE">
            <not>
                <sequenceExists sequenceName="human_seq"/>
            </not>
        </preConditions>
        <createSequence sequenceName="human_seq" startValue="1" incrementBy="1"/>
    </changeSet>

    <changeSet author="horatiucd" id="201">
        <preConditions onFail="CONTINUE">
            <not>
                <tableExists tableName="human"/>
            </not>
        </preConditions>
        <createTable tableName="human">
            <column name="id" type="BIGINT">
                <constraints nullable="false"/>
            </column>
        </createTable>
    </changeSet>

    <changeSet author="horatiucd" id="301">
        <preConditions onFail="CONTINUE">
            <not>
                <primaryKeyExists primaryKeyName="human_pk" tableName="human"/>
            </not>
        </preConditions>
        <addPrimaryKey columnNames="id" constraintName="human_pk" tableName="human"/>
    </changeSet>

</databaseChangeLog>

For each item, the precondition checks if it does not exist.

When running the application, the log shows what is executed:

INFO 49016 --- [main] liquibase.database     : Set default schema name to liquidempo
INFO 49016 --- [main] liquibase.changelog    : Reading from liquidempo.databasechangelog
INFO 49016 --- [main] liquibase.lockservice  : Successfully acquired change log lock
Running Changeset: db/changelog/human_init_idempo_continue.xml::101::horatiucd
INFO 49016 --- [main] liquibase.changelog    : Continuing past: db/changelog/human_init_idempo_continue.xml::101::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_continue.xml::201::horatiucd
INFO 49016 --- [main] liquibase.changelog    : Continuing past: db/changelog/human_init_idempo_continue.xml::201::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_continue.xml::301::horatiucd
INFO 49016 --- [main] liquibase.changelog    : Continuing past: db/changelog/human_init_idempo_continue.xml::301::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
INFO 49016 --- [main] liquibase              : Update command completed successfully.
INFO 49016 --- [main] liquibase.lockservice  : Successfully released change log lock

As expected, all three preconditions failed and the execution of the change log continued.

The databasechangelog database table does not have any records in addition to the previous three, which means the change sets will be attempted to be executed again at the next start-up of the application.

onFail=”MARK_RAN”

The change log file – human_init_idempo_mark_ran.xml – is similar to the one in human_init_idempo_continue.xml, the only difference is the onFail attribute, which is set as onFail="MARK_RAN".

The db.changelog-root.xml root change log now looks as below:

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd">

    <include file="db/changelog/human_init.xml"/>
    <include file="db/changelog/human_init_idempo_continue.xml"/>
    <include file="db/changelog/human_init_idempo_mark_ran.xml"/>
    
</databaseChangeLog>

For this proof of concept, all three files were kept on purpose, in order to be able to observe the behavior in detail.

If the application is restarted, no errors are encountered and the log depicts the following:

INFO 38788 --- [main] liquibase.database      : Set default schema name to liquidempo
INFO 38788 --- [main] liquibase.changelog     : Reading from liquidempo.databasechangelog
INFO 38788 --- [main] liquibase.lockservice   : Successfully acquired change log lock
INFO 38788 --- [main] liquibase.changelog     : Reading from liquidempo.databasechangelog
Running Changeset: db/changelog/human_init_idempo_continue.xml::101::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Continuing past: db/changelog/human_init_idempo_continue.xml::101::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_continue.xml::201::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Continuing past: db/changelog/human_init_idempo_continue.xml::201::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_continue.xml::301::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Continuing past: db/changelog/human_init_idempo_continue.xml::301::horatiucd despite precondition failure due to onFail='CONTINUE': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_mark_ran.xml::101::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Marking ChangeSet: "db/changelog/human_init_idempo_mark_ran.xml::101::horatiucd" as ran despite precondition failure due to onFail='MARK_RAN': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_mark_ran.xml::201::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Marking ChangeSet: "db/changelog/human_init_idempo_mark_ran.xml::201::horatiucd" as ran despite precondition failure due to onFail='MARK_RAN': 
          db/changelog/db.changelog-root.xml : Not precondition failed
Running Changeset: db/changelog/human_init_idempo_mark_ran.xml::301::horatiucd
INFO 38788 --- [main] liquibase.changelog     : Marking ChangeSet: "db/changelog/human_init_idempo_mark_ran.xml::301::horatiucd" as ran despite precondition failure due to onFail='MARK_RAN': 
          db/changelog/db.changelog-root.xml : Not precondition failed
INFO 38788 --- [main] liquibase               : Update command completed successfully.
INFO 38788 --- [main] liquibase.lockservice   : Successfully released change log lock

The change sets with onFail="CONTINUE" were tried to be re-executed, as this is a new attempt, while the ones with onFail="MARK_RAN" were marked in the databasechangelog and will be passed over at the next star-up.

+---+---------+-------------------------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+
|id |author   |filename                                   |dateexecuted              |orderexecuted|exectype|md5sum                            |description                                           |
+---+---------+-------------------------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+
|100|horatiucd|db/changelog/human_init.xml                |2023-05-26 16:23:17.184239|1            |EXECUTED|8:db8c5fb392dc96efa322da2c326b5eba|createSequence sequenceName=human_seq                 |
|200|horatiucd|db/changelog/human_init.xml                |2023-05-26 16:23:17.193031|2            |EXECUTED|8:ed8e5e7df5edb17ed9a0682b9b640d7f|createTable tableName=human                           |
|300|horatiucd|db/changelog/human_init.xml                |2023-05-26 16:23:17.204184|3            |EXECUTED|8:a2d6eff5a1e7513e5ab7981763ae532b|addPrimaryKey constraintName=human_pk, tableName=human|
|101|horatiucd|db/changelog/human_init_idempo_mark_ran.xml|2023-05-29 16:40:26.453305|4            |MARK_RAN|8:db8c5fb392dc96efa322da2c326b5eba|createSequence sequenceName=human_seq                 |
|201|horatiucd|db/changelog/human_init_idempo_mark_ran.xml|2023-05-29 16:40:26.463021|5            |MARK_RAN|8:ed8e5e7df5edb17ed9a0682b9b640d7f|createTable tableName=human                           |
|301|horatiucd|db/changelog/human_init_idempo_mark_ran.xml|2023-05-29 16:40:26.475153|6            |MARK_RAN|8:a2d6eff5a1e7513e5ab7981763ae532b|addPrimaryKey constraintName=human_pk, tableName=human|
+---+---------+-------------------------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+

At the next run of the application, the log will be similar to the one where the onFail was set on "CONTINUE".

One more observation is worth making at this point. In case a change set whose preconditions do not fail, they are executed normally and recorded with exectype = EXECUTED in the databasechangelog table.

Conclusions

This article presented two ways of writing idempotent Liquibase change sets, practice that allows having more robust and easy to maintain applications. This was accomplished by leveraging the change set preConditions tag inside the change log files. While both onFail attribute values – CONTINUE and MARK_RAN – may be used depending on the actual performed operation, the latter seems more appropriate for this proof of concept as it does not attempt to re-run the change sets at every start-up of the application.

Resources

  1. Liquibase Documentation
  2. Source code for the sample application
  3. Idempotence

Repeatable Database Updates via Liquibase

by Horatiu Dan

Abstract

The main purpose of this tutorial is to present a way of detecting modifications to a stored Liquibase change set that was previously applied and execute it again automatically. In order to illustrate this, a small proof of concept is constructed gradually. In the first step, the application configures Liquibase as its migration manager and creates the initial database schema. Then, modifications are applied to the running version and lastly, the repeatable script is introduced and enhanced.

Set-up

  • Java 17
  • Spring Boot v.3.0.2
  • Liquibase 4.17.2
  • PostgreSQL 12.11
  • Maven

Proof of Concept

As PostgreSQL was chosen for the database layer of this service, first a new schema is created (liquirepeat). This can be easily accomplished by issuing the following SQL command, after previously connecting to the database.

create schema liquirepeat;

At application level, the steps are presented below.

  • The Maven Spring Boot project is created and instructed to use the PostgreSQL Driver, Liquibase and Spring Data JPA dependencies. This is enough for the current purpose.
  • A Minifig entity is created, having two attributes – id and name. It represents a mini-figure with an unique identifier and its name.
@Entity
@Table(name = "minifig")
@SequenceGenerator(sequenceName="minifig_seq", name="CUSTOM_SEQ_GENERATOR", initialValue=1, allocationSize=1)
@Data
public class Minifig {

    @Id
    @GeneratedValue(strategy = GenerationType.AUTO, generator = "CUSTOM_SEQ_GENERATOR")
    @Column(name = "id")
    private Long id;

    @Column(name = "name", nullable = false)
    private String name;

    public Minifig() {

    }

    public Minifig(String name) {
        this.name = name;
    }
}

For convenience, when entities are stored, their unique identifiers are generated using a database sequence, called minifig_seq.

  • A corresponding JPA repository is declared by extending the existing CrudRepository.
public interface MinifigRepository extends CrudRepository<Minifig, Long> {}
  • The data source is configured in the usual way in the application.properties file.
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.url=jdbc:postgresql://localhost:5432/postgres?currentSchema=liquirepeat&useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true
spring.datasource.username=postgres
spring.datasource.password=123456

spring.jpa.hibernate.ddl-auto=none

The previously created schema is referred in the connection URL. DDL handling is disabled, as the infrastructure and the data are intended to be persistent when the application is restarted.

  • As the database migration manager is Liquibase, the changelog path is configured in the application.properties file as well.
spring.liquibase.change-log=classpath:/db/changelog/db.changelog-root.xml

For now, the db.changelog-root.xml file is empty.

At application start-up, the two Liquibase specific tables are created – databasechangelog and databasechangeloglock. The former, (which records the deployed changes) is empty, as nothing is to be executed yet – db.changelog-root.xml is currently empty.

The logs clearly depict the expected behavior.

INFO 28464 --- [main] liquibase.database      : Set default schema name to liquirepeat
INFO 28464 --- [main] liquibase.lockservice   : Successfully acquired change log lock
INFO 28464 --- [main] liquibase.changelog     : Creating database history table with name: liquirepeat.databasechangelog
INFO 28464 --- [main] liquibase.changelog     : Reading from liquirepeat.databasechangelog
INFO 28464 --- [main] liquibase.lockservice   : Successfully released change log lock

In the first version of the application – 1.0.0 – at least the database schema initialization should be fulfilled.

According to Liquibase best practices, a directory for each version is recommended and located under db/changelog, next to db.changelog-root.xml file. Thus, version-1.0.0 folder is created, containing this version change sets – for now, the schema-init.xml file.

<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd">

    <changeSet author="horatiucd" id="100">
        <createSequence sequenceName="minifig_seq" startValue="1" incrementBy="1"/>
    </changeSet>

    <changeSet author="horatiucd" id="200">
        <createTable tableName="minifig">
            <column name="id" type="BIGINT">
                <constraints nullable="false"/>
            </column>
            <column name="name" type="VARCHAR(255)">
                <constraints nullable="false"/>
            </column>
        </createTable>
    </changeSet>

    <changeSet author="horatiucd" id="300">
        <addPrimaryKey columnNames="id" constraintName="minifig_pk" tableName="minifig"/>
    </changeSet>

</databaseChangeLog>

minifig table and the corresponding minifig_seq sequence are created, in line with the simple entity class. In order for these to be applied, they need to be recorded as part of db.changelog-root.xml file, as indicated below.

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd">

    <!-- Version 1.0.0 -->
    <include file="db/changelog/version-1.0.0/schema-init.xml"/>
</databaseChangeLog>

When the application is restarted, the three change sets are executed in the order they are declared.

INFO 44740 --- [main] liquibase.database      : Set default schema name to liquirepeat
INFO 44740 --- [main] liquibase.lockservice   : Successfully acquired change log lock
INFO 44740 --- [main] liquibase.changelog     : Reading from liquirepeat.databasechangelog
Running Changeset: db/changelog/version-1.0.0/schema-init.xml::100::horatiucd
INFO 44740 --- [main] liquibase.changelog     : Sequence minifig_seq created
INFO 44740 --- [main] liquibase.changelog     : ChangeSet db/changelog/version-1.0.0/schema-init.xml::100::horatiucd ran successfully in 15ms
Running Changeset: db/changelog/version-1.0.0/schema-init.xml::200::horatiucd
INFO 44740 --- [main] liquibase.changelog     : Table minifig created
INFO 44740 --- [main] liquibase.changelog     : ChangeSet db/changelog/version-1.0.0/schema-init.xml::200::horatiucd ran successfully in 4ms
Running Changeset: db/changelog/version-1.0.0/schema-init.xml::300::horatiucd
INFO 44740 --- [main] liquibase.changelog     : Primary key added to minifig (id)
INFO 44740 --- [main] liquibase.changelog     : ChangeSet db/changelog/version-1.0.0/schema-init.xml::300::horatiucd ran successfully in 5ms
INFO 44740 --- [main] liquibase.lockservice   : Successfully released change log lock

Moreover, they are recorded as separate rows in the databasechangelog database table.

+---+---------+------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+
|id |author   |filename                                  |dateexecuted              |orderexecuted|exectype|md5sum                            |description                                               |
+---+---------+------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+
|100|horatiucd|db/changelog/version-1.0.0/schema-init.xml|2023-02-06 23:15:29.458517|1            |EXECUTED|8:be2c93cdecec258121a0e522cde14dbf|createSequence sequenceName=minifig_seq                   |
|200|horatiucd|db/changelog/version-1.0.0/schema-init.xml|2023-02-06 23:15:29.466702|2            |EXECUTED|8:7083de78675d6af112bec737838e8cbb|createTable tableName=minifig                             |
|300|horatiucd|db/changelog/version-1.0.0/schema-init.xml|2023-02-06 23:15:29.472865|3            |EXECUTED|8:db76242ba57fe4b4883e51313955cae9|addPrimaryKey constraintName=minifig_pk, tableName=minifig|
+---+---------+------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+

In version 2.0.0, a new attribute is added to the Minifig entity, its description. In order to reflect it at the database level, a change set is added in a version specific directory and plugged into the db.changelog-root.xml file.

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd">

    <!-- Version 1.0.0 -->
    <include file="db/changelog/version-1.0.0/schema-init.xml"/>

    <!-- Version 2.0.0 -->
    <include file="db/changelog/version-2.0.0/minifig_update.xml"/>
    
</databaseChangeLog>

The minifig_update.xml contains the change set that updates the table.

<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd">

    <changeSet author="horatiucd" id="400">
        <addColumn tableName="minifig">
            <column name="description" type="VARCHAR(500)"/>
        </addColumn>
    </changeSet>
</databaseChangeLog>

The entity is enriched with the new attribute as well.

@Column(name = "description")
private String description;

At application start-up, a new record is added into databasechangelog database table, record that reflects the mentioned change.

+---+---------+---------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+
|id |author   |filename                                     |dateexecuted              |orderexecuted|exectype|md5sum                            |description                                               |
+---+---------+---------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+
|100|horatiucd|db/changelog/version-1.0.0/schema-init.xml   |2023-02-06 23:15:29.458517|1            |EXECUTED|8:be2c93cdecec258121a0e522cde14dbf|createSequence sequenceName=minifig_seq                   |
|200|horatiucd|db/changelog/version-1.0.0/schema-init.xml   |2023-02-06 23:15:29.466702|2            |EXECUTED|8:7083de78675d6af112bec737838e8cbb|createTable tableName=minifig                             |
|300|horatiucd|db/changelog/version-1.0.0/schema-init.xml   |2023-02-06 23:15:29.472865|3            |EXECUTED|8:db76242ba57fe4b4883e51313955cae9|addPrimaryKey constraintName=minifig_pk, tableName=minifig|
|400|horatiucd|db/changelog/version-2.0.0/minifig_update.xml|2023-02-06 23:31:21.146004|4            |EXECUTED|8:0fc33fb9a00f989ed96e3e3af48355c9|addColumn tableName=minifig                               |
+---+---------+---------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+

In order to have some data as well, not just the database structure, a few mini-figures may be easily added in the designated table. One straight-forward way is by wiring a CommandLineRunner and provide it with the MinifigRepository.

@Bean
public CommandLineRunner init(MinifigRepository repository) {
	return args -> {
		Minifig harry = new Minifig("Harry Potter");
		Minifig ron = new Minifig("Ron Weasley");
		Minifig hermione = new Minifig("Hermione Granger");

		List.of(harry, ron, hermione)
				.forEach(minifig -> log.info("Persisted {}.", repository.save(minifig)));
	};
}

The application logs reflect what happens when the application is restarted.

Hibernate: select nextval('minifig_seq')
Hibernate: insert into minifig (description, name, id) values (?, ?, ?)
INFO 10516 --- [main] com.hcd.liquirepeat.config.DataLoader : Persisted Minifig(id=4, name=Harry Potter, description=null).
Hibernate: select nextval('minifig_seq')
Hibernate: insert into minifig (description, name, id) values (?, ?, ?)
INFO 10516 --- [main] com.hcd.liquirepeat.config.DataLoader : Persisted Minifig(id=5, name=Ron Weasley, description=null).
Hibernate: select nextval('minifig_seq')
Hibernate: insert into minifig (description, name, id) values (?, ?, ?)
INFO 10516 --- [main] com.hcd.liquirepeat.config.DataLoader : Persisted Minifig(id=6, name=Hermione Granger, description=null).

Handling Repeatable Database Updates

As the plot, let’s assume now that as part of application version 2.0.0, a simple Minifig Report is requested to be created, designed as a view – it contains the Id and Name of all mini-figures.

The code for creating it is straight-forward.

DROP VIEW IF EXISTS liquirepeat."Minifig Report" CASCADE;
CREATE OR REPLACE VIEW liquirepeat."Minifig Report"
    AS
SELECT m.id AS "Minifig ID",
       m.name AS "Minifig Name"
FROM liquirepeat.minifig m;

One option to implement it is to create a new change set file in folder version-2.0.0 and deploy the change. Analyzing a bit more, one may envision that at some point in the future it’s likely for the report to modify and thus, another change set would be needed in the particular version folder so that the update is deployed as well.

A better solution is to be able to just update the script and the application to execute it automatically at the next restart.

According to Liquibase documentation, change sets have an attribute called runOnChange. When this is true, Liquibase detects a modification to a previously applied update and re-runs it.

With this detail acknowledged, let’s add a new change set in the db.changelog-root.xml, having runOnChanged=true and placed in a position where it is always executed the last. The change set runs a plain SQL file – minifig-report.sql – that contains the code for (re)creating the database view.

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd">

    <!-- Version 1.0.0 -->
    <include file="db/changelog/version-1.0.0/schema-init.xml"/>

    <!-- Version 2.0.0 -->
    <include file="db/changelog/version-2.0.0/minifig_update.xml"/>
    
    <changeSet id="repeatable" author="dev-team" runOnChange="true">
        <sqlFile dbms="postgresql" path="db/changelog/run-on-change/minifig-report.sql"/>
    </changeSet>
</databaseChangeLog>

At start-up, Liquibase executes the change set, as usually.

INFO 6128 --- [main] liquibase.lockservice   : Successfully acquired change log lock
INFO 6128 --- [main] liquibase.changelog     : Reading from liquirepeat.databasechangelog
Running Changeset: db/changelog/db.changelog-root.xml::repeatable::dev-team
INFO 6128 --- [main] liquibase.changelog     : SQL in file db/changelog/run-on-change/minifig-report.sql executed
INFO 6128 --- [main] liquibase.changelog     : ChangeSet db/changelog/db.changelog-root.xml::repeatable::dev-team ran successfully in 15ms
INFO 6128 --- [main] liquibase.lockservice   : Successfully released change log lock

The databasechangelog table records this as well.

+----------+---------+---------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+
|id        |author   |filename                                     |dateexecuted              |orderexecuted|exectype|md5sum                            |description                                               |
+----------+---------+---------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+
|100       |horatiucd|db/changelog/version-1.0.0/schema-init.xml   |2023-02-06 23:15:29.458517|1            |EXECUTED|8:be2c93cdecec258121a0e522cde14dbf|createSequence sequenceName=minifig_seq                   |
|200       |horatiucd|db/changelog/version-1.0.0/schema-init.xml   |2023-02-06 23:15:29.466702|2            |EXECUTED|8:7083de78675d6af112bec737838e8cbb|createTable tableName=minifig                             |
|300       |horatiucd|db/changelog/version-1.0.0/schema-init.xml   |2023-02-06 23:15:29.472865|3            |EXECUTED|8:db76242ba57fe4b4883e51313955cae9|addPrimaryKey constraintName=minifig_pk, tableName=minifig|
|400       |horatiucd|db/changelog/version-2.0.0/minifig_update.xml|2023-02-06 23:31:21.146004|4            |EXECUTED|8:0fc33fb9a00f989ed96e3e3af48355c9|addColumn tableName=minifig                               |
|repeatable|dev-team |db/changelog/db.changelog-root.xml           |2023-02-06 23:51:37.876140|5            |EXECUTED|8:93b422e6004aecce9b67018d6b10bc82|sqlFile                                                   |
+----------+---------+---------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+

A few observations are useful at this moment:

  • since this change set is re-executed if changed, its identifier was chosen to be something that illustrates this aspect – repeatable. Also, it shall designate idempotent operations.
  • since the minifig-report.sql file might be updated by a certain developer, the author was set to be a generic one – dev-team.

As a last action, let’s imagine the Minifig Report is requested to be enhanced to also contain the description of a mini-figure.

In order to implement this requirement, a developer edits the minifig-report.sql file and modify the script accordingly.

DROP VIEW IF EXISTS liquirepeat."Minifig Report" CASCADE;
CREATE OR REPLACE VIEW liquirepeat."Minifig Report"
    AS
SELECT m.id AS "Minifig ID",
       m.name AS "Minifig Name",
       m.description AS "Minifig Description"
FROM liquirepeat.minifig m;

At start-up, it is re-run and the report structure is updated, that is the database view is recreated.

INFO 18796 --- [main] liquibase.lockservice  : Successfully acquired change log lock
INFO 18796 --- [main] liquibase.changelog    : Reading from liquirepeat.databasechangelog
Running Changeset: db/changelog/db.changelog-root.xml::repeatable::dev-team
INFO 18796 --- [main] liquibase.changelog    : SQL in file db/changelog/run-on-change/minifig-report.sql executed
INFO 18796 --- [main] liquibase.changelog    : ChangeSet db/changelog/db.changelog-root.xml::repeatable::dev-team ran successfully in 12ms
INFO 18796 --- [main] liquibase.lockservice  : Successfully released change log lock

It is important to note the value in the exectype column of the databasechangelog table has changed from EXECUTED to RERAN. Also, the value of the md5sum has a different value, as the contents of the file was altered and this triggered the re-execution in the first place.

+----------+---------+---------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+
|id        |author   |filename                                     |dateexecuted              |orderexecuted|exectype|md5sum                            |description                                               |
+----------+---------+---------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+
|100       |horatiucd|db/changelog/version-1.0.0/schema-init.xml   |2023-02-06 23:15:29.458517|1            |EXECUTED|8:be2c93cdecec258121a0e522cde14dbf|createSequence sequenceName=minifig_seq                   |
|200       |horatiucd|db/changelog/version-1.0.0/schema-init.xml   |2023-02-06 23:15:29.466702|2            |EXECUTED|8:7083de78675d6af112bec737838e8cbb|createTable tableName=minifig                             |
|300       |horatiucd|db/changelog/version-1.0.0/schema-init.xml   |2023-02-06 23:15:29.472865|3            |EXECUTED|8:db76242ba57fe4b4883e51313955cae9|addPrimaryKey constraintName=minifig_pk, tableName=minifig|
|400       |horatiucd|db/changelog/version-2.0.0/minifig_update.xml|2023-02-06 23:31:21.146004|4            |EXECUTED|8:0fc33fb9a00f989ed96e3e3af48355c9|addColumn tableName=minifig                               |
|repeatable|dev-team |db/changelog/db.changelog-root.xml           |2023-02-06 23:56:36.516859|6            |RERAN   |8:59be58683050b5ac350494d8bfbad7ac|sqlFile                                                   |
+----------+---------+---------------------------------------------+--------------------------+-------------+--------+----------------------------------+----------------------------------------------------------+

Conclusion

The tutorial presented a simple, yet useful and convenient way of automatically re-running database scripts that are periodically updated, without having each time to record this aspect in the root migration file and reflect the modification.

Resources

  1. Liquibase Documentation – https://docs.liquibase.com/home.html
  2. Source code for the sample application – https://github.com/horatiucd/liquirepeat
  3. The picture represents Lego figures, designed and built by my son.

IMAP OAuth 2.0 Authorization in Exchange Online

by Horatiu Dan

Context

Microsoft announced that starting October 2022 Basic authentication for specific protocols in Exchange Online would be considered deprecated and turned off gradually and randomly for certain tenants. As insightful details concerning this topic may be found in Resources items 1 and 2, among these protocols there are Exchange ActiveSync (EAS), POP, IMAP, Remote PowerShell, Exchange Web Services (EWS), Offline Address Book (OAB).

Consequently, customer applications leveraging Basic authentication towards Exchange Online as part of their business use-cases need to replace it with Modern authentication – OAuth 2.0 token-based authorization – which no doubt has many benefits and improvements that help mitigating the former’s risks.

The purpose of this article is to document and show-case how a Java based client application can connect to an e-mail server via IMAP (or IMAPS) protocol using the JavaMail library, after previously obtained an OAuth 2.0 access-token. The token retrieval is implemented in two different manners – by leveraging the OAuth 2.0 Resource Owner Password Credentials (ROPC) grant and using Microsoft Authentication Library (MSAL) for Java – thus letting developers choose the preferred way.

Assumptions

Prior to successfully running the sample code, the Exchange server set-up shall be fulfilled to require an OAuth 2.0 authentication mechanism. Details on how a system administrator may configure it can be found at least in Resources item 7. Moreover, significant gotchas described in Resources item 8 proved to be very helpful. Briefly, when creating the service principal using PowerShell in Exchange Online, one shall use the ObjectId of the enterprise application as the ServiceId, as the application ObjectId is different from the enterprise application ObjectId.

Set-up

The proof of concept uses the following:

  • Java 17
  • Maven 3.6.3
  • Spring Boot version 2.7.5
  • JavaMail version 1.6.2
  • MSAL4J version 1.13.2

Connecting via JavaMail

According to Oracle (Resources item 3), starting with version 1.5.2 JavaMail supports OAuth 2 authentication via SASL XOAUTH2 mechanism. While IMAP and SMTP protocols are covered, POP3 is not. Nevertheless, the proof of concept in this article uses IMAP and demonstrates a simple use-case – a session is created, then used to connect to a store to get a folder with a specified name.

Since the important aspect here is connecting via IMAP and authenticating using OAuth 2, a great deal of other use-cases may be easily implemented as needed.

In order to accomplish the aimed scenario, a simple MailReader component is created.

public class MailReader {

    private final MailProperties mailProperties;
    private final TokenProvider tokenProvider;

    public MailReader(MailProperties mailProperties, TokenProvider tokenProvider) {
        this.mailProperties = mailProperties;
        this.tokenProvider = tokenProvider;
    }

    public Folder getFolder(String name) {
        final Properties props = new Properties();
        props.put("mail.debug", "true");
        props.put("mail.store.protocol", "imaps");
        props.put("mail.imaps.port", 993);
        props.put("mail.imaps.ssl.enable", "true");
        props.put("mail.imaps.starttls.enable", "true");
        props.put("mail.imaps.auth.mechanisms", "XOAUTH2");

        Session session = Session.getInstance(props);
        try {
            Store store = session.getStore();            
            store.connect(mailProperties.getHost(), mailProperties.getUser(), tokenProvider.getAccessToken());
            return store.getFolder(name);
        } catch (MessagingException e) {
            throw new RuntimeException("Unable to connect to the default folder.", e);
        }
    }
}

getFolder() function sets-up the connection properties, connects and returns the folder.

A brief comparison between Basic and OAuth 2.0 authentication methods is worth doing at this point. In order to connect, a client application uses an account set-up accordingly. Concerning the implementation, the differences between the two methods are minimal. For OAuth 2.0:

  • mail.imaps.auth.mechanisms property shall be set to XOAUTH2
  • the password parameter of the Store#connect() method contains an access token instead of the configured password

Retrieving an Access Token

For connecting to store via IMAP(S) using OAuth 2.0 authentication, one shall first obtain an access token. In either of the two ways – ROPC grant or MSAL – the client application needs the following pieces of information whatsoever:

  • the authentication URL
  • the tenant identifier – the tenant directory the user is logged into
  • the client identifier – the Application (client) ID the portal page assigned to the application
  • the client secret, required if the application is a confidential client

The two ways of retrieving the token are described in the next sections. In this direction, this proof of concept defines the following interface

public interface TokenProvider {

    String getAccessToken();
}

and then for each of them an implementation is coded.

Using Resource Owner Password Credentials (ROPC) Grant

The ROPC flow is pretty straight-forward, it requires a single HTTP POST call towards the authorization endpoint that corresponds to the particular tenant. The request shall contain the client identifier, the client secret, the scope (usually https://outlook.office365.com/.default) and the ‘client_credentials’ or ‘password’ grant type. The response contains the access token necessary to connect afterwards.

The implementation uses a WebClient instance to perform the POST call.

public class RopcTokenProvider implements TokenProvider {

    private final MailProperties mailProperties;
    private final WebClient client;

    public RopcTokenProvider(MailProperties mailProperties) {
        this.mailProperties = mailProperties;

        client = WebClient.builder()
                .baseUrl(mailProperties.getAuthUrl())
                .build();
    }

    @Override
    public String getAccessToken() {
        MultiValueMap<String, String> bodyValues = new LinkedMultiValueMap<>();
        bodyValues.add("client_id", mailProperties.getClientId());
        bodyValues.add("client_secret", mailProperties.getClientSecret());
        bodyValues.add("scope", "https://outlook.office365.com/.default");
        bodyValues.add("grant_type", "client_credentials");

        TokenDTO token = client.post()
                .uri("/{tenantId}/oauth2/v2.0/token", mailProperties.getTenantId())
                .contentType(MediaType.APPLICATION_FORM_URLENCODED)
                .accept(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromFormData(bodyValues))
                .retrieve()
                .bodyToMono(TokenDTO.class)
                .block();

        if (token == null) {
            throw new RuntimeException("Unable to retrieve OAuth2 access token.");
        }
        return token.getAccessToken();
    }
}

Two useful aspects are worth observing here:

A successful HTTP POST result has the following form

{
    "token_type": "Bearer",
    "expires_in": 3599,
    "ext_expires_in": 3599,
    "access_token": "the access token"
}

and may be unmarshalled into an object as the TokenDTO below.

@Data
public class TokenDTO {

    @JsonProperty("token_type")
    private String tokenType;

    @JsonProperty("access_token")
    private String accessToken;

    @JsonProperty("expires_in")
    private int expiresIn;

    @JsonProperty("ext_expires_in")
    private int extExpiresIn;
}

The important piece here is obviously the accessToken.

Using Microsoft Authentication Library (MSAL)

In order to retrieve an access token using MSAL for Java, the client application classpath shall be enhanced with the following library:

<dependency>
	<groupId>com.sun.mail</groupId>
	<artifactId>javax.mail</artifactId>
	<version>1.6.2</version>
</dependency>

In this case, getAccessToken() method of the TokenProvider implementation returns directly the token used to connect to the store.

public class MsalTokenProvider implements TokenProvider {

    private final ConfidentialClientApplication confidentialClientApp;
    private final Set<String> scopes;

    public MsalTokenProvider(MailProperties mailProperties) throws MalformedURLException {
        IClientCredential credential = ClientCredentialFactory.createFromSecret(mailProperties.getClientSecret());

        final String authority = String.format("%s/%s",
                mailProperties.getAuthUrl(), mailProperties.getTenantId());
        confidentialClientApp = ConfidentialClientApplication.builder(mailProperties.getClientId(), credential)
                .authority(authority)
                .build();

        scopes = Set.of("https://outlook.office365.com/.default");
    }

    @Override
    public String getAccessToken() {
        try {
            ClientCredentialParameters parameters = ClientCredentialParameters
                    .builder(scopes)
                    .build();

            return confidentialClientApp.acquireToken(parameters)
                    .join()
                    .accessToken();
        } catch (Exception e) {
            throw new RuntimeException("Unable to retrieve OAuth2 access token.", e);
        }
    }
}

Testing the Solutions

The pieces of information needed to authenticate that have been previously mentioned are set as application properties. Except for mail.host and mail.authUrl ones, the values shall be filled in with the actual values configured on the Exchange server.

mail.host = outlook.office365.com
mail.user = technically@correct.com

mail.authUrl = https://login.microsoftonline.com
mail.tenantId = tenantid
mail.clientId = clientid
mail.clientSecret = secret

If either of the following integration tests are run, details may be observed while connecting to the store. As mail.debug session property was set to true, the logs will depict these.

@SpringBootTest
class MailReaderTest {

    @Autowired
    @Qualifier("msalMailReader")
    private MailReader msalMailReader;

    @Autowired
    @Qualifier("ropcMailReader")
    private MailReader ropcMailReader;

    private void getFolder(MailReader mailReader) {
        final String name = "INBOX";
        Folder folder = mailReader.getFolder("INBOX");
        Assertions.assertNotNull(folder);
        Assertions.assertEquals(name, folder.getFullName());
    }

    @Test
    void getFolderMsal() {
        getFolder(msalMailReader);
    }

    @Test
    void getFolderRopc() {
        getFolder(ropcMailReader);
    }
}

What happens when connecting?

DEBUG: JavaMail version 1.6.2
DEBUG: successfully loaded resource: /META-INF/javamail.default.address.map
DEBUG: getProvider() returning javax.mail.Provider[STORE,imaps,com.sun.mail.imap.IMAPSSLStore,Oracle]
DEBUG IMAPS: mail.imap.fetchsize: 16384
DEBUG IMAPS: mail.imap.ignorebodystructuresize: false
DEBUG IMAPS: mail.imap.statuscachetimeout: 1000
DEBUG IMAPS: mail.imap.appendbuffersize: -1
DEBUG IMAPS: mail.imap.minidletime: 10
DEBUG IMAPS: enable STARTTLS
DEBUG IMAPS: closeFoldersOnStoreFailure
DEBUG IMAPS: trying to connect to host "outlook.office365.com", port 993, isSSL true
* OK The Microsoft Exchange IMAP4 service is ready. [TQBOADIAUABSADEANgBDAEEAMAAwADUANgAuAG4AYQBtAHAAcgBkADEANgAuAHAAcgBvAGQALgBvAHUAdABsAG8AbwBrAC4AYwBvAG0A]
A0 CAPABILITY
* CAPABILITY IMAP4 IMAP4rev1 AUTH=PLAIN AUTH=XOAUTH2 SASL-IR UIDPLUS ID UNSELECT CHILDREN IDLE NAMESPACE LITERAL+
A0 OK CAPABILITY completed.
DEBUG IMAPS: AUTH: PLAIN
DEBUG IMAPS: AUTH: XOAUTH2
DEBUG IMAPS: protocolConnect login, host=outlook.office365.com, user=technically@correct.com, password=<non-null>
DEBUG IMAPS: AUTHENTICATE XOAUTH2 command trace suppressed
DEBUG IMAPS: AUTHENTICATE XOAUTH2 command result: A1 OK AUTHENTICATE completed.
A2 CAPABILITY
* CAPABILITY IMAP4 IMAP4rev1 AUTH=PLAIN AUTH=XOAUTH2 SASL-IR UIDPLUS MOVE ID UNSELECT CLIENTACCESSRULES CLIENTNETWORKPRESENCELOCATION BACKENDAUTHENTICATE CHILDREN IDLE NAMESPACE LITERAL+
A2 OK CAPABILITY completed.
DEBUG IMAPS: AUTH: PLAIN
DEBUG IMAPS: AUTH: XOAUTH2

The OAuth 2.0 authentication is successful.

Conclusions

This article documents possible implementations that client applications shall accomplish in order to embrace the Modern authentication (OAuth 2.0) promoted by Microsoft and thus remain functional after Basic Authentication will have been turned off. While the connection via the JavaMail library does not imply major changes, the access token retrieval needs to be coded. Two different mechanims were implemented, either by leveraging the Resource Owner Password Credentials grant or using Microsoft Authentication Library for Java.

The former is apparently more lightweight, as it is done via a HTTP POST call to the Authorization server. According to Microsoft, more secure alternatives are available and recommended (Resources item 5).

The latter needs including the msal4j library as part of the client application classpath, nevertheless, taking into account the recommendations, it seems the preferred manner of retrieving an access token.

Resources

  1. Deprecation of Basic authentication in Exchange Online
  2. Basic Authentication Deprecation in Exchange Online
  3. JavaMail OAuth2 Support
  4. Microsoft Authentication Library (MSAL) for Java
  5. Microsoft identity platform and OAuth 2.0 Resource Owner Password Credentials
  6. RFC 6749
  7. Authenticate an IMAP, POP or SMTP connection using OAuth
  8. Troubleshooting OAuth 2 Authenticatin to IMAP on Office 365 using JavaMail
  9. Sample project here
  10. The picture represents the door of Carta Monastery, in Romania, built around year 1200.

Bypassing Spring Interceptors via Decoration

by Horatiu Dan

Context

Whether they are built using the genuine Spring Framework or Spring Boot, such applications are widely developed and deployed these days. By trying to address simple or complex business challenges, products strongly rely on the used framework features in their attempt to offer elegant solutions. Elegant here means correct, clean and easy to understand and maintain.

In case of a web application, some requests are handled in a way, while others may need extra pre or post processing, or even a change in the initial request. Generally, Servlet Filters are configured and put in force, in order to accommodate such scenarios.

Spring MVC applications, on the other hand, define HandlerInterceptors which are quite similar to Servlet Filters. As per the API reference, a HandlerInterceptor “allows custom pre-processing with the option of prohibiting the execution of the handler itself and custom post-processing”. Usually, a chain of such interceptors is defined based on the HandlerMapping itself, HandlerMapping being the contract that objects defining the mapping between requests and the executors of the requests shall obey.

This article documents a simple, yet very useful way of bypassing some of the configured HandlerInterceptors depending on the requests’ mapping. An out-of-the-box Spring Framework HandlerInterceptor decorator is used – MappedInterceptor.

Set-up

  • Java 17
  • Maven 3.6.3
  • Spring Boot v. 2.7.3
  • A small web application that exposes a single HTTP Get operation in two different manners:
    • As a REST call (JSON representation)
    • As a web page (HTML representation)

Developing the POC

The application developed to showcase the solution is straight-forward. In order to accomplish the two previously mentioned scenarios, two controllers are defined.

The former is a @RestController annotated one and addresses the REST call, while the latter is a @Controller annotated one and handles the HTML part.

@RestController
@Slf4j
@RequiredArgsConstructor
class JokesRestController {

    private final JokesService jokesService;

    @GetMapping("/api/jokes")
    ResponseEntity<List<Joke>> getJokes() {
        log.info("getJokes - Retrieve all jokes representation.");
        return ResponseEntity.ok(jokesService.getJokes());
    }
}

@Controller
@Slf4j
@RequiredArgsConstructor
class JokesController {

    private final JokesService jokesService;

    @GetMapping("/jokes")
    String getJokes(Model model) {
        log.info("getJokes - Render all jokes.");
        model.addAttribute("jokes", jokesService.getJokes());
        return "jokes";
    }
}

Both of them delegate to the same @Service component that provides the content – a few jokes – to be represented in the two aimed manners. Out of simplicity, the JokesService has the entities declared inside, as the scope of this article is focused around the web tier. In a real application, the service obviously would further delegate to a real source of data (a database repository etc.).

A Joke entity is simply described by an identifier and the joke text and represented by the record below:

public record Joke(String id, String text) {

    public Joke(String text) {
        this(UUID.randomUUID().toString(), text);
    }
}

The detail worth observing and essential for the purpose of this article is the slight difference between the controllers’ handler mappings – /api/jokes and /jokes respectively. Since it was decided that both ‘sections’ of this application (REST and non-REST) run in the same JVM and are deployed together, they were separated at URL level. Basically, all REST related URLs are prefixed by /api. Pros and cons around this decision may arise, but in order to sustain the facts presented in this article, this is quite handy.

If we run it, the outcome is as desired:

  • http://localhost:8080/jokes – responds with the the jokes rendered in HTML
HTTP/1.1 200 
Content-Type: text/html;charset=UTF-8
Content-Language: en-US
Transfer-Encoding: chunked
Date: Fri, 16 Sep 2022 08:14:31 GMT
Keep-Alive: timeout=60
Connection: keep-alive

<!DOCTYPE HTML>
<html lang="en">
<head>
    <title>Jokes</title>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
</head>
<body>
<div>
    <table>
        <thead>
        <tr>
            <th align="left">Jokes</th>
        </tr>
        </thead>
        <tbody>
        <tr>
            <td>If Chuck Norris coughs on you, you owe him 50 bucks.</td>
        </tr>
        <tr>
            <td>Chuck Norris can make a slinky go up the stairs.</td>
        </tr>
        <tr>
            <td>Ice has Chuck Norris running through its veins.</td>
        </tr>
        </tbody>
    </table>
</div>
</body>
</html>
  • http://localhost:8080/api/jokes – responds with the jokes represented as JSON
HTTP/1.1 200 
Content-Type: application/json
Transfer-Encoding: chunked
Date: Fri, 16 Sep 2022 08:13:40 GMT
Keep-Alive: timeout=60
Connection: keep-alive

[
  {
    "id": "99798159-673b-470a-9355-09246935a42c",
    "text": "If Chuck Norris coughs on you, you owe him 50 bucks."
  },
  {
    "id": "6c073428-02c8-4a6f-a438-b4fc445adcd3",
    "text": "Chuck Norris can make a slinky go up the stairs."
  },
  {
    "id": "c9b6fcca-d12b-482f-9197-56991e799a15",
    "text": "Ice has Chuck Norris running through its veins."
  }
]

Adding Interceptors

Let’s consider the following requirement – log all requests fulfilled by the application and moreover the session id, where applicable.

Since these are two different concerns, two different HandlerInterceptors are wired in.

@Component
@Slf4j
public class AppInterceptor implements HandlerInterceptor {

    @Override
    public boolean preHandle(HttpServletRequest request, 
			HttpServletResponse response, Object handler) {
			
        log.info("preHandle - {} {} recorded", request.getMethod(), request.getRequestURI());
        return true;
    }
}
@Component
@Slf4j
public class SessionInterceptor implements HandlerInterceptor {

    @Override
    public boolean preHandle(HttpServletRequest request, 
			HttpServletResponse response, Object handler) {
			
        log.info("preHandle - {} {}, session id is {}",
                request.getMethod(), request.getRequestURI(), request.getSession().getId());
        return true;
    }
}
@EnableWebMvc
@Configuration
@RequiredArgsConstructor
public class WebConfig implements WebMvcConfigurer {

    private final AppInterceptor appInterceptor;
    private final SessionInterceptor sessionInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(appInterceptor);
        registry.addInterceptor(sessionInterceptor);
    }
}

AppInterceptor logs each request, while SessionInterceptor logs the corresponding session identifier of each request, both in advance of the actual fulfillment – method preHandle() is overwritten.

If we run it again, the output is identical as before and in the console following are logged:

  • http://localhost:8080/jokes
c.h.m.interceptor.AppInterceptor         : preHandle - GET /jokes recorded
c.h.m.interceptor.SessionInterceptor     : preHandle - GET /jokes, session id is 61A873D6D9697A1D1041B17A47B02285
c.h.m.controller.JokesController         : getJokes - Render all jokes.
  • http://localhost:8080/api/jokes
c.h.m.interceptor.AppInterceptor         : preHandle - GET /api/jokes recorded
c.h.m.interceptor.SessionInterceptor     : preHandle - GET /api/jokes, session id is 61A873D6D9697A1D1041B17A47B02285
c.h.m.controller.JokesRestController     : getJokes - Retrieve all jokes representation.

For each of the requests, both HandlerInterceptors were invoked and they logged the desired information, in the order they were configured. Then, the corresponding handler method was invoked and fulfilled the request.

Analysis and Improvements

If we analyze a bit this simple implementation, we recollect that one of the characteristics of REST is its statelessness – the session state is kept entirely on the client. Thus, in case of the REST call of this implementation, the session identifier logged by the latter HandlerInterceptor is irrelevant.

Furthermore, it means that short-circuiting this interceptor for all REST calls would be an improvement. Here, SessionInterceptor calls HttpServletRequest#getSession(), which further delegates and calls HttpServletRequest#getSession(true). This is a well known call which returns the session associated with the request or creates one, if necessary. In case of REST calls, such a call is useless, pretty expensive and may affect the overall functioning of the service, if a client performs a great deal of REST calls.

Spring Framework defines the org.springframework.web.servlet.handler.MappedInterceptor, which according to its documentation “wraps a HandlerInterceptor and uses URL patterns to determine whether it applies to a given request”. This looks exactly what it is needed here, a way to bypass the SessionInterceptor in case of REST calls, in case of URLs prefixed with '/api'.

MappedInterceptor class defines a few constructors which allow including or excluding a HandlerInterceptor from being called based on URL patterns. The configuration becomes as follows:

@EnableWebMvc
@Configuration
@RequiredArgsConstructor
public class WebConfig implements WebMvcConfigurer {

    private final AppInterceptor appInterceptor;
    private final SessionInterceptor sessionInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(appInterceptor);
        registry.addInterceptor(new MappedInterceptor(null, new String[] {"/api/**"}, sessionInterceptor));
    }
}

Instead of directly registering the sessionInterceptor as above, it is decorated in a MappedInterceptor instance, which excludes all URLs prefixed by '/api'.

Now in the console the following are logged:

  • http://localhost:8080/jokes
c.h.m.interceptor.AppInterceptor         : preHandle - GET /jokes recorded
c.h.m.interceptor.SessionInterceptor     : preHandle - GET /jokes, session id is 8BD55C96F2396494FF8F72CAC5F4EE67
c.h.m.controller.JokesController         : getJokes - Render all jokes.

Both interceptors are executed, before the handler method in JokesController.

  • http://localhost:8080/api/jokes
c.h.m.interceptor.AppInterceptor         : preHandle - GET /api/jokes recorded
c.h.m.controller.JokesRestController     : getJokes - Retrieve all jokes representation.

Only the relevant interceptor is invoked, before the handler method in JokesRestController.

Conclusion

This article documented and demonstrated via a simple use case how a HandlerInterceptor invocation may be bypassed when needed, by leveraging the out-of-the box Spring Framework’s MappedInterceptor.

In real applications, such configurations might be helpful and proactively protect product development teams from hard to depict problems, suddenly arose and apparently out of nowhere.

Resources

  • Sample project is available here.
  • The picture was taken at Barsana Monastery, Romania

Delegating JWT Validation for Greater Flexibility

by Horatiu Dan

Context

In my opinion, the purpose of all software applications that have been created so far, are being and will be developed should primarily be to make humans’ day to day activities easier to fulfill. Humans are the most valuable creations and software applications are great tools that at least could be used by them.

Nowadays, almost every software product exchanges data with at least one other peer software product, which results in huge amounts of data flowing among them. Usually, a request from one product to another needs to pass a set of preconditions before it is considered acceptable and trustworthy.

The purpose of this article is to showcase a simple and flexible, yet efficient and decoupled solution for validating such prerequisites.

Setting the Stage

Let’s consider the next simple and general use case:

  • Service Provider and Client are two applications exchanging data.
  • The Client calls the Service Provider.
  • The operation invoked is executed only after the Client is identified by the Service Provider.
  • The Client identification is done via a token included in the request and validated by the Service Provider.

As part of this article, a small Java project is built and while doing this the token validation strategy is explained.

As JSON Web Tokens (JWT) are widely used nowadays, especially when products need to identify among others, JWT validation was chosen as the concrete implementation. According to RFC7519, a JWT is a compact, encoded, URL-safe string representation of a JSON message.

Very briefly, a JWT has three sections – header, payload and signature.

Encoded, it is a string with three sections, separated by dots:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiJoY2QiLCJpc3MiOiJpc3N1ZXIiLCJhdWQiOiJhdWRpZW5jZSIsImV4cCI6MTY1MDU0OTg1OH0.
rbs6NqNw9KZ4IGuCOjdPpdJqMswTXHn7oNADCzlQHL8

Decoded, it is in JSON format and thus, more readable:

Header – algorithm and type

{
  "alg": "HS256",
  "typ": "JWT"
}

Payload – data (claims)

{
  "sub": "hcd",
  "iss": "issuer",
  "aud": "audience",
  "exp": 1650549858
}

Signature

HMACSHA256(
  base64UrlEncode(header) + "." +
  base64UrlEncode(payload),
  the-256-bit-secret
)

These pieces of information are enough to have an idea about JWTs, let’s start developing.

Initial Implementation

The sample project is built with Java 17 and Maven. The dependencies are very few:

  • io.jsonwebtoken / jjwt – for JWT signing and verification
<dependency>
      <groupId>io.jsonwebtoken</groupId>
      <artifactId>jjwt</artifactId>
      <version>0.9.1</version>
</dependency>

For exploring other available libraries, check https://jwt.io/libraries?language=Java.

  • JUnit 5 and Mockito – for unit testing the implementation
<dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-engine</artifactId>
      <version>5.8.2</version>
      <scope>test</scope>
</dependency>
<dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-junit-jupiter</artifactId>
      <version>4.5.1</version>
      <scope>test</scope>
</dependency>

JWT generation and verification is implemented using the following interface:

public interface JwtManager {

    String generate(String sub, String iss, String aud);

    boolean isValid(String jwt, String iss, String aud);
}

The former method uses the provided parameters (subject, issuer and audience) to create and sign a valid a JWT. The latter checks whether the jwt is valid or not, using the provided issuer and audience.

The goal is to create an implementation and make the following test pass.

class JwtManagerTest {

    private String iss;
    private String aud;
    private String jwt;
    private JwtManager jwtManager;

    @BeforeEach
    void setUp() {
        jwtManager = new JwtManagerImpl();

        iss = "issuer";
        aud = "audience";
        jwt = jwtManager.generate("hcd", iss, aud);
        Assertions.assertNotNull(jwt);
    }

    @Test
    void isValid_coupled() {
        final boolean valid = jwtManager.isValid(jwt, iss, aud);
        Assertions.assertTrue(valid);
    }
}

By leveraging the Jwts builder, the sub, iss, aud are set, the token is configured to expire after 1 minute and moreover, it is signed using the Service Provider secret key.

public String generate(String sub, String iss, String aud) {
	final Date exp = new Date(System.currentTimeMillis() + 60_000);

	return Jwts.builder()
		.setSubject(sub)
		.setIssuer(iss)
		.setAudience(aud)
		.setExpiration(exp)
		.signWith(SignatureAlgorithm.HS256, "s1e2c3r4e5t6k7e8y9")
		.compact();
}

In the other direction, the token is parsed using the same secret key and if it hasn’t expired yet, the payload claims are extracted.

public boolean isValid(String jwt, String iss, String aud) {
	Claims body;
	try {
		body = Jwts.parser()
				.setSigningKey("s1e2c3r4e5t6k7e8y9")
				.parseClaimsJws(jwt)
				.getBody();
	} catch (JwtException e) {
		return false;
	}

	return iss.equals(body.getIssuer()) &&
			aud.equals(body.getAudience());
}

This is straight-forward. Nevertheless, a custom assumption is made in addition to the standard (mandatory) token validations.

“A valid token is acceptable if the issuer and audience conform to specific values.”

Basically this is the plot of the article – how to implement the custom verification for a valid token, as flexible as possible.

If we run the test, it passes, the implementation is correct, but unfortunately, not flexible enough.

At some point, the Service Provider that validates the Client request changes the assumption that has been previously made. This obviously impacts isValid() method whose implementation should to be changed.

Final Implementation

It would be good if whenever the Service Provider makes a change to these preconditions, the standard part of the token validation remains in place. Then the code shall be flexible enough to allow deciding on the custom validation assumptions as late as possible. In order to accommodate this, the code needs to be refactored.

What’s been stated, it’s enclosed in the next interface (even better, @FunctionalInterface).

@FunctionalInterface
public interface ValidationStrategy {

    boolean isValid(Claims body);
}

The strategy is implemented, the last two lines in the isValid() method are moved in the newly implemented strategy. Moreover, we may assume that this is the default validation strategy of the Service Provider.

public class DefaultValidationStrategy implements ValidationStrategy {

    private final String iss;
    private final String aud;

    public DefaultValidationStrategy(String iss, String aud) {
        this.iss = iss;
        this.aud = aud;
    }

    @Override
    public boolean isValid(Claims body) {
        return iss.equals(body.getIssuer()) &&
                aud.equals(body.getAudience());
    }
}

The former method is first deprecated and soon replaced by the new implementation below.

public interface JwtManager {

    String generate(String sub, String iss, String aud);

    /**
     * @deprecated in favor of {@link #isValid(String, ValidationStrategy)}
     */
    @Deprecated(forRemoval = true)
    boolean isValid(String jwt, String iss, String aud);

    boolean isValid(String jwt, ValidationStrategy strategy);
}

Basically, the new method delegates to the ValidationStrategy callback. Delegation (in programming) means exactly this, one entity passes something to another entity.

public boolean isValid(String jwt, ValidationStrategy strategy) {
	Claims body;
	try {
		body = Jwts.parser()
				.setSigningKey(SECRET)
				.parseClaimsJws(jwt)
				.getBody();
	} catch (JwtException e) {
		return false;
	}

	return strategy.isValid(body);
}

In use, the validation is performed as in the following unit test.

@Test
void isValid_looselyCoupled_defaultStrategy() {
	final boolean valid = jwtManager.isValid(jwt,
			new DefaultValidationStrategy(iss, aud));
	Assertions.assertTrue(valid);
} 

With these modifications, the code is flexible enough to accommodate potential changes in the validation strategy. For instance, if the Service Provider decides to check only the issuer, this can be achieved without needing to modify the code that handles the JWT standard part.

@Test
void isValid_looselyCoupled_customStrategy() {
	final boolean valid = jwtManager.isValid(jwt,
			body -> iss.equals(body.getIssuer()));
	Assertions.assertTrue(valid);
}

If we have a look at the previous unit test, we see how handful it is to pass the ValidationStrategy using lambda. Also, I suppose it’s clear the reason for making the ValidationStrategy a @FunctionalInterface from the beginning.

Conclusion

In this article, a decoupled solution for validating JSON Web Tokens was implemented. This solution uses callbacks and thus promotes decoupling and flexibility.

Resources