Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/opencode_a2a/server/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pathlib import Path
from typing import Any, cast
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit

from sqlalchemy import event
from sqlalchemy.engine import make_url
Expand All @@ -12,6 +13,9 @@
_SQLITE_JOURNAL_MODE = "WAL"
_SQLITE_BUSY_TIMEOUT_MS = 30_000
_SQLITE_SYNCHRONOUS_MODE = "NORMAL"
_SENSITIVE_DATABASE_QUERY_KEYS = frozenset(
{"password", "passwd", "pwd", "token", "secret", "api_key", "apikey", "access_token"}
)


def _configure_sqlite_connection(dbapi_connection: Any, _connection_record: Any) -> None:
Expand All @@ -24,6 +28,20 @@ def _configure_sqlite_connection(dbapi_connection: Any, _connection_record: Any)
cursor.close()


def redact_database_url_for_logs(database_url: str) -> str:
parts = urlsplit(database_url)
if not parts.query:
return database_url

redacted_query = []
for key, value in parse_qsl(parts.query, keep_blank_values=True):
if key.lower() in _SENSITIVE_DATABASE_QUERY_KEYS:
redacted_query.append((key, "***"))
continue
redacted_query.append((key, value))
return urlunsplit(parts._replace(query=urlencode(redacted_query)))


def build_database_engine(settings: Settings) -> AsyncEngine:
database_url = cast(str, settings.a2a_task_store_database_url)
url = make_url(database_url)
Expand Down
4 changes: 2 additions & 2 deletions src/opencode_a2a/server/task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ..config import Settings
from ..task_states import TERMINAL_TASK_STATES
from .context_helpers import normalize_server_call_context
from .database import build_database_engine
from .database import build_database_engine, redact_database_url_for_logs
from .task_store_sdk_compat import DatabaseTaskStoreCompat

if TYPE_CHECKING:
Expand Down Expand Up @@ -341,7 +341,7 @@ def describe_lightweight_persistence_backend(settings: Settings) -> dict[str, st
if settings.a2a_task_store_backend != "database":
return summary
url = make_url(cast(str, settings.a2a_task_store_database_url))
summary["database_url"] = url.render_as_string(hide_password=True)
summary["database_url"] = redact_database_url_for_logs(url.render_as_string(hide_password=True))
summary["sqlite_tuning"] = (
"local_durability_defaults" if url.drivername.startswith("sqlite") else "not_applicable"
)
Expand Down
5 changes: 4 additions & 1 deletion src/opencode_a2a/server/task_store_sdk_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sqlalchemy.dialects.sqlite import insert as sqlite_insert

from ..task_states import TERMINAL_TASK_STATES
from .database import redact_database_url_for_logs

_ATOMIC_TERMINAL_GUARD_DIALECTS = frozenset({"postgresql", "sqlite"})
_TERMINAL_TASK_STATE_VALUES = tuple(TaskState.Name(int(state)) for state in TERMINAL_TASK_STATES)
Expand Down Expand Up @@ -66,7 +67,9 @@ async def initialize(self) -> None:
await self._task_store.initialize()

async def validate_schema(self) -> None:
database_url = self._task_store.engine.url.render_as_string(hide_password=True)
database_url = redact_database_url_for_logs(
self._task_store.engine.url.render_as_string(hide_password=True)
)
table_name = self._shape.task_model.__table__.name
required_indexes = frozenset({f"idx_{table_name}_owner_last_updated"})
async with self._task_store.engine.begin() as conn:
Expand Down
38 changes: 37 additions & 1 deletion tests/server/test_task_store_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import warnings
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
from urllib.parse import parse_qsl, urlsplit

import pytest
from a2a.server.context import ServerCallContext
Expand All @@ -13,7 +14,7 @@
from google.protobuf.timestamp_pb2 import Timestamp
from sqlalchemy import text

from opencode_a2a.server.database import build_database_engine
from opencode_a2a.server.database import build_database_engine, redact_database_url_for_logs
from opencode_a2a.server.task_store import (
FirstTerminalStateWinsPolicy,
GuardedTaskStore,
Expand Down Expand Up @@ -160,6 +161,41 @@ def test_describe_lightweight_persistence_backend_marks_sqlite_first_scope() ->
}


def test_redact_database_url_for_logs_masks_sensitive_query_values() -> None:
redacted = redact_database_url_for_logs(
"postgresql+asyncpg://user:***@db.example.com/app"
"?sslmode=require&token=super-secret&API_KEY=top-secret&pool_size=5"
)

assert redacted == (
"postgresql+asyncpg://user:***@db.example.com/app"
"?sslmode=require&token=%2A%2A%2A&API_KEY=%2A%2A%2A&pool_size=5"
)


def test_describe_lightweight_persistence_backend_redacts_sensitive_query_values() -> None:
settings = make_settings(
test_bearer_token="test-token",
a2a_task_store_database_url=(
"postgresql+asyncpg://db.example.com/app"
"?sslmode=require&token=super-secret&api_key=top-secret&pool_size=5"
),
)

summary = describe_lightweight_persistence_backend(settings)

assert summary["backend"] == "database"
assert summary["scope"] == "sdk_tasks_and_adapter_state"
assert summary["sqlite_tuning"] == "not_applicable"
assert summary["database_url"].startswith("postgresql+asyncpg://db.example.com/app?")
assert dict(parse_qsl(urlsplit(summary["database_url"]).query, keep_blank_values=True)) == {
"sslmode": "require",
"token": "***",
"api_key": "***",
"pool_size": "5",
}


def test_describe_lightweight_persistence_backend_supports_memory_backend() -> None:
settings = make_settings(
test_bearer_token="test-token",
Expand Down