From cfd76b7addf0a5024782f83a74c38fb275a43931 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Thu, 5 Feb 2026 16:38:37 +0100 Subject: [PATCH] [Fix #1136] Run db writes asynchronously Signed-off-by: fjtirado --- .../DefaultPersistenceInstanceHandlers.java | 49 +++++++-- .../DefaultPersistenceInstanceReader.java | 3 + .../DefaultPersistenceInstanceWriter.java | 100 +++++++++++++----- .../PersistenceInstanceHandlers.java | 7 +- .../PersistenceInstanceReader.java | 2 +- .../PersistenceInstanceWriter.java | 24 +++-- impl/persistence/mvstore/README.md | 6 +- .../test/AbstractPersistenceTest.java | 17 ++- impl/test/db-samples/running.db | Bin 16384 -> 24576 bytes impl/test/db-samples/running_v1.db | Bin 16384 -> 24576 bytes impl/test/db-samples/suspended.db | Bin 12288 -> 16384 bytes impl/test/db-samples/suspended_v1.db | Bin 12288 -> 16384 bytes .../impl/test/MvStorePersistenceTest.java | 10 +- 13 files changed, 164 insertions(+), 54 deletions(-) diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java index 8231a0078..dc354b63b 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java @@ -17,17 +17,53 @@ import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers { - private final PersistenceInstanceStore store; + public static class Builder { + + private final PersistenceInstanceStore store; + private ExecutorService executorService; + private Duration closeTimeout; + + private Builder(PersistenceInstanceStore store) { + this.store = store; + } + + public Builder withExecutorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + public Builder withCloseTimeout(Duration closeTimeout) { + this.closeTimeout = closeTimeout; + return this; + } - public static DefaultPersistenceInstanceHandlers from(PersistenceInstanceStore store) { - return new DefaultPersistenceInstanceHandlers( - new DefaultPersistenceInstanceWriter(store), - new DefaultPersistenceInstanceReader(store), - store); + public PersistenceInstanceHandlers build() { + return new DefaultPersistenceInstanceHandlers( + new DefaultPersistenceInstanceWriter( + store, + Optional.ofNullable(executorService), + closeTimeout == null ? Duration.ofSeconds(1) : closeTimeout), + new DefaultPersistenceInstanceReader(store), + store); + } } + public static Builder builder(PersistenceInstanceStore store) { + return new Builder(store); + } + + public static PersistenceInstanceHandlers from(PersistenceInstanceStore store) { + return new Builder(store).build(); + } + + private final PersistenceInstanceStore store; + private DefaultPersistenceInstanceHandlers( PersistenceInstanceWriter writer, PersistenceInstanceReader reader, @@ -38,6 +74,7 @@ private DefaultPersistenceInstanceHandlers( @Override public void close() { + super.close(); safeClose(store); } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java index 19efa14af..2c1473d13 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java @@ -55,4 +55,7 @@ public Stream scanAll(WorkflowDefinition definition, String ap .onClose(() -> transaction.commit(definition)) .map(v -> new WorkflowPersistenceInstance(definition, v)); } + + @Override + public void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java index e67a1cb4d..f599a743b 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java @@ -17,75 +17,123 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinitionData; import io.serverlessworkflow.impl.WorkflowStatus; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter { private final PersistenceInstanceStore store; + private final Map> futuresMap = new ConcurrentHashMap<>(); + private final Optional executorService; + private final Duration closeTimeout; - protected DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) { + protected DefaultPersistenceInstanceWriter( + PersistenceInstanceStore store, + Optional executorService, + Duration closeTimeout) { this.store = store; + this.executorService = executorService; + this.closeTimeout = closeTimeout; } @Override - public void started(WorkflowContextData workflowContext) { - doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext); + public CompletableFuture started(WorkflowContextData workflowContext) { + return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext); } @Override - public void completed(WorkflowContextData workflowContext) { - removeProcessInstance(workflowContext); + public CompletableFuture completed(WorkflowContextData workflowContext) { + return removeProcessInstance(workflowContext); } @Override - public void failed(WorkflowContextData workflowContext, Throwable ex) { - removeProcessInstance(workflowContext); + public CompletableFuture failed(WorkflowContextData workflowContext, Throwable ex) { + return removeProcessInstance(workflowContext); } @Override - public void aborted(WorkflowContextData workflowContext) { - removeProcessInstance(workflowContext); + public CompletableFuture aborted(WorkflowContextData workflowContext) { + return removeProcessInstance(workflowContext); } - protected void removeProcessInstance(WorkflowContextData workflowContext) { - doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext); + protected CompletableFuture removeProcessInstance(WorkflowContextData workflowContext) { + return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext) + .thenRun(() -> futuresMap.remove(workflowContext.instanceData().id())); } @Override - public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) { - // not recording + public CompletableFuture taskStarted( + WorkflowContextData workflowContext, TaskContextData taskContext) { + return CompletableFuture.completedFuture(null); } @Override - public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext); + public CompletableFuture taskRetried( + WorkflowContextData workflowContext, TaskContextData taskContext) { + return doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext); } @Override - public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext); + public CompletableFuture taskCompleted( + WorkflowContextData workflowContext, TaskContextData taskContext) { + return doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext); } @Override - public void suspended(WorkflowContextData workflowContext) { - doTransaction(t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext); + public CompletableFuture suspended(WorkflowContextData workflowContext) { + return doTransaction( + t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext); } @Override - public void resumed(WorkflowContextData workflowContext) { - doTransaction(t -> t.clearStatus(workflowContext), workflowContext); + public CompletableFuture resumed(WorkflowContextData workflowContext) { + return doTransaction(t -> t.clearStatus(workflowContext), workflowContext); } - private void doTransaction( - Consumer operations, WorkflowContextData context) { + private CompletableFuture doTransaction( + Consumer operation, WorkflowContextData context) { + final ExecutorService service = + this.executorService.orElse(context.definition().application().executorService()); + final Runnable runnable = () -> executeTransaction(operation, context.definition()); + return futuresMap.compute( + context.instanceData().id(), + (k, v) -> + v == null + ? CompletableFuture.runAsync(runnable, service) + : v.thenRunAsync(runnable, service)); + } + + private void executeTransaction( + Consumer operation, WorkflowDefinitionData definition) { PersistenceInstanceTransaction transaction = store.begin(); try { - operations.accept(transaction); - transaction.commit(context.definition()); + operation.accept(transaction); + transaction.commit(definition); } catch (Exception ex) { - transaction.rollback(context.definition()); + transaction.rollback(definition); throw ex; } } + + @Override + public void close() { + futuresMap.clear(); + executorService.ifPresent( + e -> { + try { + e.awaitTermination(closeTimeout.toMillis(), TimeUnit.MILLISECONDS); + e.shutdown(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + }); + } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java index 84dd96c48..beec1c672 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java @@ -15,6 +15,8 @@ */ package io.serverlessworkflow.impl.persistence; +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; + public class PersistenceInstanceHandlers implements AutoCloseable { private final PersistenceInstanceWriter writer; @@ -35,5 +37,8 @@ public PersistenceInstanceReader reader() { } @Override - public void close() {} + public void close() { + safeClose(writer); + safeClose(reader); + } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java index 73e78879b..d7736c8f5 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java @@ -20,7 +20,7 @@ import java.util.Optional; import java.util.stream.Stream; -public interface PersistenceInstanceReader { +public interface PersistenceInstanceReader extends AutoCloseable { default Stream scanAll(WorkflowDefinition definition) { return scanAll(definition, definition.application().id()); diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java index 55f79faff..31ce235a5 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java @@ -17,24 +17,28 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; +import java.util.concurrent.CompletableFuture; -public interface PersistenceInstanceWriter { +public interface PersistenceInstanceWriter extends AutoCloseable { - void started(WorkflowContextData workflowContext); + CompletableFuture started(WorkflowContextData workflowContext); - void completed(WorkflowContextData workflowContext); + CompletableFuture completed(WorkflowContextData workflowContext); - void failed(WorkflowContextData workflowContext, Throwable ex); + CompletableFuture failed(WorkflowContextData workflowContext, Throwable ex); - void aborted(WorkflowContextData workflowContext); + CompletableFuture aborted(WorkflowContextData workflowContext); - void suspended(WorkflowContextData workflowContext); + CompletableFuture suspended(WorkflowContextData workflowContext); - void resumed(WorkflowContextData workflowContext); + CompletableFuture resumed(WorkflowContextData workflowContext); - void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext); + CompletableFuture taskRetried( + WorkflowContextData workflowContext, TaskContextData taskContext); - void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext); + CompletableFuture taskStarted( + WorkflowContextData workflowContext, TaskContextData taskContext); - void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext); + CompletableFuture taskCompleted( + WorkflowContextData workflowContext, TaskContextData taskContext); } diff --git a/impl/persistence/mvstore/README.md b/impl/persistence/mvstore/README.md index ff67da685..c20058ca6 100644 --- a/impl/persistence/mvstore/README.md +++ b/impl/persistence/mvstore/README.md @@ -8,14 +8,14 @@ This document explains how to enable persistence using MVStore as underlying per To enable MVStore persistence, users should at least do the following things: - Initialize a MVStorePersistenceStore instance, passing the path of the file containing the persisted information -- Pass this MVStorePersitenceStore as argument of BytesMapPersistenceInstanceHandlers.builder. This will create PersistenceInstanceWriter and PersistenceInstanceReader. +- Pass this MVStorePersitenceStore as argument of DefaultPersistenceInstanceHandlers.from. This will create PersistenceInstanceWriter and PersistenceInstanceReader. - Use the PersistenceInstanceWriter created in the previous step to decorate the existing WorkflowApplication builder. The code will look like this ---- try (PersistenceInstanceHandlers handlers = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore("test.db")) + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore("test.db")) .build(); WorkflowApplication application = PersistenceApplicationBuilder.builder( @@ -33,7 +33,7 @@ If user wants to resume execution of all previously existing instances (typicall Once retrieved, calling `start` method will resume the execution after the latest completed task before the running JVM was stopped. ---- - handlers.reader().readAll(definition).values().forEach(WorkflowInstance::start); + handlers.reader().scanAll(definition).forEach(WorkflowInstance::start); ---- --- diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java index 0051dead4..b0994ebac 100644 --- a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java @@ -41,6 +41,7 @@ import java.time.Instant; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executors; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -67,7 +68,10 @@ static void init() throws IOException { @BeforeEach void setup() { - handlers = DefaultPersistenceInstanceHandlers.from(persistenceStore()); + handlers = + DefaultPersistenceInstanceHandlers.builder(persistenceStore()) + .withExecutorService(Executors.newSingleThreadExecutor()) + .build(); context = app.modelFactory().fromNull(); workflowContext = mock(WorkflowContext.class); workflowInstance = mock(WorkflowInstance.class); @@ -117,8 +121,8 @@ void testWorkflowInstance() throws InterruptedException { final Map completedMap = Map.of("name", "fulanito"); - handlers.writer().started(workflowContext); - handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries)); + handlers.writer().started(workflowContext).join(); + handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries)).join(); Optional optional = handlers.reader().find(definition, workflowInstance.id()); assertThat(optional).isPresent(); WorkflowPersistenceInstance instance = (WorkflowPersistenceInstance) optional.orElseThrow(); @@ -135,7 +139,10 @@ void testWorkflowInstance() throws InterruptedException { assertThat(retryAttempt.getValue()).isEqualTo(numRetries); // task completed - handlers.writer().taskCompleted(workflowContext, completedTaskContext(position, completedMap)); + handlers + .writer() + .taskCompleted(workflowContext, completedTaskContext(position, completedMap)) + .join(); instance = (WorkflowPersistenceInstance) handlers.reader().find(definition, workflowInstance.id()).orElseThrow(); @@ -157,7 +164,7 @@ void testWorkflowInstance() throws InterruptedException { assertThat(transition.getValue().isEndNode()).isTrue(); // workflow completed - handlers.writer().completed(workflowContext); + handlers.writer().completed(workflowContext).join(); assertThat(handlers.reader().find(definition, workflowInstance.id())).isEmpty(); } } diff --git a/impl/test/db-samples/running.db b/impl/test/db-samples/running.db index 461b7996b3fac60d9835b840c9ebc2a600053526..531b9ae9c90ff6740c2e3a88b0b3f6885172ec43 100644 GIT binary patch delta 1174 zcmc(e&2JJx7>8$QEEPY}c&RaJK*~WgrMoi&%uZqiy-3nhnkYw;Wjj~|76MCyUf|3@ z4dcng#6%CK2NU8+PmTYDcmIz%>=%%7cP=}5CxQ2Ue$P7^!OA4>(a2X0?5F_ju&IF{y`zxmZkWSsoT*h#KnK z{hl~lK;#t)PK5?-ht#l(i?9r8TUNhg+SY5X5va#J;Mg><{FB_imIvU$1i0tt=JfDy zY!Y8j2Hnpis5OOS9g8kE=@H3g-4sf)6W@^MyU_|=nwGB0S$7B049!rT1wpeM%^G3L z)FY7ff;Kf3^nNs^T6+f)@-+Sy?nxujfmwN)_z;i005{jb#)UNeRESDt8-9_$@%PKban$xsnEB29i=VmsBW8*<4Z>`v-20WBmXC delta 8 PcmZoTz}V2hxWEAb4^aan diff --git a/impl/test/db-samples/running_v1.db b/impl/test/db-samples/running_v1.db index fa4c2bebeb8825fe11610f648fc7c28337226283..cc9a861055d1f49d842d03fcb759e43df0812e6f 100644 GIT binary patch delta 1185 zcmc(ey>AmS7{={XG$nkfuyjF?6a=EO(mUJ9-Fa%nLZvoI5oAPi9EXM^cTsYQ(kYo3 zI)D^!NC+etP`3y%z=Zf)_$Qc(b07IoyW`<(y(ii4^Lw6mJci?6@YCMJSbsrKOS;TN z9sWx87)EVAWI4modxzPm>v=vC#%xs`(?EWsEg2?x!^#mV&_H&34bNo+xpL^U9P&NE zs4diVw{#VN1WPCYZ>sY4z3W%>g;^{3+REF0cx%(;ZJ8~U%Ln^U3eWdT#czL&H&C!y znJtx(=t}O(O7s*)O?kqES*3L>EHvOPW>ULu)wu)n(1sWin1dFi0Kl(nB?yurDUSPY zGiydWRv+-s(H9t0wc={a6HP5)l|!pBkmZ2MjKz=<&16@rSk#L5ST6jdj^q0V=c6}lpoUS_!&s4*Op(`_w0x2=We zG?bqYJ?B;J`C&4pTYCpG>MZ>h9w;Z%fj87y=3_eX5^Sx3jSE&pmJn^p1Nd3}#*c4D zxa9)3I>*^N5yV*sbBJ;JluhoA>i^f#(ITL+50^XoJ_;U*g171h4nh_N&zw6G2X7X_ za4@5JTg>O^(gD4IYH4JnSzV4BUWwbaChb4#ix4g gbm8@>{-Kw8Ha+?tgfx~J!=Xzdl{%#MBBU_&59sD)$p8QV delta 8 PcmZoTz}V2hxWEAb4^aan diff --git a/impl/test/db-samples/suspended.db b/impl/test/db-samples/suspended.db index 0450b6af36164390f1eebb029355f45b85877464..7f47c2833bda68b3a7ec055c4a35f451b1bd74b1 100644 GIT binary patch delta 458 zcmZ9JPfh|c6o*@kpb&{3fQW!Duz~4J8A?|YS0oCU7&mU5DMg6$FT)JMmBfu}>Dt6A z5aS6vhj($9Q8d0y(_hl}-uHEWGoQm(<(iK|=Z>k!5)Q-2Gj7Esy4l8z5voL|$0+h$ zH(*p*mDx}P3dXizDlsY5g|4Hi@F!!}iJm!P7BGSWmosfJl*@W=8-P?b^$1{I&Ps>r z&8T6G%9qm7-mYWVqSo&B&pM~=%TBNR_EDO`1Gb9UpBQmp7=gI@9)jh5rBI+yg&J(Y zCL{pB+{Pya8IXxtYFPFv#}5)FWEh#2%l{MV$Qn;YKc*!KEmz{~JR(;~Lf%Y(v_t?| z@neh$O$1B>Du>7x3tn z7+=FT@Le36qVc;l{U`mt@AvEcW~8m8le(Rr8gNlcKFPeGoLf$JA!Ndfm&8O+W;iWiAMml>Zx#$ z+zcC54E0(#+}pLyv8*(^-P6`d^P<&hzkL*@aG$SY{wGGllcp~(zlRW2F6D9tq)>x3 zs6z$-%xru@kOHZQr9#wI>F7bhn2eyF<%s`;b!3gEvK!H|m{#B@I}6Dvi^=sENQ(uq z6+a;i8?gXFQD6rX*>C#m<<>5WB(lJI5;;Nyu4#*Vu3<>hu}5k)s9Kb&JI4T&7a*$l l?DG6Mv&bilKi~0dc3??g45h~#W?$2pAws!r<>t3w`U??Vh=%|G delta 7 OcmZo@U~EWOU;qFM90IZc diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index c1011a3be..7dcd71211 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -29,18 +29,23 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.Collection; import java.util.Map; +import java.util.concurrent.Executors; import java.util.stream.Stream; import org.junit.jupiter.api.Test; public class MvStorePersistenceTest { @Test - void testSimpleRun() throws IOException { + void testSimpleRun() throws Exception { final String dbName = "db-samples/simple.db"; try (PersistenceInstanceHandlers handlers = - DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); + DefaultPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) + .withExecutorService(Executors.newSingleThreadExecutor()) + .withCloseTimeout(Duration.ofMillis(100)) + .build(); WorkflowApplication application = PersistenceApplicationBuilder.builder(WorkflowApplication.builder(), handlers.writer()) .build(); ) { @@ -49,6 +54,7 @@ void testSimpleRun() throws IOException { readWorkflowFromClasspath("workflows-samples/simple-expression.yaml")); assertNoInstance(handlers, definition); definition.instance(Map.of()).start().join(); + handlers.writer().close(); assertNoInstance(handlers, definition); } finally { Files.delete(Path.of(dbName));