Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/problem6/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# NATS connection
NATS_URL=nats://localhost:4222

# Token auth — leave empty when running against local docker-compose (no auth)
NATS_AUTH_TOKEN=

# Fastify port
PORT=4000
3 changes: 3 additions & 0 deletions src/problem6/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.env
.DS_Store
node_modules
115 changes: 115 additions & 0 deletions src/problem6/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Live Scoreboard (NATS + Fastify)

A real-time top-10 scoreboard. The browser submits a score for a user over HTTP, Fastify forwards it to a worker via NATS request/reply, and the worker publishes a `succeeded` event that every connected browser receives over a NATS WebSocket subscription. Each tab applies an optimistic local update on its own clicks and treats incoming events as authoritative for other users.

## Architecture

```mermaid
flowchart LR
subgraph browsers[Browsers]
b1["Tab A"]
b2["Tab B"]
b3["Tab C"]
end

subgraph app["pnpm dev"]
fastify["Fastify main.ts<br/>:4000<br/>GET / · GET /app.js · POST /api"]
worker["Worker worker.ts<br/>queue group: workers"]
end

subgraph natsbox[NATS]
srv["nats-server<br/>:4222 (TCP) · :8080 (WS)"]
end

%% HTTP: page load + score submit
browsers -.->|"GET / · GET /app.js"| fastify
browsers -->|"POST /api { name, prefix: user }"| fastify

%% Request/reply over NATS
fastify -->|"request: jobs.create"| srv
srv -->|"deliver (queue)"| worker
worker -->|"reply: { executionId, prefix }"| srv
srv -->|"reply"| fastify

%% Single succeeded event broadcast to all subscribers
worker -->|"publish: executions.&lt;user&gt;.&lt;executionId&gt;.event"| srv
srv -.->|"WS deliver: executions.&gt;"| browsers
```

Why optimistic on the clicker: the round-trip is HTTP → NATS req/reply → NATS publish → WS push. The clicker's tab bumps its local copy immediately, then suppresses the eventual NATS event for its own click via a pending-click counter so the score isn't counted twice. Other tabs see the same event and bump normally.

## Run locally

```bash
# 1. Start NATS (docker) — exposes 4222 (TCP) and 8080 (WS)
docker compose up -d

# 2. Install deps
pnpm install

# 3. Start API + worker together
pnpm dev

# 4. Open http://localhost:4000
```

## Integrate with the scoreboard

You don't need this UI to participate — anything that can do HTTP or talk NATS can play.

### Submit a score (HTTP)

```bash
curl -X POST http://localhost:4000/api \
-H 'content-type: application/json' \
-d '{ "name": "score", "prefix": "alice" }'

# → { "executionId": "0193...", "prefix": "alice" }
```

Body fields:

| Field | Type | Description |
| -------- | ------ | ------------------------------------------------------------------------- |
| `prefix` | string | The user identifier whose score should be incremented. Required to score. |
| `name` | string | A free-form job name; logged by the worker. Not interpreted. |

The response is returned synchronously from the worker via NATS request/reply (3-second timeout). A success means the worker accepted the job and will publish a `succeeded` event.

### Listen for score events (NATS)

Events are published once per submission, on the subject:

```
executions.<prefix>.<executionId>.event
```

Subscribe to `executions.>` to receive every score in the system, or `executions.alice.>` to follow a single user.

Event payload (`ExecutionEvent`, see `types.ts`):

```ts
{
executionId: string; // UUIDv7, unique per submission
prefix: string; // user identifier
status: "succeeded"; // only status emitted today
at: string; // ISO timestamp
}
```

### Connect from a server (Node / Go / etc.)

Any [official NATS client](https://docs.nats.io/using-nats/developer) can subscribe to `executions.>` over the TCP port (`:4222`) or publish a `jobs.create` request directly without going through Fastify:

```ts
import { connect, StringCodec } from "nats";
const nc = await connect({ servers: "nats://localhost:4222" });
const sc = StringCodec();

const msg = await nc.request(
"jobs.create",
sc.encode(JSON.stringify({ name: "score", prefix: "alice" })),
{ timeout: 3000 },
);
console.log(JSON.parse(sc.decode(msg.data))); // { executionId, prefix }
```
46 changes: 46 additions & 0 deletions src/problem6/atoms/nats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { connect, JSONCodec } from "nats";
import type { NatsClient, NatsConfig } from "../types";

export const createNatsClient = async (
config: NatsConfig,
): Promise<NatsClient> => {
const nc = await connect({ servers: config.url, token: config.token });
const jc = JSONCodec<unknown>();

const request: NatsClient["request"] = async (subject, payload, opts) => {
const res = await nc.request(subject, jc.encode(payload), {
timeout: opts?.timeout ?? 3000,
});
return jc.decode(res.data) as never;
};

const publish: NatsClient["publish"] = (subject, payload) => {
nc.publish(subject, jc.encode(payload));
};

const subscribe: NatsClient["subscribe"] = (subject, opts, handler) => {
const sub = nc.subscribe(subject, opts);
(async () => {
for await (const m of sub) {
let data: unknown = {};
try {
data = jc.decode(m.data);
} catch {
data = {};
}
await handler({
data: data as never,
respond: (payload) => m.respond(jc.encode(payload)),
});
}
})();
};

return {
getServer: () => nc.getServer(),
request,
publish,
subscribe,
drain: () => nc.drain(),
};
};
10 changes: 10 additions & 0 deletions src/problem6/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
services:
nats:
image: nats:2.10.24-alpine
command: ["-c", "/etc/nats/nats.conf"]
ports:
- "4222:4222"
- "8222:8222"
- "8080:8080"
volumes:
- ./nats.local.conf:/etc/nats/nats.conf:ro
56 changes: 56 additions & 0 deletions src/problem6/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { readFile } from "node:fs/promises";
import { fileURLToPath } from "node:url";
import { dirname, join } from "node:path";
import Fastify from "fastify";
import cors from "@fastify/cors";
import { createNatsClient } from "./atoms/nats";
import type { JobRequest, JobResponse } from "./types";

const natsUrl = process.env.NATS_URL ?? "nats://localhost:4222";
const natsToken = process.env.NATS_AUTH_TOKEN;
const port = Number(process.env.PORT ?? 4000);

const nats = await createNatsClient({ url: natsUrl, token: natsToken });
console.log(`fastify connected to NATS at ${nats.getServer()}`);

const app = Fastify({ logger: true });
await app.register(cors, { origin: true });

const viewsDir = join(dirname(fileURLToPath(import.meta.url)), "views");
const indexHtml = await readFile(join(viewsDir, "index.html"), "utf8");
const appJs = await readFile(join(viewsDir, "app.js"), "utf8");

app.get("/", async (_req, reply) => {
reply.type("text/html");
return indexHtml;
});

app.get("/app.js", async (_req, reply) => {
reply.type("application/javascript");
return appJs;
});

app.post("/api", async (req, reply) => {
const body = (req.body ?? {}) as JobRequest;
try {
return await nats.request<JobResponse, JobRequest>("jobs.create", body, {
timeout: 3000,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
reply.code(502);
return { error: "worker unavailable", detail: message };
}
});

app.get("/healthz", async () => ({ ok: true, nats: nats.getServer() }));

await app.listen({ host: "0.0.0.0", port });

const shutdown = async () => {
await app.close();
await nats.drain();
process.exit(0);
};
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
10 changes: 10 additions & 0 deletions src/problem6/nats.local.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
http_port: 8222

jetstream {
store_dir: /data
}

websocket {
port: 8080
no_tls: true
}
21 changes: 21 additions & 0 deletions src/problem6/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "dokploy-nats-example-node",
"private": true,
"type": "module",
"scripts": {
"start": "tsx main.ts",
"worker": "tsx worker.ts",
"dev": "tsx main.ts & tsx worker.ts & wait"
},
"dependencies": {
"@fastify/cors": "^11.2.0",
"fastify": "^5.8.5",
"nats": "^2.29.2",
"uuid": "^14.0.0"
},
"devDependencies": {
"@types/node": "^22.10.0",
"tsx": "^4.19.2",
"typescript": "^5.7.2"
}
}
Loading