diff --git a/src/opencode_a2a/server/database.py b/src/opencode_a2a/server/database.py index 546df93..ad060cd 100644 --- a/src/opencode_a2a/server/database.py +++ b/src/opencode_a2a/server/database.py @@ -2,6 +2,7 @@ from pathlib import Path from typing import Any, cast +from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit from sqlalchemy import event from sqlalchemy.engine import make_url @@ -12,6 +13,9 @@ _SQLITE_JOURNAL_MODE = "WAL" _SQLITE_BUSY_TIMEOUT_MS = 30_000 _SQLITE_SYNCHRONOUS_MODE = "NORMAL" +_SENSITIVE_DATABASE_QUERY_KEYS = frozenset( + {"password", "passwd", "pwd", "token", "secret", "api_key", "apikey", "access_token"} +) def _configure_sqlite_connection(dbapi_connection: Any, _connection_record: Any) -> None: @@ -24,6 +28,20 @@ def _configure_sqlite_connection(dbapi_connection: Any, _connection_record: Any) cursor.close() +def redact_database_url_for_logs(database_url: str) -> str: + parts = urlsplit(database_url) + if not parts.query: + return database_url + + redacted_query = [] + for key, value in parse_qsl(parts.query, keep_blank_values=True): + if key.lower() in _SENSITIVE_DATABASE_QUERY_KEYS: + redacted_query.append((key, "***")) + continue + redacted_query.append((key, value)) + return urlunsplit(parts._replace(query=urlencode(redacted_query))) + + def build_database_engine(settings: Settings) -> AsyncEngine: database_url = cast(str, settings.a2a_task_store_database_url) url = make_url(database_url) diff --git a/src/opencode_a2a/server/task_store.py b/src/opencode_a2a/server/task_store.py index 034ae94..79516b2 100644 --- a/src/opencode_a2a/server/task_store.py +++ b/src/opencode_a2a/server/task_store.py @@ -18,7 +18,7 @@ from ..config import Settings from ..task_states import TERMINAL_TASK_STATES from .context_helpers import normalize_server_call_context -from .database import build_database_engine +from .database import build_database_engine, redact_database_url_for_logs from .task_store_sdk_compat import DatabaseTaskStoreCompat if TYPE_CHECKING: @@ -341,7 +341,7 @@ def describe_lightweight_persistence_backend(settings: Settings) -> dict[str, st if settings.a2a_task_store_backend != "database": return summary url = make_url(cast(str, settings.a2a_task_store_database_url)) - summary["database_url"] = url.render_as_string(hide_password=True) + summary["database_url"] = redact_database_url_for_logs(url.render_as_string(hide_password=True)) summary["sqlite_tuning"] = ( "local_durability_defaults" if url.drivername.startswith("sqlite") else "not_applicable" ) diff --git a/src/opencode_a2a/server/task_store_sdk_compat.py b/src/opencode_a2a/server/task_store_sdk_compat.py index 04936e3..3824d27 100644 --- a/src/opencode_a2a/server/task_store_sdk_compat.py +++ b/src/opencode_a2a/server/task_store_sdk_compat.py @@ -14,6 +14,7 @@ from sqlalchemy.dialects.sqlite import insert as sqlite_insert from ..task_states import TERMINAL_TASK_STATES +from .database import redact_database_url_for_logs _ATOMIC_TERMINAL_GUARD_DIALECTS = frozenset({"postgresql", "sqlite"}) _TERMINAL_TASK_STATE_VALUES = tuple(TaskState.Name(int(state)) for state in TERMINAL_TASK_STATES) @@ -66,7 +67,9 @@ async def initialize(self) -> None: await self._task_store.initialize() async def validate_schema(self) -> None: - database_url = self._task_store.engine.url.render_as_string(hide_password=True) + database_url = redact_database_url_for_logs( + self._task_store.engine.url.render_as_string(hide_password=True) + ) table_name = self._shape.task_model.__table__.name required_indexes = frozenset({f"idx_{table_name}_owner_last_updated"}) async with self._task_store.engine.begin() as conn: diff --git a/tests/server/test_task_store_factory.py b/tests/server/test_task_store_factory.py index df7bce4..c27f90a 100644 --- a/tests/server/test_task_store_factory.py +++ b/tests/server/test_task_store_factory.py @@ -4,6 +4,7 @@ import warnings from pathlib import Path from unittest.mock import AsyncMock, MagicMock +from urllib.parse import parse_qsl, urlsplit import pytest from a2a.server.context import ServerCallContext @@ -13,7 +14,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from sqlalchemy import text -from opencode_a2a.server.database import build_database_engine +from opencode_a2a.server.database import build_database_engine, redact_database_url_for_logs from opencode_a2a.server.task_store import ( FirstTerminalStateWinsPolicy, GuardedTaskStore, @@ -160,6 +161,41 @@ def test_describe_lightweight_persistence_backend_marks_sqlite_first_scope() -> } +def test_redact_database_url_for_logs_masks_sensitive_query_values() -> None: + redacted = redact_database_url_for_logs( + "postgresql+asyncpg://user:***@db.example.com/app" + "?sslmode=require&token=super-secret&API_KEY=top-secret&pool_size=5" + ) + + assert redacted == ( + "postgresql+asyncpg://user:***@db.example.com/app" + "?sslmode=require&token=%2A%2A%2A&API_KEY=%2A%2A%2A&pool_size=5" + ) + + +def test_describe_lightweight_persistence_backend_redacts_sensitive_query_values() -> None: + settings = make_settings( + test_bearer_token="test-token", + a2a_task_store_database_url=( + "postgresql+asyncpg://db.example.com/app" + "?sslmode=require&token=super-secret&api_key=top-secret&pool_size=5" + ), + ) + + summary = describe_lightweight_persistence_backend(settings) + + assert summary["backend"] == "database" + assert summary["scope"] == "sdk_tasks_and_adapter_state" + assert summary["sqlite_tuning"] == "not_applicable" + assert summary["database_url"].startswith("postgresql+asyncpg://db.example.com/app?") + assert dict(parse_qsl(urlsplit(summary["database_url"]).query, keep_blank_values=True)) == { + "sslmode": "require", + "token": "***", + "api_key": "***", + "pool_size": "5", + } + + def test_describe_lightweight_persistence_backend_supports_memory_backend() -> None: settings = make_settings( test_bearer_token="test-token",