diff --git a/README.md b/README.md index 24655a2..a0225ee 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,300 @@ # Mediator -A mediator component for CQRS pattern application. + +一个轻量级的 Java Mediator 组件,用于在 CQRS 场景下统一处理 `Command`、`Query`、`Event`,并支持中间件管道、消息验证和事件并行策略。 + +## 项目简介 + +`Mediator` 通过“消息 + 处理器”的模式解耦业务调用方与实现方: + +- `Command`:执行动作(通常无返回值) +- `Query`:查询数据(有返回值) +- `Event`:发布通知(可被多个处理器订阅) + +默认实现为 `PipelinedMediator`,核心特性: + +- 自动匹配消息处理器(`Handler`) +- 支持中间件链(`Middleware`) +- 支持验证器(`Validator`),失败时抛出 `ValidationException` +- 支持事件并行分发策略(`HandlerParallelStrategy`) +- 支持事件异常处理策略(`HandlerExceptionStrategy`) + +## 依赖与环境 + +项目为 Maven 工程(见 `pom.xml`): + +- `groupId`: `com.nerosoft` +- `artifactId`: `Mediator` +- `version`: `1.0.0` +- 测试依赖:`org.junit.jupiter:junit-jupiter:6.0.3` +- 编译版本:`maven.compiler.source/target = 25` + +## 使用方法 + +### 1. 定义 Command 与 Handler + +```java +public record UserCreateCommand(String name, String email) implements Command {} + +public class UserCreateCommandHandler implements Handler { + @Override + public Void handle(UserCreateCommand message) { + System.out.println("create user: " + message.email()); + return null; + } +} +``` + +### 2. (可选)定义 Validator + +```java +public class UserCreateCommandValidator implements Validator { + @Override + public ValidationResult validate(UserCreateCommand message) { + if (message.name() == null || message.name().isBlank()) { + return ValidationResult.failure("Name is required"); + } + if (message.email() == null || !message.email().contains("@")) { + return ValidationResult.failure("Email is invalid"); + } + return ValidationResult.success(); + } +} +``` + +### 3. 组装 Mediator + +```java +Mediator mediator = new PipelinedMediator() + .use(() -> java.util.stream.Stream.of(new UserCreateCommandHandler())) + .use(() -> java.util.stream.Stream.of(new UserCreateCommandValidator())) + .use(() -> java.util.stream.Stream.of( + (message, next) -> { + long start = System.nanoTime(); + try { + return next.invoke(); + } finally { + long cost = System.nanoTime() - start; + System.out.println("handled " + message.getClass().getSimpleName() + " in " + cost + " ns"); + } + } + )); +``` + +### 4. 发送消息 + +```java +mediator.send(new UserCreateCommand("Alice", "alice@example.com")); +``` + +如校验失败,会抛出 `ValidationException`,可通过 `getErrors()` 读取错误列表。 + +## Spring Boot 集成方法 + +本项目本身不依赖 Spring;推荐在你的 Spring Boot 业务工程中引入该库后,通过 `@Configuration` 装配 `Mediator`。 + +### 1. 在业务工程引入依赖 + +如果你已将该库发布到私有仓库或本地仓库,可在业务工程 `pom.xml` 中添加: + +```xml + + com.nerosoft + Mediator + 1.0.0 + +``` + +### 2. 将 Handler / Validator / Middleware 交给 Spring 管理 + +```java +@Component +public class UserCreateCommandHandler implements Handler { + @Override + public Void handle(UserCreateCommand message) { + return null; + } +} + +@Component +public class UserCreateCommandValidator implements Validator { + @Override + public ValidationResult validate(UserCreateCommand message) { + return ValidationResult.success(); + } +} +``` + +### 3. 在配置类中组装 `PipelinedMediator` + +```java +import com.nerosoft.mediator.*; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.Executors; + +@Configuration +public class MediatorConfiguration { + + @Bean + public Mediator mediator(ApplicationContext applicationContext) { + return new PipelinedMediator() + .use(() -> applicationContext.getBeansOfType(Handler.class).values().stream()) + .use(() -> applicationContext.getBeansOfType(Validator.class).values().stream()) + .use(() -> applicationContext.getBeansOfType(Middleware.class).values().stream()) + .use(() -> Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); + } +} +``` + +### 4. 在业务服务中使用 + +```java +import com.nerosoft.mediator.Mediator; +import org.springframework.stereotype.Service; + +@Service +public class UserApplicationService { + private final Mediator mediator; + + public UserApplicationService(Mediator mediator) { + this.mediator = mediator; + } + + public void createUser(String name, String email) { + mediator.send(new UserCreateCommand(name, email)); + } +} +``` + +说明: + +- `Handler` 默认按“消息泛型类型”匹配 +- 多个 `Middleware` 会按流顺序组成责任链 +- `Validator` 返回失败时会抛出 `ValidationException`,可统一在全局异常处理中转换为 HTTP 响应 + +## 中间件使用方法 + +`Middleware` 用于在消息进入 `Handler` 之前或之后插入通用逻辑,适合做日志、耗时统计、权限检查、审计、链路追踪等横切处理。 + +### 1. 中间件接口 + +```java +@FunctionalInterface +public interface Middleware { + Object handle(com.nerosoft.mediator.internal.Message message, com.nerosoft.mediator.internal.MiddlewareDelegate next); +} +``` + +其中: + +- `message`:当前正在处理的消息 +- `next`:责任链中的下一个中间件或最终 `Handler` + +### 2. 注册中间件 + +在创建 `PipelinedMediator` 时,通过 `.use(() -> Stream.of(...))` 传入中间件流: + +```java +Mediator mediator = new PipelinedMediator() + .use(() -> java.util.stream.Stream.of(new UserCreateCommandHandler())) + .use(() -> java.util.stream.Stream.of(new UserCreateCommandValidator())) + .use(() -> java.util.stream.Stream.of( + (message, next) -> { + System.out.println("before: " + message.getClass().getSimpleName()); + try { + return next.invoke(); + } finally { + System.out.println("after: " + message.getClass().getSimpleName()); + } + } + )); +``` + +### 3. 执行顺序 + +中间件会按照注册顺序形成链式调用: + +1. 第一个中间件先执行 +2. 调用 `next.invoke()` 进入下一个中间件 +3. 最后到达对应的 `Handler` +4. 返回结果后,中间件可以继续做收尾处理 + +如果你注册了多个中间件,它们的执行顺序与传入流的顺序一致。 + +### 4. 常见使用场景 + +#### 日志与耗时统计 + +```java +(message, next) -> { + long start = System.nanoTime(); + try { + return next.invoke(); + } finally { + long cost = System.nanoTime() - start; + System.out.println("cost(ns): " + cost); + } +} +``` + +#### 权限或参数预检查 + +```java +(message, next) -> { + if (message == null) { + throw new IllegalArgumentException("message can not be null"); + } + return next.invoke(); +} +``` + +### 5. 与 Spring Boot 结合 + +如果项目已集成 Spring Boot,可以把中间件声明成 `@Component`,然后在配置类中统一注入到 `PipelinedMediator`: + +```java +@Bean +public Mediator mediator(ApplicationContext applicationContext) { + return new PipelinedMediator() + .use(() -> applicationContext.getBeansOfType(Middleware.class).values().stream()); +} +``` + +## Event 并行策略(可选) + +给事件类型添加注解控制并发行为: + +```java +@HandlerParallelStrategy(HandlerParallelStrategy.WHEN_ALL) +@HandlerExceptionStrategy(HandlerExceptionStrategy.CONTINUE) +public class UserCreatedEvent implements Event {} +``` + +- `NO_WAIT`:派发后不等待 +- `WHEN_ALL`:等待全部处理器完成 +- `WHEN_ANY`:任一处理器完成即继续 +- `STOP`:任一处理器异常立即终止 +- `CONTINUE`:收集异常,最后统一抛出 + +## 包内容 + +- `com.nerosoft.mediator` + - 核心抽象:`Mediator`、`Command`、`Query`、`Event` + - 扩展点:`Handler`、`Middleware`、`Validator` + - 默认实现:`PipelinedMediator` +- `com.nerosoft.mediator.strategy` + - 事件并行与异常策略注解 +- `com.nerosoft.mediator.validation` + - `ValidationResult`、`ValidationException` +- `com.nerosoft.mediator.internal` + - 内部支持类型(消息基类、流供应器、异常聚合等) + +## 快速构建 + +```bash +mvn clean test +``` + +如果本地 JDK 版本与 `pom.xml` 不一致,请先调整 JDK 或修改 `maven.compiler.source/target`。 diff --git a/pom.xml b/pom.xml index bbaa5b3..8c986e6 100644 --- a/pom.xml +++ b/pom.xml @@ -14,4 +14,18 @@ UTF-8 + + + org.junit.jupiter + junit-jupiter + 6.0.3 + test + + + org.junit.jupiter + junit-jupiter-api + 6.0.3 + test + + \ No newline at end of file diff --git a/src/main/java/com/nerosoft/mediator/Executor.java b/src/main/java/com/nerosoft/mediator/Executor.java new file mode 100644 index 0000000..0ebe7fd --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/Executor.java @@ -0,0 +1,43 @@ +package com.nerosoft.mediator; + +import com.nerosoft.mediator.internal.ExceptionHandle; + +import static java.util.concurrent.CompletableFuture.runAsync; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; + +class Executor { + static void run(List tasks, ExecutorService concurrentPolicy, ExceptionHandle onException) { + try { + tasks.forEach(task -> runAsync(task, concurrentPolicy)); + } catch (Throwable e) { + onException.handleException(e); + } + } + + static void whenAll(List tasks, ExecutorService concurrentPolicy, ExceptionHandle onException) { + CompletableFuture.allOf(tasks.stream() + .map(task -> { + return CompletableFuture.runAsync(task, concurrentPolicy) + .exceptionally(ex -> { + onException.handleException(ex); + return null; + }); + }) + .toArray(CompletableFuture[]::new)) + .join(); + } + + static void whenAny(List tasks, ExecutorService concurrentPolicy, ExceptionHandle onException) { + List> futures = tasks.stream() + .map(task -> CompletableFuture.runAsync(task, concurrentPolicy) + .exceptionally(ex -> { + onException.handleException(ex); + return null; + })) + .toList(); + CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{})).join(); + } +} diff --git a/src/main/java/com/nerosoft/mediator/PipelinedMediator.java b/src/main/java/com/nerosoft/mediator/PipelinedMediator.java index 0c094fa..1dc0634 100644 --- a/src/main/java/com/nerosoft/mediator/PipelinedMediator.java +++ b/src/main/java/com/nerosoft/mediator/PipelinedMediator.java @@ -1,23 +1,198 @@ package com.nerosoft.mediator; +import com.nerosoft.mediator.internal.*; +import com.nerosoft.mediator.strategy.HandlerExceptionStrategy; +import com.nerosoft.mediator.strategy.HandlerParallelStrategy; +import com.nerosoft.mediator.validation.ValidationException; +import com.nerosoft.mediator.validation.ValidationResult; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * This class is an implementation of the Mediator interface that provides a pipelined approach to handling commands, queries, and events. + * It allows for the processing of commands, queries, and events in a sequential manner, where each command, query, + * or event is processed one at a time, and the next one is not processed until the current one is completed. + * This can be useful in scenarios where the order of processing is important, + * or when there are dependencies between commands, queries, and events that need to be respected. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) public class PipelinedMediator implements Mediator { + private StreamSupplier handlers = Stream::empty; + private StreamSupplier middlewares = Stream::empty; + private StreamSupplier validators = Stream::empty; + private Supplier concurrentPolicy = Executors::newCachedThreadPool; + + /** + * Configures the mediator to use the provided stream of handlers for processing commands, queries, and events. + * + * @param handlers the stream of handlers to be used by the mediator + * @return the current instance of PipelinedMediator for method chaining + */ + public PipelinedMediator use(HandlerStream handlers) { + this.handlers = handlers::supply; + return this; + } + + public PipelinedMediator use(MiddlewareStream middlewares) { + this.middlewares = middlewares::supply; + return this; + } + + public PipelinedMediator use(ValidatorStream validators) { + this.validators = validators::supply; + return this; + } + + public PipelinedMediator use(Supplier concurrentPolicy) { + this.concurrentPolicy = concurrentPolicy; + return this; + } + @Override public void send(T command) { - + checkArguments(command, "Command can not be null."); + validate(command); + var handler = resolveHandler(command); + var pipeline = buildMiddlewarePipeline(command, () -> handler.handle(command)); + pipeline.invoke(); } @Override public , R> R execute(T query) { - return null; + checkArguments(query, "Query can not be null."); + validate(query); + var handler = resolveHandler(query); + var pipeline = buildMiddlewarePipeline(query, () -> handler.handle(query)); + return (R) pipeline.invoke(); } @Override public , R> void execute(T query, R response) { - + checkArguments(query, "Query can not be null."); } @Override public void publish(T event) { + checkArguments(event, "Event can not be null."); + + List tasks = handlers.supply() + .filter(handler -> handler.matches(event)) + .map(handler -> (Handler) handler) + .map(handler -> (Runnable) () -> { + var pipeline = buildMiddlewarePipeline(event, () -> handler.handle(event)); + pipeline.invoke(); + }) + .toList(); + + if (tasks.isEmpty()) { + return; + } + + HandlerParallelStrategy parallelStrategy = event.getClass().getAnnotation(HandlerParallelStrategy.class); + HandlerExceptionStrategy exceptionStrategy = event.getClass().getAnnotation(HandlerExceptionStrategy.class); + + var parallelStrategyValue = parallelStrategy != null ? parallelStrategy.value() : HandlerParallelStrategy.No_WAIT; + var exceptionStrategyValue = exceptionStrategy != null ? exceptionStrategy.value() : HandlerExceptionStrategy.CONTINUE; + + List exceptions = new java.util.ArrayList<>(); + + ExceptionHandle exceptionHandle = exception -> { + if (Objects.equals(exceptionStrategyValue, HandlerExceptionStrategy.STOP)) { + throw new RuntimeException(exception); + } else { + exceptions.add(exception); + } + }; + + switch (parallelStrategyValue) { + case HandlerParallelStrategy.No_WAIT -> Executor.run(tasks, concurrentPolicy.get(), exceptionHandle); + case HandlerParallelStrategy.WHEN_ALL -> Executor.whenAll(tasks, concurrentPolicy.get(), exceptionHandle); + case HandlerParallelStrategy.WHEN_ANY -> Executor.whenAny(tasks, concurrentPolicy.get(), exceptionHandle); + } + + if (!exceptions.isEmpty()) { + throw new AggregateException(exceptions); + } + } + + /** + * Resolves the appropriate handler for the given message by filtering through the stream of handlers and finding the first one that matches the message type. + * If no matching handler is found, it throws a RuntimeException indicating that no handler was found for the message. + * This method is crucial for ensuring that messages are processed by the correct handlers based on their types, + * allowing for a flexible and extensible architecture in the mediator pattern. + * + * @param message the message for which a handler is to be resolved + * @param the type of the message + * @param the type of the response produced by the message handler + * @return the resolved handler for the given message + */ + private , R> Handler resolveHandler(T message) { + // resolve handler from handlers stream + return handlers.supply() + .filter(handler -> handler.matches(message)) + .map(handler -> (Handler) handler) + .findFirst() + .orElseThrow(() -> new RuntimeException("No handler found for message: " + message.getClass().getName())); + } + + /** + * Validates the given message using the available validators. It iterates through the stream of validators, + * applies each validator to the message, and collects any validation errors. If there are any validation errors, it throws a ValidationException containing the list of errors. This method ensures that messages are validated before they are processed by the handlers, allowing for better error handling and improved code readability when dealing with validation logic in the mediator pattern. + * + * @param message the message to be validated + * @param the type of the message + * @param the type of the response produced by the message handler + */ + private , R> void validate(T message) { + var errors = validators.supply() + .map(validator -> validator.validate(message)) + .filter(ValidationResult::isFailure) + .flatMap(result -> result.getErrors().stream()) + .toList(); + if (!errors.isEmpty()) { + throw new ValidationException(errors); + } + } + + /** + * Checks if the provided argument is not null. + * If the argument is null, it throws an IllegalArgumentException with the provided message. + * + * @param argument the argument to be checked for nullity + * @param message the exception message to be used if the argument is null + */ + private void checkArguments(Object argument, String message) { + if (argument != null) { + return; + } + + throw new IllegalArgumentException(message != null ? message : "Argument can not be null."); + } + /** + * Builds a middleware pipeline for the given message and final action. + * The pipeline is constructed by wrapping the final action with each applicable middleware in reverse order, + * allowing each middleware to process the message before and/or after the final action is invoked. + * + * @param message the message to be processed by the middleware pipeline + * @param finalAction the final action to be executed after all middlewares have been applied + * @param the type of the message + * @param the type of the response produced by the message handler + * @return a delegate representing the complete middleware pipeline + */ + private , R> MiddlewareDelegate buildMiddlewarePipeline(T message, MiddlewareDelegate finalAction) { + var applicableMiddlewares = middlewares.supply().toList(); + MiddlewareDelegate delegate = finalAction; + for (int i = applicableMiddlewares.size() - 1; i >= 0; i--) { + Middleware middleware = applicableMiddlewares.get(i); + MiddlewareDelegate next = delegate; + delegate = () -> middleware.handle(message, next); + } + return delegate; } } diff --git a/src/main/java/com/nerosoft/mediator/internal/AggregateException.java b/src/main/java/com/nerosoft/mediator/internal/AggregateException.java new file mode 100644 index 0000000..0eba050 --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/internal/AggregateException.java @@ -0,0 +1,17 @@ +package com.nerosoft.mediator.internal; + +import java.util.List; + +public class AggregateException extends RuntimeException { + + private final List exceptions; + + public AggregateException(List exceptions) { + super("Multiple exceptions occurred during handler execution."); + this.exceptions = exceptions; + } + + public List getExceptions() { + return exceptions; + } +} diff --git a/src/main/java/com/nerosoft/mediator/internal/ExceptionHandle.java b/src/main/java/com/nerosoft/mediator/internal/ExceptionHandle.java new file mode 100644 index 0000000..8319f9c --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/internal/ExceptionHandle.java @@ -0,0 +1,6 @@ +package com.nerosoft.mediator.internal; + +@FunctionalInterface +public interface ExceptionHandle { + void handleException(Throwable e); +} diff --git a/src/main/java/com/nerosoft/mediator/internal/HandlerStream.java b/src/main/java/com/nerosoft/mediator/internal/HandlerStream.java new file mode 100644 index 0000000..c0dc455 --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/internal/HandlerStream.java @@ -0,0 +1,21 @@ +package com.nerosoft.mediator.internal; + +import com.nerosoft.mediator.Handler; + +import java.util.stream.Stream; + +/** + * A functional interface that represents a supplier of a stream of handlers. + * This interface is used to provide a stream of handlers to the mediator for processing commands. + */ +@FunctionalInterface +public interface HandlerStream { + + /** + * Supplies a stream of handlers to the mediator for processing commands. + * + * @return a stream of handlers + */ + @SuppressWarnings("rawtypes") + Stream supply(); +} diff --git a/src/main/java/com/nerosoft/mediator/internal/MiddlewareStream.java b/src/main/java/com/nerosoft/mediator/internal/MiddlewareStream.java new file mode 100644 index 0000000..20a6280 --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/internal/MiddlewareStream.java @@ -0,0 +1,19 @@ +package com.nerosoft.mediator.internal; + +import com.nerosoft.mediator.Middleware; + +import java.util.stream.Stream; + +/** + * A functional interface that represents a supplier of a stream of middleware components. + * This interface is used to provide a stream of middleware components to the mediator for processing commands. + */ +@FunctionalInterface +public interface MiddlewareStream { + /** + * Supplies a stream of middleware components to the mediator for processing commands. + * + * @return a stream of middleware components + */ + Stream supply(); +} diff --git a/src/main/java/com/nerosoft/mediator/internal/ValidatorStream.java b/src/main/java/com/nerosoft/mediator/internal/ValidatorStream.java new file mode 100644 index 0000000..92e0eae --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/internal/ValidatorStream.java @@ -0,0 +1,20 @@ +package com.nerosoft.mediator.internal; + +import com.nerosoft.mediator.Validator; + +import java.util.stream.Stream; + +/** + * A functional interface that represents a supplier of a stream of validators. + * This interface is used to provide a stream of validators to the mediator for processing commands. + */ +@FunctionalInterface +public interface ValidatorStream { + /** + * Supplies a stream of validators to the mediator for processing commands. + * + * @return a stream of validators + */ + @SuppressWarnings("rawtypes") + Stream supply(); +} diff --git a/src/main/java/com/nerosoft/mediator/strategy/HandlerExceptionStrategy.java b/src/main/java/com/nerosoft/mediator/strategy/HandlerExceptionStrategy.java new file mode 100644 index 0000000..33bf4d4 --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/strategy/HandlerExceptionStrategy.java @@ -0,0 +1,30 @@ +package com.nerosoft.mediator.strategy; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Defines the strategy for handling exceptions that occur in handlers within the mediator pattern. + * This enum provides two strategies for handling exceptions: + * 1. STOP: If an exception occurs in any handler, the processing of the message will be stopped immediately, and no further handlers will be invoked. This is useful when you want to ensure that if any handler fails, the entire processing of the message is halted to prevent inconsistent states or unintended side effects. + * 2. CONTINUE: If an exception occurs in a handler, the processing will continue to the next handler in the chain. This allows other handlers to attempt to process the message, even if one handler fails. This can be useful in scenarios where you want to allow for partial processing of a message, or when you want to ensure that all handlers have a chance to process the message, regardless of any exceptions that may occur in individual handlers. + */ +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface HandlerExceptionStrategy { + + /** + * Stops the processing of the message if an exception occurs in any handler. + */ + public final String STOP = "STOP"; + + /** + * Continues to the next handler if an exception occurs in the current handler. + * This allows other handlers to attempt to process the message, even if one handler fails. + */ + public final String CONTINUE = "CONTINUE"; + + String value() default CONTINUE; +} diff --git a/src/main/java/com/nerosoft/mediator/strategy/HandlerParallelStrategy.java b/src/main/java/com/nerosoft/mediator/strategy/HandlerParallelStrategy.java new file mode 100644 index 0000000..4bd95b7 --- /dev/null +++ b/src/main/java/com/nerosoft/mediator/strategy/HandlerParallelStrategy.java @@ -0,0 +1,34 @@ +package com.nerosoft.mediator.strategy; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface HandlerParallelStrategy { + /** + * Runs handlers in parallel without waiting for any of them to complete. + * This strategy is useful when you want to fire-and-forget handlers, + * allowing them to execute independently without blocking the main thread or waiting for their results. + * Handlers will be executed concurrently, and the main thread will continue immediately after dispatching the handlers. + */ + public final String No_WAIT = "NO_WAIT"; + + /** + * Waits for all handlers to complete before proceeding. + * This strategy is useful when you need to ensure that all handlers have finished processing before moving on to the next step in the workflow. + * The main thread will block until all handlers have completed their execution. + */ + public final String WHEN_ALL = "WHEN_ALL"; + + /** + * Waits for any handler to complete before proceeding. + * This strategy is useful when you need to continue processing as soon as the first handler finishes, without waiting for all handlers to complete. + * The main thread will block until at least one handler has completed its execution. + */ + public final String WHEN_ANY = "WHEN_ANY"; + + String value() default No_WAIT; +} diff --git a/src/main/java/com/nerosoft/mediator/validation/ValidationException.java b/src/main/java/com/nerosoft/mediator/validation/ValidationException.java index 05ac7a0..31bd4eb 100644 --- a/src/main/java/com/nerosoft/mediator/validation/ValidationException.java +++ b/src/main/java/com/nerosoft/mediator/validation/ValidationException.java @@ -22,7 +22,7 @@ public class ValidationException extends RuntimeException { * Creates a new ValidationException with the specified list of error messages. * @param errors the list of error messages describing the validation failure */ - private ValidationException(List errors) { + public ValidationException(List errors) { super("Validation failed: " + String.join(", ", errors)); this.result = ValidationResult.failure(errors); } diff --git a/src/test/java/com/nerosoft/mediator/CommandTest.java b/src/test/java/com/nerosoft/mediator/CommandTest.java new file mode 100644 index 0000000..83f25fd --- /dev/null +++ b/src/test/java/com/nerosoft/mediator/CommandTest.java @@ -0,0 +1,4 @@ +package com.nerosoft.mediator; + +class CommandTest { +} diff --git a/src/test/java/com/nerosoft/mediator/LoggingMiddleware.java b/src/test/java/com/nerosoft/mediator/LoggingMiddleware.java new file mode 100644 index 0000000..cab5494 --- /dev/null +++ b/src/test/java/com/nerosoft/mediator/LoggingMiddleware.java @@ -0,0 +1,15 @@ +package com.nerosoft.mediator; + +import com.nerosoft.mediator.internal.Message; +import com.nerosoft.mediator.internal.MiddlewareDelegate; + +public class LoggingMiddleware implements Middleware { + + @Override + public Object handle(Message message, MiddlewareDelegate next) { + System.out.println("LoggingMiddleware: Handling message of type " + message.getClass().getSimpleName()); + Object result = next.invoke(); + System.out.println("LoggingMiddleware: Finished handling message of type " + message.getClass().getSimpleName()); + return result; + } +} diff --git a/src/test/java/com/nerosoft/mediator/PipelinedMediatorTest.java b/src/test/java/com/nerosoft/mediator/PipelinedMediatorTest.java index abf376e..69d8477 100644 --- a/src/test/java/com/nerosoft/mediator/PipelinedMediatorTest.java +++ b/src/test/java/com/nerosoft/mediator/PipelinedMediatorTest.java @@ -1,4 +1,27 @@ package com.nerosoft.mediator; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.Executors; +import java.util.stream.Stream; + public class PipelinedMediatorTest { + private final Mediator mediator; + + public PipelinedMediatorTest() { + mediator = new PipelinedMediator() + .use(() -> Stream.of(new UserCreateCommandHandler(), new UserCreatedEventHandler())) + .use(() -> Stream.of(new UserCreateCommandValidator())) + .use(() -> Stream.of(new LoggingMiddleware())) + .use(() -> Executors.newFixedThreadPool(4)); + } + + @Test + public void testMediator() { + mediator.send(new UserCreateCommand("John Doe", "johndoe@sample.com")); + + var users = UserStore.getInstance().getUsers(); + assert users.size() == 1; + assert users.get(0).name().equals("John Doe"); + } } diff --git a/src/test/java/com/nerosoft/mediator/User.java b/src/test/java/com/nerosoft/mediator/User.java new file mode 100644 index 0000000..af052f7 --- /dev/null +++ b/src/test/java/com/nerosoft/mediator/User.java @@ -0,0 +1,4 @@ +package com.nerosoft.mediator; + +public record User(Long id, String name, String email) { +} diff --git a/src/test/java/com/nerosoft/mediator/UserCreateCommand.java b/src/test/java/com/nerosoft/mediator/UserCreateCommand.java new file mode 100644 index 0000000..0b1dee9 --- /dev/null +++ b/src/test/java/com/nerosoft/mediator/UserCreateCommand.java @@ -0,0 +1,4 @@ +package com.nerosoft.mediator; + +public record UserCreateCommand(String name, String email) implements Command { +} diff --git a/src/test/java/com/nerosoft/mediator/UserCreateCommandHandler.java b/src/test/java/com/nerosoft/mediator/UserCreateCommandHandler.java new file mode 100644 index 0000000..f6c5e05 --- /dev/null +++ b/src/test/java/com/nerosoft/mediator/UserCreateCommandHandler.java @@ -0,0 +1,12 @@ +package com.nerosoft.mediator; + +public class UserCreateCommandHandler implements Handler { + @Override + public Void handle(UserCreateCommand message) { + System.out.printf("UserCreateCommandHandler received command: %s\n", message); + User user = new User(System.currentTimeMillis(), message.name(), message.email()); + UserStore.getInstance().addUser(user); + System.out.printf("User created: %s\n", user); + return null; + } +} diff --git a/src/test/java/com/nerosoft/mediator/UserCreateCommandValidator.java b/src/test/java/com/nerosoft/mediator/UserCreateCommandValidator.java new file mode 100644 index 0000000..a7f17ae --- /dev/null +++ b/src/test/java/com/nerosoft/mediator/UserCreateCommandValidator.java @@ -0,0 +1,22 @@ +package com.nerosoft.mediator; + +import com.nerosoft.mediator.validation.ValidationResult; + +public class UserCreateCommandValidator implements Validator { + @Override + public ValidationResult validate(UserCreateCommand message) { + if (message.name() == null || message.name().isBlank()) { + return ValidationResult.failure("Name is required"); + } + + if (message.email() == null || message.email().isBlank()) { + return ValidationResult.failure("Email is required"); + } + + if (!message.email().contains("@")) { + return ValidationResult.failure("Email address is invalid"); + } + + return ValidationResult.success(); + } +} diff --git a/src/test/java/com/nerosoft/mediator/UserCreatedEvent.java b/src/test/java/com/nerosoft/mediator/UserCreatedEvent.java new file mode 100644 index 0000000..83e6bbf --- /dev/null +++ b/src/test/java/com/nerosoft/mediator/UserCreatedEvent.java @@ -0,0 +1,4 @@ +package com.nerosoft.mediator; + +public record UserCreatedEvent(Long id, String name) implements Event { +} diff --git a/src/test/java/com/nerosoft/mediator/UserCreatedEventHandler.java b/src/test/java/com/nerosoft/mediator/UserCreatedEventHandler.java new file mode 100644 index 0000000..117d06a --- /dev/null +++ b/src/test/java/com/nerosoft/mediator/UserCreatedEventHandler.java @@ -0,0 +1,9 @@ +package com.nerosoft.mediator; + +public class UserCreatedEventHandler implements Handler { + + @Override + public Void handle(UserCreatedEvent message) { + return null; + } +} diff --git a/src/test/java/com/nerosoft/mediator/UserStore.java b/src/test/java/com/nerosoft/mediator/UserStore.java new file mode 100644 index 0000000..3db2a9d --- /dev/null +++ b/src/test/java/com/nerosoft/mediator/UserStore.java @@ -0,0 +1,33 @@ +package com.nerosoft.mediator; + +import java.util.ArrayList; +import java.util.List; + +public class UserStore { + private static UserStore instance; + + public static UserStore getInstance() { + if (instance == null) { + instance = new UserStore(); + } + return instance; + } + + private final List users = new ArrayList<>(); + + public void addUser(User user) { + users.add(user); + } + + public List getUsers() { + return new ArrayList<>(users); + } + + public User getUser(Long id) { + return users.stream().filter(user -> user.id().equals(id)).findFirst().orElse(null); + } + + public void clear() { + users.clear(); + } +}