TWR - The Waiting room

The Waiting Room

Copyright © 2023 Dr Ant Kutschera - see bottom of page for details

A software product designed to enhance the consumption of Apache Kafka topics, providing guaranteed ordering at the key level, rather than just the partition. This allows consumers to retry in the case of failures, without either losing the order (e.g. by sending to a dead letter topic), or blocking other records in the partition with different keys. Whether or not to guarantee ordering can be set per record. It additionally supports delayed delivery, backoff during retries and time-to-live per record. This allows you to build scalable concurrent self-healing systems, while still fulfilling ordering requirements as necessary.



Concept

Distributed software architectures allow improvements in availability and scalability but suffer from network partitions and that leads to a whole list of associated problem, e.g. Eleven Patterns, Problems & Solutions related to Microservices and in particular Distributed Architectures. A pattern often applied to solve global data consistency requirements, is the transactional outbox pattern where instead of sending events outside of the transaction used to commit data in a source component, an outbox table is written to, within the same ACID transaction. The contents of that outbox table are processed asynchronously, resulting in eventual consistency. That pattern has been implemented in many libraries and software products, as well as in the cloud. Most frequently, the outbox is used to send (domain) events to downstream services. However, it is also possible to make the source component respond to the event itself, execute some logic, like making a call to a downstream REST based service, sending an email, or integrating anything else that does not have a event based API.

Regardless of how it is used, little is written about consuming from the outbox and what happens when failures during the processing of the events, occur. Strongly related to those failures is the ordering of the events. For example an error while processing the first event in the outbox can lead to a second event overtaking the first one. Sometimes, that is not good, and complicates understanding what happened.

When working with a monolithic architecture, understanding problems in the software is often greatly simplfied in comparison to those problems described above for distributed architectures. This is because the process steps are often serialised, especially when considering just a single instance of an "aggregate", like an insurance case, a shopping cart, etc. While not always advantageous, required or even necessary, such serialisation, can greatly simplify the problem at hand. In certain cases, not processing events in the correct order can lead to globally inconsistent data, a.k.a. corrupt data.

This software library acts as middleware sitting between your application and Kafka, providing a transactional outbox, but also an inbox (a so called Oibox™). It takes care of serialising the processing of events for any given business key, allowing the processing to be attempted multiple times in cases of failure (using automated retries and a configurable backoff strategy), effectively making other events for the same key wait, so that they are processed later, in the correct order. Note that not all events must be blocked (it is configurable per event).

While this library depends on Debezium to help it with ordering data, it does not require the deployment of other products like Apache Kafka Connect, since it provides all the necessary infrastructure to run in clustered mode.

This software library is not solely dependent upon a transactional outbox and can also be used in cases where your software simply needs to consume Kafka records, but with the same advantages described above or in the feature list.


Resources

Open Source Code, licensed under Apache 2.0, is available at https://github.com/twr-rocks

Maven artifacts, Docker images, etc. coming soon.

Documentation coming soon...


Features

  • Guaranteed Ordering, where required, configured at the event level
  • Partitioning by individual business keys
  • Self-healing mechanisms
  • Automatic retry, with various backoff strategies
  • Delayed Execution
  • Mapping Abilities - for compatibility with other outbox implementations
  • At least once delivery, with consumer-side deduplication
  • Locking / Reserving mechanisms, where required
  • Time-to-live (TTL) where required
  • Support for Quarkus, Spring, plain Java and other languages
  • Scalability and load-balancing, without overloading service providers

How It Works

Outbox

Outbox functionality is used to send messages to Kafka, if and only if the transaction used to also write business (functional) data atomically together with the messages, is committed. It is based on an embedded Debezium Engine so that you don't need to run a Kafka Connect cluster in addition to your application, which probably has a number of concurrently running nodes running to ensure high availability and scalability.

  • (1) Your application uses the TWR API to publish messages in the same database transaction as it uses an ORM or similar library to write business data to the database.
  • (2) When that transaction is committed, Debezium is triggered to process the data from the outbox table.
  • (3) You can supply optional Java mapping logic to TWR which is convert the raw data from the outbox table into a first class Kafka record, or you can let TWR use it's standard mapping logic to create JSON or Avro based records. The published record can be designed to be consumed by other applications, or by the same application.

Inbox

Inbox functionality is used to receive messages from Kafka, but provides you with some standard features:

  • Deduplication - because default consumption from Kafka provides at-least-once semantics
  • Retry & Backoff - letting you throw an exception if you were unable to consume the record successfully, and try to consume it again at a later time, e.g. using an exponential backoff strategy.
  • Guaranteed ordering at the level of the Kafka key, rather than the default of just the Kafka partition

  • (1) Any topic of interest, perhaps one written to using the outbox, is subscribed and polled.
  • (2) Records are not initially processed by your application, rather TWR takes them and uses a table in your applications database to check for duplicates. The table is also used for sending signals back to TWR in the correct order, any time that your application attempts to process such a record.
  • (3) An embedded Debezium is used to check the signals which TWR writes to the database.
  • (4 & 5) This results in control records being written to some internal topics which are used to a) ensure that records are queued by their key, b) block them in case of failure to ensure that they are processed in the correct order, and delay them according to the backoff strategy you select, until they should be retried.
  • (6) TWR sends the record to your application code, which can tell TWR if it was able to successfully process the record, or if it failed, in which case TWR will first wait, and then eventually call your application again.
  • (7) When successfully processing a record, you will typically write business data in the same transaction as TWR will update the control data used by it internally. This is similar to the outbox functionality shown previously.

Hybrid

In hybrid mode, an Oibox™ table is used (an outbox, but also an inbox), so that the same component which writes to the outbox is the consumer of the outbox. This is useful if you are orchestrating a number of resources that cannot be bound into a single transaction, e.g. if you are writing to a number of REST services, sending an email, or similar.

  • (1) Your application writes to the Oibox™ to say that something still needs to be done, and you only want it to be done if the transaction is committed. You might also write business data at the same time.
  • (2) An embedded Debezium is used to receive the Oibox™ contents
  • (3) TWR sends the contents to an internal topic, in which the order of the contents is guaranteed to be the insert order
  • (4 & 5) TWR processes the contents just like it does when having just an inbox
  • (6 & 7) TWR triggers your application to process the contents and for example call a REST based service. Your application can response positively or negatively. In negative cases TWR will retry in the future and block any further records having the same key, so that the order is maintained.
  • (8) In the positive case where your application suceeds in processing the record, TWR will send any further records that have been waiting on that predecessing record.


Requirements

  1. Backoff During Failure - If a standard Kafka consumer faces a problem while processing a record, and ordering is paramount, it cannot simply discard the record or push it to a dead letter topic, without at the very least sending any further records with the same key, or perhaps all records of that partition, to the dead letter topic. It gets tricky and expensive to ensure that no data is lost and ordering is maintained. Another solution is to shutdown the consumer and restart it later. This forces unnecessary rebalancing of the partitions amongst surviving consumers and causes entire partitions to be blocked. And it requires that you program such a backoff strategy.
    A standard solution should exist, but doesn't. Hence the requirement for TWR to handle failure with diverse backoff strategies, seamlessly with only the need for simple configuration.
  2. Blocking At the Key Level - If a Kafka consumer is stopped or paused as part of a backoff strategy, then all the records in the partition containing the record with the failure, are blocked with it. TWR should be able to partition based on the record key, so that records with other keys can be allowed through without being blocked.
  3. Delayed Processing - TBD
  4. Deduplication - TBD
  5. Guaranteed Ordering - TBD
  6. High Resilience - TBD Scalable, performant, etc., thanks to Kafka and Kafka Streams
  7. Monitoring & Alerting - TBD
  8. Administration - TBD
  9. Security - TBD
  10. TBD - TBD

Design Choices

In 2018 we developed a basic outbox pattern which used the database as its source. The proof of concept can be found here: https://github.com/maxant/commands-demo. We soon learned two things. First of all, guaranteeing order, when faced with failures is very hard. Second, the design doesn't work well under load when the outbox contains a lot of tasks. In the mean time, the go-to-reference for the transactional outbox, https://microservices.io/patterns/data/transactional-outbox.html says:

So ordering, is very important. And we learned that the hard way with incidents reported in production that small amounts of data were globally inconsistent even though all pending outbox tasks were processed. The cases where we found inconsistencies were thanks to us having the data replicated once using an audit trail and once using the source data. Our assumption is that there are other inconsistencies in the landscape which we know nothing about! The easiest, cheapest and most reliable way to get ordering is to use a "message relay" based on CDC (Change Data Capture), just like it is described on microservices.io, namely using a product like Debezium.

One of the main requirements that this product fulfils is exponential backoff while processing failed messages. We could store failed messages in a database, but we have the problem of how to get them out of the database and back into the component, in the correct order. When multiple instances of a component are running concurrently, we then have contention issues and the experiences we had back in 2018, trying to solve those problems, while maintaining order, led us to make the next design choice, namely to based TWR on Kafka.

The next design choice was to separate writing into the outbox, and processing from the outbox. In our original implementation from 2018, we used the outbox primarly for processing the intention to call REST services during orchestration, to write data, after committing our own local creates/updates/deletes. This led to a design whereby our components were the actual subscribers of the outbox, because they had the knowledge of how to make these REST calls. In the mean time, it might be more customary to decouple the architecture more and simply send out events. As a result, we decided that it makes more sense to split the two concerns, so that writing to the outbox and later processing from the outbox (regardless of whether the source component or a downstream component is consuming), are separate single responsibilities. This was partially influenced by the description of the outbox pattern on microservices.io which only talks about sending messages/events, and partly based on the ability of Debezium to publish domain events directly using SMTs or more simply using their outbox pattern. Next, came the following article: https://softwaremill.com/microservices-101/ which talks about an inbox and using it for deduplication. Here too, the production and consumption of outbox tasks is separated.

Since it is the responsibility of the message relay to process data from the transaction log and push it into Kafka, it must also handle failure and retries. Debezium does this very well. Hence it is only on the consumer side, while processing the events out of the outbox, where components may require the functionality provided by TWR, e.g. retries with (exponential) backoff, administration of "poison pills", blocking of only single aggregate IDs, etc.