diff --git a/LICENSE b/LICENSE index 41a26b6..e9fa206 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2026 Einar Ingebrigtsen +Copyright (c) Cratis Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/Source/ChronicleClient.ts b/Source/ChronicleClient.ts index 76a4914..e389ec8 100644 --- a/Source/ChronicleClient.ts +++ b/Source/ChronicleClient.ts @@ -1,9 +1,11 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -import { ChronicleConnection } from '@cratis/chronicle.contracts'; +import { diag } from '@opentelemetry/api'; import { SpanStatusCode } from '@opentelemetry/api'; import { ChronicleOptions } from './ChronicleOptions'; +import { ChronicleConnection } from './connection'; +import { ConnectionLifecycle } from './connection/ConnectionLifecycle'; import { EventStore } from './EventStore'; import { EventStoreName } from './EventStoreName'; import { EventStoreNamespaceName } from './EventStoreNamespaceName'; @@ -26,8 +28,20 @@ import { ChronicleTracer } from './Tracing'; * ``` */ export class ChronicleClient implements IChronicleClient { + private static readonly _healthCheckIntervalMs = 5000; + private readonly _connection: ChronicleConnection; - private readonly _stores: Map = new Map(); + private readonly _stores: Map = new Map(); + private readonly _lifecycle = new ConnectionLifecycle(); + + private readonly _logger = diag.createComponentLogger({ + namespace: '@cratis/chronicle/ChronicleClient' + }); + + private _watchdogHandle?: ReturnType; + private _connectOperation?: Promise; + private _reconnectOperation?: Promise; + private _isDisposed = false; /** * Creates a new {@link ChronicleClient} using the provided options. @@ -43,6 +57,35 @@ export class ChronicleClient implements IChronicleClient { ? { connectionString: options.connectionString, credentials: options.connectionString.createCredentials() } : { connectionString: options.connectionString }; this._connection = new ChronicleConnection(connectionOptions); + + this._logger.info('Created Chronicle client', { + serverAddress: `${options.connectionString.serverAddress.host}:${options.connectionString.serverAddress.port}`, + disableTls: options.connectionString.disableTls + }); + + this._lifecycle.onConnected(async () => { + if (this._stores.size === 0) { + this._logger.debug('No event stores cached; nothing to register on connected lifecycle callback', { + connectionId: this._lifecycle.connectionId + }); + return; + } + + this._logger.info('Connection lifecycle connected; registering artifacts for cached event stores', { + connectionId: this._lifecycle.connectionId, + eventStoreCount: this._stores.size + }); + + await Promise.all([...this._stores.values()].map(store => this.registerArtifactsForStore(store, 'connected'))); + }); + + this._lifecycle.onDisconnected(async () => { + this._logger.warn('Connection lifecycle disconnected', { + connectionId: this._lifecycle.connectionId + }); + }); + + this.startConnectionWatchdog(); } /** @inheritdoc */ @@ -58,17 +101,30 @@ export class ChronicleClient implements IChronicleClient { span.setAttribute('chronicle.event_store', storeName.value); span.setAttribute('chronicle.namespace', namespaceName.value); try { - const key = `${storeName.value}/${namespaceName.value}`; - const existing = this._stores.get(key); - if (existing) { - span.setStatus({ code: SpanStatusCode.OK }); - return existing; - } + const store = await this.withReconnect('get_event_store', async () => { + await this.ensureConnected(); - await this._connection.eventStores.ensure({ Name: storeName.value }); + const key = `${storeName.value}/${namespaceName.value}`; + const existing = this._stores.get(key); + if (existing) { + this._logger.verbose('Returning cached event store', { + eventStore: storeName.value, + namespace: namespaceName.value + }); + return existing; + } - const store = new EventStore(storeName, namespaceName, this._connection); - this._stores.set(key, store); + this._logger.debug('Ensuring event store exists in kernel', { + eventStore: storeName.value + }); + await this._connection.eventStores.ensure({ Name: storeName.value }); + + const created = new EventStore(storeName, namespaceName, this._connection); + this._stores.set(key, created); + + await this.registerArtifactsForStore(created, 'new-store'); + return created; + }); ChronicleMetrics.eventStoreRetrievals.add(1, { 'chronicle.event_store': storeName.value, @@ -79,6 +135,11 @@ export class ChronicleClient implements IChronicleClient { } catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) }); span.recordException(error as Error); + this._logger.error('Failed getting event store', { + eventStore: storeName.value, + namespace: namespaceName.value, + error: this.toErrorMessage(error) + }); throw error; } finally { span.end(); @@ -90,13 +151,22 @@ export class ChronicleClient implements IChronicleClient { async getEventStores(): Promise { return ChronicleTracer.startActiveSpan('chronicle.client.get_event_stores', async span => { try { - const response = await this._connection.eventStores.getEventStores({}); + const response = await this.withReconnect('get_event_stores', async () => { + await this.ensureConnected(); + return this._connection.eventStores.getEventStores({}); + }); const result = (response.items ?? []).map((name: string) => new EventStoreName(name)); + this._logger.verbose('Retrieved event stores from kernel', { + count: result.length + }); span.setStatus({ code: SpanStatusCode.OK }); return result; } catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) }); span.recordException(error as Error); + this._logger.error('Failed retrieving event stores', { + error: this.toErrorMessage(error) + }); throw error; } finally { span.end(); @@ -106,6 +176,212 @@ export class ChronicleClient implements IChronicleClient { /** @inheritdoc */ dispose(): void { + this._isDisposed = true; + + if (this._watchdogHandle) { + clearInterval(this._watchdogHandle); + this._watchdogHandle = undefined; + } + + if (this._lifecycle.isConnected) { + void this._lifecycle.disconnected(error => { + this._logger.error('Disconnected lifecycle callback failed during dispose', { + error: this.toErrorMessage(error) + }); + }); + } + this._connection.disconnect(); + this._logger.info('Disposed Chronicle client'); + } + + private async connectWithRetry(): Promise { + let attempt = 0; + + while (!this._isDisposed) { + try { + if (attempt > 0) { + // Recreate the gRPC channel so we start from IDLE. A failed + // probe can leave the channel in TRANSIENT_FAILURE, which gRPC + // won't recover without a fresh channel. The contracts connect() + // is also bypassed here — it uses watchConnectivityState and + // rejects as soon as the state changes to CONNECTING (not READY), + // making it unreliable for initial connection establishment. + this._connection.resetChannel(); + } + + this._logger.debug('Connecting to Chronicle kernel', { attempt: attempt + 1 }); + + // Probe with a real RPC call. gRPC connects lazily on the first call, + // so this effectively waits until the channel reaches READY or fails. + await this._connection.server.getVersionInfo({}, { signal: AbortSignal.timeout(10_000) }); + + this._logger.info('Connected to Chronicle kernel'); + await this._lifecycle.connected(error => { + this._logger.error('Connected lifecycle callback failed', { + error: this.toErrorMessage(error) + }); + }); + return; + } catch (error) { + attempt++; + const delayMs = Math.min(1000 * Math.pow(2, attempt - 1), 30_000); + this._logger.warn('Connection attempt failed, retrying', { + attempt, + delayMs, + error: this.toErrorMessage(error) + }); + await new Promise(resolve => setTimeout(resolve, delayMs)); + } + } + + throw new Error('ChronicleClient was disposed during connection attempt.'); + } + + private async ensureConnected(): Promise { + if (this._isDisposed) { + throw new Error('ChronicleClient is disposed. Create a new client instance before making calls.'); + } + + if (this._lifecycle.isConnected) { + return; + } + + if (!this._connectOperation) { + this._connectOperation = this.connectWithRetry().finally(() => { + this._connectOperation = undefined; + }); + } + + await this._connectOperation; + } + + private async reconnect(reason: string, error: unknown): Promise { + if (!this._reconnectOperation) { + this._reconnectOperation = (async () => { + this._logger.warn('Reconnecting to Chronicle kernel', { + reason, + error: this.toErrorMessage(error) + }); + + if (this._lifecycle.isConnected) { + await this._lifecycle.disconnected(disconnectError => { + this._logger.error('Disconnected lifecycle callback failed', { + error: this.toErrorMessage(disconnectError) + }); + }); + } + + let attempt = 0; + while (!this._isDisposed) { + try { + this._connection.resetChannel(); + await this._connection.server.getVersionInfo({}, { signal: AbortSignal.timeout(10_000) }); + this._logger.info('Reconnected to Chronicle kernel', { attempt: attempt + 1 }); + await this._lifecycle.connected(connectedError => { + this._logger.error('Connected lifecycle callback failed after reconnect', { + error: this.toErrorMessage(connectedError) + }); + }); + return; + } catch (reconnectError) { + attempt++; + const delayMs = Math.min(1000 * Math.pow(2, attempt - 1), 30_000); + this._logger.warn('Reconnect attempt failed, retrying', { + attempt, + delayMs, + error: this.toErrorMessage(reconnectError) + }); + await new Promise(resolve => setTimeout(resolve, delayMs)); + } + } + })().finally(() => { + this._reconnectOperation = undefined; + }); + } + + await this._reconnectOperation; + } + + private async withReconnect(operation: string, action: () => Promise): Promise { + try { + return await action(); + } catch (error) { + if (!this.shouldReconnect(error)) { + throw error; + } + + await this.reconnect(operation, error); + return action(); + } + } + + private shouldReconnect(error: unknown): boolean { + const code = Number((error as { code?: number })?.code ?? -1); + const details = String((error as { details?: string })?.details ?? ''); + const message = this.toErrorMessage(error); + + if (code === 4 || code === 13 || code === 14) { + return true; + } + + const connectionIndicators = [ + 'deadline exceeded', + 'unavailable', + 'connection', + 'connectivity', + 'channel', + 'socket', + 'econnrefused', + 'etimedout' + ]; + + const combined = `${details} ${message}`.toLowerCase(); + return connectionIndicators.some(indicator => combined.includes(indicator)); + } + + private toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + + return String(error); + } + + private startConnectionWatchdog(): void { + this._watchdogHandle = setInterval(() => { + void this.runHealthCheck(); + }, ChronicleClient._healthCheckIntervalMs); + + this._watchdogHandle.unref?.(); + } + + private async runHealthCheck(): Promise { + if (this._isDisposed || !this._lifecycle.isConnected) { + return; + } + + try { + await this._connection.server.getVersionInfo({}); + this._logger.verbose('Connection health check passed'); + } catch (error) { + await this.reconnect('watchdog-health-check', error); + } + } + + private async registerArtifactsForStore(store: EventStore, reason: string): Promise { + this._logger.debug('Registering artifacts for event store', { + eventStore: store.name.value, + namespace: store.namespace.value, + reason + }); + + await store.registerArtifacts(); + + this._logger.info('Registered artifacts for event store', { + eventStore: store.name.value, + namespace: store.namespace.value, + reason + }); } } diff --git a/Source/ChronicleOptions.ts b/Source/ChronicleOptions.ts index 7c90b8e..ea6d11e 100644 --- a/Source/ChronicleOptions.ts +++ b/Source/ChronicleOptions.ts @@ -1,8 +1,8 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -import { ChronicleConnectionString } from '@cratis/chronicle.contracts'; import { DefaultClientArtifactsProvider, IClientArtifactsProvider } from './artifacts'; +import { ChronicleConnectionString } from './connection'; type ChronicleOptionsConstructorParams = { connectionString: ChronicleConnectionString; diff --git a/Source/EventSequences/EventLog.ts b/Source/EventSequences/EventLog.ts index a72c80c..be5f3a8 100644 --- a/Source/EventSequences/EventLog.ts +++ b/Source/EventSequences/EventLog.ts @@ -1,7 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -import { ChronicleConnection } from '@cratis/chronicle.contracts'; +import { ChronicleConnection } from '../connection'; import { EventSequence } from './EventSequence'; import { EventSequenceId } from './EventSequenceId'; import { IEventLog } from './IEventLog'; diff --git a/Source/EventSequences/EventSequence.ts b/Source/EventSequences/EventSequence.ts index 9fd4177..842c55a 100644 --- a/Source/EventSequences/EventSequence.ts +++ b/Source/EventSequences/EventSequence.ts @@ -4,7 +4,7 @@ import { ChronicleConnection, Guid as ContractsGuid -} from '@cratis/chronicle.contracts'; +} from '../connection'; import { SpanStatusCode } from '@opentelemetry/api'; import { Guid } from '@cratis/fundamentals'; import { getEventTypeFor } from '../Events/eventTypeDecorator'; diff --git a/Source/EventStore.ts b/Source/EventStore.ts index 6828744..a05ef8c 100644 --- a/Source/EventStore.ts +++ b/Source/EventStore.ts @@ -1,8 +1,9 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -import { ChronicleConnection } from '@cratis/chronicle.contracts'; +import { diag } from '@opentelemetry/api'; import { SpanStatusCode } from '@opentelemetry/api'; +import { ChronicleConnection } from './connection'; import { EventLog } from './EventSequences/EventLog'; import { EventSequence } from './EventSequences/EventSequence'; import { EventSequenceId } from './EventSequences/EventSequenceId'; @@ -29,6 +30,10 @@ import { DefaultClientArtifactsProvider } from './artifacts/DefaultClientArtifac * via gRPC using the provided {@link ChronicleConnection}. */ export class EventStore implements IEventStore { + private readonly _logger = diag.createComponentLogger({ + namespace: '@cratis/chronicle/EventStore' + }); + readonly eventLog: IEventLog; readonly eventTypes: IEventTypes; readonly constraints: IConstraints; @@ -60,13 +65,36 @@ export class EventStore implements IEventStore { * @returns A promise that resolves when all registrations are complete. */ async registerArtifacts(): Promise { + this._logger.debug('Discovering artifacts for registration', { + eventStore: this.name.value, + namespace: this.namespace.value + }); + + await this.eventTypes.discover(); + await Promise.all([ + this.constraints.discover(), + this.projections.discover(), + this.reactors.discover(), + this.reducers.discover() + ]); + + this._logger.debug('Registering discovered artifacts', { + eventStore: this.name.value, + namespace: this.namespace.value + }); + + await this.eventTypes.register(); await Promise.all([ - this.eventTypes.register(), this.constraints.register(), this.projections.register(), this.reactors.register(), this.reducers.register() ]); + + this._logger.info('Artifact registration completed', { + eventStore: this.name.value, + namespace: this.namespace.value + }); } /** @inheritdoc */ diff --git a/Source/Events/Constraints/Constraints.ts b/Source/Events/Constraints/Constraints.ts index ed10482..0d98208 100644 --- a/Source/Events/Constraints/Constraints.ts +++ b/Source/Events/Constraints/Constraints.ts @@ -1,8 +1,9 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -import { ChronicleConnection, ConstraintType } from '@cratis/chronicle.contracts'; +import { ConstraintType } from '@cratis/chronicle.contracts'; import { IClientArtifactsProvider } from '../../artifacts'; +import { ChronicleConnection } from '../../connection'; import { ConstraintId } from './ConstraintId'; import { IConstraint } from './IConstraint'; import { IConstraints } from './IConstraints'; diff --git a/Source/Events/EventTypes.ts b/Source/Events/EventTypes.ts index ac3bb78..d5a551e 100644 --- a/Source/Events/EventTypes.ts +++ b/Source/Events/EventTypes.ts @@ -2,8 +2,8 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. import { Constructor } from '@cratis/fundamentals'; -import { ChronicleConnection } from '@cratis/chronicle.contracts'; import { IClientArtifactsProvider } from '../artifacts'; +import { ChronicleConnection } from '../connection'; import { EventTypeId } from './EventTypeId'; import { IEventTypes } from './IEventTypes'; import { getEventTypeMetadata, getEventTypeJsonSchemaFor } from './eventTypeDecorator'; diff --git a/Source/Projections/Projections.ts b/Source/Projections/Projections.ts index 41d9697..eeaf343 100644 --- a/Source/Projections/Projections.ts +++ b/Source/Projections/Projections.ts @@ -3,11 +3,11 @@ import { AutoMap, - ChronicleConnection, ProjectionOwner } from '@cratis/chronicle.contracts'; import { Constructor } from '@cratis/fundamentals'; import { IClientArtifactsProvider } from '../artifacts'; +import { ChronicleConnection } from '../connection'; import { EventSequenceId } from '../EventSequences/EventSequenceId'; import { getEventTypeFor } from '../Events/eventTypeDecorator'; import { getReadModelMetadata } from '../ReadModels'; diff --git a/Source/connection/ChronicleConnection.ts b/Source/connection/ChronicleConnection.ts new file mode 100644 index 0000000..3a6008c --- /dev/null +++ b/Source/connection/ChronicleConnection.ts @@ -0,0 +1,204 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +import { + ChronicleConnection as ContractsChronicleConnection, + type ChronicleConnectionOptions, + type ChronicleServices +} from '@cratis/chronicle.contracts'; + +/** + * Wraps the contracts connection and allows recreating the underlying channel + * during reconnect while keeping a stable object reference for consumers. + */ +export class ChronicleConnection implements ChronicleServices { + private _inner: ContractsChronicleConnection; + + /** + * Creates a new {@link ChronicleConnection}. + * @param _options - Connection options used to create and recreate the underlying connection. + */ + constructor(private readonly _options: ChronicleConnectionOptions) { + this._inner = this.createInnerConnection(); + } + + /** + * Gets the connection string currently used by the inner connection. + */ + get connectionString() { + return this._inner.connectionString; + } + + /** + * Gets whether the current inner connection is connected. + */ + get isConnected(): boolean { + return this._inner.isConnected; + } + + /** + * Event stores service. + */ + get eventStores() { + return this._inner.eventStores; + } + + /** + * Namespaces service. + */ + get namespaces() { + return this._inner.namespaces; + } + + /** + * Recommendations service. + */ + get recommendations() { + return this._inner.recommendations; + } + + /** + * Identities service. + */ + get identities() { + return this._inner.identities; + } + + /** + * Event sequences service. + */ + get eventSequences() { + return this._inner.eventSequences; + } + + /** + * Event types service. + */ + get eventTypes() { + return this._inner.eventTypes; + } + + /** + * Constraints service. + */ + get constraints() { + return this._inner.constraints; + } + + /** + * Observers service. + */ + get observers() { + return this._inner.observers; + } + + /** + * Failed partitions service. + */ + get failedPartitions() { + return this._inner.failedPartitions; + } + + /** + * Reactors service. + */ + get reactors() { + return this._inner.reactors; + } + + /** + * Reducers service. + */ + get reducers() { + return this._inner.reducers; + } + + /** + * Projections service. + */ + get projections() { + return this._inner.projections; + } + + /** + * Read models service. + */ + get readModels() { + return this._inner.readModels; + } + + /** + * Jobs service. + */ + get jobs() { + return this._inner.jobs; + } + + /** + * Event seeding service. + */ + get eventSeeding() { + return this._inner.eventSeeding; + } + + /** + * Server service. + */ + get server() { + return this._inner.server; + } + + /** + * Connects the current inner connection. + */ + async connect(): Promise { + await this._inner.connect(); + } + + /** + * Recreates the inner connection (new gRPC channel) without calling connect(). + * Use this before probing with a real RPC call to ensure a fresh IDLE channel. + */ + resetChannel(): void { + try { + this._inner.disconnect(); + } catch { + // Best-effort disconnect before re-creating the channel. + } + this._inner = this.createInnerConnection(); + } + + /** + * Recreates and connects the inner connection. + */ + async reconnect(): Promise { + try { + this._inner.disconnect(); + } catch { + // Best-effort disconnect before re-creating the channel. + } + + this._inner = this.createInnerConnection(); + await this._inner.connect(); + } + + /** + * Disconnects the current inner connection. + */ + disconnect(): void { + this._inner.disconnect(); + } + + /** + * Disposes the current inner connection. + */ + dispose(): void { + this._inner.dispose(); + } + + private createInnerConnection(): ContractsChronicleConnection { + return new ContractsChronicleConnection(this._options); + } +} + +export type { ChronicleConnectionOptions } from '@cratis/chronicle.contracts'; \ No newline at end of file diff --git a/Source/connection/ChronicleConnectionString.ts b/Source/connection/ChronicleConnectionString.ts new file mode 100644 index 0000000..0018a05 --- /dev/null +++ b/Source/connection/ChronicleConnectionString.ts @@ -0,0 +1,4 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +export { ChronicleConnectionString } from '@cratis/chronicle.contracts'; \ No newline at end of file diff --git a/Source/connection/ChronicleServices.ts b/Source/connection/ChronicleServices.ts new file mode 100644 index 0000000..5d7e041 --- /dev/null +++ b/Source/connection/ChronicleServices.ts @@ -0,0 +1,4 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +export { ChronicleServices } from '@cratis/chronicle.contracts'; \ No newline at end of file diff --git a/Source/connection/DateTimeOffset.ts b/Source/connection/DateTimeOffset.ts new file mode 100644 index 0000000..09a9244 --- /dev/null +++ b/Source/connection/DateTimeOffset.ts @@ -0,0 +1,4 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +export { DateTimeOffset } from '@cratis/chronicle.contracts'; \ No newline at end of file diff --git a/Source/connection/Guid.ts b/Source/connection/Guid.ts new file mode 100644 index 0000000..7531adc --- /dev/null +++ b/Source/connection/Guid.ts @@ -0,0 +1,4 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +export { Guid } from '@cratis/chronicle.contracts'; \ No newline at end of file diff --git a/Source/connection/TokenProvider.ts b/Source/connection/TokenProvider.ts new file mode 100644 index 0000000..197ddf8 --- /dev/null +++ b/Source/connection/TokenProvider.ts @@ -0,0 +1,13 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +import type { ITokenProvider } from '@cratis/chronicle.contracts'; + +export { + NoOpTokenProvider, + OAuthTokenProvider +} from '@cratis/chronicle.contracts'; + +export type { ITokenProvider } from '@cratis/chronicle.contracts'; + +export type TokenProvider = ITokenProvider; \ No newline at end of file diff --git a/Source/connection/index.ts b/Source/connection/index.ts new file mode 100644 index 0000000..70bb581 --- /dev/null +++ b/Source/connection/index.ts @@ -0,0 +1,9 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +export { ChronicleConnection } from './ChronicleConnection'; +export { ChronicleConnectionString } from './ChronicleConnectionString'; +export { ChronicleServices } from './ChronicleServices'; +export { DateTimeOffset } from './DateTimeOffset'; +export { Guid } from './Guid'; +export { TokenProvider } from './TokenProvider'; \ No newline at end of file diff --git a/TestApps/NodeJS/constraints.ts b/TestApps/NodeJS/constraints.ts index 0bf646e..119ad58 100644 --- a/TestApps/NodeJS/constraints.ts +++ b/TestApps/NodeJS/constraints.ts @@ -2,7 +2,7 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. import { ConstraintType, RegisterConstraintsRequest } from '@cratis/chronicle.contracts'; -import { getEventTypeFor, IEventStore } from '@cratis/chronicle'; +import { getEventTypeFor } from '@cratis/chronicle'; import { EmployeeHired } from './events'; /** @@ -41,25 +41,3 @@ function buildUniqueHireConstraint(): RegisterConstraintsRequest { } export const uniqueHireConstraint = buildUniqueHireConstraint(); - -/** - * Registers the unique-hire constraint with the Chronicle Kernel. - * - * Once registered, any attempt to append a second {@link EmployeeHired} event for the - * same event source will be rejected by the Kernel, and the {@link AppendResult} will - * contain a populated {@link ConstraintViolation} list. - * - * @param store - The event store to register constraints against. - */ -export async function registerConstraints(store: IEventStore): Promise { - console.log(' Registering unique-hire constraint...'); - // The constraint registration API is not yet available on the TypeScript client. - // When available it will look like: - // await store.constraints.register(uniqueHireConstraint); - const constraint = uniqueHireConstraint.Constraints[0]; - const eventTypeId = constraint.Definition?.Value1?.EventTypeId ?? '(unknown)'; - console.log(` [Constraint] Name : ${constraint.Name}`); - console.log(` [Constraint] Type : ${ConstraintType[constraint.Type]}`); - console.log(` [Constraint] Event type: EmployeeHired (${eventTypeId})`); - void store; -} diff --git a/TestApps/NodeJS/index.ts b/TestApps/NodeJS/index.ts index 3bf0229..e739749 100644 --- a/TestApps/NodeJS/index.ts +++ b/TestApps/NodeJS/index.ts @@ -2,115 +2,84 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. // Telemetry MUST be imported first so the OpenTelemetry SDK is fully -// initialised before any instrumented code runs. +// initialized before any instrumented code runs. + import './telemetry'; import 'reflect-metadata'; -import { ChronicleClient, ChronicleOptions, getEventTypeJsonSchemaFor } from '@cratis/chronicle'; - -// Register all artifacts so their decorators fire before anything else runs. -import './events'; -import './reactors'; -import './reducers'; - -import { registerEmployeeListProjection } from './projections-declarative'; -import { demonstrateModelBoundProjection } from './projections-model-bound'; -import { registerConstraints } from './constraints'; -import { demonstrateReactors } from './reactors'; -import { demonstrateReducers } from './reducers'; +import { diag } from '@opentelemetry/api'; +import { ChronicleClient, ChronicleOptions } from '@cratis/chronicle'; import { EmployeeHired, EmployeePromoted, EmployeeMoved } from './events'; +const logger = diag.createComponentLogger({ namespace: 'chronicle-test-console' }); + async function run(): Promise { const options = process.env.CHRONICLE_CONNECTION ? ChronicleOptions.fromConnectionString(process.env.CHRONICLE_CONNECTION) : ChronicleOptions.development(); - console.log(`Connecting to Chronicle at ${options.connectionString}...`); + logger.info(`Connecting to Chronicle`, { address: options.connectionString.toString() }); const client = new ChronicleClient(options); - const employeeHiredSchema = getEventTypeJsonSchemaFor(EmployeeHired); - console.log(`EmployeeHired schema properties: ${Object.keys(employeeHiredSchema.properties ?? {}).join(', ')}`); - try { const store = await client.getEventStore('TestStore'); - console.log(`Event store: ${store.name.value} / ${store.namespace.value}\n`); - - // --- Constraints --- - console.log('=== Constraints ==='); - await registerConstraints(store); - - // --- Projections --- - console.log('\n=== Declarative Projection Artifacts ==='); - await registerEmployeeListProjection(store); - - console.log('\n=== Model-Bound Projection Artifacts ==='); - await demonstrateModelBoundProjection(store); - - console.log('\n=== Reactor Artifacts ==='); - await demonstrateReactors(store); - - console.log('\n=== Reducer Artifacts ==='); - await demonstrateReducers(store); + logger.info(`Event store ready`, { name: store.name.value, namespace: store.namespace.value }); + logger.info('Client lifecycle auto-discovery/auto-registration is enabled'); // --- Event appending --- - console.log('\n=== Appending Events ==='); - await store.eventTypes.register(); const eventSourceId = `employee-${Date.now()}`; + logger.info('Appending events', { eventSourceId }); let appendSucceeded = false; try { const hireResult = await store.eventLog.append(eventSourceId, new EmployeeHired('Jane', 'Doe', 'Software Engineer')); if (hireResult.isSuccess) { - console.log(` Hired at seq #${hireResult.sequenceNumber.value}`); + logger.info(`Hired`, { sequenceNumber: hireResult.sequenceNumber.value }); } else { - console.error(' Hire failed:', hireResult.constraintViolations, hireResult.errors); + logger.error('Hire failed', { constraintViolations: hireResult.constraintViolations, errors: hireResult.errors }); } const promoteResult = await store.eventLog.append(eventSourceId, new EmployeePromoted('Senior Software Engineer')); if (promoteResult.isSuccess) { - console.log(` Promoted at seq #${promoteResult.sequenceNumber.value}`); + logger.info(`Promoted`, { sequenceNumber: promoteResult.sequenceNumber.value }); } else { - console.error(' Promotion failed:', promoteResult.errors); + logger.error('Promotion failed', { errors: promoteResult.errors }); } const moveResult = await store.eventLog.append(eventSourceId, new EmployeeMoved('San Francisco')); if (moveResult.isSuccess) { - console.log(` Relocated at seq #${moveResult.sequenceNumber.value}`); + logger.info(`Relocated`, { sequenceNumber: moveResult.sequenceNumber.value }); } else { - console.error(' Move failed:', moveResult.errors); + logger.error('Move failed', { errors: moveResult.errors }); } appendSucceeded = true; } catch (error) { - console.warn(' Append operations are currently unavailable in this environment.'); - console.warn(' Continuing with connectivity checks.'); - console.warn(` Append error: ${String(error)}`); + logger.warn('Append operations unavailable, continuing with connectivity checks', { error: String(error) }); } // --- Read back --- - console.log('\n=== Event Log State ==='); if (appendSucceeded) { const tail = await store.eventLog.getTailSequenceNumber(eventSourceId); const hasEvents = await store.eventLog.hasEventsFor(eventSourceId); - console.log(` Tail sequence : ${tail.value}`); - console.log(` Has events : ${hasEvents}`); + logger.info('Event log state', { tailSequence: tail.value, hasEvents }); } else { - console.log(' Skipped tail lookup because append did not complete.'); + logger.info('Skipped tail lookup because append did not complete'); } const namespaces = await store.getNamespaces(); - console.log(` Namespaces : ${namespaces.map(n => n.value).join(', ') || '(none)'}`); + logger.info('Namespaces', { namespaces: namespaces.map(n => n.value).join(', ') || '(none)' }); - console.log('\nAll operations completed successfully.'); + logger.info('All operations completed successfully'); } catch (error) { - console.error('Error:', error); + logger.error('Unhandled error', { error: String(error) }); process.exitCode = 1; } finally { client.dispose(); - console.log('Disconnected.'); + logger.info('Disconnected'); } } run().catch(error => { - console.error('Unhandled error:', error); + logger.error('Unhandled error', { error: String(error) }); process.exit(1); }); diff --git a/TestApps/NodeJS/projections-declarative.ts b/TestApps/NodeJS/projections-declarative.ts index e35e359..ade3465 100644 --- a/TestApps/NodeJS/projections-declarative.ts +++ b/TestApps/NodeJS/projections-declarative.ts @@ -1,7 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -import { IEventStore, IProjectionBuilderFor, IProjectionFor, projection, readModel } from '@cratis/chronicle'; +import { IProjectionBuilderFor, IProjectionFor, projection, readModel } from '@cratis/chronicle'; import { EmployeeHired, EmployeeMoved, EmployeePromoted } from './events'; import { Guid } from '@cratis/fundamentals'; @@ -35,13 +35,4 @@ export class EmployeeListProjection implements IProjectionFor { } } -/** - * Registers the declarative employee list projection with the given event store. - * @param store - The event store to register the projection with. - */ -export async function registerEmployeeListProjection(store: IEventStore): Promise { - await store.projections.register(); - console.log(' Declarative projection "EmployeeListProjection" registered via builder API.'); -} - diff --git a/TestApps/NodeJS/projections-model-bound.ts b/TestApps/NodeJS/projections-model-bound.ts index b84cce2..c919ac2 100644 --- a/TestApps/NodeJS/projections-model-bound.ts +++ b/TestApps/NodeJS/projections-model-bound.ts @@ -1,7 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -import { fromEvent, IEventStore, readModel, setFrom } from '@cratis/chronicle'; +import { fromEvent, readModel, setFrom } from '@cratis/chronicle'; import { EmployeeHired, EmployeePromoted, EmployeeMoved } from './events'; import { Guid } from '@cratis/fundamentals'; @@ -32,17 +32,3 @@ export class EmployeeDetails { city = ''; } -/** - * Reports current model-bound projection support status in the TypeScript client. - * - * @param store - The event store. - */ -export async function demonstrateModelBoundProjection(store: IEventStore): Promise { - console.log(' Discovering and registering model-bound projection artifacts from @readModel + model-bound decorators...'); - await store.projections.discover(); - await store.projections.register(); - console.log(` [Projection] Artifact : ${EmployeeDetails.name}`); - console.log(' [Projection] Discovery : @fromEvent/@setFrom on @readModel'); - console.log(' [Projection] Status : registered in kernel'); - void store; -} diff --git a/TestApps/NodeJS/reactors.ts b/TestApps/NodeJS/reactors.ts index 807e554..45575df 100644 --- a/TestApps/NodeJS/reactors.ts +++ b/TestApps/NodeJS/reactors.ts @@ -1,7 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -import { reactor, EventContext, IEventStore } from '@cratis/chronicle'; +import { reactor, EventContext } from '@cratis/chronicle'; import { EmployeeHired, EmployeePromoted, EmployeeMoved } from './events'; /** @@ -46,14 +46,3 @@ export class HrNotificationReactor { } } -/** - * Demonstrates reactor artifact discovery and registration in the NodeJS test app. - * @param store - The event store. - */ -export async function demonstrateReactors(store: IEventStore): Promise { - console.log(' Discovering reactor artifacts...'); - await store.reactors.discover(); - await store.reactors.register(); - console.log(` [Reactor] Artifact : ${HrNotificationReactor.name}`); - console.log(' [Reactor] Status : discovered and registration flow invoked'); -} diff --git a/TestApps/NodeJS/reducers.ts b/TestApps/NodeJS/reducers.ts index 5ff2ee9..278bb05 100644 --- a/TestApps/NodeJS/reducers.ts +++ b/TestApps/NodeJS/reducers.ts @@ -1,7 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -import { reducer, IEventStore } from '@cratis/chronicle'; +import { reducer } from '@cratis/chronicle'; import { EmployeeHired, EmployeePromoted, EmployeeMoved } from './events'; /** @@ -66,24 +66,3 @@ export class EmployeeStateReducer { } } -/** - * Demonstrates reducer artifact discovery/registration and a local fold over sample events. - * @param store - The event store. - */ -export async function demonstrateReducers(store: IEventStore): Promise { - console.log(' Discovering reducer artifacts...'); - await store.reducers.discover(); - await store.reducers.register(); - - const reducer = new EmployeeStateReducer(); - const hired = new EmployeeHired('Jane', 'Doe', 'Software Engineer'); - const promoted = new EmployeePromoted('Senior Software Engineer'); - const moved = new EmployeeMoved('San Francisco'); - - let state = await reducer.employeeHired(hired); - state = await reducer.employeePromoted(promoted, state); - state = await reducer.employeeMoved(moved, state); - - console.log(` [Reducer] Artifact : ${EmployeeStateReducer.name}`); - console.log(` [Reducer] Sample fold: ${state.firstName} ${state.lastName}, ${state.title}, ${state.city}`); -} diff --git a/TestApps/NodeJS/telemetry.ts b/TestApps/NodeJS/telemetry.ts index 6766793..47e672d 100644 --- a/TestApps/NodeJS/telemetry.ts +++ b/TestApps/NodeJS/telemetry.ts @@ -4,6 +4,7 @@ // This file MUST be imported before any other application code so that the // OpenTelemetry SDK is fully initialised before the first instrumented call. +import { diag, DiagLogLevel, type DiagLogger } from '@opentelemetry/api'; import { NodeSDK } from '@opentelemetry/sdk-node'; import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http'; import { resourceFromAttributes } from '@opentelemetry/resources'; @@ -12,6 +13,84 @@ import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentation import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'; import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http'; +// --------------------------------------------------------------------------- +// ASP.NET-style console logger +// --------------------------------------------------------------------------- +// Colors via ANSI escape codes (no external dependencies). +// Output format: +// info: @cratis/chronicle/ChronicleClient[0] +// Created Chronicle client {"serverAddress":"localhost:35000"} +// --------------------------------------------------------------------------- + +const RESET = '\x1b[0m'; +const BOLD = '\x1b[1m'; +const DIM = '\x1b[2m'; +const RED = '\x1b[31m'; +const YELLOW = '\x1b[33m'; +const GREEN = '\x1b[32m'; +const CYAN = '\x1b[36m'; +const GRAY = '\x1b[90m'; + + +function writeLog( + level: string, + levelColor: string, + toStderr: boolean, + message: string, + args: unknown[] +): void { + // OTel's DiagComponentLogger calls the underlying DiagLogger as: + // logger.method(namespace, userMessage, ...contextArgs) + // So `message` is the component namespace and args[0] is the actual log body. + // Raw OTel internal messages (no component logger) pass the full text as `message` + // with no extra string arg, so we detect by checking args[0]. + let category: string; + let body: string; + let contextArgs: unknown[]; + + if (args.length > 0 && typeof args[0] === 'string') { + category = message; + body = args[0]; + contextArgs = args.slice(1); + } else { + // Internal OTel message — use a generic category. + category = 'opentelemetry'; + body = message; + contextArgs = args; + } + + const extra = contextArgs + .map(a => (typeof a === 'object' && a !== null ? JSON.stringify(a) : String(a))) + .join(' '); + + const detail = extra ? `${body} ${GRAY}${extra}${RESET}` : body; + const line = `${levelColor}${BOLD}${level}${RESET}: ${GRAY}${category}${RESET}\n ${detail}`; + if (toStderr) { + process.stderr.write(`${line}\n`); + } else { + process.stdout.write(`${line}\n`); + } +} + +const aspNetLogger: DiagLogger = { + error(message: string, ...args: unknown[]) { writeLog('fail', RED, true, message, args); }, + warn (message: string, ...args: unknown[]) { writeLog('warn', YELLOW, false, message, args); }, + info (message: string, ...args: unknown[]) { writeLog('info', GREEN, false, message, args); }, + debug(message: string, ...args: unknown[]) { writeLog('dbug', CYAN, false, message, args); }, + verbose(message: string, ...args: unknown[]) { writeLog('trce', DIM, false, message, args); }, +}; + +const logLevelEnv = (process.env.LOG_LEVEL ?? 'debug').toLowerCase(); +const diagLevel = + logLevelEnv === 'verbose' || logLevelEnv === 'trace' ? DiagLogLevel.VERBOSE : + logLevelEnv === 'info' ? DiagLogLevel.INFO : + logLevelEnv === 'warn' ? DiagLogLevel.WARN : + logLevelEnv === 'error' ? DiagLogLevel.ERROR : + DiagLogLevel.DEBUG; + +// Must be set before any OTel SDK initialisation. +diag.setLogger(aspNetLogger, diagLevel); + // --------------------------------------------------------------------------- // Resource // ---------------------------------------------------------------------------