{@code
+ * try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
+ * IntStream.range(0, 10_000).forEach(i ->
+ * executor.submit(() -> {
+ * Thread.sleep(Duration.ofMillis(10));
+ * return Thread.currentThread().getName();
+ * }));
+ * } // auto-shutdown: waits for all 10,000 tasks to finish
+ * }
+ *
+ * See {@link br.com.leonardoz.features.threads.UsingVirtualThreads} for a
+ * detailed description of virtual threads.
*
* And 2 types of tasks:
*
diff --git a/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java b/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java
index bbffd0f..df372e3 100644
--- a/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java
+++ b/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java
@@ -136,6 +136,19 @@
* CompletableFuture.completeExceptionally() // completes throwing a exception
*
* CompletableFuture.completeExceptionally(ex) // completes with a exception
+ * = Java 9+ additions =
+ *
+ * orTimeout(long timeout, TimeUnit unit)
+ * Completes the future exceptionally with a TimeoutException if it
+ * is not completed before the given timeout elapses.
+ *
+ * completeOnTimeout(T value, long timeout, TimeUnit unit)
+ * Completes the future with the provided default value if it is not completed
+ * before the timeout. Unlike orTimeout, this is a normal completion.
+ *
+ * exceptionallyAsync(fn, executor)
+ * Async version of exceptionally(); recovers from an exception on a separate
+ * thread. (Java 12)
*
*/
public class UsingCompletableFuture {
@@ -204,6 +217,33 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
CompletableFuture
.supplyAsync(getVal)
.acceptEitherAsync(CompletableFuture.supplyAsync(getVal2, executor), (firstToBeReady) -> System.out.println(firstToBeReady), executor);
+
+ // -------------------------------------------------------------------
+ // Java 9+ timeout APIs
+ // -------------------------------------------------------------------
+
+ // orTimeout — completes exceptionally with TimeoutException if the
+ // future is not done within the specified duration.
+ SupplierNOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *
{@code
+ * record UserSummary(String userName, int orderCount) {}
+ *
+ * UserSummary fetchUserSummary(long userId) throws Exception {
+ * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
+ *
+ * StructuredTaskScope.Subtask user = scope.fork(() -> fetchUserName(userId));
+ * StructuredTaskScope.Subtask orders = scope.fork(() -> fetchOrderCount(userId));
+ *
+ * scope.join() // waits for both tasks
+ * .throwIfFailed(); // re-throws if either failed
+ *
+ * return new UserSummary(user.get(), orders.get());
+ * }
+ * }
+ * }
+ *
+ * {@code
+ * String callFastest() throws Exception {
+ * try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
+ *
+ * scope.fork(() -> callService("primary"));
+ * scope.fork(() -> callService("fallback"));
+ *
+ * scope.join();
+ * return scope.result(); // result of the first successful subtask
+ * }
+ * }
+ * }
+ *
+ * + * Publisher ──publishes items──▶ Subscriber + * │ + * back-pressure (request(n)) ◀─┘ + *+ * + * == Key interfaces == + * + * {@link Flow.Publisher}<T> + * Produces items and delivers them to registered subscribers. + * subscribe(Subscriber) is the only method. + * + * {@link Flow.Subscriber}<T> + * Consumes items. Four callbacks: + * - onSubscribe(Subscription) — called once after subscribing; use it to + * call request(n) and signal how many items you are ready to receive. + * - onNext(T item) — called for each delivered item. + * - onError(Throwable) — terminal; called when an error occurs. + * - onComplete() — terminal; called when the publisher is done. + * + * {@link Flow.Subscription} + * Connects one publisher to one subscriber. + * - request(n) — signals demand for n more items (back-pressure). + * - cancel() — cancels the subscription. + * + * {@link Flow.Processor}<T,R> + * Extends both Publisher and Subscriber; acts as a pipeline stage that + * transforms items from type T to type R. + * + * == Back-pressure == + * + * The subscriber controls flow by calling {@code request(n)}. The publisher + * must not deliver more items than requested. This prevents fast producers + * from overwhelming slow consumers. + * + * == SubmissionPublisher == + * + * {@link SubmissionPublisher} is the only concrete {@code Flow.Publisher} + * provided by the JDK. It is buffered, auto-closes on exception, and uses + * the common ForkJoinPool by default. + */ public class UsingReactiveStreams { + /** + * A simple subscriber that requests one item at a time (maximum + * back-pressure) and prints each value it receives. + */ + static class PrintSubscriber implements Flow.Subscriber
NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *
| Feature | ThreadLocal | ScopedValue |
|---|---|---|
| Mutability | Mutable (set/remove) | + *Immutable (bound once per scope) |
| Cleanup | Must call remove(); leaks if forgotten | + *Automatic — cleaned up when scope exits |
| Virtual thread memory | Each thread holds its own copy | + *Shared read-only; no per-thread copy |
| Child task visibility | Manual InheritableThreadLocal | + *Automatically inherited by StructuredTaskScope children |
{@code
+ * // 1. Declare — typically a static final field on a class
+ * static final ScopedValue CURRENT_USER = ScopedValue.newInstance();
+ *
+ * // 2. Bind and run — value visible only within the lambda/block
+ * ScopedValue.where(CURRENT_USER, "alice").run(() -> {
+ * System.out.println("User: " + CURRENT_USER.get()); // "alice"
+ * processRequest(); // value is visible anywhere in this call tree
+ * });
+ * // CURRENT_USER.get() here would throw NoSuchElementException
+ *
+ * // 3. Nested re-binding — child sees "bob"; parent still sees "alice"
+ * ScopedValue.where(CURRENT_USER, "alice").run(() ->
+ * ScopedValue.where(CURRENT_USER, "bob").run(() ->
+ * System.out.println(CURRENT_USER.get()) // "bob"
+ * )
+ * );
+ *
+ * // 4. Returning a value from a scoped computation
+ * String result = ScopedValue.where(CURRENT_USER, "charlie")
+ * .call(() -> "hello, " + CURRENT_USER.get());
+ *
+ * // 5. Integration with StructuredTaskScope
+ * // Child tasks forked inside a scope automatically inherit the binding.
+ * ScopedValue.where(CURRENT_USER, "dave").run(() -> {
+ * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
+ * scope.fork(() -> auditLog(CURRENT_USER.get())); // sees "dave"
+ * scope.fork(() -> sendEmail(CURRENT_USER.get())); // sees "dave"
+ * scope.join().throwIfFailed();
+ * }
+ * });
+ * }
+ *
+ * NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *
{@code
+ * // 1. Create and start a single virtual thread
+ * Thread vt = Thread.ofVirtual()
+ * .name("my-vthread")
+ * .start(() -> System.out.println("Hello from " + Thread.currentThread()));
+ *
+ * // 2. Virtual thread-per-task executor — the primary production use case.
+ * // Creates a new virtual thread for every submitted task.
+ * // try-with-resources shuts down and waits for all tasks automatically.
+ * try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
+ * IntStream.range(0, 10_000).forEach(i ->
+ * executor.submit(() -> {
+ * // Blocking I/O — the virtual thread is unmounted while waiting,
+ * // freeing the carrier thread for other work.
+ * Thread.sleep(Duration.ofMillis(100));
+ * return i;
+ * }));
+ * } // auto-close: awaits termination of all submitted tasks
+ *
+ * // 3. Detect whether the current thread is virtual
+ * boolean isVirtual = Thread.currentThread().isVirtual();
+ *
+ * // 4. ThreadFactory for virtual threads
+ * ThreadFactory factory = Thread.ofVirtual().name("worker-", 0).factory();
+ * }
+ *
+ * Pinning occurs when a virtual thread cannot be unmounted. + * This happens when: + *
NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *
{@code
+ * public class VirtualThreadProducerConsumer {
+ *
+ * private static final int CAPACITY = 1_000;
+ * private final BlockingQueue queue = new LinkedBlockingQueue<>(CAPACITY);
+ *
+ * // Each producer runs in its own virtual thread; blocking put() is cheap.
+ * private final Callable producer = () -> {
+ * for (int i = 0; i < 90_000; i++) {
+ * queue.put(UUID.randomUUID().toString());
+ * }
+ * return null;
+ * };
+ *
+ * // Each consumer runs in its own virtual thread; blocking take() is cheap.
+ * private final Callable consumer = () -> {
+ * while (true) {
+ * String item = queue.poll(5, TimeUnit.SECONDS);
+ * if (item == null) break;
+ * System.out.println("Consumed " + item + " on " + Thread.currentThread());
+ * }
+ * return null;
+ * };
+ *
+ * public void run(int numProducers, int numConsumers) throws InterruptedException {
+ * // newVirtualThreadPerTaskExecutor() — one virtual thread per task
+ * try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
+ * IntStream.range(0, numProducers).forEach(i -> executor.submit(producer));
+ * IntStream.range(0, numConsumers).forEach(i -> executor.submit(consumer));
+ * }
+ * }
+ * }
+ * }
+ *
+ * NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *
{@code
+ * public class StructuredTaskConvergence {
+ *
+ * record WorkerResult(int workerId, List data) {}
+ *
+ * WorkerResult doWork(int workerId) {
+ * var rng = new Random();
+ * var results = new ArrayList();
+ * for (int i = 0; i < 400_000; i++) {
+ * results.add((long) rng.nextInt(150_000));
+ * }
+ * return new WorkerResult(workerId, results);
+ * }
+ *
+ * void run() throws Exception {
+ * int cores = Runtime.getRuntime().availableProcessors();
+ *
+ * // ShutdownOnFailure: if any worker fails, cancel all others.
+ * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
+ *
+ * var subtasks = IntStream.range(0, cores)
+ * .mapToObj(id -> scope.fork(() -> doWork(id)))
+ * .toList();
+ *
+ * scope.join() // wait for ALL workers to finish
+ * .throwIfFailed(); // re-throw first failure (if any)
+ *
+ * // All workers succeeded — aggregate results
+ * long total = subtasks.stream()
+ * .map(Subtask::get)
+ * .flatMap(r -> r.data().stream())
+ * .mapToLong(Long::longValue)
+ * .sum();
+ *
+ * System.out.println("Converged. Total sum: " + total);
+ * }
+ * }
+ * }
+ * }
+ *
+ * NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *
{@code
+ * public class StructuredBackgroundTaskExecutor {
+ *
+ * // Execute multiple tasks; fail fast if any task fails.
+ * public List executeAll(List> tasks) throws Exception {
+ * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
+ *
+ * var subtasks = tasks.stream()
+ * .map(scope::fork)
+ * .toList();
+ *
+ * scope.join().throwIfFailed();
+ *
+ * return subtasks.stream()
+ * .map(StructuredTaskScope.Subtask::get)
+ * .toList();
+ * }
+ * }
+ *
+ * // Execute several tasks; return the result of whichever finishes first.
+ * public T executeAny(List> tasks) throws Exception {
+ * try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
+ *
+ * tasks.forEach(scope::fork);
+ *
+ * scope.join();
+ * return scope.result();
+ * }
+ * }
+ * }
+ * }
+ *
+ * | BackgroundTaskExecutor | + *StructuredBackgroundTaskExecutor | |
|---|---|---|
| Cancellation on failure | Manual Future.cancel() | + *Automatic (ShutdownOnFailure) |
| Executor shutdown | Manual shutdown()/awaitTermination() | + *Automatic (try-with-resources) |
| Exception handling | + *Checked at every Future.get() call site | + *Single throwIfFailed() propagates the first exception |
| Thread type | Platform threads | + *Virtual threads (default factory) |