Add parallelism to message pooling process#100
Conversation
|
Comment
You can also request review from a specific team by commenting 💡 If you see something that doesn't look right, check the configuration guide. |
There was a problem hiding this comment.
Pull request overview
This PR introduces parallelism to the message polling process to improve throughput when the outbox table has accumulated significant lag. The implementation adds configurable parallel polling using multiple concurrent database queries with the new TkmsMessagePooler class, along with supporting DAO methods for offset-based pagination and message ID range queries.
- Adds a new
TkmsMessagePoolerclass that dynamically switches between sequential and parallel polling modes based on message availability - Introduces
pollerParallelismconfiguration property (default: 1) to control the number of parallel database queries - Extends DAO interface with offset-based
getMessages()method andgetMinMessageId()/getMaxMessageId()queries for both PostgreSQL and MariaDB
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsMessagePooler.java | New class implementing parallel message polling logic with dynamic mode switching and CompletableFuture-based concurrency |
| tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java | Integration point that instantiates TkmsMessagePooler and replaces direct DAO calls with pooler invocation |
| tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java | Adds pollerParallelism property with validation and shard-specific configuration support |
| tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/ITkmsDao.java | Extends interface with offset-based getMessages() and min/max message ID methods |
| tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsDao.java | Implements new DAO methods with offset support and min/max ID queries, including metrics tracking |
| tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsPostgresDao.java | PostgreSQL-specific SQL implementations with index hints for new query methods |
| tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsMariaDao.java | MariaDB-specific SQL implementations with index hints for new query methods |
| tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/dao/FaultInjectedTkmsDao.java | Test wrapper updated to delegate new DAO methods |
| tw-tkms-starter/src/test/resources/application.yml | Test configuration adds pollerParallelism setting |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Context
To speeded up message pooling when out_box accumulated huge lag introduced parallelism to message pooling process
This is related to incident with capability-hub https://wise.enterprise.slack.com/archives/C0A0JNC8MTJ
By default processing does not change if poller-parallelism property is not configured
Checklist