diff --git a/CHANGELOG.md b/CHANGELOG.md index 74c802b..700df3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,76 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Declarative policy engine (`DeclarativePolicyEngine`) that loads rules from YAML or TOML files. + Rules are evaluated top-down with first-match-wins semantics; supports `safety_class`, `sensitivity`, + `roles`, `attributes`, and `min_justification` match conditions. (#42) +- Policy denial explanation: `ExplainingPolicyEngine` protocol plus `DefaultPolicyEngine.explain()` and + `DeclarativePolicyEngine.explain()` implementations return a structured `DenialExplanation` with a + `FailedCondition` list for every failing check (no short-circuit), a `remediation` list, and a + human-readable `narrative`. (#48) +- Dry-run invocation mode: `kernel.invoke(..., dry_run=True)` verifies the token and resolves the + execution plan without calling the driver. Returns `DryRunResult` with the resolved `driver_id`, + `operation`, `response_mode`, and an `estimated_cost` tier (`low`/`medium`/`high`). (#43) +- `Kernel.explain_denial()` convenience method that calls the policy engine's `explain()` for a given + `CapabilityRequest` and `Principal` without requiring a token. Raises `AgentKernelError` when the + configured engine does not implement `explain()`. +- New public types exported from `agent_kernel`: `DeclarativePolicyEngine`, `ExplainingPolicyEngine`, + `PolicyEngine`, `PolicyMatch`, `PolicyRule`, `DenialExplanation`, `FailedCondition`, `DryRunResult`, + `PolicyConfigError`. +- `policy` optional extra (`pip install weaver-kernel[policy]`) pulls in `pyyaml` and `tomli` (Python 3.10). +- Example policy files in `examples/policies/` (YAML and TOML formats). + +### Changed +- `PolicyEngine` protocol no longer requires `explain()`. Engines that need to support + `Kernel.explain_denial()` should implement the new `ExplainingPolicyEngine` protocol. Built-in + engines satisfy both. This avoids a breaking typing change for downstream implementers. +- `DeclarativePolicyEngine` now defers `yaml` and `tomllib`/`tomli` imports into the corresponding + loaders, so `import agent_kernel` works without the `policy` extra installed. Calling + `from_yaml`/`from_toml` without the parser surfaces a `PolicyConfigError` with an install hint. +- `Kernel.invoke(dry_run=True)` resolves `operation` the same way drivers do + (`args.get("operation", capability_id)`) so `DryRunResult.operation` matches what a driver would + actually receive — instead of `capability.impl.operation`, which can diverge. +- `Kernel.invoke(dry_run=True)` mirrors the Firewall's admin-only gate for `raw` mode: non-admin + principals see their requested `raw` mode downgraded to `summary` in `DryRunResult`, matching + what they would actually get at real-invoke time. Prevents probing for raw availability. + +### Documentation +- `docs/architecture.md` now describes `PolicyEngine` / `ExplainingPolicyEngine` protocols, + `DefaultPolicyEngine` and `DeclarativePolicyEngine` (with policy-DSL semantics), and dry-run + mode (admin gate, operation resolution rule). Closes the canonical "Components & API + reference" gap flagged in audit. +- `docs/capabilities.md` adds a "Dry-run mode" section (semantics, the three parity rules, + no-side-effects guarantee), a "Declarative policies" section (loaders, match conditions, + optional-extra behaviour), and a "Denial explanations" section. Closes the affected-files + gap from issue #43. + +### Fixed +- `DeclarativePolicyEngine._parse_rule()` now validates the types of `roles`, `attributes`, + `min_justification`, and `constraints` in policy files and raises `PolicyConfigError` with a + precise message instead of silently producing misbehaving rules or raising at evaluation time. +- `DeclarativePolicyEngine.explain()` now correctly reports explicit deny rules that fully match + (previously fell through to the misleading `no_matching_rule` fallback and dropped the rule's + reason). Partial-match deny rules are now skipped so the explanation focuses on actionable allow + rules instead of suggesting changes that would only trigger the deny. +- Example policy files (`examples/policies/default.{yaml,toml}`) now use the correct `default` key + (was `default_action`, which the parser silently ignored), express PII-with-tenant as an allow + rule paired with default-deny (the previous deny rule was inverted under first-match-wins), and + order the `allow-secrets-service` rule before the deny rule (the deny was previously unreachable). +- `Kernel.explain_denial()` docstring no longer contradicts itself ("never raises" vs. + `CapabilityNotFound`). +- `DryRunResult.budget_remaining` docstring no longer references the unimplemented `BudgetManager`; + the field is documented as reserved for a future cross-invocation budget mechanism. +- `drivers/mcp.py` adds an explicit `_McpError: type[BaseException] | None` annotation so mypy + `--strict` remains happy across the try/except import branches. + +### Tests +- `tests/test_policy.py` adds `test_declarative_replicates_default_policy_decisions` — a + comparative test asserting that `DeclarativePolicyEngine` and `DefaultPolicyEngine` produce + the same allow/deny outcomes across a curated scenario matrix (READ × non-sensitive / PII / + PCI / SECRETS, WRITE/DESTRUCTIVE with and without required roles and justification). Closes + issue #42's "comparative test" acceptance criterion. + ## [0.5.0] - 2026-04-12 ### Added diff --git a/docs/agent-context/invariants.md b/docs/agent-context/invariants.md index ba1f7f8..01f015f 100644 --- a/docs/agent-context/invariants.md +++ b/docs/agent-context/invariants.md @@ -64,6 +64,19 @@ tag is **silently ignored** — capabilities tagged with it pass policy without **Rule:** When adding a `SensitivityTag`, always add a matching policy rule and test. +### Dry-run response-mode parity +`Kernel.invoke(dry_run=True)` reports the response mode the caller would actually +get at real-invoke time. The Firewall downgrades `raw` to `summary` for non-admin +principals (`firewall/transform.py:108`), so dry-run must mirror that downgrade — +otherwise a non-admin caller can probe/assume raw-mode availability they will never +actually receive. The same applies to `operation`: dry-run resolves it the same way +drivers do (`args.get("operation", capability_id)`), so what the caller sees in +`DryRunResult` matches what a driver would receive. + +**Rule:** Any code path that reports a response mode or driver operation back to the +caller must apply the same admin gate / resolution rule the real-invoke path uses, +including dry-run, mock, and test paths. + ## Safe vs. unsafe changes | Safe | Unsafe | diff --git a/docs/architecture.md b/docs/architecture.md index b2c7fe6..5a7a6af 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -27,23 +27,42 @@ graph TD ## Components ### Kernel -The central orchestrator. Wires all components together and exposes five methods: +The central orchestrator. Wires all components together and exposes: - `request_capabilities(goal)` — discover relevant capabilities - `grant_capability(request, principal, justification)` — policy check + token issuance -- `invoke(token, principal, args, response_mode)` — execute + firewall + trace +- `invoke(token, principal, args, response_mode, dry_run=False)` — execute + firewall + trace, or short-circuit before driver dispatch when `dry_run=True` - `expand(handle, query)` — paginate/filter stored results - `explain(action_id)` — retrieve audit trace +- `explain_denial(request, principal, justification)` — return a structured `DenialExplanation` instead of raising `PolicyDenied` ### CapabilityRegistry A flat dict of `Capability` objects indexed by `capability_id`. Provides keyword-based search (no LLM, no vector DB — purely token overlap scoring). ### PolicyEngine -The `DefaultPolicyEngine` implements role-based rules: -1. **READ** — always allowed -2. **WRITE** — requires `justification ≥ 15 chars` + role `writer|admin` -3. **DESTRUCTIVE** — requires role `admin` -4. **PII/PCI** — requires `tenant` attribute; enforces `allowed_fields` unless `pii_reader` -5. **max_rows** — 50 (user), 500 (service) +Two protocols and two built-in engines: + +- **`PolicyEngine`** (protocol) — single required method: `evaluate(request, capability, principal, justification) -> PolicyDecision`. +- **`ExplainingPolicyEngine`** (protocol, extends `PolicyEngine`) — adds `explain(...) -> DenialExplanation`. Only engines that implement this protocol can be used with `Kernel.explain_denial`; otherwise that call raises `AgentKernelError` with a clear message. Splitting the contract keeps existing downstream `PolicyEngine` implementers backward-compatible. + +Both built-in engines satisfy `ExplainingPolicyEngine`: + +- **`DefaultPolicyEngine`** — hardcoded role-based rules: + 1. **READ** — always allowed + 2. **WRITE** — requires `justification ≥ 15 chars` + role `writer|admin` + 3. **DESTRUCTIVE** — requires role `admin` + `justification ≥ 15 chars` + 4. **PII/PCI** — requires `tenant` attribute; enforces `allowed_fields` unless `pii_reader` + 5. **SECRETS** — requires role `admin|secrets_reader` + `justification ≥ 15 chars` + 6. **max_rows** — 50 (user), 500 (service) + 7. **Rate limiting** — sliding-window per `(principal_id, capability_id)` (60 READ / 10 WRITE / 2 DESTRUCTIVE per 60s; service role gets 10×) +- **`DeclarativePolicyEngine`** — loads rules from a YAML or TOML file (or a plain dict). Supports `safety_class`, `sensitivity`, `roles`, `attributes`, and `min_justification` match conditions; `allow`/`deny` actions; per-rule `constraints` merged into the resulting `PolicyDecision`; configurable `default` action. Rules are evaluated top-down with first-match-wins. `pyyaml` and `tomli` are optional dependencies — `import agent_kernel` works without them; calling `from_yaml`/`from_toml` without the parser raises `PolicyConfigError` with an install hint. + +#### Denial explanations + +`PolicyEngine.explain()` (when available) returns a structured `DenialExplanation` with `denied`, `rule_name`, a `failed_conditions: list[FailedCondition]` describing each missing condition with `required`/`actual`/`suggestion`, a `remediation` list, and a human-readable `narrative`. Engines collect all failing conditions (no short-circuit) so callers get the full picture. For `DeclarativePolicyEngine`, an explicit deny rule that fully matches is reported as the cause; partial-match deny rules are skipped during explanation so the surfaced advice is actionable rather than self-defeating. + +#### Dry-run mode + +`Kernel.invoke(dry_run=True)` verifies the token and resolves the route plan but **never calls the driver**. It returns a `DryRunResult` with the resolved `driver_id`, the same `operation` a driver would receive (`args.get("operation", capability_id)`), the request constraints, the effective `response_mode` (Firewall's admin-only gate is mirrored: non-admin `raw` is downgraded to `summary`), and a coarse `estimated_cost` tier based on `SafetyClass`. Token verification still raises `TokenExpired` / `TokenInvalid` / `TokenScopeError` in dry-run, so the mode is safe as a policy/route sanity check. See [`docs/capabilities.md`](capabilities.md#dry-run-mode) for usage and [`docs/agent-context/invariants.md`](agent-context/invariants.md) for the parity rule with the real-invoke path. ### TokenProvider (HMAC) Issues HMAC-SHA256 signed tokens. Each token is bound to `principal_id + capability_id + constraints`. Verification checks: expiry → signature → principal → capability. diff --git a/docs/capabilities.md b/docs/capabilities.md index 2863d88..db448db 100644 --- a/docs/capabilities.md +++ b/docs/capabilities.md @@ -47,3 +47,123 @@ Capability( ... ) ``` + +## Dry-run mode + +`Kernel.invoke(..., dry_run=True)` verifies the token and resolves the route +plan but **never calls the driver**. Use it to validate that a principal can +invoke a capability, inspect what a driver *would* receive, or run policy +checks in CI without live tool backends. + +```python +result = await kernel.invoke( + token, + principal=principal, + args={"operation": "billing.list_invoices", "max_rows": 5}, + response_mode="summary", + dry_run=True, +) +# result: DryRunResult( +# capability_id="billing.list_invoices", +# principal_id="user-001", +# policy_decision=PolicyDecision(allowed=True, ...), +# driver_id="billing", +# operation="billing.list_invoices", +# resolved_args={"operation": "billing.list_invoices", "max_rows": 5}, +# response_mode="summary", +# budget_remaining=None, +# estimated_cost="low", +# ) +``` + +Three rules govern dry-run behaviour — keep them in sync with the real-invoke +path if you change either: + +1. **Token verification still runs.** Expired, revoked, or scope-mismatched + tokens raise `TokenExpired` / `TokenRevoked` / `TokenInvalid` / + `TokenScopeError` exactly as they would at real-invoke. Policy is *not* + re-evaluated at invoke time — the granting policy decision is encoded in + the token at `grant_capability`. +2. **Operation resolution mirrors drivers.** `DryRunResult.operation` is + computed the same way every driver computes it: + `str(args.get("operation", capability_id))`. Always use `args["operation"]` + when you need a fixed operation; otherwise the dry-run operation is the + capability ID, matching what the driver would see. +3. **Raw-mode admin gate mirrors the Firewall.** Non-admin principals never + get `response_mode="raw"` at real-invoke (the Firewall downgrades it to + `"summary"` — see `firewall/transform.py`). Dry-run downgrades the same + way, so non-admin callers cannot probe for raw-mode availability via + `DryRunResult`. + +The driver's `execute()` is never called in dry-run, so the mode is free of +side effects regardless of driver type (`InMemoryDriver`, `HTTPDriver`, +`MCPDriver`). `DryRunResult.budget_remaining` is currently always `None`; the +field is reserved for a future cross-invocation budget mechanism. + +## Declarative policies + +`DeclarativePolicyEngine` is an alternative to `DefaultPolicyEngine` that +loads rules from a YAML or TOML file (or a plain dict). Rules are evaluated +top-down, first-match-wins; if no rule matches, the policy's `default` action +applies (`"deny"` unless overridden). + +```python +from pathlib import Path +from agent_kernel import DeclarativePolicyEngine, Kernel + +# YAML or TOML — both formats are interchangeable. +policy = DeclarativePolicyEngine.from_yaml(Path("examples/policies/default.yaml")) + +# Or build entirely in-memory: +policy = DeclarativePolicyEngine.from_dict({ + "default": "deny", + "rules": [ + {"name": "allow-read", "action": "allow", + "match": {"safety_class": ["READ"], "sensitivity": ["NONE"]}}, + # ... + ], +}) + +kernel = Kernel(registry=registry, policy=policy) +``` + +A rule's `match` block supports `safety_class`, `sensitivity`, `roles` +(ANY-of), `attributes` (ALL-of, with `"*"` meaning "attribute must be +present"), and `min_justification` (minimum stripped length). On `allow`, the +rule's `constraints` are merged into the resulting `PolicyDecision`. On +`deny`, `reason` is embedded in the raised `PolicyDenied`. + +The DSL has no negation/missing-attribute operator today, so a policy that +should deny "when an attribute is missing" should be expressed as an allow +rule requiring the attribute paired with `default: deny`. See +[`examples/policies/default.yaml`](../examples/policies/default.yaml) for a +worked example. + +`pyyaml` and `tomli` are **optional** — they live behind the `[policy]` +extra. `import agent_kernel` always works; calling `from_yaml` / `from_toml` +without the parser installed raises `PolicyConfigError` with an install hint. + +## Denial explanations + +When a capability call is denied, `Kernel.explain_denial(request, principal, +justification="")` returns a structured `DenialExplanation` describing +**every** unmet condition (not just the first one), so the caller can see the +full remediation path: + +```python +explanation = kernel.explain_denial( + CapabilityRequest(capability_id="billing.update_invoice", goal="..."), + principal, + justification="too short", +) +# explanation.denied == True +# explanation.rule_name == "write-min_justification" +# explanation.failed_conditions == [FailedCondition(condition="roles", required=[...]), ...] +# explanation.remediation == ["Add 'writer' or 'admin' role to ...", "Provide ..."] +# explanation.narrative == "Request for 'billing.update_invoice' by '...' would be denied: ..." +``` + +Both built-in engines support `explain()`. If you bring a custom policy +engine that implements only `PolicyEngine.evaluate`, `explain_denial` raises +`AgentKernelError` with guidance — implement the `ExplainingPolicyEngine` +protocol to enable structured explanations. diff --git a/examples/policies/default.toml b/examples/policies/default.toml new file mode 100644 index 0000000..1d62354 --- /dev/null +++ b/examples/policies/default.toml @@ -0,0 +1,68 @@ +# Example declarative policy for agent-kernel (TOML format) +# Load with: DeclarativePolicyEngine.from_toml(Path("examples/policies/default.toml")) +# +# Equivalent to default.yaml — TOML and YAML formats are interchangeable. +# +# Note: the current DSL has no negation / missing-attribute operator. Policies +# that should deny "when an attribute is missing" must be expressed as allow +# rules requiring the attribute, paired with `default = "deny"`. + +default = "deny" + +# Allow read-only operations on non-sensitive data. +[[rules]] +name = "allow-read-nonsensitive" +action = "allow" +[rules.match] +safety_class = ["READ"] +sensitivity = ["NONE"] + +# Allow PII access only when the principal carries a tenant attribute. +# PII requests without a tenant fall through to default-deny. +[[rules]] +name = "allow-pii-with-tenant" +action = "allow" +[rules.match] +sensitivity = ["PII"] +[rules.match.attributes] +tenant = "*" # "*" means: attribute must be present with any value + +# Allow write operations for editors and admins with a justification. +[[rules]] +name = "allow-write-editors" +action = "allow" +[rules.match] +safety_class = ["WRITE"] +sensitivity = ["NONE"] +roles = ["editor", "admin"] +min_justification = 20 + +# Allow destructive operations for admins only with a strong justification. +[[rules]] +name = "allow-destructive-admin" +action = "allow" +[rules.match] +safety_class = ["DESTRUCTIVE"] +roles = ["admin"] +min_justification = 50 + +# Allow service principals to read secrets with a justification. +# IMPORTANT: this allow rule must precede any deny-secrets rule — +# first-match-wins means an earlier unconditional deny would shadow it. +[[rules]] +name = "allow-secrets-service" +action = "allow" +[rules.match] +sensitivity = ["SECRETS"] +roles = ["service"] +min_justification = 10 + +# Any SECRETS request that wasn't already allowed is explicitly denied so +# the caller gets a descriptive PolicyDenied reason. Functionally equivalent +# to default-deny but provides clearer feedback. +[[rules]] +name = "deny-secrets-non-service" +action = "deny" +reason = "SECRETS access is restricted to service-role principals" +[rules.match] +sensitivity = ["SECRETS"] diff --git a/examples/policies/default.yaml b/examples/policies/default.yaml new file mode 100644 index 0000000..00145d0 --- /dev/null +++ b/examples/policies/default.yaml @@ -0,0 +1,64 @@ +# Example declarative policy for agent-kernel +# Load with: DeclarativePolicyEngine.from_yaml(Path("examples/policies/default.yaml")) +# +# Rules are evaluated top-down with first-match-wins semantics. +# If no rule matches, the `default` action applies. +# +# Note: the current DSL has no negation / missing-attribute operator. Policies +# that should deny "when an attribute is missing" must be expressed as allow +# rules requiring the attribute, paired with `default: deny`. + +default: deny + +rules: + # Allow read-only operations on non-sensitive data. + - name: allow-read-nonsensitive + action: allow + match: + safety_class: [READ] + sensitivity: [NONE] + + # Allow PII access only when the principal carries a tenant attribute. + # PII requests without a tenant fall through to default-deny. + - name: allow-pii-with-tenant + action: allow + match: + sensitivity: [PII] + attributes: + tenant: "*" # "*" means: attribute must be present with any value + + # Allow write operations for editors and admins with a justification. + - name: allow-write-editors + action: allow + match: + safety_class: [WRITE] + sensitivity: [NONE] + roles: [editor, admin] + min_justification: 20 + + # Allow destructive operations for admins only with a strong justification. + - name: allow-destructive-admin + action: allow + match: + safety_class: [DESTRUCTIVE] + roles: [admin] + min_justification: 50 + + # Allow service principals to read secrets with a justification. + # IMPORTANT: this allow rule must precede any deny-secrets rule — + # first-match-wins means an earlier unconditional deny would shadow it. + - name: allow-secrets-service + action: allow + match: + sensitivity: [SECRETS] + roles: [service] + min_justification: 10 + + # Any SECRETS request that wasn't already allowed is explicitly denied so + # the caller gets a descriptive PolicyDenied reason. Functionally equivalent + # to default-deny but provides clearer feedback. + - name: deny-secrets-non-service + action: deny + match: + sensitivity: [SECRETS] + reason: "SECRETS access is restricted to service-role principals" diff --git a/pyproject.toml b/pyproject.toml index cceada3..33a244d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,9 +39,16 @@ dev = [ "mypy>=1.10", "httpx>=0.27", "mcp>=1.6", + "pyyaml>=6.0", + "tomli>=2.0; python_version<'3.11'", + "types-PyYAML>=6.0", ] mcp = ["mcp>=1.6"] otel = ["opentelemetry-api>=1.20"] +policy = [ + "pyyaml>=6.0", + "tomli>=2.0; python_version<'3.11'", +] [tool.hatch.build.targets.wheel] packages = ["src/agent_kernel"] @@ -62,3 +69,19 @@ ignore = ["E501"] python_version = "3.10" strict = true files = ["src/"] + +[[tool.mypy.overrides]] +module = "tomli" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "yaml" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "httpx" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "mcp.*" +ignore_missing_imports = true diff --git a/src/agent_kernel/__init__.py b/src/agent_kernel/__init__.py index 35cfb78..73fda47 100644 --- a/src/agent_kernel/__init__.py +++ b/src/agent_kernel/__init__.py @@ -15,7 +15,7 @@ Policy:: - from agent_kernel import DefaultPolicyEngine + from agent_kernel import DefaultPolicyEngine, DeclarativePolicyEngine Firewall:: @@ -30,7 +30,7 @@ from agent_kernel import ( AgentKernelError, TokenExpired, TokenInvalid, TokenScopeError, - PolicyDenied, DriverError, FirewallError, + PolicyDenied, PolicyConfigError, DriverError, FirewallError, CapabilityNotFound, HandleNotFound, HandleExpired, ) """ @@ -48,6 +48,7 @@ FirewallError, HandleExpired, HandleNotFound, + PolicyConfigError, PolicyDenied, TokenExpired, TokenInvalid, @@ -63,6 +64,9 @@ Capability, CapabilityGrant, CapabilityRequest, + DenialExplanation, + DryRunResult, + FailedCondition, Frame, Handle, ImplementationRef, @@ -73,13 +77,14 @@ ResponseMode, RoutePlan, ) -from .policy import DefaultPolicyEngine +from .policy import DefaultPolicyEngine, ExplainingPolicyEngine, PolicyEngine +from .policy_dsl import DeclarativePolicyEngine, PolicyMatch, PolicyRule from .registry import CapabilityRegistry from .router import StaticRouter from .tokens import CapabilityToken, HMACTokenProvider from .trace import TraceStore -__version__ = "0.4.0" +__version__ = "0.5.0" __all__ = [ # version @@ -93,6 +98,9 @@ "CapabilityGrant", "CapabilityRequest", "CapabilityToken", + "DenialExplanation", + "DryRunResult", + "FailedCondition", "Frame", "Handle", "ImplementationRef", @@ -114,6 +122,7 @@ "FirewallError", "HandleExpired", "HandleNotFound", + "PolicyConfigError", "PolicyDenied", "TokenExpired", "TokenInvalid", @@ -121,6 +130,11 @@ "TokenScopeError", # policy "DefaultPolicyEngine", + "DeclarativePolicyEngine", + "ExplainingPolicyEngine", + "PolicyEngine", + "PolicyMatch", + "PolicyRule", # tokens "HMACTokenProvider", # router diff --git a/src/agent_kernel/drivers/mcp.py b/src/agent_kernel/drivers/mcp.py index caba34e..fe4f2c7 100644 --- a/src/agent_kernel/drivers/mcp.py +++ b/src/agent_kernel/drivers/mcp.py @@ -21,11 +21,13 @@ # Lazy import of McpError — only available when the mcp optional dep is installed. # If mcp is absent, factory methods raise ImportError before any session is created, -# so _McpError will never be None on a live driver instance. +# so _McpError will never be None on a live driver instance. The explicit annotation +# keeps mypy --strict happy across the try/except branches. +_McpError: type[BaseException] | None try: from mcp.shared.exceptions import McpError as _McpError except ImportError: # pragma: no cover - _McpError = None # type: ignore[assignment,misc] + _McpError = None def _infer_safety_class(spec: ToolSpec) -> SafetyClass: diff --git a/src/agent_kernel/errors.py b/src/agent_kernel/errors.py index 7cce33f..0b4d15f 100644 --- a/src/agent_kernel/errors.py +++ b/src/agent_kernel/errors.py @@ -31,6 +31,10 @@ class PolicyDenied(AgentKernelError): """Raised when the policy engine rejects a capability request.""" +class PolicyConfigError(AgentKernelError): + """Raised when a declarative policy file is malformed or unreadable.""" + + # ── Driver errors ───────────────────────────────────────────────────────────── diff --git a/src/agent_kernel/kernel.py b/src/agent_kernel/kernel.py index de8e310..2611471 100644 --- a/src/agent_kernel/kernel.py +++ b/src/agent_kernel/kernel.py @@ -5,18 +5,22 @@ import datetime import logging import uuid -from typing import Any +from typing import Any, Literal, overload from .drivers.base import Driver, ExecutionContext -from .errors import DriverError +from .enums import SafetyClass +from .errors import AgentKernelError, DriverError from .firewall.transform import Firewall from .handles import HandleStore from .models import ( ActionTrace, CapabilityGrant, CapabilityRequest, + DenialExplanation, + DryRunResult, Frame, Handle, + PolicyDecision, Principal, ResponseMode, RoutePlan, @@ -170,6 +174,28 @@ def get_token( """ return self.grant_capability(request, principal, justification=justification).token + @overload + async def invoke( + self, + token: CapabilityToken, + *, + principal: Principal, + args: dict[str, Any], + response_mode: ResponseMode = ..., + dry_run: Literal[True], + ) -> DryRunResult: ... + + @overload + async def invoke( + self, + token: CapabilityToken, + *, + principal: Principal, + args: dict[str, Any], + response_mode: ResponseMode = ..., + dry_run: Literal[False] = ..., + ) -> Frame: ... + async def invoke( self, token: CapabilityToken, @@ -177,18 +203,26 @@ async def invoke( principal: Principal, args: dict[str, Any], response_mode: ResponseMode = "summary", - ) -> Frame: + dry_run: bool = False, + ) -> Frame | DryRunResult: """Execute a capability using a signed token and return a Frame. + When ``dry_run=True`` the full pipeline runs (token verification, + capability lookup, route resolution) but the driver is never called. + A :class:`DryRunResult` is returned instead of a :class:`Frame`. + Args: token: A signed :class:`CapabilityToken` authorising the invocation. principal: The principal invoking the capability (must match token). args: Arguments passed to the driver. response_mode: How to present the result (``summary``, ``table``, ``handle_only``, or ``raw``). + dry_run: When ``True``, skip driver execution and return a + :class:`DryRunResult` describing what would happen. Returns: - A bounded :class:`Frame` (never raw driver output). + A bounded :class:`Frame`, or :class:`DryRunResult` when + ``dry_run=True``. Raises: TokenRevoked: If the token has been revoked. @@ -196,7 +230,7 @@ async def invoke( TokenInvalid: If the token signature does not verify. TokenScopeError: If the token belongs to a different principal or capability. CapabilityNotFound: If the capability is not registered. - DriverError: If all drivers fail. + DriverError: If all drivers fail (not raised in dry-run mode). """ # ── Verify token ────────────────────────────────────────────────────── self._token_provider.verify( @@ -205,10 +239,46 @@ async def invoke( expected_capability_id=token.capability_id, ) - action_id = str(uuid.uuid4()) - self._registry.get(token.capability_id) # validate capability exists + capability = self._registry.get(token.capability_id) plan: RoutePlan = self._router.route(token.capability_id) + # ── Dry-run short-circuit ───────────────────────────────────────────── + if dry_run: + driver_id = plan.driver_ids[0] if plan.driver_ids else "" + # Mirror driver operation resolution exactly (see InMemoryDriver, + # HTTPDriver, MCPDriver — all read ``args.get("operation", capability_id)``). + # Using ``capability.impl.operation`` here would diverge from what the + # driver actually executes at real-invoke time. + operation = str(args.get("operation", token.capability_id)) + # Mirror Firewall's admin-only gate for ``raw`` mode + # (see firewall/transform.py:108 and docs/agent-context/invariants.md). + # Dry-run must not let non-admin principals probe raw-mode availability. + effective_response_mode: ResponseMode = response_mode + if response_mode == "raw" and "admin" not in principal.roles: + effective_response_mode = "summary" + _cost_map: dict[SafetyClass, Literal["low", "medium", "high"]] = { + SafetyClass.READ: "low", + SafetyClass.WRITE: "medium", + SafetyClass.DESTRUCTIVE: "high", + } + return DryRunResult( + capability_id=token.capability_id, + principal_id=principal.principal_id, + policy_decision=PolicyDecision( + allowed=True, + reason="Token verified. Policy was evaluated at grant time.", + constraints=dict(token.constraints), + ), + driver_id=driver_id, + operation=operation, + resolved_args=args, + response_mode=effective_response_mode, + budget_remaining=None, + estimated_cost=_cost_map[capability.safety_class], + ) + + action_id = str(uuid.uuid4()) + _log_ctx = { "action_id": action_id, "principal_id": principal.principal_id, @@ -348,3 +418,49 @@ def explain(self, action_id: str) -> ActionTrace: }, ) return self._trace_store.get(action_id) + + def explain_denial( + self, + request: CapabilityRequest, + principal: Principal, + *, + justification: str = "", + ) -> DenialExplanation: + """Explain why *principal*'s *request* would be denied (or allowed). + + Delegates to the configured policy engine's ``explain()`` method. + Unlike :meth:`grant_capability`, this does not raise + :class:`PolicyDenied` when the policy fails — it returns a + :class:`DenialExplanation` instead. + + Note: Rate-limit state is not reflected here. A request denied due to + rate limits shows as ``denied=False`` in the explanation. + + Args: + request: The capability request to explain. + principal: The principal to evaluate the request for. + justification: Free-text justification (used in policy checks). + + Returns: + :class:`DenialExplanation` with ``denied=False`` if the request + would succeed. + + Raises: + CapabilityNotFound: If the capability is not registered. + AgentKernelError: If the configured policy engine does not + implement ``explain()``. Use :class:`DefaultPolicyEngine` or + :class:`DeclarativePolicyEngine` for structured explanations, + or add an ``explain()`` method to your engine. + """ + capability = self._registry.get(request.capability_id) + explain_fn = getattr(self._policy, "explain", None) + if explain_fn is None: + raise AgentKernelError( + f"Policy engine {type(self._policy).__name__!r} does not implement " + f"explain(); structured denial explanations are unavailable. " + f"Use DefaultPolicyEngine or DeclarativePolicyEngine, or add an " + f"explain() method to your engine." + ) + result = explain_fn(request, capability, principal, justification=justification) + assert isinstance(result, DenialExplanation) + return result diff --git a/src/agent_kernel/models.py b/src/agent_kernel/models.py index d2f0878..5b7c262 100644 --- a/src/agent_kernel/models.py +++ b/src/agent_kernel/models.py @@ -228,3 +228,76 @@ class ActionTrace: driver_id: str handle_id: str | None = None error: str | None = None + + +# ── Policy explanation ──────────────────────────────────────────────────────── + + +@dataclass(slots=True) +class FailedCondition: + """A single policy condition that was not met.""" + + condition: str + """Name of the condition (e.g. ``"roles"``, ``"min_justification"``).""" + + required: Any + """What the policy requires.""" + + actual: Any + """What the principal or request actually has.""" + + suggestion: str + """Actionable remediation hint.""" + + +@dataclass(slots=True) +class DenialExplanation: + """Structured explanation of a policy evaluation result.""" + + denied: bool + """``True`` if the request would be denied.""" + + rule_name: str + """Name of the rule (or rule category) that caused the denial.""" + + failed_conditions: list[FailedCondition] + """All conditions that were not satisfied.""" + + remediation: list[str] + """Ordered list of actionable steps to satisfy the policy.""" + + narrative: str + """Human-readable single-sentence summary.""" + + +# ── Dry-run ─────────────────────────────────────────────────────────────────── + + +@dataclass(slots=True) +class DryRunResult: + """Result of a dry-run invocation — driver is never called. + + Returned by :meth:`~agent_kernel.Kernel.invoke` when ``dry_run=True``. + """ + + capability_id: str + principal_id: str + policy_decision: PolicyDecision + """The policy decision encoded in the verified token.""" + + driver_id: str + """Driver that would handle the invocation (first in route plan).""" + + operation: str + """Operation name that would be passed to the driver.""" + + resolved_args: dict[str, Any] + """Arguments that would be forwarded to the driver.""" + + response_mode: ResponseMode + budget_remaining: int | None + """Reserved for a future cross-invocation budget mechanism; always + ``None`` in v0.5 (no such budget is tracked today).""" + + estimated_cost: Literal["low", "medium", "high"] + """Rough cost estimate based on the capability's safety class.""" diff --git a/src/agent_kernel/policy.py b/src/agent_kernel/policy.py index c85e229..fba3a13 100644 --- a/src/agent_kernel/policy.py +++ b/src/agent_kernel/policy.py @@ -11,7 +11,14 @@ from .enums import SafetyClass, SensitivityTag from .errors import AgentKernelError, PolicyDenied -from .models import Capability, CapabilityRequest, PolicyDecision, Principal +from .models import ( + Capability, + CapabilityRequest, + DenialExplanation, + FailedCondition, + PolicyDecision, + Principal, +) logger = logging.getLogger(__name__) @@ -84,7 +91,14 @@ def record(self, key: str) -> None: class PolicyEngine(Protocol): - """Interface for a policy engine.""" + """Interface for a policy engine. + + Implementations need only provide :meth:`evaluate`. To enable structured + denial explanations via :meth:`Kernel.explain_denial`, additionally + implement :meth:`ExplainingPolicyEngine.explain` — engines that satisfy + only this base protocol cause :meth:`Kernel.explain_denial` to raise + :class:`AgentKernelError`. + """ def evaluate( self, @@ -108,6 +122,44 @@ def evaluate( ... +class ExplainingPolicyEngine(PolicyEngine, Protocol): + """Policy engine that can produce structured denial explanations. + + :meth:`Kernel.explain_denial` requires this richer contract; downstream + engines that only implement :class:`PolicyEngine` keep working for + :meth:`Kernel.grant_capability` and :meth:`Kernel.invoke` but cannot + answer :meth:`Kernel.explain_denial`. + + Both built-in engines (:class:`DefaultPolicyEngine` and + :class:`agent_kernel.DeclarativePolicyEngine`) satisfy this protocol. + """ + + def explain( + self, + request: CapabilityRequest, + capability: Capability, + principal: Principal, + *, + justification: str, + ) -> DenialExplanation: + """Explain why *principal*'s *request* would be denied (or allowed). + + Unlike :meth:`evaluate`, this method never raises — it collects all + failing conditions and returns a structured :class:`DenialExplanation`. + + Args: + request: The capability request to explain. + capability: The target capability. + principal: The requesting principal. + justification: Free-text justification from the caller. + + Returns: + A :class:`DenialExplanation` with ``denied=False`` if the request + would succeed. + """ + ... + + class DefaultPolicyEngine: """Rule-based policy engine implementing the default access control policy. @@ -323,3 +375,146 @@ def evaluate( reason=reason, constraints=constraints, ) + + def explain( + self, + request: CapabilityRequest, + capability: Capability, + principal: Principal, + *, + justification: str, + ) -> DenialExplanation: + """Explain which policy conditions would deny *principal*'s *request*. + + Traverses the same rule chain as :meth:`evaluate` but collects ALL + failing conditions instead of short-circuiting on the first failure. + Rate-limit state is excluded — it is transient and not remediable + by changing the request. + + Args: + request: The capability request to explain. + capability: The target capability. + principal: The requesting principal. + justification: Free-text justification from the caller. + + Returns: + :class:`DenialExplanation` with ``denied=False`` if allowed. + """ + roles = set(principal.roles) + pid = principal.principal_id + cid = capability.capability_id + failed: list[FailedCondition] = [] + + # ── Safety class checks ─────────────────────────────────────────────── + + if capability.safety_class == SafetyClass.WRITE: + if not (roles & {"writer", "admin"}): + failed.append( + FailedCondition( + condition="roles", + required=["writer", "admin"], + actual=sorted(roles), + suggestion=f"Add 'writer' or 'admin' role to principal '{pid}'", + ) + ) + stripped = len(justification.strip()) + if stripped < _MIN_JUSTIFICATION: + failed.append( + FailedCondition( + condition="min_justification", + required=_MIN_JUSTIFICATION, + actual=stripped, + suggestion=( + f"Provide justification with at least {_MIN_JUSTIFICATION} " + f"characters (currently {stripped})" + ), + ) + ) + + elif capability.safety_class == SafetyClass.DESTRUCTIVE: + if "admin" not in roles: + failed.append( + FailedCondition( + condition="roles", + required=["admin"], + actual=sorted(roles), + suggestion=f"Add 'admin' role to principal '{pid}'", + ) + ) + stripped = len(justification.strip()) + if stripped < _MIN_JUSTIFICATION: + failed.append( + FailedCondition( + condition="min_justification", + required=_MIN_JUSTIFICATION, + actual=stripped, + suggestion=( + f"Provide justification with at least {_MIN_JUSTIFICATION} " + f"characters (currently {stripped})" + ), + ) + ) + + # ── Sensitivity checks ──────────────────────────────────────────────── + + if ( + capability.sensitivity in (SensitivityTag.PII, SensitivityTag.PCI) + and "tenant" not in principal.attributes + ): + failed.append( + FailedCondition( + condition="tenant_attribute", + required="present", + actual="absent", + suggestion=f"Add 'tenant' attribute to principal '{pid}'", + ) + ) + + if capability.sensitivity == SensitivityTag.SECRETS: + if not (roles & {"admin", "secrets_reader"}): + failed.append( + FailedCondition( + condition="roles", + required=["admin", "secrets_reader"], + actual=sorted(roles), + suggestion=f"Add 'admin' or 'secrets_reader' role to principal '{pid}'", + ) + ) + stripped = len(justification.strip()) + if stripped < _MIN_JUSTIFICATION: + failed.append( + FailedCondition( + condition="min_justification", + required=_MIN_JUSTIFICATION, + actual=stripped, + suggestion=( + f"Provide justification with at least {_MIN_JUSTIFICATION} " + f"characters (currently {stripped})" + ), + ) + ) + + denied = bool(failed) + remediation = [fc.suggestion for fc in failed] + + if denied: + first = failed[0] + rule_name = ( + f"{capability.safety_class.value.lower()}-{first.condition.replace('_', '-')}" + ) + narrative = ( + f"Request for '{cid}' by '{pid}' would be denied: " + + "; ".join(fc.suggestion for fc in failed) + + "." + ) + else: + rule_name = "allowed" + narrative = f"Request for '{cid}' by '{pid}' would be allowed by DefaultPolicyEngine." + + return DenialExplanation( + denied=denied, + rule_name=rule_name, + failed_conditions=failed, + remediation=remediation, + narrative=narrative, + ) diff --git a/src/agent_kernel/policy_dsl.py b/src/agent_kernel/policy_dsl.py new file mode 100644 index 0000000..3a4e366 --- /dev/null +++ b/src/agent_kernel/policy_dsl.py @@ -0,0 +1,503 @@ +"""Declarative policy engine: load access-control rules from YAML or TOML. + +The YAML/TOML loaders import their parsers lazily, so ``import agent_kernel`` +works without the optional ``policy`` extra installed. Calling +:meth:`DeclarativePolicyEngine.from_yaml` or :meth:`from_toml` without the +required parser surfaces a :class:`PolicyConfigError` with an install hint. +""" + +from __future__ import annotations + +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Literal + +from .enums import SafetyClass, SensitivityTag +from .errors import PolicyConfigError, PolicyDenied +from .models import ( + Capability, + CapabilityRequest, + DenialExplanation, + FailedCondition, + PolicyDecision, + Principal, +) + +# Hint surfaced when the optional ``policy`` extra is missing. +_POLICY_EXTRA_HINT = ( + "Install the policy extra to enable file loaders: pip install 'weaver-kernel[policy]'" +) + + +@dataclass(slots=True) +class PolicyMatch: + """Conditions that must ALL be satisfied for a rule to match a request. + + ``None`` fields are wildcards — they match any value. + List fields use ANY-of semantics (e.g. ``roles = ["a", "b"]`` matches + if the principal has *at least one* of those roles). + """ + + safety_class: list[SafetyClass] | None = None + """Match if ``capability.safety_class`` is in this list.""" + + sensitivity: list[SensitivityTag] | None = None + """Match if ``capability.sensitivity`` is in this list.""" + + roles: list[str] | None = None + """Match if the principal has ANY of these roles.""" + + attributes: dict[str, str] | None = None + """Match if the principal has ALL these attributes. + Use ``"*"`` as the value to require the attribute with any value.""" + + min_justification: int | None = None + """Match if ``len(justification.strip()) >= min_justification``.""" + + +@dataclass(slots=True) +class PolicyRule: + """A single declarative policy rule.""" + + name: str + match: PolicyMatch + action: Literal["allow", "deny"] + constraints: dict[str, Any] = field(default_factory=dict) + """Extra constraints merged into the :class:`PolicyDecision` on allow.""" + + reason: str = "" + """Human-readable reason embedded in :class:`PolicyDenied` on deny.""" + + +class DeclarativePolicyEngine: + """Policy engine that evaluates rules loaded from YAML or TOML. + + Rules are evaluated top-to-bottom; the first matching rule wins. + If no rule matches, the *default* action applies (``"deny"`` unless + overridden). + + Example:: + + engine = DeclarativePolicyEngine.from_yaml(Path("policy.yaml")) + decision = engine.evaluate(request, capability, principal, justification="...") + """ + + def __init__( + self, + rules: list[PolicyRule], + *, + default: Literal["allow", "deny"] = "deny", + ) -> None: + """Initialise with a validated rule list. + + Args: + rules: Ordered list of policy rules (first match wins). + default: Action when no rule matches. Defaults to ``"deny"``. + """ + self._rules = rules + self._default = default + + # ── Loaders ─────────────────────────────────────────────────────────────── + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> DeclarativePolicyEngine: + """Build from a plain dict (no file I/O). + + Args: + data: Dict with a ``rules`` list and an optional ``default`` key. + + Raises: + PolicyConfigError: If the data is malformed. + """ + return cls._parse(data) + + @classmethod + def from_yaml(cls, path: Path) -> DeclarativePolicyEngine: + """Build from a YAML file. + + Requires ``pyyaml``: ``pip install 'weaver-kernel[policy]'``. + The import is deferred so that ``import agent_kernel`` works without + the policy extra installed. + + Args: + path: Path to the YAML policy file. + + Raises: + PolicyConfigError: If the file is unreadable or malformed, or + if ``pyyaml`` is not installed. + """ + try: + import yaml + except ImportError as exc: + raise PolicyConfigError(_POLICY_EXTRA_HINT) from exc + + try: + text = path.read_text(encoding="utf-8") + data: Any = yaml.safe_load(text) + except OSError as exc: + raise PolicyConfigError(f"Cannot read policy file '{path}': {exc}") from exc + except yaml.YAMLError as exc: + raise PolicyConfigError(f"YAML parse error in '{path}': {exc}") from exc + if not isinstance(data, dict): + raise PolicyConfigError(f"Policy file '{path}' must be a YAML mapping.") + return cls._parse(data) + + @classmethod + def from_toml(cls, path: Path) -> DeclarativePolicyEngine: + """Build from a TOML file. + + Requires Python 3.11+ (stdlib ``tomllib``) or ``tomli`` on 3.10 + (included in ``pip install 'weaver-kernel[policy]'``). The import is + deferred so that ``import agent_kernel`` works without the policy + extra installed. + + Args: + path: Path to the TOML policy file. + + Raises: + PolicyConfigError: If the file is unreadable or malformed, or + if neither ``tomllib`` nor ``tomli`` is available. + """ + try: + if sys.version_info >= (3, 11): + import tomllib as _toml + else: + import tomli as _toml + except ImportError as exc: + raise PolicyConfigError(_POLICY_EXTRA_HINT) from exc + + try: + with path.open("rb") as fh: + data = _toml.load(fh) + except OSError as exc: + raise PolicyConfigError(f"Cannot read policy file '{path}': {exc}") from exc + except Exception as exc: # TOMLDecodeError is not a stable import target + raise PolicyConfigError(f"TOML parse error in '{path}': {exc}") from exc + return cls._parse(data) + + # ── Parsing ─────────────────────────────────────────────────────────────── + + @classmethod + def _parse(cls, data: dict[str, Any]) -> DeclarativePolicyEngine: + raw_default = data.get("default", "deny") + if raw_default not in ("allow", "deny"): + raise PolicyConfigError(f"'default' must be 'allow' or 'deny', got {raw_default!r}.") + default: Literal["allow", "deny"] = raw_default + + raw_rules = data.get("rules", []) + if not isinstance(raw_rules, list): + raise PolicyConfigError("'rules' must be a list.") + + return cls( + [cls._parse_rule(r, index=i) for i, r in enumerate(raw_rules)], + default=default, + ) + + @classmethod + def _parse_rule(cls, raw: Any, *, index: int) -> PolicyRule: + if not isinstance(raw, dict): + raise PolicyConfigError(f"Rule[{index}] must be a mapping, got {type(raw).__name__}.") + name: str = raw.get("name", f"rule-{index}") + action = raw.get("action") + if action not in ("allow", "deny"): + raise PolicyConfigError( + f"Rule '{name}': 'action' must be 'allow' or 'deny', got {action!r}." + ) + raw_match = raw.get("match", {}) + if not isinstance(raw_match, dict): + raise PolicyConfigError(f"Rule '{name}': 'match' must be a mapping.") + + safety_class: list[SafetyClass] | None = None + if "safety_class" in raw_match: + try: + safety_class = [SafetyClass(v) for v in raw_match["safety_class"]] + except ValueError as exc: + raise PolicyConfigError( + f"Rule '{name}': invalid safety_class value: {exc}" + ) from exc + + sensitivity: list[SensitivityTag] | None = None + if "sensitivity" in raw_match: + try: + sensitivity = [SensitivityTag(v) for v in raw_match["sensitivity"]] + except ValueError as exc: + raise PolicyConfigError( + f"Rule '{name}': invalid sensitivity value: {exc}" + ) from exc + + roles: list[str] | None = None + if "roles" in raw_match: + roles_raw = raw_match["roles"] + if not isinstance(roles_raw, list) or not all(isinstance(r, str) for r in roles_raw): + raise PolicyConfigError( + f"Rule '{name}': 'roles' must be a list of strings, " + f"got {type(roles_raw).__name__}." + ) + roles = list(roles_raw) + + attributes: dict[str, str] | None = None + if "attributes" in raw_match: + attrs_raw = raw_match["attributes"] + if not isinstance(attrs_raw, dict) or not all( + isinstance(k, str) and isinstance(v, str) for k, v in attrs_raw.items() + ): + raise PolicyConfigError( + f"Rule '{name}': 'attributes' must be a mapping of " + f"string keys to string values." + ) + attributes = dict(attrs_raw) + + min_justification: int | None = None + if "min_justification" in raw_match: + mj_raw = raw_match["min_justification"] + # ``bool`` is a subclass of ``int`` in Python; reject it explicitly + # so ``min_justification: true`` does not silently pass. + if not isinstance(mj_raw, int) or isinstance(mj_raw, bool): + raise PolicyConfigError( + f"Rule '{name}': 'min_justification' must be an integer, " + f"got {type(mj_raw).__name__}." + ) + min_justification = mj_raw + + constraints_raw = raw.get("constraints", {}) + if not isinstance(constraints_raw, dict): + raise PolicyConfigError( + f"Rule '{name}': 'constraints' must be a mapping, " + f"got {type(constraints_raw).__name__}." + ) + + return PolicyRule( + name=name, + match=PolicyMatch( + safety_class=safety_class, + sensitivity=sensitivity, + roles=roles, + attributes=attributes, + min_justification=min_justification, + ), + action=action, + constraints=dict(constraints_raw), + reason=raw.get("reason", ""), + ) + + # ── Matching ────────────────────────────────────────────────────────────── + + def _matches( + self, + rule: PolicyRule, + capability: Capability, + principal: Principal, + justification: str, + ) -> bool: + m = rule.match + roles = set(principal.roles) + if m.safety_class is not None and capability.safety_class not in m.safety_class: + return False + if m.sensitivity is not None and capability.sensitivity not in m.sensitivity: + return False + if m.roles is not None and not (roles & set(m.roles)): + return False + if m.attributes is not None: + for k, v in m.attributes.items(): + attr_val = principal.attributes.get(k) + if attr_val is None or (v != "*" and attr_val != v): + return False + return m.min_justification is None or len(justification.strip()) >= m.min_justification + + # ── Evaluation ──────────────────────────────────────────────────────────── + + def evaluate( + self, + request: CapabilityRequest, + capability: Capability, + principal: Principal, + *, + justification: str, + ) -> PolicyDecision: + """Evaluate rules top-to-bottom; first match wins. + + Args: + request: The capability request. + capability: The target capability. + principal: The requesting principal. + justification: Free-text justification. + + Returns: + :class:`PolicyDecision` with ``allowed=True`` and merged constraints. + + Raises: + PolicyDenied: If a deny rule matches or the default action is deny. + """ + constraints = dict(request.constraints) + pid = principal.principal_id + cid = capability.capability_id + + for rule in self._rules: + if not self._matches(rule, capability, principal, justification): + continue + if rule.action == "deny": + raise PolicyDenied(rule.reason or f"Denied by rule '{rule.name}'.") + constraints.update(rule.constraints) + return PolicyDecision( + allowed=True, + reason=f"Allowed by rule '{rule.name}'.", + constraints=constraints, + ) + + if self._default == "deny": + raise PolicyDenied( + f"No policy rule matched request for capability '{cid}' " + f"by principal '{pid}'. Default action is deny." + ) + return PolicyDecision( + allowed=True, + reason="No rule matched; default action is allow.", + constraints=constraints, + ) + + def explain( + self, + request: CapabilityRequest, + capability: Capability, + principal: Principal, + *, + justification: str, + ) -> DenialExplanation: + """Explain which rule conditions prevented a match. + + Takes the fast path first: if the request would be allowed, returns + ``denied=False`` immediately. Otherwise, finds the rule that explains + the denial — either an explicit deny rule that fully matches, or the + first structurally-matching allow rule whose unmet conditions are + reported. Partial-match deny rules are skipped (they did not cause + the denial, and suggesting how to satisfy them would be misleading — + satisfying them would only trigger the deny). + + Args: + request: The capability request. + capability: The target capability. + principal: The requesting principal. + justification: Free-text justification. + + Returns: + :class:`DenialExplanation` with ``denied=False`` if allowed. + """ + try: + self.evaluate(request, capability, principal, justification=justification) + return DenialExplanation( + denied=False, + rule_name="", + failed_conditions=[], + remediation=[], + narrative=( + f"Request for '{capability.capability_id}' by " + f"'{principal.principal_id}' would be allowed." + ), + ) + except PolicyDenied: + pass + + roles = set(principal.roles) + pid = principal.principal_id + explanation_failures: list[FailedCondition] = [] + rule_name = "default-deny" + + for rule in self._rules: + m = rule.match + if m.safety_class is not None and capability.safety_class not in m.safety_class: + continue + if m.sensitivity is not None and capability.sensitivity not in m.sensitivity: + continue + + # Collect unmet conditions for this rule. + rule_failures: list[FailedCondition] = [] + if m.roles is not None and not (roles & set(m.roles)): + rule_failures.append( + FailedCondition( + condition="roles", + required=list(m.roles), + actual=sorted(roles), + suggestion=f"Add one of {m.roles!r} to roles for principal '{pid}'", + ) + ) + if m.attributes is not None: + for k, v in m.attributes.items(): + attr_val = principal.attributes.get(k) + if attr_val is None or (v != "*" and attr_val != v): + rule_failures.append( + FailedCondition( + condition=f"attribute:{k}", + required=v, + actual=attr_val if attr_val is not None else "", + suggestion=f"Set attribute '{k}'={v!r} on principal '{pid}'", + ) + ) + if m.min_justification is not None: + stripped = len(justification.strip()) + if stripped < m.min_justification: + rule_failures.append( + FailedCondition( + condition="min_justification", + required=m.min_justification, + actual=stripped, + suggestion=( + f"Provide justification with at least " + f"{m.min_justification} characters (currently {stripped})" + ), + ) + ) + + if rule.action == "deny": + if not rule_failures: + # Explicit deny rule fully matched — this is the cause. + rule_name = rule.name + explanation_failures = [ + FailedCondition( + condition="denied_by_rule", + required=f"request must NOT match deny rule '{rule.name}'", + actual=f"matched deny rule '{rule.name}'", + suggestion=( + rule.reason + or f"Remove or narrow deny rule '{rule.name}' so this " + f"request does not match it" + ), + ) + ] + break + # Partial-match deny rule: it did NOT cause the denial. Skip + # so we don't suggest changes that would actually trigger it. + continue + + # Allow rule (structurally matched, conditions unmet) — report it. + rule_name = rule.name + explanation_failures = rule_failures + break + + if not explanation_failures: + explanation_failures = [ + FailedCondition( + condition="no_matching_rule", + required="an allow rule matching this capability", + actual="no rule matched", + suggestion=( + f"Add an allow rule for safety_class=" + f"{capability.safety_class.value!r} in your policy file" + ), + ) + ] + + remediation = [fc.suggestion for fc in explanation_failures] + narrative = ( + f"Request for '{capability.capability_id}' by '{pid}' would be denied " + f"(rule: '{rule_name}'): " + + "; ".join(fc.suggestion for fc in explanation_failures) + + "." + ) + return DenialExplanation( + denied=True, + rule_name=rule_name, + failed_conditions=explanation_failures, + remediation=remediation, + narrative=narrative, + ) diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 46a8490..4b974b9 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -215,3 +215,228 @@ async def test_confused_deputy_prevention(kernel: Kernel, reader_principal: Prin principal=other_principal, args={"operation": "billing.list_invoices"}, ) + + +# ── Dry-run mode ─────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_dry_run_returns_dry_run_result(kernel: Kernel, reader_principal: Principal) -> None: + """dry_run=True returns DryRunResult, not Frame.""" + from agent_kernel.models import DryRunResult + + req = CapabilityRequest(capability_id="billing.list_invoices", goal="test") + token = kernel.get_token(req, reader_principal, justification="") + result = await kernel.invoke(token, principal=reader_principal, args={}, dry_run=True) + assert isinstance(result, DryRunResult) + assert result.capability_id == "billing.list_invoices" + assert result.principal_id == reader_principal.principal_id + assert result.policy_decision.allowed is True + assert result.budget_remaining is None + + +@pytest.mark.asyncio +async def test_dry_run_driver_not_called( + kernel: Kernel, reader_principal: Principal, memory_driver: InMemoryDriver +) -> None: + """Driver execute() must never be called in dry-run mode.""" + from unittest.mock import AsyncMock, patch + + req = CapabilityRequest(capability_id="billing.list_invoices", goal="test") + token = kernel.get_token(req, reader_principal, justification="") + with patch.object(memory_driver, "execute", new_callable=AsyncMock) as mock_exec: + await kernel.invoke(token, principal=reader_principal, args={}, dry_run=True) + mock_exec.assert_not_called() + + +@pytest.mark.asyncio +async def test_dry_run_estimated_cost(kernel: Kernel, admin_principal: Principal) -> None: + """estimated_cost maps to safety class.""" + from agent_kernel.models import DryRunResult + + read_req = CapabilityRequest(capability_id="billing.list_invoices", goal="r") + read_token = kernel.get_token(read_req, admin_principal, justification="") + read_result = await kernel.invoke(read_token, principal=admin_principal, args={}, dry_run=True) + assert isinstance(read_result, DryRunResult) + assert read_result.estimated_cost == "low" + + del_req = CapabilityRequest(capability_id="billing.delete_invoice", goal="d") + del_token = kernel.get_token( + del_req, admin_principal, justification="long enough justification here" + ) + del_result = await kernel.invoke(del_token, principal=admin_principal, args={}, dry_run=True) + assert isinstance(del_result, DryRunResult) + assert del_result.estimated_cost == "high" + + +@pytest.mark.asyncio +async def test_dry_run_operation_uses_args_then_capability_id( + kernel: Kernel, reader_principal: Principal +) -> None: + """Dry-run resolves ``operation`` the same way drivers do at real-invoke. + + Drivers read ``ctx.args.get("operation", ctx.capability_id)``. Dry-run + must mirror that exactly so ``DryRunResult.operation`` matches what a + driver would actually receive. + """ + from agent_kernel.models import DryRunResult + + req = CapabilityRequest(capability_id="billing.list_invoices", goal="t") + token = kernel.get_token(req, reader_principal, justification="") + + # Explicit operation in args wins. + explicit = await kernel.invoke( + token, + principal=reader_principal, + args={"operation": "billing.custom_op"}, + dry_run=True, + ) + assert isinstance(explicit, DryRunResult) + assert explicit.operation == "billing.custom_op" + + # No operation in args → falls back to the capability_id (NOT impl.operation). + fallback = await kernel.invoke( + token, + principal=reader_principal, + args={}, + dry_run=True, + ) + assert isinstance(fallback, DryRunResult) + assert fallback.operation == "billing.list_invoices" + + +@pytest.mark.asyncio +async def test_dry_run_downgrades_raw_for_non_admin( + kernel: Kernel, reader_principal: Principal +) -> None: + """Dry-run mirrors the Firewall's raw-mode admin gate. + + Non-admin principals never get raw mode at real-invoke + (see firewall/transform.py); dry-run must downgrade too so callers + cannot probe/assume raw availability they will never receive. + """ + from agent_kernel.models import DryRunResult + + req = CapabilityRequest(capability_id="billing.list_invoices", goal="t") + token = kernel.get_token(req, reader_principal, justification="") + result = await kernel.invoke( + token, + principal=reader_principal, + args={}, + response_mode="raw", + dry_run=True, + ) + assert isinstance(result, DryRunResult) + assert result.response_mode == "summary" + + +@pytest.mark.asyncio +async def test_dry_run_preserves_raw_for_admin(kernel: Kernel, admin_principal: Principal) -> None: + """Admin principals keep raw mode in dry-run (no downgrade).""" + from agent_kernel.models import DryRunResult + + req = CapabilityRequest(capability_id="billing.list_invoices", goal="t") + token = kernel.get_token(req, admin_principal, justification="") + result = await kernel.invoke( + token, + principal=admin_principal, + args={}, + response_mode="raw", + dry_run=True, + ) + assert isinstance(result, DryRunResult) + assert result.response_mode == "raw" + + +@pytest.mark.asyncio +async def test_dry_run_expired_token_still_raises( + kernel: Kernel, reader_principal: Principal +) -> None: + """Token expiry is enforced even in dry-run mode.""" + from agent_kernel import HMACTokenProvider, TokenExpired + + provider = HMACTokenProvider(secret="test-secret-do-not-use-in-prod") + token = provider.issue( + "billing.list_invoices", + reader_principal.principal_id, + constraints={}, + audit_id="audit-expired", + ttl_seconds=-1, + ) + with pytest.raises(TokenExpired): + await kernel.invoke(token, principal=reader_principal, args={}, dry_run=True) + + +# ── explain_denial ───────────────────────────────────────────────────────────── + + +def test_explain_denial_allowed(kernel: Kernel, reader_principal: Principal) -> None: + """explain_denial returns denied=False for an allowed request.""" + req = CapabilityRequest(capability_id="billing.list_invoices", goal="test") + result = kernel.explain_denial(req, reader_principal, justification="") + assert result.denied is False + assert result.failed_conditions == [] + + +def test_explain_denial_write_no_role(kernel: Kernel, reader_principal: Principal) -> None: + """explain_denial reports role failure for WRITE without writer role.""" + req = CapabilityRequest(capability_id="billing.update_invoice", goal="update") + result = kernel.explain_denial( + req, reader_principal, justification="long enough justification" + ) + assert result.denied is True + assert any(fc.condition == "roles" for fc in result.failed_conditions) + + +def test_explain_denial_write_short_justification( + kernel: Kernel, writer_principal: Principal +) -> None: + """explain_denial reports justification failure for WRITE with short justification.""" + req = CapabilityRequest(capability_id="billing.update_invoice", goal="update") + result = kernel.explain_denial(req, writer_principal, justification="short") + assert result.denied is True + assert any(fc.condition == "min_justification" for fc in result.failed_conditions) + + +def test_explain_denial_capability_not_found(kernel: Kernel, reader_principal: Principal) -> None: + """explain_denial raises CapabilityNotFound for unknown capability.""" + from agent_kernel import CapabilityNotFound + + req = CapabilityRequest(capability_id="nonexistent.capability", goal="test") + with pytest.raises(CapabilityNotFound): + kernel.explain_denial(req, reader_principal) + + +def test_explain_denial_engine_without_explain_raises( + registry: CapabilityRegistry, reader_principal: Principal +) -> None: + """A policy engine that only implements evaluate() surfaces a clear error. + + The ``PolicyEngine`` protocol requires only ``evaluate()``; engines that + don't implement ``explain()`` raise ``AgentKernelError`` from + ``Kernel.explain_denial`` rather than producing a misleading explanation. + """ + from agent_kernel import AgentKernelError, PolicyDenied + from agent_kernel.models import PolicyDecision + + class EvaluateOnlyEngine: + """Minimal engine satisfying PolicyEngine but not ExplainingPolicyEngine.""" + + def evaluate( + self, + request: CapabilityRequest, + capability: Capability, + principal: Principal, + *, + justification: str, + ) -> PolicyDecision: + raise PolicyDenied("denied for test") + + k = Kernel( + registry=registry, + policy=EvaluateOnlyEngine(), # type: ignore[arg-type] + token_provider=HMACTokenProvider(secret="test-secret"), + ) + req = CapabilityRequest(capability_id="billing.list_invoices", goal="t") + with pytest.raises(AgentKernelError, match="does not implement explain"): + k.explain_denial(req, reader_principal) diff --git a/tests/test_policy.py b/tests/test_policy.py index 10d23c8..a0d9805 100644 --- a/tests/test_policy.py +++ b/tests/test_policy.py @@ -2,14 +2,18 @@ from __future__ import annotations +import tempfile from collections.abc import Callable +from pathlib import Path import pytest from agent_kernel import ( AgentKernelError, Capability, + DeclarativePolicyEngine, DefaultPolicyEngine, + PolicyConfigError, PolicyDenied, Principal, SafetyClass, @@ -468,3 +472,792 @@ def test_rate_limit_window_slides() -> None: # Advance past first entry's window t[0] = 11.0 eng.evaluate(_req("cap.r"), cap, p, justification="") # should succeed + + +# ═══════════════════════════════════════════════════════════════════════════════ +# DefaultPolicyEngine.explain() +# ═══════════════════════════════════════════════════════════════════════════════ + + +def test_explain_read_allowed(engine: DefaultPolicyEngine) -> None: + """READ with no special sensitivity returns denied=False.""" + p = Principal(principal_id="u1") + result = engine.explain(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") + assert result.denied is False + assert result.failed_conditions == [] + assert result.remediation == [] + assert "allowed" in result.narrative + + +def test_explain_write_missing_role(engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + result = engine.explain( + _req("cap.w"), + _cap("cap.w", SafetyClass.WRITE), + p, + justification="long enough justification here", + ) + assert result.denied is True + assert len(result.failed_conditions) == 1 + fc = result.failed_conditions[0] + assert fc.condition == "roles" + assert "writer" in str(fc.required) + assert sorted(["reader"]) == fc.actual + assert result.remediation == [fc.suggestion] + assert "denied" in result.narrative + + +def test_explain_write_short_justification(engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["writer"]) + result = engine.explain( + _req("cap.w"), _cap("cap.w", SafetyClass.WRITE), p, justification="short" + ) + assert result.denied is True + assert any(fc.condition == "min_justification" for fc in result.failed_conditions) + fc = next(f for f in result.failed_conditions if f.condition == "min_justification") + assert fc.required == 15 + assert fc.actual == len("short") + + +def test_explain_write_both_failures(engine: DefaultPolicyEngine) -> None: + """Missing role AND short justification both appear in failed_conditions.""" + p = Principal(principal_id="u1", roles=["reader"]) + result = engine.explain( + _req("cap.w"), _cap("cap.w", SafetyClass.WRITE), p, justification="too short" + ) + assert result.denied is True + conditions = {fc.condition for fc in result.failed_conditions} + assert "roles" in conditions + assert "min_justification" in conditions + assert len(result.remediation) == 2 + + +def test_explain_destructive_missing_admin(engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["writer"]) + result = engine.explain( + _req("cap.d"), + _cap("cap.d", SafetyClass.DESTRUCTIVE), + p, + justification="long enough justification here", + ) + assert result.denied is True + assert result.failed_conditions[0].condition == "roles" + assert result.failed_conditions[0].required == ["admin"] + + +def test_explain_pii_missing_tenant(engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + cap = _cap("cap.pii", SafetyClass.READ, SensitivityTag.PII) + result = engine.explain(_req("cap.pii"), cap, p, justification="") + assert result.denied is True + assert result.failed_conditions[0].condition == "tenant_attribute" + assert "tenant" in result.failed_conditions[0].suggestion + + +def test_explain_secrets_missing_role(engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + cap = _cap("cap.sec", SafetyClass.READ, SensitivityTag.SECRETS) + result = engine.explain(_req("cap.sec"), cap, p, justification="long enough justification") + assert result.denied is True + fc = result.failed_conditions[0] + assert fc.condition == "roles" + assert "secrets_reader" in str(fc.required) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# DeclarativePolicyEngine +# ═══════════════════════════════════════════════════════════════════════════════ + + +def _dce(rules: list[dict], *, default: str = "deny") -> DeclarativePolicyEngine: + return DeclarativePolicyEngine.from_dict({"rules": rules, "default": default}) + + +# ── from_dict: basic evaluation ─────────────────────────────────────────────── + + +def test_declarative_allow_rule_matches() -> None: + engine = _dce([{"name": "r1", "match": {"safety_class": ["READ"]}, "action": "allow"}]) + p = Principal(principal_id="u1") + dec = engine.evaluate(_req("c"), _cap("c", SafetyClass.READ), p, justification="") + assert dec.allowed is True + assert "r1" in dec.reason + + +def test_declarative_deny_rule_raises() -> None: + engine = _dce( + [ + {"name": "block-all", "match": {}, "action": "deny", "reason": "blocked for test"}, + ] + ) + p = Principal(principal_id="u1") + with pytest.raises(PolicyDenied, match="blocked for test"): + engine.evaluate(_req("c"), _cap("c", SafetyClass.READ), p, justification="") + + +def test_declarative_default_deny_no_match() -> None: + engine = _dce([{"name": "r1", "match": {"safety_class": ["WRITE"]}, "action": "allow"}]) + p = Principal(principal_id="u1") + with pytest.raises(PolicyDenied, match="Default action is deny"): + engine.evaluate(_req("c"), _cap("c", SafetyClass.READ), p, justification="") + + +def test_declarative_default_allow_no_match() -> None: + engine = _dce( + [{"name": "r1", "match": {"safety_class": ["WRITE"]}, "action": "allow"}], + default="allow", + ) + p = Principal(principal_id="u1") + dec = engine.evaluate(_req("c"), _cap("c", SafetyClass.READ), p, justification="") + assert dec.allowed is True + + +def test_declarative_first_match_wins() -> None: + engine = _dce( + [ + {"name": "allow-read", "match": {"safety_class": ["READ"]}, "action": "allow"}, + { + "name": "deny-read", + "match": {"safety_class": ["READ"]}, + "action": "deny", + "reason": "x", + }, + ] + ) + p = Principal(principal_id="u1") + dec = engine.evaluate(_req("c"), _cap("c", SafetyClass.READ), p, justification="") + assert dec.allowed is True + assert "allow-read" in dec.reason + + +def test_declarative_role_condition() -> None: + engine = _dce( + [ + { + "name": "w", + "match": {"safety_class": ["WRITE"], "roles": ["writer"]}, + "action": "allow", + }, + ] + ) + p_writer = Principal(principal_id="u1", roles=["writer"]) + p_reader = Principal(principal_id="u2", roles=["reader"]) + dec = engine.evaluate(_req("c"), _cap("c", SafetyClass.WRITE), p_writer, justification="") + assert dec.allowed is True + with pytest.raises(PolicyDenied): + engine.evaluate(_req("c"), _cap("c", SafetyClass.WRITE), p_reader, justification="") + + +def test_declarative_min_justification_condition() -> None: + engine = _dce( + [ + { + "name": "w", + "match": {"safety_class": ["WRITE"], "min_justification": 10}, + "action": "allow", + }, + ] + ) + p = Principal(principal_id="u1") + dec = engine.evaluate(_req("c"), _cap("c", SafetyClass.WRITE), p, justification="1234567890") + assert dec.allowed is True + with pytest.raises(PolicyDenied): + engine.evaluate(_req("c"), _cap("c", SafetyClass.WRITE), p, justification="short") + + +def test_declarative_attribute_condition() -> None: + engine = _dce( + [ + { + "name": "pii", + "match": {"sensitivity": ["PII"], "attributes": {"tenant": "*"}}, + "action": "allow", + } + ] + ) + p_tenant = Principal(principal_id="u1", attributes={"tenant": "acme"}) + p_no_tenant = Principal(principal_id="u2") + dec = engine.evaluate( + _req("c"), _cap("c", SafetyClass.READ, SensitivityTag.PII), p_tenant, justification="" + ) + assert dec.allowed is True + with pytest.raises(PolicyDenied): + engine.evaluate( + _req("c"), + _cap("c", SafetyClass.READ, SensitivityTag.PII), + p_no_tenant, + justification="", + ) + + +def test_declarative_constraints_merged() -> None: + engine = _dce( + [ + { + "name": "r", + "match": {"safety_class": ["READ"]}, + "action": "allow", + "constraints": {"max_rows": 10}, + } + ] + ) + p = Principal(principal_id="u1") + dec = engine.evaluate(_req("c"), _cap("c", SafetyClass.READ), p, justification="") + assert dec.constraints["max_rows"] == 10 + + +# ── config validation errors ────────────────────────────────────────────────── + + +def test_declarative_invalid_default() -> None: + with pytest.raises(PolicyConfigError, match="default"): + DeclarativePolicyEngine.from_dict({"default": "maybe", "rules": []}) + + +def test_declarative_invalid_action() -> None: + with pytest.raises(PolicyConfigError, match="action"): + _dce([{"name": "r", "match": {}, "action": "perhaps"}]) + + +def test_declarative_invalid_safety_class() -> None: + with pytest.raises(PolicyConfigError, match="safety_class"): + _dce([{"name": "r", "match": {"safety_class": ["SUPER_DANGEROUS"]}, "action": "allow"}]) + + +# ── YAML round-trip ─────────────────────────────────────────────────────────── + + +def test_declarative_from_yaml_round_trip() -> None: + yaml_text = """\ +rules: + - name: read-allowed + match: + safety_class: [READ] + action: allow + - name: write-role + match: + safety_class: [WRITE] + roles: [writer, admin] + min_justification: 10 + action: allow +default: deny +""" + with tempfile.NamedTemporaryFile(suffix=".yaml", mode="w", delete=False) as f: + f.write(yaml_text) + path = Path(f.name) + try: + engine = DeclarativePolicyEngine.from_yaml(path) + p_reader = Principal(principal_id="u1") + dec = engine.evaluate(_req("c"), _cap("c", SafetyClass.READ), p_reader, justification="") + assert dec.allowed is True + p_writer = Principal(principal_id="u2", roles=["writer"]) + dec2 = engine.evaluate( + _req("c"), _cap("c", SafetyClass.WRITE), p_writer, justification="1234567890" + ) + assert dec2.allowed is True + with pytest.raises(PolicyDenied): + engine.evaluate(_req("c"), _cap("c", SafetyClass.WRITE), p_reader, justification="") + finally: + path.unlink(missing_ok=True) + + +# ── TOML round-trip ─────────────────────────────────────────────────────────── + + +def test_declarative_from_toml_round_trip() -> None: + toml_text = """\ +default = "deny" + +[[rules]] +name = "read-allowed" +action = "allow" +[rules.match] +safety_class = ["READ"] + +[[rules]] +name = "write-role" +action = "allow" +[rules.match] +safety_class = ["WRITE"] +roles = ["writer", "admin"] +min_justification = 10 +""" + with tempfile.NamedTemporaryFile(suffix=".toml", mode="w", delete=False) as f: + f.write(toml_text) + path = Path(f.name) + try: + engine = DeclarativePolicyEngine.from_toml(path) + p = Principal(principal_id="u1") + dec = engine.evaluate(_req("c"), _cap("c", SafetyClass.READ), p, justification="") + assert dec.allowed is True + with pytest.raises(PolicyDenied): + engine.evaluate(_req("c"), _cap("c", SafetyClass.WRITE), p, justification="") + finally: + path.unlink(missing_ok=True) + + +# ── explain() on DeclarativePolicyEngine ───────────────────────────────────── + + +def test_declarative_explain_allowed() -> None: + engine = _dce([{"name": "r", "match": {"safety_class": ["READ"]}, "action": "allow"}]) + p = Principal(principal_id="u1") + result = engine.explain(_req("c"), _cap("c", SafetyClass.READ), p, justification="") + assert result.denied is False + assert result.failed_conditions == [] + + +def test_declarative_explain_missing_role() -> None: + engine = _dce( + [ + { + "name": "write-needs-writer", + "match": {"safety_class": ["WRITE"], "roles": ["writer"]}, + "action": "allow", + }, + ] + ) + p = Principal(principal_id="u1", roles=["reader"]) + result = engine.explain(_req("c"), _cap("c", SafetyClass.WRITE), p, justification="") + assert result.denied is True + assert result.rule_name == "write-needs-writer" + fc = result.failed_conditions[0] + assert fc.condition == "roles" + + +def test_declarative_explain_no_structural_match() -> None: + """When no rule targets the capability type, report no_matching_rule.""" + engine = _dce([{"name": "r", "match": {"safety_class": ["WRITE"]}, "action": "allow"}]) + p = Principal(principal_id="u1") + result = engine.explain(_req("c"), _cap("c", SafetyClass.DESTRUCTIVE), p, justification="") + assert result.denied is True + assert result.failed_conditions[0].condition == "no_matching_rule" + + +def test_declarative_explain_explicit_deny_full_match() -> None: + """An explicit deny rule that fully matches is reported as the cause. + + Regression test for the bug where ``explain()`` fell through to the + ``no_matching_rule`` fallback when a deny rule with no remaining + conditions matched — the deny rule's ``reason`` was silently dropped. + """ + engine = _dce( + [ + { + "name": "deny-all-write", + "match": {"safety_class": ["WRITE"]}, + "action": "deny", + "reason": "WRITE is currently blocked for maintenance", + }, + ] + ) + p = Principal(principal_id="u1", roles=["writer"]) + result = engine.explain( + _req("c"), + _cap("c", SafetyClass.WRITE), + p, + justification="long enough justification", + ) + assert result.denied is True + assert result.rule_name == "deny-all-write" + assert len(result.failed_conditions) == 1 + fc = result.failed_conditions[0] + assert fc.condition == "denied_by_rule" + assert "deny-all-write" in str(fc.actual) + # The rule's reason propagates into the suggestion (not the no_matching_rule fallback). + assert "maintenance" in fc.suggestion.lower() + + +def test_declarative_explain_skips_partial_match_deny() -> None: + """Partial-match deny rules don't pollute the explanation. + + A deny rule whose conditions are *not* fully satisfied did not cause the + denial; suggesting how to satisfy it would invite the caller to trigger + the deny. Explain should look past it to the first allow rule that + structurally matches. + """ + engine = _dce( + [ + # Deny rule that doesn't apply here (roles don't match). + { + "name": "deny-secrets-admin-only", + "match": {"sensitivity": ["SECRETS"], "roles": ["admin"]}, + "action": "deny", + "reason": "internal: admins routed via different path", + }, + # Allow rule that explains the real path forward. + { + "name": "allow-secrets-service", + "match": {"sensitivity": ["SECRETS"], "roles": ["service"]}, + "action": "allow", + }, + ] + ) + p = Principal(principal_id="u1", roles=["reader"]) # neither admin nor service + cap = _cap("c", SafetyClass.READ, SensitivityTag.SECRETS) + result = engine.explain(_req("c"), cap, p, justification="") + assert result.denied is True + # Explanation should come via the allow rule (missing 'service' role), + # NOT via the deny rule (which would suggest adding 'admin' — triggers deny). + assert result.rule_name == "allow-secrets-service" + assert len(result.failed_conditions) == 1 + fc = result.failed_conditions[0] + assert fc.condition == "roles" + assert "service" in str(fc.required) + + +# ── _parse_rule type validation ──────────────────────────────────────────────── + + +def test_declarative_invalid_roles_not_list() -> None: + """'roles' must be a list, not a bare string.""" + with pytest.raises(PolicyConfigError, match="roles.*list of strings"): + _dce( + [ + { + "name": "r", + "match": {"safety_class": ["READ"], "roles": "admin"}, + "action": "allow", + } + ] + ) + + +def test_declarative_invalid_roles_element_type() -> None: + """'roles' list elements must be strings.""" + with pytest.raises(PolicyConfigError, match="roles.*list of strings"): + _dce( + [ + { + "name": "r", + "match": {"safety_class": ["READ"], "roles": [1, 2]}, + "action": "allow", + } + ] + ) + + +def test_declarative_invalid_attributes_type() -> None: + """'attributes' must be a mapping of string keys to string values.""" + with pytest.raises(PolicyConfigError, match="attributes.*mapping"): + _dce( + [ + { + "name": "r", + "match": {"safety_class": ["READ"], "attributes": ["tenant"]}, + "action": "allow", + } + ] + ) + + +def test_declarative_invalid_attributes_value_type() -> None: + """'attributes' values must be strings (not ints, lists, etc.).""" + with pytest.raises(PolicyConfigError, match="attributes.*mapping"): + _dce( + [ + { + "name": "r", + "match": {"safety_class": ["READ"], "attributes": {"tenant": 42}}, + "action": "allow", + } + ] + ) + + +def test_declarative_invalid_min_justification_type() -> None: + """'min_justification' must be an integer.""" + with pytest.raises(PolicyConfigError, match="min_justification.*integer"): + _dce( + [ + { + "name": "r", + "match": {"safety_class": ["READ"], "min_justification": "ten"}, + "action": "allow", + } + ] + ) + + +def test_declarative_invalid_min_justification_bool_rejected() -> None: + """'min_justification' must not be a bool (which is an int subclass).""" + with pytest.raises(PolicyConfigError, match="min_justification.*integer"): + _dce( + [ + { + "name": "r", + "match": {"safety_class": ["READ"], "min_justification": True}, + "action": "allow", + } + ] + ) + + +def test_declarative_invalid_constraints_type() -> None: + """'constraints' must be a mapping.""" + with pytest.raises(PolicyConfigError, match="constraints.*mapping"): + _dce( + [ + { + "name": "r", + "match": {"safety_class": ["READ"]}, + "action": "allow", + "constraints": ["max_rows"], + } + ] + ) + + +# ── Comparative test: DeclarativePolicyEngine ≡ DefaultPolicyEngine ────────── + + +def test_declarative_replicates_default_policy_decisions() -> None: + """DeclarativePolicyEngine can express DefaultPolicyEngine's decisions. + + Validates #42's acceptance criterion: a declarative rule set can replicate + DefaultPolicyEngine's allow/deny behaviour for every condition the DSL is + able to express. Walks a curated scenario matrix through both engines and + asserts the same outcome. + + Out of scope (DSL has no equivalent operator today, by design): + - Rate limiting (sliding-window per principal/capability) + - max_rows ceiling (hardcoded 50/500 in DefaultPolicyEngine) + - allowed_fields enforcement (paired with sensitivity in DefaultPolicyEngine) + + The illustrative ``examples/policies/default.{yaml,toml}`` file uses the + same rule shape but a slightly different policy (``editor`` role with + longer justification thresholds) to show DSL flexibility — equivalence to + DefaultPolicyEngine is asserted via an inline rule set built here. + """ + declarative_rules = { + "default": "deny", + "rules": [ + # READ on non-sensitive data → allowed for anyone + { + "name": "allow-read-nonsensitive", + "action": "allow", + "match": {"safety_class": ["READ"], "sensitivity": ["NONE"]}, + }, + # READ on PII/PCI → require tenant attribute + { + "name": "allow-read-pii-with-tenant", + "action": "allow", + "match": { + "safety_class": ["READ"], + "sensitivity": ["PII"], + "attributes": {"tenant": "*"}, + }, + }, + { + "name": "allow-read-pci-with-tenant", + "action": "allow", + "match": { + "safety_class": ["READ"], + "sensitivity": ["PCI"], + "attributes": {"tenant": "*"}, + }, + }, + # SECRETS → admin or secrets_reader with justification + { + "name": "allow-secrets-admin", + "action": "allow", + "match": { + "sensitivity": ["SECRETS"], + "roles": ["admin"], + "min_justification": 15, + }, + }, + { + "name": "allow-secrets-reader", + "action": "allow", + "match": { + "sensitivity": ["SECRETS"], + "roles": ["secrets_reader"], + "min_justification": 15, + }, + }, + # WRITE → writer or admin with justification (≥ 15 chars) + { + "name": "allow-write-writers", + "action": "allow", + "match": { + "safety_class": ["WRITE"], + "sensitivity": ["NONE"], + "roles": ["writer", "admin"], + "min_justification": 15, + }, + }, + # DESTRUCTIVE → admin with justification (≥ 15 chars) + { + "name": "allow-destructive-admin", + "action": "allow", + "match": { + "safety_class": ["DESTRUCTIVE"], + "roles": ["admin"], + "min_justification": 15, + }, + }, + ], + } + declarative = DeclarativePolicyEngine.from_dict(declarative_rules) + + long_justification = "this is a long enough justification string" + + scenarios: list[tuple[str, Capability, Principal, str, bool]] = [ + # READ on non-sensitive → allowed for anyone + ( + "read-nonsensitive", + _cap("c", SafetyClass.READ), + Principal(principal_id="u1"), + "", + True, + ), + # READ on PII without tenant → denied by both + ( + "read-pii-no-tenant", + _cap("c", SafetyClass.READ, SensitivityTag.PII), + Principal(principal_id="u1", roles=["reader"]), + "", + False, + ), + # READ on PII with tenant → allowed by both + ( + "read-pii-with-tenant", + _cap("c", SafetyClass.READ, SensitivityTag.PII), + Principal(principal_id="u1", roles=["reader"], attributes={"tenant": "acme"}), + "", + True, + ), + # READ on PCI with tenant → allowed by both + ( + "read-pci-with-tenant", + _cap("c", SafetyClass.READ, SensitivityTag.PCI), + Principal(principal_id="u1", roles=["reader"], attributes={"tenant": "acme"}), + "", + True, + ), + # WRITE without writer role → denied + ( + "write-no-role", + _cap("c", SafetyClass.WRITE), + Principal(principal_id="u1", roles=["reader"]), + long_justification, + False, + ), + # WRITE with writer role + long justification → allowed + ( + "write-writer-allowed", + _cap("c", SafetyClass.WRITE), + Principal(principal_id="u1", roles=["writer"]), + long_justification, + True, + ), + # WRITE with writer role + short justification → denied + ( + "write-writer-short-justification", + _cap("c", SafetyClass.WRITE), + Principal(principal_id="u1", roles=["writer"]), + "too short", + False, + ), + # DESTRUCTIVE without admin → denied + ( + "destructive-no-admin", + _cap("c", SafetyClass.DESTRUCTIVE), + Principal(principal_id="u1", roles=["writer"]), + long_justification, + False, + ), + # DESTRUCTIVE with admin + long justification → allowed + ( + "destructive-admin-allowed", + _cap("c", SafetyClass.DESTRUCTIVE), + Principal(principal_id="u1", roles=["admin"]), + long_justification, + True, + ), + # SECRETS without role → denied + ( + "secrets-no-role", + _cap("c", SafetyClass.READ, SensitivityTag.SECRETS), + Principal(principal_id="u1", roles=["reader"]), + long_justification, + False, + ), + # SECRETS with secrets_reader + justification → allowed + ( + "secrets-reader-allowed", + _cap("c", SafetyClass.READ, SensitivityTag.SECRETS), + Principal(principal_id="u1", roles=["secrets_reader"]), + long_justification, + True, + ), + # SECRETS with admin + justification → allowed + ( + "secrets-admin-allowed", + _cap("c", SafetyClass.READ, SensitivityTag.SECRETS), + Principal(principal_id="u1", roles=["admin"]), + long_justification, + True, + ), + ] + + for name, capability, principal, justification, expected_allow in scenarios: + # Fresh DefaultPolicyEngine per scenario to avoid rate-limit state. + default = DefaultPolicyEngine() + if expected_allow: + d_decision = default.evaluate( + _req("c"), capability, principal, justification=justification + ) + r_decision = declarative.evaluate( + _req("c"), capability, principal, justification=justification + ) + assert d_decision.allowed is True, f"{name}: DefaultPolicyEngine expected allow" + assert r_decision.allowed is True, f"{name}: DeclarativePolicyEngine expected allow" + else: + with pytest.raises(PolicyDenied): + default.evaluate(_req("c"), capability, principal, justification=justification) + with pytest.raises(PolicyDenied): + declarative.evaluate(_req("c"), capability, principal, justification=justification) + + +# ── Optional-deps install hint ──────────────────────────────────────────────── + + +def test_declarative_from_yaml_install_hint(monkeypatch: pytest.MonkeyPatch) -> None: + """When pyyaml is unavailable, ``from_yaml`` raises with the install hint.""" + import builtins + + real_import = builtins.__import__ + + def fake_import(name: str, *args: object, **kwargs: object) -> object: + if name == "yaml": + raise ImportError("simulated: pyyaml missing") + return real_import(name, *args, **kwargs) # type: ignore[arg-type] + + monkeypatch.setattr(builtins, "__import__", fake_import) + with pytest.raises(PolicyConfigError, match="weaver-kernel\\[policy\\]"): + DeclarativePolicyEngine.from_yaml(Path("does-not-matter.yaml")) + + +def test_declarative_from_toml_install_hint(monkeypatch: pytest.MonkeyPatch) -> None: + """On 3.10 without tomli, ``from_toml`` raises with the install hint. + + On 3.11+ ``tomllib`` is in stdlib and always importable, so this test + only exercises the install-hint path on 3.10. We assert by simulating + an ``ImportError`` for whichever module the loader uses on this version. + """ + import builtins + import sys + + target = "tomllib" if sys.version_info >= (3, 11) else "tomli" + real_import = builtins.__import__ + + def fake_import(name: str, *args: object, **kwargs: object) -> object: + if name == target: + raise ImportError(f"simulated: {target} missing") + return real_import(name, *args, **kwargs) # type: ignore[arg-type] + + monkeypatch.setattr(builtins, "__import__", fake_import) + with pytest.raises(PolicyConfigError, match="weaver-kernel\\[policy\\]"): + DeclarativePolicyEngine.from_toml(Path("does-not-matter.toml"))