diff --git a/capiscio_sdk/connect.py b/capiscio_sdk/connect.py index df5477b..c873fd8 100644 --- a/capiscio_sdk/connect.py +++ b/capiscio_sdk/connect.py @@ -18,6 +18,7 @@ import json import os +import sys import logging import httpx from pathlib import Path @@ -53,10 +54,16 @@ def _public_jwk_from_private(private_jwk: dict) -> dict: return public -def _log_agent_key_capture_hint(agent_id: str, private_jwk: dict) -> None: - """Log a one-time hint telling the user how to persist key material.""" - compact_json = json.dumps(private_jwk, separators=(",", ":")) - logger.warning( +def _print_agent_key_capture_hint(agent_id: str, private_jwk: dict) -> None: + """Print a one-time hint telling the user how to persist key material. + + SECURITY: Never output private key material. Only the key ID (kid) is + shown. Users are directed to the CLI or key files for export. + + Output goes to stderr so it doesn't interfere with stdout-based protocols. + """ + kid = private_jwk.get("kid", "unknown") + print( "\n" " \u2554" + "\u2550" * 62 + "\u2557\n" " \u2551 New agent identity generated \u2014 save key for persistence \u2551\n" @@ -66,11 +73,18 @@ def _log_agent_key_capture_hint(agent_id: str, private_jwk: dict) -> None: " serverless, CI) the identity will be lost on restart unless\n" " you persist the private key.\n" "\n" - " Add to your secrets manager / .env:\n" + f" Key ID: {kid}\n" + "\n" + " To export the private key for your secrets manager:\n" + "\n" + " capiscio identity export --format env\n" + "\n" + " Or copy the key file from:\n" "\n" - " CAPISCIO_AGENT_PRIVATE_KEY_JWK='" + compact_json + "'\n" + f" ~/.capiscio/keys/{agent_id}/private.jwk\n" "\n" - " The DID will be recovered automatically from the JWK on startup.\n" + " The DID will be recovered automatically from the JWK on startup.\n", + file=sys.stderr, ) @@ -697,7 +711,7 @@ def _init_identity(self) -> str: if private_key_path.exists(): try: private_jwk = json.loads(private_key_path.read_text()) - _log_agent_key_capture_hint(self.agent_id, private_jwk) + _print_agent_key_capture_hint(self.agent_id, private_jwk) except Exception: pass # Best-effort hint diff --git a/tests/unit/test_connect.py b/tests/unit/test_connect.py index 0e2463d..c2cf91a 100644 --- a/tests/unit/test_connect.py +++ b/tests/unit/test_connect.py @@ -18,11 +18,43 @@ DEFAULT_KEYS_DIR, ENV_AGENT_PRIVATE_KEY, PROD_REGISTRY, - _log_agent_key_capture_hint, + _print_agent_key_capture_hint, _public_jwk_from_private, ) +class TestPrintAgentKeyCaptureHint: + """Regression tests for _print_agent_key_capture_hint — security-critical.""" + + def test_no_private_key_material_in_output(self, capsys): + """SECURITY: Output must never contain private key 'd' parameter or full JWK.""" + private_jwk = { + "kty": "OKP", + "crv": "Ed25519", + "kid": "did:key:z6MkSafe", + "x": "public_x_value_base64", + "d": "SUPER_SECRET_PRIVATE_KEY_VALUE", + } + _print_agent_key_capture_hint("my-agent", private_jwk) + captured = capsys.readouterr() + stderr_output = captured.err + + # Must NOT contain private key material + assert "SUPER_SECRET_PRIVATE_KEY_VALUE" not in stderr_output + assert '"d"' not in stderr_output + assert "public_x_value_base64" not in stderr_output + + # Must contain only kid for identification + assert "did:key:z6MkSafe" in stderr_output + + def test_correct_key_path_in_output(self, capsys): + """Output must show the correct default key file path.""" + private_jwk = {"kty": "OKP", "crv": "Ed25519", "kid": "did:key:z6Mk1"} + _print_agent_key_capture_hint("agent-42", private_jwk) + captured = capsys.readouterr() + assert "~/.capiscio/keys/agent-42/private.jwk" in captured.err + + class TestAgentIdentity: """Tests for AgentIdentity dataclass.""" @@ -933,7 +965,7 @@ def test_init_identity_logs_capture_hint_on_new_gen(self, tmp_path): "d": "gen", "x": "gen", })) - with patch.object(connect_module, "_log_agent_key_capture_hint") as mock_hint: + with patch.object(connect_module, "_print_agent_key_capture_hint") as mock_hint: connector._init_identity() mock_hint.assert_called_once() @@ -962,7 +994,7 @@ def test_init_identity_no_capture_hint_on_recovery(self, tmp_path): ) connector._ensure_did_registered = MagicMock(return_value=None) - with patch.object(connect_module, "_log_agent_key_capture_hint") as mock_hint: + with patch.object(connect_module, "_print_agent_key_capture_hint") as mock_hint: connector._init_identity() mock_hint.assert_not_called()