From cc57b88c5f0a4620502409cd84e210d35ce2a65d Mon Sep 17 00:00:00 2001 From: "James N." Date: Mon, 8 Jun 2026 16:17:48 -0700 Subject: [PATCH] fix(durable-demo): make fraud-detection workflow reliably runnable on Windows Fixes several issues found while running the durable fraud-detection demo on Windows, and adds presenter-friendly controls and launch scripts. Bug fixes: - observability/setup.py: emoji in print() crashed services on Windows cp1252 (UnicodeEncodeError), and the except handler double-faulted on its own emoji print, turning 'non-fatal' observability into a hard startup crash. Now reconfigures stdout/stderr to UTF-8 at import and routes status messages through a _safe_print() that can never raise. - worker.py / backend.py: add an early UTF-8 stdout/stderr guard so emoji and arrows in logs render correctly instead of mojibake/escapes. - backend.py /health: was a shallow {status: healthy}. Now a deep readiness probe that checks DTS connectivity, MCP reachability (TCP), and the Azure OpenAI Entra credential, returning 503 with a per-dependency breakdown. Demo ergonomics: - Event producer now defaults OFF so a presenter controls when the ambient feed starts. New endpoints POST /api/producer/start, /stop and GET /api/producer/status, plus a power-button toggle in the UI Live Feed panel. - start.ps1 / stop.ps1: Windows launchers that force UTF-8, start MCP -> worker -> backend -> UI in order with health gating, and stop the stack cleanly (match demo processes by command line). Docs: - .env.sample: document DTS Mode A (local emulator) vs Mode B (Azure DTS), clarify DefaultAzureCredential vs API key, and the event-producer defaults. - README.md: split Running the Demo into the two DTS modes, add the Windows quick-start via start.ps1, and document the /health readiness check. Validated end-to-end against a live Azure DTS taskhub: orchestration scheduling, multi-specialist fraud analysis, durable HITL external-event wait, approve decision, exactly-once account-lock activity, and terminal COMPLETED status. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- agentic_ai/observability/setup.py | 44 +++- .../fraud_detection_durable/.env.sample | 61 ++++- .../fraud_detection_durable/README.md | 65 ++++- .../fraud_detection_durable/backend.py | 227 ++++++++++++++---- .../fraud_detection_durable/start.ps1 | 141 +++++++++++ .../workflow/fraud_detection_durable/stop.ps1 | 82 +++++++ .../ui/src/components/EventFeed.jsx | 49 +++- .../ui/src/constants/config.js | 3 + .../fraud_detection_durable/worker.py | 7 + 9 files changed, 607 insertions(+), 72 deletions(-) create mode 100644 agentic_ai/workflow/fraud_detection_durable/start.ps1 create mode 100644 agentic_ai/workflow/fraud_detection_durable/stop.ps1 diff --git a/agentic_ai/observability/setup.py b/agentic_ai/observability/setup.py index 6efe21bc4..8b8d69fb2 100644 --- a/agentic_ai/observability/setup.py +++ b/agentic_ai/observability/setup.py @@ -17,6 +17,7 @@ """ import os +import sys import logging from typing import Optional @@ -26,6 +27,43 @@ _initialized = False +def _ensure_utf8_console() -> None: + """Force stdout/stderr to UTF-8 so emoji/unicode never crash a print(). + + On Windows the default console encoding is cp1252, which raises + UnicodeEncodeError when printing emoji (βœ…, ⚠️, πŸš€, etc.). Because this + module is imported at process startup (before any agent output), calling + this here makes every downstream print()/log safe. Uses errors="replace" + as a final guard so even an un-encodable glyph degrades gracefully instead + of crashing the service. + """ + for stream_name in ("stdout", "stderr"): + stream = getattr(sys, stream_name, None) + reconfigure = getattr(stream, "reconfigure", None) + if reconfigure is None: + continue + try: + reconfigure(encoding="utf-8", errors="replace") + except Exception: + # Never let console hardening break startup. + pass + + +# Harden the console as soon as this module is imported. +_ensure_utf8_console() + + +def _safe_print(message: str) -> None: + """print() that can never raise on un-encodable characters.""" + try: + print(message) + except Exception: + try: + print(message.encode("ascii", "replace").decode("ascii")) + except Exception: + pass + + def setup_observability( connection_string: Optional[str] = None, service_name: str = "contoso-agent", @@ -89,19 +127,19 @@ def _safe_model_dump_json(self, **kwargs): pydantic.BaseModel.model_dump_json = _safe_model_dump_json # type: ignore[assignment] _initialized = True - print(f"βœ… Application Insights observability enabled (service: {service_name})") + _safe_print(f"βœ… Application Insights observability enabled (service: {service_name})") logger.info(f"βœ… Application Insights observability enabled (service: {service_name})") return True except ImportError as e: - print(f"❌ Observability dependencies not installed: {e}") + _safe_print(f"❌ Observability dependencies not installed: {e}") logger.warning(f"Observability dependencies not installed: {e}") return False except BaseException as e: # Catch ALL errors including KeyboardInterrupt from import deadlocks # in azure-ai-projects telemetry instrumentor (openai SDK version conflicts). # Observability is never worth crashing the service. - print(f"⚠️ Observability setup failed (non-fatal): {type(e).__name__}: {e}") + _safe_print(f"⚠️ Observability setup failed (non-fatal): {type(e).__name__}: {e}") logger.warning(f"Observability setup failed (non-fatal): {type(e).__name__}: {e}") return False diff --git a/agentic_ai/workflow/fraud_detection_durable/.env.sample b/agentic_ai/workflow/fraud_detection_durable/.env.sample index 1a694f06e..f56d82793 100644 --- a/agentic_ai/workflow/fraud_detection_durable/.env.sample +++ b/agentic_ai/workflow/fraud_detection_durable/.env.sample @@ -1,25 +1,66 @@ -# Azure OpenAI Configuration +# ============================================================================ +# Azure OpenAI +# ============================================================================ AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/ AZURE_OPENAI_CHAT_DEPLOYMENT=gpt-4o -# Optional: Use API key instead of Azure CLI auth + +# Authentication: by default the worker and backend use Entra ID via +# DefaultAzureCredential (run `az login`). This is the recommended path and +# needs NO API key. Only uncomment AZURE_OPENAI_API_KEY if you specifically +# want key-based auth β€” note a stale/invalid key here is a common source of +# 401 errors, so leave it commented unless you need it. # AZURE_OPENAI_API_KEY=your-api-key -# MCP Server +# ============================================================================ +# MCP Server (Contoso tools) +# ============================================================================ MCP_SERVER_URI=http://localhost:8000/mcp -# Durable Task Scheduler -# Local emulator (default) +# ============================================================================ +# Durable Task Scheduler (DTS) β€” choose ONE mode +# ============================================================================ +# The code selects the mode purely from the DTS_ENDPOINT prefix: +# - http://localhost... -> local emulator, insecure channel, NO credential +# - anything else -> Azure DTS, secure channel, DefaultAzureCredential +# +# --- Mode A: Local emulator (Docker) ----------------------------------------- +# Start it first: +# docker run -d --name dts-emulator -p 8080:8080 -p 8082:8082 \ +# mcr.microsoft.com/dts/dts-emulator:latest +# Or use: .\start.ps1 -StartEmulator DTS_ENDPOINT=http://localhost:8080 DTS_TASKHUB=fraud-detection +# +# --- Mode B: Azure DTS (managed) --------------------------------------------- +# Requires `az login` (DefaultAzureCredential) with the Durable Task Data +# Contributor role on the scheduler. Provision with provision_dts.ps1. +# DTS_ENDPOINT=https://your-scheduler..durabletask.io +# DTS_TASKHUB=fraud-detection -# Azure-hosted (production) -# DTS_ENDPOINT=https://your-dts-endpoint.azure.com -# DTS_TASKHUB=fraud-detection-prod - -# Human-in-the-loop Configuration +# ============================================================================ +# Human-in-the-loop +# ============================================================================ ANALYST_APPROVAL_TIMEOUT_HOURS=72 MAX_REVIEW_ATTEMPTS=3 +# ============================================================================ +# Layer 1 ambient event producer +# ============================================================================ +# Default OFF so a presenter controls when the ambient feed begins (start it +# from the UI power button or POST /api/producer/start). Set to true to +# auto-start the feed on backend boot. +EVENT_PRODUCER_ENABLED=false +# Seconds between synthetic telemetry events. +EVENT_INTERVAL_SECONDS=3.0 +# Probability (0-1) that a post-warmup event is anomalous. +ANOMALY_PROBABILITY=0.08 + +# ============================================================================ # Application Insights (optional - enables telemetry) +# ============================================================================ +# Telemetry is fully optional and never required for the demo. Observability +# setup is hardened to never crash startup if this is unset or misconfigured. # APPLICATIONINSIGHTS_CONNECTION_STRING=InstrumentationKey=... # ENABLE_SENSITIVE_DATA=true +# Backend telemetry is gated separately (off by default): +# BACKEND_OBSERVABILITY=false diff --git a/agentic_ai/workflow/fraud_detection_durable/README.md b/agentic_ai/workflow/fraud_detection_durable/README.md index c64a376fd..fde6fa3b1 100644 --- a/agentic_ai/workflow/fraud_detection_durable/README.md +++ b/agentic_ai/workflow/fraud_detection_durable/README.md @@ -549,18 +549,59 @@ This section covers how to run the reference implementation locally to see the p ### Prerequisites -- **Docker** β€” for DTS emulator - **Python 3.12+** with **uv** - **Node.js 18+** β€” for React UI - **Azure OpenAI** β€” with a deployed chat model - **MCP Server** β€” Contoso tools on port 8000 +- **A Durable Task Scheduler** β€” either the local Docker emulator **or** an Azure DTS endpoint (see modes below) +- **Docker** β€” only if you use the local DTS emulator (Mode A) -### Service Startup Order +### Choose your DTS mode + +The code picks the mode automatically from the `DTS_ENDPOINT` prefix in `.env`: + +| | **Mode A β€” Local emulator** | **Mode B β€” Azure DTS** | +|---|---|---| +| `DTS_ENDPOINT` | `http://localhost:8080` | `https://..durabletask.io` | +| Channel | insecure | secure (TLS) | +| Auth | none | `DefaultAzureCredential` (`az login`) | +| Needs Docker | βœ… yes | ❌ no | +| Setup | `docker run … dts-emulator` | [provision_dts.ps1](provision_dts.ps1) | + +> ⚠️ Make sure `DTS_ENDPOINT` in your `.env` matches the mode you actually start. +> A common mistake is starting the Docker emulator while `.env` points at Azure DTS +> (or vice-versa). + +### Quick start (Windows) + +The fastest path on Windows β€” handles UTF-8, startup order, and health gating: + +```powershell +cd agentic_ai/workflow/fraud_detection_durable +uv sync +# Mode B (Azure DTS, default in .env): +.\start.ps1 +# Mode A (also start the local Docker emulator): +.\start.ps1 -StartEmulator + +# When done: +.\stop.ps1 # add -StopEmulator if you used Mode A +``` + +`start.ps1` launches MCP β†’ worker β†’ backend β†’ UI in order, each in its own +window with logs under `.\logs\`, then prints the `/health` readiness summary. + +> The ambient event feed starts **OFF** by default so you control when Scenario 1 +> begins β€” press the ⏻ power button in the **Live Feed** panel, or +> `POST http://localhost:8001/api/producer/start`. Set `EVENT_PRODUCER_ENABLED=true` +> in `.env` to auto-start it. + +### Manual startup (cross-platform) ```mermaid %%{init: {'theme': 'base'}}%% flowchart LR - S1["1️⃣ DTS Emulator
Port 8080"] + S1["1️⃣ DTS
emulator :8080 or Azure"] S2["2️⃣ MCP Server
Port 8000"] S3["3️⃣ Worker
Pulls from DTS"] S4["4️⃣ Backend
Port 8001"] @@ -575,7 +616,7 @@ flowchart LR style S5 fill:#e8f4fd,stroke:#4A90D9 ``` -#### 1. Start DTS Emulator +#### 1. Start the DTS (Mode A only) ```bash docker run -d --name dts-emulator \ @@ -585,7 +626,8 @@ docker run -d --name dts-emulator \ Dashboard: http://localhost:8082 -> **Production:** Replace with Azure DTS β€” same SDK, just change `DTS_ENDPOINT`. See [provision_dts.ps1](provision_dts.ps1). +> **Mode B (Azure DTS):** skip this step β€” just `az login` and set `DTS_ENDPOINT` +> to your scheduler. See [provision_dts.ps1](provision_dts.ps1). #### 2. Start MCP Server @@ -613,6 +655,19 @@ cd ui && npm install && npm run dev # Open http://localhost:3000 ``` +> **Windows note:** the services force UTF-8 stdout/stderr so emoji in logs +> render correctly. If you launch them yourself with a custom wrapper, set +> `PYTHONUTF8=1` to be safe. + +#### Verify readiness + +```bash +curl http://localhost:8001/health +``` + +Returns `status: healthy` only when DTS, MCP, and the Azure OpenAI credential +all pass; otherwise `503` with a per-dependency breakdown. + --- ## Demo Scenarios diff --git a/agentic_ai/workflow/fraud_detection_durable/backend.py b/agentic_ai/workflow/fraud_detection_durable/backend.py index 729d77cd6..5ca3b30d4 100644 --- a/agentic_ai/workflow/fraud_detection_durable/backend.py +++ b/agentic_ai/workflow/fraud_detection_durable/backend.py @@ -22,6 +22,13 @@ from dotenv import load_dotenv +# Force UTF-8 console so emoji/unicode in logs never crash on Windows cp1252. +for _stream in (sys.stdout, sys.stderr): + try: + _stream.reconfigure(encoding="utf-8", errors="replace") + except Exception: + pass + # Load environment first so observability can read connection string load_dotenv() @@ -50,7 +57,7 @@ from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles -from fastapi.responses import FileResponse, StreamingResponse +from fastapi.responses import FileResponse, StreamingResponse, JSONResponse from pydantic import BaseModel from event_producer import EventProducer, CUSTOMER_PROFILES, TelemetryEvent @@ -267,6 +274,75 @@ async def broadcast(self, instance_id: str, message: dict): _event_producer_task: asyncio.Task | None = None +async def _auto_submit_alert_callback(alert_id, customer_id, alert_type, description, severity): + """Auto-submit a detected anomaly (Layer 1) to the durable workflow (Layer 2).""" + try: + client = get_dts_client() + alert = { + "alert_id": alert_id, + "customer_id": customer_id, + "alert_type": alert_type, + "description": description, + "timestamp": datetime.now().isoformat(), + "severity": severity, + "approval_timeout_hours": 0.05, + "auto_detected": True, + } + instance_id = client.schedule_new_orchestration( + ORCHESTRATION_NAME, + input=alert, + instance_id=f"fraud-{alert_id}-{int(time.time())}", + ) + logger.info(f"πŸ€– Auto-submitted alert {alert_id} β†’ orchestration {instance_id}") + + started_event = TelemetryEvent( + id=f"WF-{alert_id}", + timestamp=alert["timestamp"], + customer_id=customer_id, + customer_name=CUSTOMER_PROFILES.get(customer_id, {}).get("name", f"Customer {customer_id}"), + event_type="workflow_auto_started", + details={ + "instance_id": instance_id, + "alert_id": alert_id, + "alert_type": alert_type, + "description": description, + "severity": severity, + }, + is_anomaly=True, + anomaly_rule=alert_type, + alert_triggered=True, + ) + await event_producer._broadcast(started_event) + except Exception as e: + logger.error(f"Failed to auto-submit alert: {e}") + + +def _start_event_producer() -> bool: + """Start the ambient event producer if not already running. Returns running state.""" + global _event_producer_task + if _event_producer_task and not _event_producer_task.done(): + return True + event_producer.set_alert_callback(_auto_submit_alert_callback) + _event_producer_task = asyncio.create_task(event_producer.run()) + logger.info("β–Ά Event producer started (Layer 1 ambient detection)") + return True + + +def _stop_event_producer() -> bool: + """Stop the ambient event producer if running. Returns running state.""" + global _event_producer_task + event_producer.stop() + if _event_producer_task: + _event_producer_task.cancel() + _event_producer_task = None + logger.info("⏸ Event producer stopped") + return False + + +def _event_producer_running() -> bool: + return bool(_event_producer_task and not _event_producer_task.done()) + + # ============================================================================ # Background Task: Poll DTS for Status Updates # ============================================================================ @@ -383,8 +459,96 @@ async def read_root(): @app.get("/health") async def health(): - """Health check endpoint.""" - return {"status": "healthy", "timestamp": datetime.now().isoformat()} + """Deep readiness check. + + Verifies the three external dependencies the demo needs so a presenter can + confirm everything is wired *before* starting a scenario: + - DTS : can we reach the Durable Task Scheduler taskhub? + - MCP : is the Contoso MCP server responding? + - OpenAI : can DefaultAzureCredential acquire a token (Entra auth)? + + Returns 200 when all checks pass, 503 otherwise. Each check is best-effort + and time-boxed so the endpoint stays fast and never hangs the UI. + """ + checks: dict[str, dict] = {} + + # 1) DTS connectivity β€” a metadata lookup on a non-existent instance returns + # None when the gRPC channel + auth are healthy. + try: + client = get_dts_client() + await asyncio.wait_for( + asyncio.to_thread(client.get_orchestration_state, "health-check-probe"), + timeout=5.0, + ) + checks["dts"] = {"ok": True, "endpoint": os.getenv("DTS_ENDPOINT", "http://localhost:8080")} + except Exception as e: + checks["dts"] = {"ok": False, "error": f"{type(e).__name__}: {e}"} + + # 2) MCP server reachability. FastMCP serves only the /mcp endpoint (no + # /health route), and that endpoint expects specific POST semantics, so + # we verify reachability with a lightweight TCP connect to host:port. + mcp_uri = os.getenv("MCP_SERVER_URI", "http://localhost:8000/mcp") + try: + from urllib.parse import urlparse + parsed = urlparse(mcp_uri) + host = parsed.hostname or "localhost" + port = parsed.port or (443 if parsed.scheme == "https" else 80) + fut = asyncio.open_connection(host, port) + reader, writer = await asyncio.wait_for(fut, timeout=5.0) + writer.close() + try: + await writer.wait_closed() + except Exception: + pass + checks["mcp"] = {"ok": True, "host": host, "port": port} + except Exception as e: + checks["mcp"] = {"ok": False, "error": f"{type(e).__name__}: {e}", "uri": mcp_uri} + + # 3) Azure OpenAI auth β€” can we get an Entra token for the cognitive + # services scope? (The worker/backend authenticate via DefaultAzureCredential.) + try: + cred = DefaultAzureCredential() + token = await asyncio.wait_for( + asyncio.to_thread(cred.get_token, "https://cognitiveservices.azure.com/.default"), + timeout=5.0, + ) + checks["openai_auth"] = {"ok": bool(token and token.token)} + except Exception as e: + checks["openai_auth"] = {"ok": False, "error": f"{type(e).__name__}: {e}"} + + healthy = all(c.get("ok") for c in checks.values()) + body = { + "status": "healthy" if healthy else "degraded", + "timestamp": datetime.now().isoformat(), + "checks": checks, + } + if not healthy: + return JSONResponse(status_code=503, content=body) + return body + + +@app.get("/api/producer/status") +async def producer_status(): + """Return whether the ambient event producer is running.""" + return { + "running": _event_producer_running(), + "interval_seconds": event_producer.interval, + "anomaly_probability": event_producer.anomaly_probability, + } + + +@app.post("/api/producer/start") +async def producer_start(): + """Start the ambient event producer (Layer 1 detection feed).""" + running = _start_event_producer() + return {"running": running} + + +@app.post("/api/producer/stop") +async def producer_stop(): + """Stop the ambient event producer.""" + running = _stop_event_producer() + return {"running": running} @app.get("/api/alerts", response_model=list[AlertInfo]) @@ -592,56 +756,15 @@ async def startup(): # with asyncio's ProactorEventLoop signal handling. logger.info("βœ“ DTS client will initialize on first request (lazy)") - # Start Layer 1 event producer (ambient detection) - if os.getenv("EVENT_PRODUCER_ENABLED", "true").lower() in ("1", "true", "yes"): - async def alert_callback(alert_id, customer_id, alert_type, description, severity): - """Auto-submit detected anomaly to the durable workflow.""" - try: - client = get_dts_client() - alert = { - "alert_id": alert_id, - "customer_id": customer_id, - "alert_type": alert_type, - "description": description, - "timestamp": datetime.now().isoformat(), - "severity": severity, - "approval_timeout_hours": 0.05, - "auto_detected": True, - } - instance_id = client.schedule_new_orchestration( - ORCHESTRATION_NAME, - input=alert, - instance_id=f"fraud-{alert_id}-{int(time.time())}", - ) - logger.info(f"πŸ€– Auto-submitted alert {alert_id} β†’ orchestration {instance_id}") - - # Broadcast workflow_auto_started to SSE so the UI can connect - started_event = TelemetryEvent( - id=f"WF-{alert_id}", - timestamp=alert["timestamp"], - customer_id=customer_id, - customer_name=CUSTOMER_PROFILES.get(customer_id, {}).get("name", f"Customer {customer_id}"), - event_type="workflow_auto_started", - details={ - "instance_id": instance_id, - "alert_id": alert_id, - "alert_type": alert_type, - "description": description, - "severity": severity, - }, - is_anomaly=True, - anomaly_rule=alert_type, - alert_triggered=True, - ) - await event_producer._broadcast(started_event) - except Exception as e: - logger.error(f"Failed to auto-submit alert: {e}") - - event_producer.set_alert_callback(alert_callback) - _event_producer_task = asyncio.create_task(event_producer.run()) - logger.info("βœ“ Event producer started (Layer 1 ambient detection)") + # Start Layer 1 event producer (ambient detection). + # Default OFF so a presenter controls when the demo's ambient feed begins + # (use the UI toggle or POST /api/producer/start). Set EVENT_PRODUCER_ENABLED=true + # to auto-start on boot. + if os.getenv("EVENT_PRODUCER_ENABLED", "false").lower() in ("1", "true", "yes"): + _start_event_producer() + logger.info("βœ“ Event producer auto-started (EVENT_PRODUCER_ENABLED=true)") else: - logger.info("⏸ Event producer disabled (set EVENT_PRODUCER_ENABLED=true to enable)") + logger.info("⏸ Event producer idle β€” start it from the UI or POST /api/producer/start") logger.info("Backend ready! πŸš€") diff --git a/agentic_ai/workflow/fraud_detection_durable/start.ps1 b/agentic_ai/workflow/fraud_detection_durable/start.ps1 new file mode 100644 index 000000000..1cd4e63ed --- /dev/null +++ b/agentic_ai/workflow/fraud_detection_durable/start.ps1 @@ -0,0 +1,141 @@ +<# +.SYNOPSIS + Start the Fraud Detection durable-workflow demo on Windows. + +.DESCRIPTION + Starts the four local services in the correct order with health gating: + 1. MCP server (port 8000) + 2. DTS worker (no port - pulls from DTS) + 3. Backend (BFF) (port 8001) + 4. React UI (port 3000) + + The Azure Durable Task Scheduler is expected to be reachable already + (either the local Docker emulator on :8080 or an Azure DTS endpoint set + via DTS_ENDPOINT in .env). This script does NOT start the emulator. + + Each service runs in its own window and logs to .\logs\.log. + Forces UTF-8 so emoji/unicode in logs never crash on Windows cp1252. + +.PARAMETER StartEmulator + Also start the local DTS emulator in Docker (port 8080/8082) before the + other services. Requires Docker Desktop running. + +.EXAMPLE + .\start.ps1 +.EXAMPLE + .\start.ps1 -StartEmulator +#> +[CmdletBinding()] +param( + [switch]$StartEmulator +) + +$ErrorActionPreference = 'Stop' +$here = Split-Path -Parent $MyInvocation.MyCommand.Path +$repoRoot = (Resolve-Path (Join-Path $here '..\..\..')).Path +$mcpDir = Join-Path $repoRoot 'mcp' +$uiDir = Join-Path $here 'ui' +$logDir = Join-Path $here 'logs' + +# Force UTF-8 for any child process we spawn from here. +$env:PYTHONUTF8 = '1' +$env:PYTHONIOENCODING = 'utf-8' + +New-Item -ItemType Directory -Force -Path $logDir | Out-Null + +function Write-Step($msg) { Write-Host "==> $msg" -ForegroundColor Cyan } +function Write-Ok($msg) { Write-Host " [ok] $msg" -ForegroundColor Green } +function Write-Warn2($msg){ Write-Host " [!!] $msg" -ForegroundColor Yellow } + +function Wait-Port { + param([int]$Port, [int]$TimeoutSec = 60, [string]$Name = "service") + $deadline = (Get-Date).AddSeconds($TimeoutSec) + while ((Get-Date) -lt $deadline) { + $ok = Test-NetConnection -ComputerName localhost -Port $Port -InformationLevel Quiet -WarningAction SilentlyContinue + if ($ok) { Write-Ok "$Name listening on port $Port"; return $true } + Start-Sleep -Seconds 2 + } + Write-Warn2 "$Name did not open port $Port within $TimeoutSec s (check $logDir)" + return $false +} + +function Start-Service2 { + param([string]$Title, [string]$WorkDir, [string]$Command, [string]$LogFile) + # Launch in a new window so the presenter can see each service; tee to a log. + $full = "`$env:PYTHONUTF8='1'; `$env:PYTHONIOENCODING='utf-8'; Set-Location '$WorkDir'; $Command *>&1 | Tee-Object -FilePath '$LogFile'" + Start-Process -FilePath 'powershell.exe' ` + -ArgumentList @('-NoExit', '-NoProfile', '-Command', $full) ` + -WindowStyle Normal | Out-Null + Write-Ok "$Title started (logs: $LogFile)" +} + +Write-Host "" +Write-Host "Fraud Detection Durable Workflow - Demo Launcher" -ForegroundColor White +Write-Host "------------------------------------------------" -ForegroundColor DarkGray + +# 0) Optional: local DTS emulator +if ($StartEmulator) { + Write-Step "Starting local DTS emulator (Docker)" + try { + docker rm -f dts-emulator 2>$null | Out-Null + docker run -d --name dts-emulator -p 8080:8080 -p 8082:8082 ` + mcr.microsoft.com/dts/dts-emulator:latest | Out-Null + Wait-Port -Port 8080 -TimeoutSec 60 -Name "DTS emulator" | Out-Null + Write-Ok "DTS emulator dashboard: http://localhost:8082" + } catch { + Write-Warn2 "Could not start DTS emulator: $_" + Write-Warn2 "Is Docker Desktop running? Or use an Azure DTS endpoint in .env." + } +} else { + Write-Step "Using DTS endpoint from .env (no local emulator)" + Write-Warn2 "If you intended the local emulator, re-run with -StartEmulator" +} + +# 1) MCP server +Write-Step "Starting MCP server (port 8000)" +Start-Service2 -Title 'MCP' -WorkDir $mcpDir ` + -Command 'uv run python mcp_service.py' ` + -LogFile (Join-Path $logDir 'mcp.log') +Wait-Port -Port 8000 -TimeoutSec 60 -Name 'MCP' | Out-Null + +# 2) Worker +Write-Step "Starting DTS worker" +Start-Service2 -Title 'Worker' -WorkDir $here ` + -Command 'uv run python worker.py' ` + -LogFile (Join-Path $logDir 'worker.log') +Write-Ok "Worker launched (connects to DTS; no local port)" +Start-Sleep -Seconds 8 + +# 3) Backend +Write-Step "Starting backend (port 8001)" +Start-Service2 -Title 'Backend' -WorkDir $here ` + -Command 'uv run python backend.py' ` + -LogFile (Join-Path $logDir 'backend.log') +Wait-Port -Port 8001 -TimeoutSec 60 -Name 'Backend' | Out-Null + +# 4) React UI +Write-Step "Starting React UI (port 3000)" +Start-Service2 -Title 'UI' -WorkDir $uiDir ` + -Command 'npm run dev' ` + -LogFile (Join-Path $logDir 'ui.log') +Wait-Port -Port 3000 -TimeoutSec 90 -Name 'UI' | Out-Null + +Write-Host "" +Write-Step "Verifying backend readiness (/health)" +try { + $health = Invoke-RestMethod -Uri 'http://localhost:8001/health' -TimeoutSec 15 + Write-Ok ("Backend health: {0}" -f $health.status) + foreach ($k in $health.checks.PSObject.Properties.Name) { + $c = $health.checks.$k + if ($c.ok) { Write-Ok " $k OK" } else { Write-Warn2 " $k FAILED: $($c.error)" } + } +} catch { + Write-Warn2 "Could not reach /health yet: $_" +} + +Write-Host "" +Write-Host "Demo is up. Open: http://localhost:3000" -ForegroundColor Green +Write-Host "Ambient feed starts OFF - press the power button in the Live Feed panel," -ForegroundColor Gray +Write-Host "or POST http://localhost:8001/api/producer/start to begin Scenario 1." -ForegroundColor Gray +Write-Host "Stop everything with: .\stop.ps1" -ForegroundColor Gray +Write-Host "" diff --git a/agentic_ai/workflow/fraud_detection_durable/stop.ps1 b/agentic_ai/workflow/fraud_detection_durable/stop.ps1 new file mode 100644 index 000000000..f3c74bb7e --- /dev/null +++ b/agentic_ai/workflow/fraud_detection_durable/stop.ps1 @@ -0,0 +1,82 @@ +<# +.SYNOPSIS + Stop all Fraud Detection durable-workflow demo services on Windows. + +.DESCRIPTION + Finds and stops the demo processes started by start.ps1: + - MCP server (mcp_service.py) + - DTS worker (worker.py) + - Backend (backend.py) + - React UI (vite / npm run dev on port 3000) + + Matches by command line so it won't touch unrelated python/node processes. + +.PARAMETER StopEmulator + Also stop+remove the local DTS emulator Docker container. + +.EXAMPLE + .\stop.ps1 +.EXAMPLE + .\stop.ps1 -StopEmulator +#> +[CmdletBinding()] +param( + [switch]$StopEmulator +) + +function Write-Step($msg) { Write-Host "==> $msg" -ForegroundColor Cyan } +function Write-Ok($msg) { Write-Host " [ok] $msg" -ForegroundColor Green } + +# Match the demo processes by their command line (avoids killing the editor, +# Copilot, or other python/node processes). +$patterns = @( + 'mcp_service\.py', + 'worker\.py', + 'backend\.py', + 'fraud_detection_durable\\ui', # vite / npm dev server for the UI + 'vite' +) + +Write-Step "Stopping demo services" +$procs = Get-CimInstance Win32_Process -Filter "Name='python.exe' OR Name='node.exe'" | + Where-Object { + $cl = $_.CommandLine + $cl -and ($patterns | Where-Object { $cl -match $_ }) -and ($cl -match 'fraud_detection_durable' -or $cl -match 'mcp_service\.py') + } + +if (-not $procs) { + Write-Ok "No running demo services found." +} else { + foreach ($p in $procs) { + try { + Stop-Process -Id $p.ProcessId -Force -ErrorAction Stop + Write-Ok ("Stopped PID {0} ({1})" -f $p.ProcessId, ($p.CommandLine -replace '.*\\', '').Substring(0, [Math]::Min(40, ($p.CommandLine -replace '.*\\', '').Length))) + } catch { + Write-Host " [!!] Could not stop PID $($p.ProcessId): $_" -ForegroundColor Yellow + } + } +} + +# Free any lingering listeners on the demo ports. +foreach ($port in @(8000, 8001, 3000)) { + $conns = Get-NetTCPConnection -LocalPort $port -State Listen -ErrorAction SilentlyContinue + foreach ($c in $conns) { + try { + Stop-Process -Id $c.OwningProcess -Force -ErrorAction Stop + Write-Ok "Freed port $port (PID $($c.OwningProcess))" + } catch { } + } +} + +if ($StopEmulator) { + Write-Step "Stopping local DTS emulator" + try { + docker rm -f dts-emulator 2>$null | Out-Null + Write-Ok "DTS emulator removed" + } catch { + Write-Host " [!!] Could not remove emulator: $_" -ForegroundColor Yellow + } +} + +Write-Host "" +Write-Host "All demo services stopped." -ForegroundColor Green diff --git a/agentic_ai/workflow/fraud_detection_durable/ui/src/components/EventFeed.jsx b/agentic_ai/workflow/fraud_detection_durable/ui/src/components/EventFeed.jsx index ba633c545..1d92d158e 100644 --- a/agentic_ai/workflow/fraud_detection_durable/ui/src/components/EventFeed.jsx +++ b/agentic_ai/workflow/fraud_detection_durable/ui/src/components/EventFeed.jsx @@ -15,6 +15,7 @@ import FiberManualRecordIcon from '@mui/icons-material/FiberManualRecord'; import WarningAmberIcon from '@mui/icons-material/WarningAmber'; import DeleteSweepIcon from '@mui/icons-material/DeleteSweep'; import OpenInNewIcon from '@mui/icons-material/OpenInNew'; +import PowerSettingsNewIcon from '@mui/icons-material/PowerSettingsNew'; import { API_CONFIG } from '../constants/config'; const MAX_EVENTS = 80; @@ -51,6 +52,7 @@ export default function EventFeed({ onAnomalyDetected, onWorkflowAutoStarted }) const [connected, setConnected] = useState(false); const [anomalyCount, setAnomalyCount] = useState(0); const [autoStartedAlerts, setAutoStartedAlerts] = useState([]); + const [producerRunning, setProducerRunning] = useState(false); const scrollRef = useRef(null); const pausedRef = useRef(false); const eventsRef = useRef([]); @@ -59,6 +61,40 @@ export default function EventFeed({ onAnomalyDetected, onWorkflowAutoStarted }) // Keep ref in sync with state useEffect(() => { pausedRef.current = paused; }, [paused]); + // Poll server-side ambient producer status so the toggle reflects reality. + useEffect(() => { + let cancelled = false; + const fetchStatus = async () => { + try { + const res = await fetch(`${API_CONFIG.BASE_URL}${API_CONFIG.ENDPOINTS.PRODUCER_STATUS}`); + if (res.ok) { + const data = await res.json(); + if (!cancelled) setProducerRunning(!!data.running); + } + } catch { + /* backend not ready yet β€” ignore */ + } + }; + fetchStatus(); + const t = setInterval(fetchStatus, 5000); + return () => { cancelled = true; clearInterval(t); }; + }, []); + + const toggleProducer = useCallback(async () => { + const endpoint = producerRunning + ? API_CONFIG.ENDPOINTS.PRODUCER_STOP + : API_CONFIG.ENDPOINTS.PRODUCER_START; + try { + const res = await fetch(`${API_CONFIG.BASE_URL}${endpoint}`, { method: 'POST' }); + if (res.ok) { + const data = await res.json(); + setProducerRunning(!!data.running); + } + } catch (err) { + console.error('Failed to toggle ambient feed:', err); + } + }, [producerRunning]); + // Auto-scroll to bottom when new events arrive useEffect(() => { if (!paused && scrollRef.current) { @@ -185,7 +221,12 @@ export default function EventFeed({ onAnomalyDetected, onWorkflowAutoStarted }) )} - + + + + + + setPaused(p => !p)}> {paused ? : } @@ -250,7 +291,11 @@ export default function EventFeed({ onAnomalyDetected, onWorkflowAutoStarted }) > {events.length === 0 && ( - {connected ? 'Waiting for events…' : 'Connecting to event stream…'} + {!connected + ? 'Connecting to event stream…' + : producerRunning + ? 'Waiting for events…' + : 'Ambient feed is idle β€” press the ⏻ power button above to start it.'} )} {events.map((evt, i) => ( diff --git a/agentic_ai/workflow/fraud_detection_durable/ui/src/constants/config.js b/agentic_ai/workflow/fraud_detection_durable/ui/src/constants/config.js index d6295ab58..dac64ac86 100644 --- a/agentic_ai/workflow/fraud_detection_durable/ui/src/constants/config.js +++ b/agentic_ai/workflow/fraud_detection_durable/ui/src/constants/config.js @@ -29,6 +29,9 @@ export const API_CONFIG = { WORKFLOW_START: '/api/workflow/start', WORKFLOW_DECISION: '/api/workflow/decision', EVENTS_STREAM: '/api/events/stream', + PRODUCER_STATUS: '/api/producer/status', + PRODUCER_START: '/api/producer/start', + PRODUCER_STOP: '/api/producer/stop', }, }; diff --git a/agentic_ai/workflow/fraud_detection_durable/worker.py b/agentic_ai/workflow/fraud_detection_durable/worker.py index 8cf1ff10a..f61608426 100644 --- a/agentic_ai/workflow/fraud_detection_durable/worker.py +++ b/agentic_ai/workflow/fraud_detection_durable/worker.py @@ -29,6 +29,13 @@ from dotenv import load_dotenv +# Force UTF-8 console so emoji/unicode in logs never crash on Windows cp1252. +for _stream in (sys.stdout, sys.stderr): + try: + _stream.reconfigure(encoding="utf-8", errors="replace") + except Exception: + pass + # Load environment first so observability can read connection string load_dotenv()