From 0e45e2ba7e3897611c7b7872178b3445125ed685 Mon Sep 17 00:00:00 2001 From: damon Date: Fri, 15 May 2026 11:23:16 +0800 Subject: [PATCH] Implement generic message handling and utility classes for mediator pattern --- .../java/com/nerosoft/mediator/Command.java | 2 +- .../java/com/nerosoft/mediator/Event.java | 2 +- .../java/com/nerosoft/mediator/Handler.java | 31 ++++- .../java/com/nerosoft/mediator/Query.java | 2 +- .../nerosoft/mediator/internal/Generic.java | 128 ++++++++++++++++++ .../nerosoft/mediator/internal/Message.java | 2 +- .../mediator/internal/StreamAggregator.java | 38 ++++++ .../mediator/internal/StreamSupplier.java | 16 +++ .../mediator/PipelinedMediatorTest.java | 4 + 9 files changed, 220 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/nerosoft/mediator/internal/Generic.java create mode 100644 src/main/java/com/nerosoft/mediator/internal/StreamAggregator.java create mode 100644 src/main/java/com/nerosoft/mediator/internal/StreamSupplier.java create mode 100644 src/test/java/com/nerosoft/mediator/PipelinedMediatorTest.java diff --git a/src/main/java/com/nerosoft/mediator/Command.java b/src/main/java/com/nerosoft/mediator/Command.java index 5d36f1f..8856c09 100644 --- a/src/main/java/com/nerosoft/mediator/Command.java +++ b/src/main/java/com/nerosoft/mediator/Command.java @@ -8,5 +8,5 @@ * A command is an instruction to perform a specific action, and it typically does not expect a response. * Commands are used to change the state of the system or to trigger some behavior. */ -public interface Command extends Message, Validatable { +public interface Command extends Message, Validatable { } diff --git a/src/main/java/com/nerosoft/mediator/Event.java b/src/main/java/com/nerosoft/mediator/Event.java index 8f7fbfe..88fc49f 100644 --- a/src/main/java/com/nerosoft/mediator/Event.java +++ b/src/main/java/com/nerosoft/mediator/Event.java @@ -6,5 +6,5 @@ * Represents an event that can be published to the mediator. * An event is a notification of something that has happened, and it typically does not expect a response. */ -public interface Event extends Message { +public interface Event extends Message { } diff --git a/src/main/java/com/nerosoft/mediator/Handler.java b/src/main/java/com/nerosoft/mediator/Handler.java index 0b30b29..509cf53 100644 --- a/src/main/java/com/nerosoft/mediator/Handler.java +++ b/src/main/java/com/nerosoft/mediator/Handler.java @@ -1,4 +1,33 @@ package com.nerosoft.mediator; -public interface Handler { +import com.nerosoft.mediator.internal.Generic; +import com.nerosoft.mediator.internal.Message; + +/** + * Defines a handler interface for processing messages of type T and producing a response of type R. + * This interface is a key component of the mediator pattern, + * allowing for the decoupling of message senders and receivers by providing a common contract for handling messages. + * Implementations of this interface will contain the logic to process specific types of messages and generate appropriate responses, + * enabling a flexible and extensible architecture for handling various operations within the application. + * @param the type of message to be handled + * @param the type of response produced by the handler + */ +public interface Handler, R> { + /** + * Handles the given message and produces a response. + * @param message the message to be processed by this handler + * @return the response produced by handling the message + */ + R handle(T message); + + /** + * Determines if this handler can process the given message based on its type. + * @param message the message to check + * @return true if this handler can process the message, otherwise false + */ + default boolean matches(T message) { + Generic generic = new Generic<>(getClass()) { + }; + return generic.resolve().isAssignableFrom(message.getClass()); + } } diff --git a/src/main/java/com/nerosoft/mediator/Query.java b/src/main/java/com/nerosoft/mediator/Query.java index bff3642..1a7e73c 100644 --- a/src/main/java/com/nerosoft/mediator/Query.java +++ b/src/main/java/com/nerosoft/mediator/Query.java @@ -9,5 +9,5 @@ * * @param the type of the response expected from the query. */ -public interface Query extends Message, Validatable { +public interface Query extends Message, Validatable { } diff --git a/src/main/java/com/nerosoft/mediator/internal/Generic.java b/src/main/java/com/nerosoft/mediator/internal/Generic.java new file mode 100644 index 0000000..f746ada --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/internal/Generic.java @@ -0,0 +1,128 @@ +package com.nerosoft.mediator.internal; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public abstract class Generic { + + private static Map, Type> RESOLVED_GENERICS = new ConcurrentHashMap<>(); + + private final Class context; + private final Type diamond; + + protected Generic(Class context) { + this.context = context; + this.diamond = capture(); + } + + private Type capture() { + Type superclass = getClass().getGenericSuperclass(); + if (!(superclass instanceof ParameterizedType)) + throw new IllegalArgumentException(superclass + " isn't parameterized"); + + return ((ParameterizedType) superclass).getActualTypeArguments()[0]; + } + + @SuppressWarnings("unchecked") + public Class resolve() { + return (Class) + RESOLVED_GENERICS.computeIfAbsent( + this, + it -> { + Mappings mappings = new Scanner().scan(context); + return mappings.get(diamond); + }); + } + + @Override + public boolean equals(Object that) { + if (this == that) return true; + if (!(that instanceof Generic)) return false; + Generic other = (Generic) that; + return context.equals(other.context) && diamond.equals(other.diamond); + } + + @Override + public int hashCode() { + return 31 * context.hashCode() + diamond.hashCode(); + } + + // for testing + static void setCache(ConcurrentHashMap, Type> cache) { + Generic.RESOLVED_GENERICS = cache; + } + + /** + * Walks the class hierarchy, collecting mappings between type variables and actual types. + */ + private static class Scanner { + + private final Mappings mappings = new Mappings(); + + public Mappings scan(Class clazz) { + scanSuperclass(clazz); + scanInterfaces(clazz); + return mappings; + } + + private void scanSuperclass(Class clazz) { + Type superclass = clazz.getGenericSuperclass(); + if (superclass instanceof ParameterizedType) { + mappings.add((ParameterizedType) superclass); + scan((Class) ((ParameterizedType) superclass).getRawType()); + } else if (superclass instanceof Class) { + scan((Class) superclass); + } + } + + private void scanInterfaces(Class clazz) { + for (Type iface : clazz.getGenericInterfaces()) { + if (iface instanceof ParameterizedType) { + mappings.add((ParameterizedType) iface); + scan((Class) ((ParameterizedType) iface).getRawType()); + } + } + } + } + + private static class Mappings { + + // Map of "type variable → actual type". + // For example: if class MyHandler implements Handler, + // and the handler is Handler, R> + // then we'll map C → UserCommand, R → Result. + private final Map, Type> mappings = new HashMap<>(); + + /** + * Adds mappings from type variables (like ) to actual arguments (like UserCommand, + * Result). + */ + public void add(ParameterizedType type) { + TypeVariable[] generics = ((Class) type.getRawType()).getTypeParameters(); + Type[] concretes = type.getActualTypeArguments(); + for (int i = 0; i < generics.length; i++) { + mappings.put(generics[i], concretes[i]); + } + } + + public Type get(Type type) { + if (type instanceof TypeVariable) { + // If it's a type variable like "C" or "R", look up what it was bound to in the current + // context. + Type replacement = mappings.get(type); + // Recursively resolve in case replacement itself is another type variable. + if (replacement != null) return get(replacement); + } else if (type instanceof ParameterizedType) { + // Example: List → List.class + return ((ParameterizedType) type).getRawType(); + } + + // If it's already a raw class (e.g., String.class), just return it. + return type; + } + } +} diff --git a/src/main/java/com/nerosoft/mediator/internal/Message.java b/src/main/java/com/nerosoft/mediator/internal/Message.java index 3f1d00f..8fe6221 100644 --- a/src/main/java/com/nerosoft/mediator/internal/Message.java +++ b/src/main/java/com/nerosoft/mediator/internal/Message.java @@ -1,4 +1,4 @@ package com.nerosoft.mediator.internal; -public interface Message { +public interface Message { } diff --git a/src/main/java/com/nerosoft/mediator/internal/StreamAggregator.java b/src/main/java/com/nerosoft/mediator/internal/StreamAggregator.java new file mode 100644 index 0000000..d018cbe --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/internal/StreamAggregator.java @@ -0,0 +1,38 @@ +package com.nerosoft.mediator.internal; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.function.BiFunction; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toCollection; + +/** + * A utility class that provides a method to merge the elements of a stream in reverse order using a provided accumulator function. + * @param the type of elements in the stream + */ +class StreamAggregator { + private final Stream stream; + + StreamAggregator(Stream stream) { + this.stream = stream; + } + + /** + * Merges the elements of the stream in reverse order using the provided accumulator function. + * @param seed the initial value for the accumulation + * @param accumulator a function that takes an element of the stream and the current accumulated value, and returns a new accumulated value + * @return the result of merging the elements of the stream + * @param the type of the accumulated value + */ + U merge(U seed, BiFunction accumulator) { + Iterator iterator = stream.collect(toCollection(LinkedList::new)).descendingIterator(); + U result = seed; + while (iterator.hasNext()) { + T element = iterator.next(); + result = accumulator.apply(element, result); + } + return result; + } +} + diff --git a/src/main/java/com/nerosoft/mediator/internal/StreamSupplier.java b/src/main/java/com/nerosoft/mediator/internal/StreamSupplier.java new file mode 100644 index 0000000..d933f3e --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/internal/StreamSupplier.java @@ -0,0 +1,16 @@ +package com.nerosoft.mediator.internal; + +import java.util.stream.Stream; + +@FunctionalInterface +public interface StreamSupplier { + Stream supply(); + + /** + * Creates a StreamAggregator that can be used to aggregate the elements of the stream supplied by this StreamSupplier. + * @return a StreamAggregator for the elements of the stream + */ + default StreamAggregator aggregate() { + return new StreamAggregator<>(supply()); + } +} diff --git a/src/test/java/com/nerosoft/mediator/PipelinedMediatorTest.java b/src/test/java/com/nerosoft/mediator/PipelinedMediatorTest.java new file mode 100644 index 0000000..abf376e --- /dev/null +++ b/src/test/java/com/nerosoft/mediator/PipelinedMediatorTest.java @@ -0,0 +1,4 @@ +package com.nerosoft.mediator; + +public class PipelinedMediatorTest { +}