diff --git a/README.md b/README.md index e7d1400..68a0fbd 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # Java Concurrency Patterns and Features -Concurrency Patterns and features found in Java, through multithreaded programming. +Concurrency Patterns and features found in Java, through multithreaded programming. +This repository covers **Java 8 through Java 17** (with documentation stubs for Java 21 Project Loom APIs). ## Features: * [Threads and Runnables](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java) @@ -15,31 +16,36 @@ Concurrency Patterns and features found in Java, through multithreaded programmi * [Barriers](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java) * [Synchronized Collections](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java) * [Concurrent Collections](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java) - * [CopyOnWriteArrayList](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java#L89) - * [ConcurrentHashMap](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java#L40) - * [Blocking Queue](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java#L141) + * [CopyOnWriteArrayList](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java) + * [ConcurrentHashMap](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java) — now includes Java 8 bulk ops: `merge`, `compute`, `forEach`, `reduceValues`, `search` + * [Blocking Queue](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java) * [Executors](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java) - * [Fixed Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L94) - * [Cached Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L65) - * [Single Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L51) - * [Scheduled Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L122) - * [Single Thread Scheduled Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L139) - * [Work-Stealing Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L156) -* [Atomics](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java) + * [Fixed Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java) + * [Cached Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java) + * [Single Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java) + * [Scheduled Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java) + * [Single Thread Scheduled Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java) + * [Work-Stealing Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java) + * Virtual Thread-Per-Task Executor *(Java 21 — see [`UsingVirtualThreads`](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/threads/UsingVirtualThreads.java))* +* [Atomics](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java) — now includes **VarHandle** (Java 9) * [Futures](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/features/futures) * [FutureTask](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/futures/UsingFutureTasks.java) - * [CompletableFuture](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java) + * [CompletableFuture](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java) — now includes `orTimeout` / `completeOnTimeout` (Java 9) + * [Structured Concurrency](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/futures/UsingStructuredConcurrency.java) *(Java 21 — doc stub)* * [Fork/Join Framework](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/forkjoin/UsingForkJoinFramework.java) -* [Parallel Streams](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java) +* [Parallel Streams](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java) — updated to use `Stream.toList()` (Java 16) * [Java Memory Model](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java) +* [Reactive Streams — `java.util.concurrent.Flow`](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java) *(Java 9 — Publisher, Subscriber, Processor, back-pressure)* +* [Virtual Threads](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/threads/UsingVirtualThreads.java) *(Java 21 — doc stub)* +* [Scoped Values](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/threads/UsingScopedValues.java) *(Java 21 — doc stub)* ## Patterns * [Protect Shared State](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state) * [Atomic Compound Actions](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/compound_actions) * [Lock Split](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/lock_split) * [Fixed Lock Ordering](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering) -* [Thread Local Confinement](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement) -* [Immutable Object](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object) +* [Thread Local Confinement](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement) — `ThreadLocal.withInitial()` (Java 8); `DateTimeFormatter` replaces `SimpleDateFormat` (Java 8) +* [Immutable Object](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object) — inner value classes converted to **records** (Java 16); switch expression (Java 14) * [Safe Lazy Initialization](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/thread_safe/initialization/SafeInitializationHolder.java) * [Safe Publishing](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/thread_safe/publishing/SafePublishing.java) * [Resource Pool](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/resource_pool) @@ -47,14 +53,32 @@ Concurrency Patterns and features found in Java, through multithreaded programmi * [wait-notify](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java) * [await-signal](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java) * [Background Task Executor](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java) +* [Structured Background Task Executor](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/task_execution/StructuredBackgroundTaskExecutor.java) *(Java 21 — doc stub)* * [Task Cancel](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/task_cancel) * [Producer-Consumer](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java) +* [Virtual Thread Producer-Consumer](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/producer_consumer/VirtualThreadProducerConsumer.java) *(Java 21 — doc stub)* * [Task Convergence](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java) +* [Structured Task Convergence](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/task_convergence/StructuredTaskConvergence.java) *(Java 21 — doc stub)* * [Non-Blocking with Atomics](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/non_blocking) * [Controlled Concurrent Initialization](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java) * [Parallel Divide and Conquer](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/divideconquer) +## Java Version Coverage + +| Java Version | Feature Added / Updated | +|---|---| +| Java 8 | `ThreadLocal.withInitial()`, `ConcurrentHashMap` bulk ops (`merge`, `compute`, `forEach`, `reduceValues`, `search`) | +| Java 9 | `java.util.concurrent.Flow` (Reactive Streams), `CompletableFuture.orTimeout()` / `completeOnTimeout()`, `VarHandle` | +| Java 14 | Switch expressions (`EventKeeper`) | +| Java 16 | Records (`MyImmutableObject`, `Event`), `Stream.toList()` | +| Java 17 | Build target (compiler source/target) | +| Java 21 *(doc stubs — requires JDK upgrade)* | Virtual Threads, `Executors.newVirtualThreadPerTaskExecutor()`, Structured Concurrency (`StructuredTaskScope`), Scoped Values (`ScopedValue`) | + +> **Java 21 stubs** are documentation-only files that describe the planned API +> with runnable code examples in Javadoc comments. They compile on Java 17 +> (no executable code) and serve as a migration guide for when the project +> upgrades to JDK 21. + ## About Patterns and Algorithms inspired by the Java Concurrency in Practice book. - diff --git a/pom.xml b/pom.xml index 8f27a3d..e941d53 100644 --- a/pom.xml +++ b/pom.xml @@ -11,11 +11,11 @@ org.apache.maven.plugins maven-compiler-plugin - 3.7.0 + 3.12.1 - 11 - 11 + 17 + 17 diff --git a/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java b/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java index 97ff747..a40525b 100644 --- a/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java +++ b/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java @@ -1,27 +1,56 @@ package br.com.leonardoz.features.atomics; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * Atomics can be used from the java.util.concurrent.atomic.* package. - * + * * An atomic operation is a compound action that totally completes out totally * fails, not supporting inconsistent values or results during it's execution. - * + * * The classes in this package supports atomic operations on single variables, * having get and set (read and write) methods that behave like a volatile * variable. - * + * * The compareAndSet are commonly used in non-blocking algorithms. They * basically tries to set a new value to a specified field, and it returns a * boolean indicating success or not. All atomic, only blocking briefly. - * + * * Interesting classes in this package are: AtomicBoolean, AtomicLong, - * AtomicReference, AtomicMarkableReference and - * AtomicReferenceFieldUpdater. - * + * AtomicReference<T>, AtomicMarkableReference<T> and + * AtomicReferenceFieldUpdater<T, V>. + * + * == Java 9+: VarHandle == + * + * {@link VarHandle} is a typed reference to a variable — an instance field, a + * static field, or an array element. It provides atomic access operations + * without the object-wrapping overhead of AtomicXxx classes, and offers finer + * control over memory ordering semantics. + * + * Key VarHandle access modes (ordered from weakest to strongest): + * + * Plain — ordinary read/write, no ordering guarantee. + * Opaque — bit-atomicity; no cross-thread ordering. + * Acquire/Release — one-sided memory ordering (like C++ acquire/release). + * Volatile — full sequential consistency (same as Java's volatile). + * + * Commonly used methods: + * get/set — plain or volatile access (depends on call site) + * getVolatile/setVolatile — volatile semantics + * getAndAdd — atomic fetch-and-add + * getAndSet — atomic fetch-and-set + * compareAndSet — CAS returning boolean + * compareAndExchange — CAS returning the witness value + * fullFence/acquireFence/releaseFence — explicit memory barriers + * + * When to prefer VarHandle over AtomicXxx: + * - You want atomic access on a plain {@code volatile} field without boxing. + * - You need fine-grained memory-order control. + * - You are building high-performance, low-allocation data structures. * */ public class UsingAtomics { @@ -45,14 +74,86 @@ public int get() { } } + // ------------------------------------------------------------------------- + // Java 9+: same counter implemented with VarHandle + // ------------------------------------------------------------------------- + + /** + * A counter backed by a plain {@code volatile int} field, accessed atomically + * through a {@link VarHandle}. This avoids the extra object allocation that + * {@link AtomicInteger} requires while providing identical thread-safety + * guarantees. + */ + static class VarHandleCounter { + // The field must be accessible to the MethodHandles.lookup() at the + // call site, so it is package-private rather than private. + volatile int count = 0; + + private static final VarHandle COUNT; + + static { + try { + COUNT = MethodHandles.lookup() + .findVarHandle(VarHandleCounter.class, "count", int.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + + /** Atomically increment and return the new value. */ + public int increment() { + // getAndAdd returns the *old* value, so we add 1 to get the new value. + return (int) COUNT.getAndAdd(this, 1) + 1; + } + + /** Atomically decrement and return the new value. */ + public int decrement() { + return (int) COUNT.getAndAdd(this, -1) - 1; + } + + /** Volatile read of the current counter value. */ + public int get() { + return (int) COUNT.getVolatile(this); + } + + /** + * Compare-and-exchange: atomically sets the field to {@code newValue} + * if it currently holds {@code expected}. + * + * Unlike {@code compareAndSet}, this returns the witness value + * (what was actually in the field), making it useful for retry loops. + */ + public int compareAndExchange(int expected, int newValue) { + return (int) COUNT.compareAndExchange(this, expected, newValue); + } + } + public static void main(String[] args) throws InterruptedException { - var counter = new AtomicCounter(); + // --- AtomicInteger counter --- + var atomicCounter = new AtomicCounter(); var cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10_000; i++) { - cachedThreadPool.execute(() -> counter.increment()); + cachedThreadPool.execute(() -> atomicCounter.increment()); } cachedThreadPool.shutdown(); cachedThreadPool.awaitTermination(4000, TimeUnit.SECONDS); - System.out.println("Result shound be 10000: Actual result is: " + counter.get()); + System.out.println("AtomicInteger — expected 10000, got: " + atomicCounter.get()); + + // --- VarHandle counter --- + var varHandleCounter = new VarHandleCounter(); + var pool2 = Executors.newCachedThreadPool(); + for (int i = 0; i < 10_000; i++) { + pool2.execute(() -> varHandleCounter.increment()); + } + pool2.shutdown(); + pool2.awaitTermination(4000, TimeUnit.SECONDS); + System.out.println("VarHandle — expected 10000, got: " + varHandleCounter.get()); + + // compare-and-exchange example + var vh = new VarHandleCounter(); + int witness = vh.compareAndExchange(0, 42); // should succeed: witness == 0 + System.out.println("CAS witness (expected 0): " + witness + ", value is now: " + vh.get()); + witness = vh.compareAndExchange(0, 99); // should fail: count is 42, not 0 + System.out.println("CAS witness (expected 42): " + witness + ", value is still: " + vh.get()); } } diff --git a/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java b/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java index 5073cac..b269b84 100644 --- a/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java +++ b/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java @@ -1,5 +1,7 @@ package br.com.leonardoz.features.collections; +import java.util.Arrays; +import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -10,7 +12,7 @@ /** * Concurrent collections are an alternative to the Synchronized Collections. - * + * * Supports concurrent access from n threads and performs better than * Synchronized collections. * @@ -22,18 +24,35 @@ public class UsingConcurrentCollections { * achieve better performance. * * Locks it's values with Lock Striping. - * + * * Lock Striping divides the protected region through several locks. - * + * * - Don't throw ConcurrentModificationException - * + * * - size() and isEmpty() can be incorrect. Don't rely on them. - * + * * - Supports atomic operations, don't need client side locking. - * + * * - Readers can access concurrently, and iterator have weak consistency. - * + * * - Just a few Writers can modify it. + * + * Java 8+ additions demonstrated below: + * + * - {@code merge(key, value, remappingFn)} — atomically combines a new value + * with an existing one using a merge function; ideal for frequency counting. + * + * - {@code compute(key, remappingFn)} — atomically computes a new value for a + * key based on its current value (or null if absent). + * + * - {@code forEach(parallelismThreshold, action)} — applies an action to each + * entry, processing in parallel when the map size exceeds the threshold. + * + * - {@code reduceValues(parallelismThreshold, reducer)} — parallel reduction + * over values; returns null if the map is empty. + * + * - {@code search(parallelismThreshold, searchFn)} — returns the first + * non-null result of applying the function, stopping as soon as found. */ public static void usingConcurrentHashMap() { System.out.println("=== ConcurrentHashMap ==="); @@ -69,6 +88,39 @@ public static void usingConcurrentHashMap() { } catch (InterruptedException e) { e.printStackTrace(); } + + // ----------------------------------------------------------------------- + // Java 8+ bulk operations on ConcurrentHashMap + // ----------------------------------------------------------------------- + System.out.println("=== ConcurrentHashMap — Java 8+ bulk operations ==="); + + var words = Arrays.asList("apple", "banana", "apple", "cherry", "banana", "apple"); + + // merge: atomically increment a word counter; no external locking needed. + var wordCounts = new ConcurrentHashMap(); + for (var word : words) { + wordCounts.merge(word, 1, Integer::sum); + } + System.out.println("Word counts (merge): " + wordCounts); + + // compute: square the count of "apple" in-place atomically. + wordCounts.compute("apple", (k, v) -> v == null ? 1 : v * v); + System.out.println("After compute square of 'apple': " + wordCounts.get("apple")); + + // forEach with parallelism threshold (process in parallel for large maps) + System.out.println("forEach (parallel threshold = 2):"); + wordCounts.forEach(2, (k, v) -> + System.out.println(" " + k + " -> " + v)); + + // reduceValues: sum all counts in parallel + Integer total = wordCounts.reduceValues(2, Integer::sum); + System.out.println("Total word occurrences (reduceValues): " + total); + + // search: find the first word whose count is > 1 + String popular = wordCounts.search(2, (k, v) -> v > 1 ? k : null); + System.out.println("First word with count > 1 (search): " + popular); + + System.out.println("\n"); } /** diff --git a/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java b/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java index 93c4bff..b23eb41 100644 --- a/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java +++ b/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java @@ -31,6 +31,26 @@ * - Work-Stealing Thread Pool: Based on Fork/Join Framework, applies the * work-stealing algorithm for balancing tasks, with available processors as a * paralellism level. + * + * == Java 21+ (requires JDK upgrade) == + * + * - Virtual Thread-Per-Task Executor: Creates a new virtual thread for each + * submitted task. Virtual threads are lightweight (managed by the JVM, not the + * OS) and excel at I/O-bound workloads. The executor is AutoCloseable, so + * try-with-resources automatically awaits all submitted tasks: + * + *
{@code
+ * try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
+ *     IntStream.range(0, 10_000).forEach(i ->
+ *         executor.submit(() -> {
+ *             Thread.sleep(Duration.ofMillis(10));
+ *             return Thread.currentThread().getName();
+ *         }));
+ * } // auto-shutdown: waits for all 10,000 tasks to finish
+ * }
+ * + * See {@link br.com.leonardoz.features.threads.UsingVirtualThreads} for a + * detailed description of virtual threads. * * And 2 types of tasks: * diff --git a/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java b/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java index bbffd0f..df372e3 100644 --- a/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java +++ b/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java @@ -136,6 +136,19 @@ * CompletableFuture.completeExceptionally() // completes throwing a exception * * CompletableFuture.completeExceptionally(ex) // completes with a exception + * = Java 9+ additions = + * + * orTimeout(long timeout, TimeUnit unit) + * Completes the future exceptionally with a TimeoutException if it + * is not completed before the given timeout elapses. + * + * completeOnTimeout(T value, long timeout, TimeUnit unit) + * Completes the future with the provided default value if it is not completed + * before the timeout. Unlike orTimeout, this is a normal completion. + * + * exceptionallyAsync(fn, executor) + * Async version of exceptionally(); recovers from an exception on a separate + * thread. (Java 12) * */ public class UsingCompletableFuture { @@ -204,6 +217,33 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc CompletableFuture .supplyAsync(getVal) .acceptEitherAsync(CompletableFuture.supplyAsync(getVal2, executor), (firstToBeReady) -> System.out.println(firstToBeReady), executor); + + // ------------------------------------------------------------------- + // Java 9+ timeout APIs + // ------------------------------------------------------------------- + + // orTimeout — completes exceptionally with TimeoutException if the + // future is not done within the specified duration. + Supplier slowSupplier = () -> { + try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + return "slow result"; + }; + + String orTimeoutResult = CompletableFuture + .supplyAsync(slowSupplier, executor) + .orTimeout(500, TimeUnit.MILLISECONDS) + .exceptionally(ex -> "Timed out: " + ex.getClass().getSimpleName()) + .get(); + System.out.println("orTimeout result: " + orTimeoutResult); + + // completeOnTimeout — completes normally with a fallback value if the + // future is not done within the specified duration. + String completeOnTimeoutResult = CompletableFuture + .supplyAsync(slowSupplier, executor) + .completeOnTimeout("default-value", 500, TimeUnit.MILLISECONDS) + .get(); + System.out.println("completeOnTimeout result: " + completeOnTimeoutResult); + executor.shutdown(); executor.awaitTermination(3000, TimeUnit.SECONDS); } diff --git a/src/main/java/br/com/leonardoz/features/futures/UsingStructuredConcurrency.java b/src/main/java/br/com/leonardoz/features/futures/UsingStructuredConcurrency.java new file mode 100644 index 0000000..3743c59 --- /dev/null +++ b/src/main/java/br/com/leonardoz/features/futures/UsingStructuredConcurrency.java @@ -0,0 +1,89 @@ +package br.com.leonardoz.features.futures; + +/** + * Structured Concurrency — {@code java.util.concurrent.StructuredTaskScope} + * (Java 21 preview → graduated in later releases) + * + *

NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *

What is Structured Concurrency?

+ * + * Structured concurrency treats a group of related concurrent tasks as a + * single unit of work with a well-defined lifetime. A parent scope cannot + * exit until all its child tasks have completed (or been cancelled). This + * eliminates the "fire-and-forget" leaks and the complex error-handling + * boilerplate that arise with unstructured use of {@code CompletableFuture} or + * {@code ExecutorService}. + * + *

Key principle

+ * + * Tasks that are forked together, complete together. + * + *

API Overview (Java 21)

+ * + * {@code StructuredTaskScope} is an {@code AutoCloseable}; use it inside a + * try-with-resources block. + * + *

ShutdownOnFailure — all-or-nothing

+ * + * If any subtask fails, all remaining subtasks are cancelled immediately. + * The parent re-throws the first failure after joining. + * + *
{@code
+ * record UserSummary(String userName, int orderCount) {}
+ *
+ * UserSummary fetchUserSummary(long userId) throws Exception {
+ *     try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
+ *
+ *         StructuredTaskScope.Subtask  user   = scope.fork(() -> fetchUserName(userId));
+ *         StructuredTaskScope.Subtask orders = scope.fork(() -> fetchOrderCount(userId));
+ *
+ *         scope.join()           // waits for both tasks
+ *              .throwIfFailed(); // re-throws if either failed
+ *
+ *         return new UserSummary(user.get(), orders.get());
+ *     }
+ * }
+ * }
+ * + *

ShutdownOnSuccess — return the fastest result

+ * + * Returns as soon as the first subtask succeeds; cancels all others. + * Useful for fan-out/fan-in or primary/fallback service calls. + * + *
{@code
+ * String callFastest() throws Exception {
+ *     try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
+ *
+ *         scope.fork(() -> callService("primary"));
+ *         scope.fork(() -> callService("fallback"));
+ *
+ *         scope.join();
+ *         return scope.result(); // result of the first successful subtask
+ *     }
+ * }
+ * }
+ * + *

Advantages over CompletableFuture / ExecutorService

+ * + *
    + *
  • Automatic cancellation — no manual {@code Future.cancel()} chains.
  • + *
  • Guaranteed cleanup — the scope closes only when all children are done.
  • + *
  • Cleaner error propagation — exceptions bubble up naturally.
  • + *
  • Integrates with {@code ScopedValue} for transparent context + * propagation to child tasks.
  • + *
+ * + *

Relationship to Virtual Threads

+ * + * Structured concurrency is designed to be used with virtual threads. + * Each {@code scope.fork()} call typically spawns a new virtual thread + * (the default factory), keeping the concurrency overhead negligible. + * + * @see UsingVirtualThreads + */ +public class UsingStructuredConcurrency { + // Placeholder — add executable examples once the project is upgraded to Java 21. +} diff --git a/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java b/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java index 2f8bef2..1ee45c8 100644 --- a/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java +++ b/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java @@ -1,5 +1,181 @@ package br.com.leonardoz.features.java_memory_model; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +/** + * Reactive Streams — {@code java.util.concurrent.Flow} (Java 9) + * + * Java 9 introduced the {@link java.util.concurrent.Flow} API, which is an + * adaptation of the Reactive + * Streams specification. It defines four interfaces that together describe + * a push-based, back-pressure-capable asynchronous data pipeline: + * + *
+ *  Publisher  ──publishes items──▶  Subscriber
+ *                                       │
+ *                         back-pressure (request(n)) ◀─┘
+ * 
+ * + * == Key interfaces == + * + * {@link Flow.Publisher}<T> + * Produces items and delivers them to registered subscribers. + * subscribe(Subscriber) is the only method. + * + * {@link Flow.Subscriber}<T> + * Consumes items. Four callbacks: + * - onSubscribe(Subscription) — called once after subscribing; use it to + * call request(n) and signal how many items you are ready to receive. + * - onNext(T item) — called for each delivered item. + * - onError(Throwable) — terminal; called when an error occurs. + * - onComplete() — terminal; called when the publisher is done. + * + * {@link Flow.Subscription} + * Connects one publisher to one subscriber. + * - request(n) — signals demand for n more items (back-pressure). + * - cancel() — cancels the subscription. + * + * {@link Flow.Processor}<T,R> + * Extends both Publisher and Subscriber; acts as a pipeline stage that + * transforms items from type T to type R. + * + * == Back-pressure == + * + * The subscriber controls flow by calling {@code request(n)}. The publisher + * must not deliver more items than requested. This prevents fast producers + * from overwhelming slow consumers. + * + * == SubmissionPublisher == + * + * {@link SubmissionPublisher} is the only concrete {@code Flow.Publisher} + * provided by the JDK. It is buffered, auto-closes on exception, and uses + * the common ForkJoinPool by default. + */ public class UsingReactiveStreams { + /** + * A simple subscriber that requests one item at a time (maximum + * back-pressure) and prints each value it receives. + */ + static class PrintSubscriber implements Flow.Subscriber { + + private final String name; + private Flow.Subscription subscription; + + PrintSubscriber(String name) { + this.name = name; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + // Request the first item — without this call no items are ever delivered. + subscription.request(1); + } + + @Override + public void onNext(Integer item) { + System.out.println("[" + name + "] received: " + item); + // Request the next item only after processing the current one. + // This is back-pressure: the publisher cannot outrun the subscriber. + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + System.err.println("[" + name + "] error: " + throwable.getMessage()); + } + + @Override + public void onComplete() { + System.out.println("[" + name + "] completed."); + } + } + + /** + * A {@link Flow.Processor} that doubles every integer it receives before + * forwarding it downstream. It extends {@link SubmissionPublisher} (which + * implements {@link Flow.Publisher}) and implements {@link Flow.Subscriber}. + */ + static class DoublingProcessor + extends SubmissionPublisher + implements Flow.Processor { + + private Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(Integer item) { + // Transform and forward to downstream subscribers. + submit(item * 2); + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + closeExceptionally(throwable); + } + + @Override + public void onComplete() { + close(); + } + } + + public static void main(String[] args) throws InterruptedException { + System.out.println("=== Flow API — Basic Publisher/Subscriber ==="); + + // SubmissionPublisher is AutoCloseable; close() signals onComplete(). + try (var publisher = new SubmissionPublisher()) { + + var subscriber = new PrintSubscriber("sub-1"); + publisher.subscribe(subscriber); + + // Publish integers 1..5 + IntStream.rangeClosed(1, 5).forEach(publisher::submit); + + } // publisher.close() is called here, triggering onComplete() + + // Give the async pipeline time to drain before the next demo. + TimeUnit.MILLISECONDS.sleep(200); + System.out.println(); + + // ----------------------------------------------------------------------- + System.out.println("=== Flow API — Multiple Subscribers ==="); + + try (var publisher = new SubmissionPublisher()) { + publisher.subscribe(new PrintSubscriber("sub-A")); + publisher.subscribe(new PrintSubscriber("sub-B")); + + IntStream.rangeClosed(1, 3).forEach(publisher::submit); + } + + TimeUnit.MILLISECONDS.sleep(200); + System.out.println(); + + // ----------------------------------------------------------------------- + System.out.println("=== Flow API — Processor (doubling pipeline) ==="); + + // Source publisher ──▶ DoublingProcessor ──▶ PrintSubscriber + try (var source = new SubmissionPublisher(); + var processor = new DoublingProcessor()) { + + // Wire the pipeline: source → processor → subscriber + source.subscribe(processor); + processor.subscribe(new PrintSubscriber("doubled")); + + IntStream.rangeClosed(1, 5).forEach(source::submit); + + } // both publishers close, propagating onComplete() through the pipeline + + TimeUnit.MILLISECONDS.sleep(200); + } } diff --git a/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java b/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java index eaf2b46..4c9801f 100644 --- a/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java +++ b/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java @@ -2,33 +2,36 @@ import java.math.BigInteger; import java.util.ArrayList; -import java.util.stream.Collectors; +import java.util.List; import java.util.stream.IntStream; /** * * Warning - * + * * Parallel Streams uses the ForkJoin Pool, so be aware of using it in Java * EE/Jakarta EE environments! * * The Stream API allows, with methods like parallel() or parallelStream(), the * execution of streams's operations in parallel. - * + * * It enables each element to be processed in parallel, having a thread for each * one of them, depending on the number of cores available. Like the Fork/Join * Framework, it has an overhead, so the speed of execution can get much better * in some cases, getting worse in some others. - * + * * Streams are composed of a Source, several intermediate operations and a * terminal operation. Streams are executed only when a terminal operation is * executed, so they're lazy too. The intermediate operations respect the order * that you used, they're sequential. The work of each intermediate operation is * parallel. - * + * * CPU intensive tasks benefits from this feature. - * - * + * + * Java 16+ note: {@code Stream.toList()} is the idiomatic replacement for + * {@code .collect(Collectors.toList())}. It returns an unmodifiable + * list. Use {@code Collectors.toList()} when you need a mutable result list. + * */ public class UsingParallelStreams { @@ -36,10 +39,11 @@ public static void main(String[] args) { // Creating Parallel Streams from existing collection new ArrayList<>().parallelStream(); - // Making Stream Parallel - IntStream.rangeClosed(0, 30_000) // source + // Making Stream Parallel — Java 16+: Stream.toList() instead of Collectors.toList() + List primes = IntStream.rangeClosed(0, 30_000) // source .parallel().mapToObj(BigInteger::valueOf).map(UsingParallelStreams::isPrime) // Intermediate operations - .collect(Collectors.toList()); // Terminal Operations + .toList(); // Terminal operation (Java 16+, returns unmodifiable list) + System.out.println("Primes checked: " + primes.size()); // Each operation run in parallel, out of order IntStream.rangeClosed(0, 20) // source @@ -52,7 +56,7 @@ public static void main(String[] args) { IntStream.rangeClosed(0, 20) .mapToObj(Integer::toString) .forEach(System.out::print); - + System.out.println("\n"); dummyPerformanceCheck(); @@ -60,23 +64,23 @@ public static void main(String[] args) { private static void dummyPerformanceCheck() { - // Sequential Stream + // Sequential Stream — Java 16+: Stream.toList() var start1 = System.currentTimeMillis(); IntStream.rangeClosed(0, 50_000) .mapToObj(BigInteger::valueOf) .map(UsingParallelStreams::isPrime) - .collect(Collectors.toList()); + .toList(); var end1 = System.currentTimeMillis(); var time1 = (end1 - start1) / 1000; System.out.println("Sequential: " + time1); - // Parallel Stream + // Parallel Stream — Java 16+: Stream.toList() var start2 = System.currentTimeMillis(); IntStream.rangeClosed(0, 50_000) .parallel() .mapToObj(BigInteger::valueOf) .map(UsingParallelStreams::isPrime) - .collect(Collectors.toList()); + .toList(); var end2 = System.currentTimeMillis(); var time2 = (end2 - start2) / 1000; System.out.println("Parallel: " + time2); diff --git a/src/main/java/br/com/leonardoz/features/threads/UsingScopedValues.java b/src/main/java/br/com/leonardoz/features/threads/UsingScopedValues.java new file mode 100644 index 0000000..923f1a6 --- /dev/null +++ b/src/main/java/br/com/leonardoz/features/threads/UsingScopedValues.java @@ -0,0 +1,81 @@ +package br.com.leonardoz.features.threads; + +/** + * Scoped Values — {@code java.lang.ScopedValue} (Java 21 preview → graduated + * in later releases) + * + *

NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *

What are Scoped Values?

+ * + * A {@code ScopedValue} is an immutable, per-thread variable that is bound for + * a bounded scope (a {@code run()} or {@code call()} block). It is + * the recommended replacement for {@link ThreadLocal} when working with virtual + * threads or structured concurrency. + * + *

Comparison with ThreadLocal

+ * + * + * + * + * + * + * + * + * + * + * + *
FeatureThreadLocalScopedValue
MutabilityMutable (set/remove)Immutable (bound once per scope)
CleanupMust call remove(); leaks if forgottenAutomatic — cleaned up when scope exits
Virtual thread memoryEach thread holds its own copyShared read-only; no per-thread copy
Child task visibilityManual InheritableThreadLocalAutomatically inherited by StructuredTaskScope children
+ * + *

API Overview (Java 21)

+ * + *
{@code
+ * // 1. Declare — typically a static final field on a class
+ * static final ScopedValue CURRENT_USER = ScopedValue.newInstance();
+ *
+ * // 2. Bind and run — value visible only within the lambda/block
+ * ScopedValue.where(CURRENT_USER, "alice").run(() -> {
+ *     System.out.println("User: " + CURRENT_USER.get()); // "alice"
+ *     processRequest();   // value is visible anywhere in this call tree
+ * });
+ * // CURRENT_USER.get() here would throw NoSuchElementException
+ *
+ * // 3. Nested re-binding — child sees "bob"; parent still sees "alice"
+ * ScopedValue.where(CURRENT_USER, "alice").run(() ->
+ *     ScopedValue.where(CURRENT_USER, "bob").run(() ->
+ *         System.out.println(CURRENT_USER.get())  // "bob"
+ *     )
+ * );
+ *
+ * // 4. Returning a value from a scoped computation
+ * String result = ScopedValue.where(CURRENT_USER, "charlie")
+ *                            .call(() -> "hello, " + CURRENT_USER.get());
+ *
+ * // 5. Integration with StructuredTaskScope
+ * //    Child tasks forked inside a scope automatically inherit the binding.
+ * ScopedValue.where(CURRENT_USER, "dave").run(() -> {
+ *     try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
+ *         scope.fork(() -> auditLog(CURRENT_USER.get()));  // sees "dave"
+ *         scope.fork(() -> sendEmail(CURRENT_USER.get())); // sees "dave"
+ *         scope.join().throwIfFailed();
+ *     }
+ * });
+ * }
+ * + *

When to Use

+ * + *
    + *
  • Propagating per-request context (user identity, trace ID, locale) to + * all callees without passing it as a parameter.
  • + *
  • Any use of {@link ThreadLocal} that involves virtual threads or + * structured concurrency.
  • + *
+ * + * @see UsingVirtualThreads + * @see UsingStructuredConcurrency + */ +public class UsingScopedValues { + // Placeholder — add executable examples once the project is upgraded to Java 21. +} diff --git a/src/main/java/br/com/leonardoz/features/threads/UsingVirtualThreads.java b/src/main/java/br/com/leonardoz/features/threads/UsingVirtualThreads.java new file mode 100644 index 0000000..d0694b4 --- /dev/null +++ b/src/main/java/br/com/leonardoz/features/threads/UsingVirtualThreads.java @@ -0,0 +1,86 @@ +package br.com.leonardoz.features.threads; + +/** + * Virtual Threads — {@code java.lang.Thread.ofVirtual()} (Java 21) + * + *

NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *

What are Virtual Threads?

+ * + * Virtual threads are lightweight threads managed entirely by the JVM (not + * the OS). They are cheap to create — millions can coexist in a single JVM + * process — because they are not pinned to OS threads. Instead they are + * multiplexed onto a small pool of OS ("carrier") threads by the scheduler. + * + *

When to Use Virtual Threads

+ * + *
    + *
  • I/O-bound tasks: HTTP calls, JDBC queries, file I/O — anything that + * blocks waiting for external resources.
  • + *
  • High-concurrency servers: handle tens of thousands of simultaneous + * requests without a matching thread-pool size.
  • + *
+ * + * Virtual threads are NOT a replacement for parallel streams or the + * Fork/Join framework for CPU-bound computation. + * + *

Key API (Java 21)

+ * + *
{@code
+ * // 1. Create and start a single virtual thread
+ * Thread vt = Thread.ofVirtual()
+ *                   .name("my-vthread")
+ *                   .start(() -> System.out.println("Hello from " + Thread.currentThread()));
+ *
+ * // 2. Virtual thread-per-task executor — the primary production use case.
+ * //    Creates a new virtual thread for every submitted task.
+ * //    try-with-resources shuts down and waits for all tasks automatically.
+ * try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
+ *     IntStream.range(0, 10_000).forEach(i ->
+ *         executor.submit(() -> {
+ *             // Blocking I/O — the virtual thread is unmounted while waiting,
+ *             // freeing the carrier thread for other work.
+ *             Thread.sleep(Duration.ofMillis(100));
+ *             return i;
+ *         }));
+ * } // auto-close: awaits termination of all submitted tasks
+ *
+ * // 3. Detect whether the current thread is virtual
+ * boolean isVirtual = Thread.currentThread().isVirtual();
+ *
+ * // 4. ThreadFactory for virtual threads
+ * ThreadFactory factory = Thread.ofVirtual().name("worker-", 0).factory();
+ * }
+ * + *

Carrier Threads and Pinning

+ * + * A virtual thread runs on a carrier (platform) thread. When the virtual + * thread performs a blocking operation (sleep, I/O, {@code Lock.lock()}) it is + * unmounted from the carrier — the carrier is then free to run + * another virtual thread. The virtual thread is re-mounted when it is ready + * to continue. + * + *

Pinning occurs when a virtual thread cannot be unmounted. + * This happens when: + *

    + *
  • The virtual thread holds a {@code synchronized} monitor while + * blocking.
  • + *
  • It calls a native method or foreign function.
  • + *
+ * Pinning does not cause correctness problems but degrades throughput. Prefer + * {@code ReentrantLock} over {@code synchronized} in hot paths used with + * virtual threads. + * + *

ThreadLocal Caution

+ * + * {@link ThreadLocal} works with virtual threads but can use significant memory + * if each of millions of virtual threads holds a large value. Prefer + * {@code ScopedValue} (Java 21) for read-only per-request context propagation. + * + * @see UsingThreads + */ +public class UsingVirtualThreads { + // Placeholder — add executable examples once the project is upgraded to Java 21. +} diff --git a/src/main/java/br/com/leonardoz/patterns/producer_consumer/VirtualThreadProducerConsumer.java b/src/main/java/br/com/leonardoz/patterns/producer_consumer/VirtualThreadProducerConsumer.java new file mode 100644 index 0000000..adf83f8 --- /dev/null +++ b/src/main/java/br/com/leonardoz/patterns/producer_consumer/VirtualThreadProducerConsumer.java @@ -0,0 +1,73 @@ +package br.com.leonardoz.patterns.producer_consumer; + +/** + * Pattern: Virtual Thread Producer-Consumer (Java 21) + * + *

NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *

Motivation

+ * + * The classic {@link ProducerConsumer} pattern uses a bounded + * {@code BlockingQueue} shared between platform threads. The number of + * concurrent producers and consumers is constrained by the thread-pool size, + * because each blocking {@code put()}/{@code take()} call ties up an OS thread. + * + * Virtual threads remove this constraint: a blocked virtual thread is + * unmounted from its carrier thread while waiting, freeing the + * carrier for other work. You can therefore spawn one virtual thread per + * producer and one per consumer without exhausting OS resources. + * + *

Implementation Sketch (Java 21)

+ * + *
{@code
+ * public class VirtualThreadProducerConsumer {
+ *
+ *     private static final int CAPACITY = 1_000;
+ *     private final BlockingQueue queue = new LinkedBlockingQueue<>(CAPACITY);
+ *
+ *     // Each producer runs in its own virtual thread; blocking put() is cheap.
+ *     private final Callable producer = () -> {
+ *         for (int i = 0; i < 90_000; i++) {
+ *             queue.put(UUID.randomUUID().toString());
+ *         }
+ *         return null;
+ *     };
+ *
+ *     // Each consumer runs in its own virtual thread; blocking take() is cheap.
+ *     private final Callable consumer = () -> {
+ *         while (true) {
+ *             String item = queue.poll(5, TimeUnit.SECONDS);
+ *             if (item == null) break;
+ *             System.out.println("Consumed " + item + " on " + Thread.currentThread());
+ *         }
+ *         return null;
+ *     };
+ *
+ *     public void run(int numProducers, int numConsumers) throws InterruptedException {
+ *         // newVirtualThreadPerTaskExecutor() — one virtual thread per task
+ *         try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
+ *             IntStream.range(0, numProducers).forEach(i -> executor.submit(producer));
+ *             IntStream.range(0, numConsumers).forEach(i -> executor.submit(consumer));
+ *         }
+ *     }
+ * }
+ * }
+ * + *

Advantages over the Platform-Thread Version

+ * + *
    + *
  • No artificial ceiling on producer/consumer count — scale to thousands + * of concurrent producers and consumers.
  • + *
  • Simpler code — no need to tune thread-pool sizes; just submit one + * virtual thread per logical task.
  • + *
  • Lower memory footprint per thread compared to platform threads.
  • + *
+ * + * @see ProducerConsumer + * @see br.com.leonardoz.features.threads.UsingVirtualThreads + */ +public class VirtualThreadProducerConsumer { + // Placeholder — add executable implementation once the project is upgraded to Java 21. +} diff --git a/src/main/java/br/com/leonardoz/patterns/task_convergence/StructuredTaskConvergence.java b/src/main/java/br/com/leonardoz/patterns/task_convergence/StructuredTaskConvergence.java new file mode 100644 index 0000000..f88a786 --- /dev/null +++ b/src/main/java/br/com/leonardoz/patterns/task_convergence/StructuredTaskConvergence.java @@ -0,0 +1,85 @@ +package br.com.leonardoz.patterns.task_convergence; + +/** + * Pattern: Structured Task Convergence (Java 21) + * + *

NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *

Motivation

+ * + * {@link TaskConvergence} demonstrates convergence using a {@code CyclicBarrier}: + * each worker calls {@code barrier.await()} and all threads block until every + * participant arrives. This approach has several drawbacks: + *
    + *
  • Manual error handling — a single failure can leave other threads + * blocked at the barrier indefinitely (broken barrier).
  • + *
  • No automatic cancellation — if one worker throws, the others keep + * running.
  • + *
  • Shutdown boilerplate — separate {@code ExecutorService} shutdown + * and {@code awaitTermination} calls.
  • + *
+ * + * Structured concurrency ({@code StructuredTaskScope}) solves all of these + * problems. + * + *

Implementation Sketch (Java 21)

+ * + *
{@code
+ * public class StructuredTaskConvergence {
+ *
+ *     record WorkerResult(int workerId, List data) {}
+ *
+ *     WorkerResult doWork(int workerId) {
+ *         var rng = new Random();
+ *         var results = new ArrayList();
+ *         for (int i = 0; i < 400_000; i++) {
+ *             results.add((long) rng.nextInt(150_000));
+ *         }
+ *         return new WorkerResult(workerId, results);
+ *     }
+ *
+ *     void run() throws Exception {
+ *         int cores = Runtime.getRuntime().availableProcessors();
+ *
+ *         // ShutdownOnFailure: if any worker fails, cancel all others.
+ *         try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
+ *
+ *             var subtasks = IntStream.range(0, cores)
+ *                 .mapToObj(id -> scope.fork(() -> doWork(id)))
+ *                 .toList();
+ *
+ *             scope.join()           // wait for ALL workers to finish
+ *                  .throwIfFailed(); // re-throw first failure (if any)
+ *
+ *             // All workers succeeded — aggregate results
+ *             long total = subtasks.stream()
+ *                 .map(Subtask::get)
+ *                 .flatMap(r -> r.data().stream())
+ *                 .mapToLong(Long::longValue)
+ *                 .sum();
+ *
+ *             System.out.println("Converged. Total sum: " + total);
+ *         }
+ *     }
+ * }
+ * }
+ * + *

Key Improvements over CyclicBarrier

+ * + *
    + *
  • Automatic cancellation — one failure stops all other workers.
  • + *
  • Exception propagates cleanly to the caller via + * {@code throwIfFailed()}.
  • + *
  • No barrier counter to configure — just fork and join.
  • + *
  • Integrates naturally with virtual threads (each fork spawns a virtual + * thread by default).
  • + *
+ * + * @see TaskConvergence + * @see br.com.leonardoz.features.futures.UsingStructuredConcurrency + */ +public class StructuredTaskConvergence { + // Placeholder — add executable implementation once the project is upgraded to Java 21. +} diff --git a/src/main/java/br/com/leonardoz/patterns/task_execution/StructuredBackgroundTaskExecutor.java b/src/main/java/br/com/leonardoz/patterns/task_execution/StructuredBackgroundTaskExecutor.java new file mode 100644 index 0000000..7e4088b --- /dev/null +++ b/src/main/java/br/com/leonardoz/patterns/task_execution/StructuredBackgroundTaskExecutor.java @@ -0,0 +1,77 @@ +package br.com.leonardoz.patterns.task_execution; + +/** + * Pattern: Structured Background Task Executor (Java 21) + * + *

NOTE: This file requires Java 21+ to compile and run. + * It is provided as a reference for the planned upgrade. The current build + * targets Java 17. + * + *

Motivation

+ * + * {@link BackgroundTaskExecutor} wraps an {@code ExecutorService} and returns + * {@code Future} handles for submitted tasks. Callers must remember to shut + * down the executor, handle {@code InterruptedException} at every + * {@code Future.get()} call site, and manually cancel remaining futures if one + * fails. This unstructured concurrency is error-prone. + * + * Structured concurrency ({@code StructuredTaskScope}) provides the same + * capability with automatic cancellation, clean exception propagation, and + * guaranteed lifetime. + * + *

Implementation Sketch (Java 21)

+ * + *
{@code
+ * public class StructuredBackgroundTaskExecutor {
+ *
+ *     // Execute multiple tasks; fail fast if any task fails.
+ *     public  List executeAll(List> tasks) throws Exception {
+ *         try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
+ *
+ *             var subtasks = tasks.stream()
+ *                     .map(scope::fork)
+ *                     .toList();
+ *
+ *             scope.join().throwIfFailed();
+ *
+ *             return subtasks.stream()
+ *                     .map(StructuredTaskScope.Subtask::get)
+ *                     .toList();
+ *         }
+ *     }
+ *
+ *     // Execute several tasks; return the result of whichever finishes first.
+ *     public  T executeAny(List> tasks) throws Exception {
+ *         try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
+ *
+ *             tasks.forEach(scope::fork);
+ *
+ *             scope.join();
+ *             return scope.result();
+ *         }
+ *     }
+ * }
+ * }
+ * + *

Comparison with BackgroundTaskExecutor

+ * + * + * + * + * + * + * + * + * + * + * + * + * + *
BackgroundTaskExecutorStructuredBackgroundTaskExecutor
Cancellation on failureManual Future.cancel()Automatic (ShutdownOnFailure)
Executor shutdownManual shutdown()/awaitTermination()Automatic (try-with-resources)
Exception handlingChecked at every Future.get() call siteSingle throwIfFailed() propagates the first exception
Thread typePlatform threadsVirtual threads (default factory)
+ * + * @see BackgroundTaskExecutor + * @see br.com.leonardoz.features.futures.UsingStructuredConcurrency + */ +public class StructuredBackgroundTaskExecutor { + // Placeholder — add executable implementation once the project is upgraded to Java 21. +} diff --git a/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object/EventKeeper.java b/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object/EventKeeper.java index d909de1..4c2d117 100644 --- a/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object/EventKeeper.java +++ b/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object/EventKeeper.java @@ -1,66 +1,44 @@ package br.com.leonardoz.patterns.thread_safe.immutable_object; +import java.util.Arrays; import java.util.UUID; /** * Pattern: Immutable Object with Volatile Reference - * + * * Example: Event keeper + * + * Java 16+ update: {@code Event} is now a {@code record}. The compact + * constructor defensively copies the {@code payload} byte array so that the + * record remains truly immutable — arrays are mutable in Java even inside + * records. */ public class EventKeeper { - private volatile Event lastEvent = new Event(null, null, null, null, null); + private volatile Event lastEvent = new Event(null, null, null, null, new byte[0]); public void acceptEvent(String name, String type, String username, byte[] payload) { - switch (type) { - case "STORAGE": - lastEvent = new Event(UUID.randomUUID().toString(), name, type, username, payload); - break; - default: - break; - } + // Java 14+ switch expression: expression form, no fall-through, no break. + lastEvent = switch (type) { + case "STORAGE" -> new Event(UUID.randomUUID().toString(), name, type, username, payload); + default -> lastEvent; + }; } public Event getLastEvent() { return lastEvent; } - static final class Event { - private final String id; - private final String name; - private final String type; - private final String username; - private final byte[] payload; - - public Event(String id, String name, String type, String username, byte[] payload) { - super(); - this.id = id; - this.name = name; - this.type = type; - this.username = username; - this.payload = payload; - } - - public String getId() { - return id; + /** + * Java 16+ Record. + * + * The compact constructor copies {@code payload} so external callers cannot + * mutate the stored array after construction, preserving true immutability. + */ + record Event(String id, String name, String type, String username, byte[] payload) { + Event { + payload = Arrays.copyOf(payload, payload.length); } - - public String getName() { - return name; - } - - public String getType() { - return type; - } - - public String getUsername() { - return username; - } - - public byte[] getPayload() { - return payload; - } - } } diff --git a/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object/ImmutableObject.java b/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object/ImmutableObject.java index e7cc772..5b569b6 100644 --- a/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object/ImmutableObject.java +++ b/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object/ImmutableObject.java @@ -2,13 +2,13 @@ /** * Pattern: Immutable Object with Volatile Reference - * + * * Motivations: Mutable state is troublesome in concurrent environments for * being vulnerable to visibility and atomic related hazards, like inconsistent * state objects, lost updates, stale values and others. An Immutable objects is * one whose status does not mutate after constructed, and so is thread-safe. * IMPORTANT: references to immutable objects are not thread-safe. - * + * * Intent: Make all fields final and don't use any method that can mutate the * internal state of the object; don't expose the object until it's full * created; make the class final, prohibiting extension. It's important to refer @@ -16,10 +16,14 @@ * guarantees (every change to the immutable object variable will be made * visible to all threads), and any synchronizing mechanism to ensure * thread-safety for the reference. - * + * * Applicability: When sharing a object, use Immutable Objects being referenced * by a volatile fields whatever it's possible. * + * Java 16+ note: Records are the idiomatic replacement for immutable value + * classes. A {@code record} automatically provides final fields, a canonical + * constructor, and accessor methods, eliminating the boilerplate shown in + * older versions of this class. */ public class ImmutableObject { @@ -33,35 +37,14 @@ public void printMyImmutableObject() { System.out.println(safeReference); } - public final static class MyImmutableObject { - private final int id; - private final String aValue; - private final boolean anotherValue; - - public MyImmutableObject(int id, String aValue, boolean anotherValue) { - super(); - this.id = id; - this.aValue = aValue; - this.anotherValue = anotherValue; - } - - public int getId() { - return id; - } - - public String getaValue() { - return aValue; - } - - public boolean getAnotherValue() { - return anotherValue; - } - - @Override - public String toString() { - return "MyImmutableObject [id=" + id + ", aValue=" + aValue + ", anotherValue=" + anotherValue + "]"; - } - - } + /** + * Java 16+ Record: all fields are implicitly {@code private final}; the + * compiler generates the canonical constructor, accessors (id(), aValue(), + * anotherValue()), equals(), hashCode(), and toString() automatically. + * + * This replaces the previous hand-written {@code final class} with + * {@code private final} fields and explicit getters. + */ + public record MyImmutableObject(int id, String aValue, boolean anotherValue) {} } diff --git a/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement/ThreadLocalConfinement.java b/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement/ThreadLocalConfinement.java index d32341a..84a84d7 100644 --- a/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement/ThreadLocalConfinement.java +++ b/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement/ThreadLocalConfinement.java @@ -15,12 +15,9 @@ */ public class ThreadLocalConfinement { - private static final ThreadLocal threadLocalOject = new ThreadLocal() { - @Override - protected Object initialValue() { - return new Object(); - } - }; + // Java 8+: ThreadLocal.withInitial() is the preferred concise form. + private static final ThreadLocal threadLocalOject = + ThreadLocal.withInitial(Object::new); public Object getNowThreadSafeObjectInstance() { return threadLocalOject.get(); diff --git a/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement/ThreadSafeDateFormat.java b/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement/ThreadSafeDateFormat.java index 97a4184..e19e2f3 100644 --- a/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement/ThreadSafeDateFormat.java +++ b/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement/ThreadSafeDateFormat.java @@ -1,25 +1,35 @@ package br.com.leonardoz.patterns.thread_safe.thread_confinement; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; /** * Pattern: Thread Local Confinement - * - * Example: Using a thread safe SimpleDateFormat object - * + * + * Example: Thread-safe date/time formatting using {@link DateTimeFormatter}. + * + * Java 8+ note: {@link DateTimeFormatter} is immutable and thread-safe, so it + * can be stored in a plain {@code static final} field without any + * {@link ThreadLocal} wrapper. This example shows the preferred modern approach + * and intentionally omits {@code ThreadLocal} to highlight that the pattern is + * unnecessary when the underlying object is already thread-safe. + * + * If you are still using the legacy {@link java.text.SimpleDateFormat} (which + * is NOT thread-safe), use {@code ThreadLocal.withInitial()} instead: + * + *
{@code
+ * private static final ThreadLocal FMT =
+ *     ThreadLocal.withInitial(() -> new SimpleDateFormat("dd/MM/yyyy HH:mm:ss"));
+ * }
*/ public class ThreadSafeDateFormat { - private static final ThreadLocal threadLocalDateFormat = new ThreadLocal() { - @Override - protected SimpleDateFormat initialValue() { - return new SimpleDateFormat("DD/MM/YYYY HH:mm:ss"); - } - }; + // DateTimeFormatter is immutable and thread-safe — no ThreadLocal needed. + private static final DateTimeFormatter FORMATTER = + DateTimeFormatter.ofPattern("dd/MM/yyyy HH:mm:ss"); - public String format(Date date) { - return threadLocalDateFormat.get().format(date); + public String format(LocalDateTime dateTime) { + return FORMATTER.format(dateTime); } }