Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private DefaultPersistenceInstanceHandlers(

@Override
public void close() {
super.close();
safeClose(store);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,7 @@ public Stream<WorkflowInstance> scanAll(WorkflowDefinition definition, String ap
.onClose(() -> transaction.commit(definition))
.map(v -> new WorkflowPersistenceInstance(definition, v));
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,74 +18,99 @@
import io.serverlessworkflow.impl.TaskContextData;
import io.serverlessworkflow.impl.WorkflowContextData;
import io.serverlessworkflow.impl.WorkflowStatus;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter {

private static final AtomicInteger threadNameCounter = new AtomicInteger();

private final PersistenceInstanceStore store;
private final ExecutorService executorService;

protected DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) {
this.store = store;
this.executorService =
Executors.newSingleThreadExecutor(
r -> new Thread(r, "SWFPersistenceWriter-" + threadNameCounter.getAndIncrement()));
}

@Override
public void started(WorkflowContextData workflowContext) {
doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext);
public CompletableFuture<Void> started(WorkflowContextData workflowContext) {
return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext);
}

@Override
public void completed(WorkflowContextData workflowContext) {
removeProcessInstance(workflowContext);
public CompletableFuture<Void> completed(WorkflowContextData workflowContext) {
return removeProcessInstance(workflowContext);
}

@Override
public void failed(WorkflowContextData workflowContext, Throwable ex) {
removeProcessInstance(workflowContext);
public CompletableFuture<Void> failed(WorkflowContextData workflowContext, Throwable ex) {
return removeProcessInstance(workflowContext);
}

@Override
public void aborted(WorkflowContextData workflowContext) {
removeProcessInstance(workflowContext);
public CompletableFuture<Void> aborted(WorkflowContextData workflowContext) {
return removeProcessInstance(workflowContext);
}

protected void removeProcessInstance(WorkflowContextData workflowContext) {
doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext);
protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext);
}

@Override
public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {
// not recording
public CompletableFuture<Void> taskStarted(
WorkflowContextData workflowContext, TaskContextData taskContext) {
return CompletableFuture.completedFuture(null);
}

@Override
public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) {
doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext);
public CompletableFuture<Void> taskRetried(
WorkflowContextData workflowContext, TaskContextData taskContext) {
return doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext);
}

@Override
public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) {
doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext);
public CompletableFuture<Void> taskCompleted(
WorkflowContextData workflowContext, TaskContextData taskContext) {
return doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext);
}

@Override
public void suspended(WorkflowContextData workflowContext) {
doTransaction(t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext);
public CompletableFuture<Void> suspended(WorkflowContextData workflowContext) {
return doTransaction(
t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext);
}

@Override
public void resumed(WorkflowContextData workflowContext) {
doTransaction(t -> t.clearStatus(workflowContext), workflowContext);
public CompletableFuture<Void> resumed(WorkflowContextData workflowContext) {
return doTransaction(t -> t.clearStatus(workflowContext), workflowContext);
}

private void doTransaction(
private CompletableFuture<Void> doTransaction(
Consumer<PersistenceInstanceTransaction> operations, WorkflowContextData context) {
PersistenceInstanceTransaction transaction = store.begin();
try {
operations.accept(transaction);
transaction.commit(context.definition());
} catch (Exception ex) {
transaction.rollback(context.definition());
throw ex;
}
return CompletableFuture.runAsync(
() -> {
PersistenceInstanceTransaction transaction = store.begin();
try {
operations.accept(transaction);
transaction.commit(context.definition());
} catch (Exception ex) {
transaction.rollback(context.definition());
throw ex;
}
},
executorService);
}

@Override
public void close() throws InterruptedException {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.serverlessworkflow.impl.persistence;

import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;

public class PersistenceInstanceHandlers implements AutoCloseable {

private final PersistenceInstanceWriter writer;
Expand All @@ -35,5 +37,8 @@ public PersistenceInstanceReader reader() {
}

@Override
public void close() {}
public void close() {
safeClose(writer);
safeClose(reader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Optional;
import java.util.stream.Stream;

public interface PersistenceInstanceReader {
public interface PersistenceInstanceReader extends AutoCloseable {

default Stream<WorkflowInstance> scanAll(WorkflowDefinition definition) {
return scanAll(definition, definition.application().id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@

import io.serverlessworkflow.impl.TaskContextData;
import io.serverlessworkflow.impl.WorkflowContextData;
import java.util.concurrent.CompletableFuture;

public interface PersistenceInstanceWriter {
public interface PersistenceInstanceWriter extends AutoCloseable {

void started(WorkflowContextData workflowContext);
CompletableFuture<Void> started(WorkflowContextData workflowContext);

void completed(WorkflowContextData workflowContext);
CompletableFuture<Void> completed(WorkflowContextData workflowContext);

void failed(WorkflowContextData workflowContext, Throwable ex);
CompletableFuture<Void> failed(WorkflowContextData workflowContext, Throwable ex);

void aborted(WorkflowContextData workflowContext);
CompletableFuture<Void> aborted(WorkflowContextData workflowContext);

void suspended(WorkflowContextData workflowContext);
CompletableFuture<Void> suspended(WorkflowContextData workflowContext);

void resumed(WorkflowContextData workflowContext);
CompletableFuture<Void> resumed(WorkflowContextData workflowContext);

void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext);
CompletableFuture<Void> taskRetried(
WorkflowContextData workflowContext, TaskContextData taskContext);

void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext);
CompletableFuture<Void> taskStarted(
WorkflowContextData workflowContext, TaskContextData taskContext);

void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext);
CompletableFuture<Void> taskCompleted(
WorkflowContextData workflowContext, TaskContextData taskContext);
}
6 changes: 3 additions & 3 deletions impl/persistence/mvstore/README.md
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are a leftover from previous refactor, same for generared db files

Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ This document explains how to enable persistence using MVStore as underlying per
To enable MVStore persistence, users should at least do the following things:

- Initialize a MVStorePersistenceStore instance, passing the path of the file containing the persisted information
- Pass this MVStorePersitenceStore as argument of BytesMapPersistenceInstanceHandlers.builder. This will create PersistenceInstanceWriter and PersistenceInstanceReader.
- Pass this MVStorePersitenceStore as argument of DefaultPersistenceInstanceHandlers.from. This will create PersistenceInstanceWriter and PersistenceInstanceReader.
- Use the PersistenceInstanceWriter created in the previous step to decorate the existing WorkflowApplication builder.

The code will look like this

----
try (PersistenceInstanceHandlers handlers =
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore("test.db"))
DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore("test.db"))
.build();
WorkflowApplication application =
PersistenceApplicationBuilder.builder(
Expand All @@ -33,7 +33,7 @@ If user wants to resume execution of all previously existing instances (typicall
Once retrieved, calling `start` method will resume the execution after the latest completed task before the running JVM was stopped.

----
handlers.reader().readAll(definition).values().forEach(WorkflowInstance::start);
handlers.reader().scanAll(definition).forEach(WorkflowInstance::start);
----

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ void testWorkflowInstance() throws InterruptedException {

final Map<String, Object> completedMap = Map.of("name", "fulanito");

handlers.writer().started(workflowContext);
handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries));
handlers.writer().started(workflowContext).join();
handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries)).join();
Optional<WorkflowInstance> optional = handlers.reader().find(definition, workflowInstance.id());
assertThat(optional).isPresent();
WorkflowPersistenceInstance instance = (WorkflowPersistenceInstance) optional.orElseThrow();
Expand All @@ -135,7 +135,10 @@ void testWorkflowInstance() throws InterruptedException {
assertThat(retryAttempt.getValue()).isEqualTo(numRetries);

// task completed
handlers.writer().taskCompleted(workflowContext, completedTaskContext(position, completedMap));
handlers
.writer()
.taskCompleted(workflowContext, completedTaskContext(position, completedMap))
.join();
instance =
(WorkflowPersistenceInstance)
handlers.reader().find(definition, workflowInstance.id()).orElseThrow();
Expand All @@ -157,7 +160,7 @@ void testWorkflowInstance() throws InterruptedException {
assertThat(transition.getValue().isEndNode()).isTrue();

// workflow completed
handlers.writer().completed(workflowContext);
handlers.writer().completed(workflowContext).join();
assertThat(handlers.reader().find(definition, workflowInstance.id())).isEmpty();
}
}
Binary file modified impl/test/db-samples/running.db
Binary file not shown.
Binary file modified impl/test/db-samples/running_v1.db
Binary file not shown.
Binary file modified impl/test/db-samples/suspended.db
Binary file not shown.
Binary file modified impl/test/db-samples/suspended_v1.db
Binary file not shown.