Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,32 @@ This is a library meant for library authors that want to build libraries that wo
Read the [Javadoc](https://javadoc.io/doc/org.funfix/tasks-jvm/0.4.1/org/funfix/tasks/jvm/package-summary.html).
Better documentation is coming.

### Migration Note (v0.5.0)

The `AsyncFun` interface has changed to improve cancellation management and simplify the API. This is a source and binary incompatible change.

**Old shape:**
```java
Task.fromAsync((executor, callback) -> {
// ...
return () -> { /* cleanup */ };
});
```

**New shape:**
```java
Task.fromAsync(continuation -> {
var executor = continuation.getExecutor();
continuation.invokeOnCancellation(() -> { /* cleanup */ });
// ...
});
```

Key differences:
- The `executor` and `callback` are now encapsulated in the `Continuation`.
- Cancellation cleanup is registered via `continuation.invokeOnCancellation(finalizer)` instead of returning a `Cancellable`.
- `continuation.onCancellation()` signals that the task has completed due to cancellation, whereas `invokeOnCancellation(finalizer)` registers a cleanup action to run when cancellation occurs.

---

Maven:
Expand Down
22 changes: 16 additions & 6 deletions tasks-jvm/src/main/java/org/funfix/tasks/jvm/AsyncFun.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,28 @@
import org.jspecify.annotations.Nullable;

import java.io.Serializable;
import java.util.concurrent.Executor;

/**
* A function that is a delayed, asynchronous computation.
* <p>
* This function type is what's needed to describe {@link Task} instances.
* The injected {@link Continuation} provides:
* <ul>
* <li>An {@code Executor} for scheduling work / introduce async boundaries.</li>
* <li>Methods to signal completion ({@link Continuation#onSuccess}, {@link Continuation#onFailure}, {@link Continuation#onCancellation})</li>
* <li>A way to register cancellation finalizers ({@link Continuation#invokeOnCancellation})</li>
* </ul>
* <p>
* Example:
* <pre>{@code
* Task.fromAsync(continuation -> {
* final Executor executor = continuation.getExecutor();
* continuation.invokeOnCancellation(() -> System.out.println("cleanup"));
* executor.execute(() -> continuation.onSuccess("ok"));
* });
* }</pre>
*/
@FunctionalInterface
@NonBlocking
public interface AsyncFun<T extends @Nullable Object> extends Serializable {
Cancellable invoke(
Executor executor,
CompletionCallback<? super T> continuation
);
void invoke(Continuation<? super T> continuation);
}
219 changes: 130 additions & 89 deletions tasks-jvm/src/main/java/org/funfix/tasks/jvm/Cancellable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
/**
* This is a token that can be used for interrupting a scheduled or
* a running task.
*
* <p>The contract for {@code cancel} is:
* <p>
* The contract for {@code cancel} is:
* <ol>
* <li>Its execution is idempotent, meaning that calling it multiple times
* has the same effect as calling it once.</li>
Expand Down Expand Up @@ -52,19 +52,6 @@ final class CancellableUtils {
static final Cancellable EMPTY = () -> {};
}

/**
* Represents a forward reference to a {@link Cancellable} that was already
* registered and needs to be filled in later.
* <p>
* <strong>INTERNAL API:</strong> Internal apis are subject to change or removal
* without any notice. When code depends on internal APIs, it is subject to
* breakage between minor version updates.
*/
@ApiStatus.Internal
interface CancellableForwardRef extends Cancellable {
void set(Cancellable cancellable);
}

/**
* INTERNAL API.
* <p>
Expand All @@ -74,58 +61,101 @@ interface CancellableForwardRef extends Cancellable {
*/
@ApiStatus.Internal
final class MutableCancellable implements Cancellable {
private final AtomicReference<State> ref;
private final AtomicReference<@Nullable State> ref;

MutableCancellable(final Cancellable initialRef) {
MutableCancellable(final @Nullable Cancellable initialRef) {
ref = new AtomicReference<>(new State.Active(initialRef, 0, null));
}

MutableCancellable() {
this(CancellableUtils.EMPTY);
this(null);
}

/**
* Cancels all registered cancellation tokens.
* <p>
* Tries to execute the finalizers on the same thread, for as long as
* possible. Normally, `Cancellation` tokens should not be expensive
* (non-blocking), but we can't force this via the compiler, and it's
* best to avoid concurrency issues if possible. Although, note that
* this is not a contract that we can provide. E.g., if the task
* was canceled, we're forced to cancel future cancellation tokens
* on the thread calling `register`.
*/
@Override
public void cancel() {
var state = ref.getAndSet(State.Closed.INSTANCE);
while (state instanceof State.Active active) {
try {
active.token.cancel();
} catch (Exception e) {
UncaughtExceptionHandler.logOrRethrow(e);
while (true) {
final var current = ref.get();
if (current instanceof State.Active active) {
final var update = new State.Cancelling(
// Adding a dummy that preserves the `order` for incoming
// subscriptions.
new State.Active(null, active.order, null)
);
if (ref.compareAndSet(current, update)) {
// NOTE: this will eventually set the state to Cancelled
// (after managing to cancel all tokens)
startCancellationOfEverything(active);
return;
}
} else {
return;
}
state = active.rest;
}
}

public CancellableForwardRef newCancellableRef() {
final var current = ref.get();
if (current instanceof State.Closed) {
return new CancellableForwardRef() {
@Override
public void set(Cancellable cancellable) {
cancellable.cancel();
}
@Override
public void cancel() {}
};
} else if (current instanceof State.Active active) {
return new CancellableForwardRef() {
@Override
public void set(Cancellable cancellable) {
registerOrdered(
active.order,
cancellable,
active
);
private void startCancellationOfEverything(State.@Nullable Active active) {
while (active != null) {
if (active.token != null) {
invoke(active.token);
}
active = active.rest;
// Tries fetching more tokens, since the state may have been updated
if (active == null) {
// Kind of hacky, but we need the loop due to the CAS
while (true) {
final var current = ref.get();
if (current instanceof State.Cancelling cancelling) {
// We could have newly registered cancellable references.
// If we have, then active != null and will get processed.
active = cancelling.toCancel;
final var update = active == null
// If not, closing the loop with a final update
? State.Cancelled.INSTANCE
: new State.Cancelling(null);
// If CAS succeeds, we break from the loop
// otherwise a concurrent update happened, so continue
if (ref.compareAndSet(current, update)) {
break;
}
} else {
// Once in `Cancelling`, concurrent updates are only allowed
// to go Cancelling -> Cancelling
final var name = current != null ? current.getClass().getName() : "null";
throw new IllegalStateException("Bug — found: " + name);
}
}
}
}
}

@Override
public void cancel() {
unregister(active.order);
/**
* The purpose of this method is to be called when invoking the methods
* on {@code CompletionCallback} in order to prevent leaks.
* <p>
* NOTE: in case a concurrent `cancel()` has already happened,
* then `complete()` becomes a NO-OP.
*/
void complete() {
while (true) {
final var current = ref.get();
if (current instanceof State.Active) {
if (ref.compareAndSet(current, State.Completed.INSTANCE)) {
return;
}
};
} else {
throw new IllegalStateException("Invalid state: " + current);
} else {
return;
}
}
}

Expand All @@ -134,11 +164,19 @@ public void cancel() {
while (true) {
final var current = ref.get();
if (current instanceof State.Active active) {
final var newOrder = active.order + 1;
final var update = new State.Active(token, newOrder, active);
if (ref.compareAndSet(current, update)) { return () -> unregister(newOrder); }
} else if (current instanceof State.Closed) {
token.cancel();
final var update = active.register(token);
if (ref.compareAndSet(current, update)) {
return () -> unregister(update.order);
}
} else if (current instanceof State.Cancelling cancelling) {
final var update = cancelling.register(token);
if (ref.compareAndSet(current, update)) {
return null;
}
} else if (current instanceof State.Cancelled) {
invoke(token);
return null;
} else if (current instanceof State.Completed) {
return null;
} else {
throw new IllegalStateException("Invalid state: " + current);
Expand All @@ -151,65 +189,68 @@ private void unregister(final long order) {
final var current = ref.get();
if (current instanceof State.Active active) {
State.@Nullable Active cursor = active;
State.@Nullable Active acc = null;
State.@Nullable Active newListReversed = null;
while (cursor != null) {
if (cursor.order != order) {
acc = new State.Active(cursor.token, cursor.order, acc);
newListReversed = new State.Active(cursor.token, cursor.order, newListReversed);
}
cursor = cursor.rest;
}
// Reversing
State.@Nullable Active update = null;
while (acc != null) {
update = new State.Active(acc.token, acc.order, update);
acc = acc.rest;
while (newListReversed != null) {
update = new State.Active(newListReversed.token, newListReversed.order, update);
newListReversed = newListReversed.rest;
}
if (update == null) {
update = new State.Active(Cancellable.getEmpty(), 0, null);
update = new State.Active(null, 0, null);
}
if (ref.compareAndSet(current, update)) {
return;
}
} else if (current instanceof State.Closed) {
return;
} else {
throw new IllegalStateException("Invalid state: " + current);
return;
}
}
}

private void registerOrdered(
final long order,
final Cancellable newToken,
State current
) {
while (true) {
if (current instanceof State.Active active) {
// Double-check ordering
if (active.order != order) { return; }
// Try to update
final var update = new State.Active(newToken, order + 1, null);
if (ref.compareAndSet(current, update)) { return; }
// Retry
current = ref.get();
} else if (current instanceof State.Closed) {
newToken.cancel();
return;
} else {
throw new IllegalStateException("Invalid state: " + current);
}
private static void invoke(Cancellable token) {
try {
token.cancel();
} catch (Throwable e) {
UncaughtExceptionHandler.logOrRethrow(e);
}
}

sealed interface State {
record Active(
Cancellable token,
@Nullable Cancellable token,
long order,
@Nullable Active rest
) implements State {}
) implements State {
Active register(Cancellable token) {
final var newOrder = order + 1;
return new State.Active(token, newOrder, this);
}
}

record Cancelling(
@Nullable Active toCancel
) implements State {
Cancelling register(Cancellable token) {
final var newOrder = toCancel != null ? toCancel.order + 1 : 1;
return new State.Cancelling(
new State.Active(token, newOrder, toCancel)
);
}
}

record Cancelled() implements State {
static final Cancelled INSTANCE = new Cancelled();
}

record Closed() implements State {
static final Closed INSTANCE = new Closed();
record Completed() implements State {
static final Completed INSTANCE = new Completed();
}
}
}
Loading
Loading