Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions architecture/gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ The storage schema is intentionally narrow:
| `version` | Optional monotonically increasing version for scoped records. |
| `status` | Optional workflow state for records such as policy revisions or draft policy chunks. |
| `dedup_key` and `hit_count` | Optional policy-advisor fields for coalescing repeated observations. |
| `resource_version` | Monotonically increasing counter for optimistic concurrency control. Incremented atomically on each update. |
| `payload` | Prost-encoded protobuf payload for the full domain object. |
| `created_at_ms` and `updated_at_ms` | Gateway timestamps used for ordering and list output. |
| `labels` | JSON object carrying Kubernetes-style object labels for filtering and organization. |
Expand Down Expand Up @@ -113,6 +114,92 @@ default WAL journal mode), which mirror the same sensitive contents.
Persisted state includes sandboxes, providers, SSH sessions, policy revisions,
settings, inference configuration, and deployment records.

### Optimistic Concurrency (CAS)

Every object row carries a `resource_version` that the database increments
atomically on each write. Concurrent mutations use compare-and-swap (CAS): the
writer reads the current version, applies changes, and writes back with a
`WHERE resource_version = <expected>` guard. If another writer updated the row
in between, the guard fails and the caller receives a `Conflict` error.

This matters for HA deployments where multiple gateway replicas share the same
Postgres database, and for single-node deployments where concurrent gRPC
handlers or the reconciler mutate the same sandbox.

**Compile-time enforcement.** The unconditional write methods `put` and
`put_message` are gated behind `#[cfg(test)]`. Production code must use
`put_if` with an explicit `WriteCondition` or `update_message_cas`. The
compiler rejects any other write path, making non-CAS writes structurally
impossible outside of tests.

Every write goes through one of three conditions:

- `MustCreate` -- insert-only. The database rejects the write with a
`UniqueViolation` error if a row with that ID already exists. Handlers match
on the structured `PersistenceError::UniqueViolation { .. }` variant to
distinguish creation conflicts from other failures.
- `MatchResourceVersion(v)` -- update-only. The database rejects the write
with a `Conflict` error if the current version differs from `v`.
- `Unconditional` -- test-only; not reachable in production builds.

**Creates.** All create paths use `MustCreate` and hydrate the response
directly from the `WriteResult` returned by `put_if`, which carries the
assigned `resource_version`, `created_at_ms`, and `updated_at_ms`. This
eliminates a read-after-write round trip and the race window that would come
with it.

**Updates.** The `update_message_cas` helper makes a single CAS attempt: it
fetches the current object, applies a mutation closure, and writes with a
`MatchResourceVersion` condition. On conflict the persistence layer returns a
`Conflict` error, which gRPC handlers map to `ABORTED` status so the client
(or the next watch/reconcile event) can retry with fresh state. There is no
automatic retry loop.

The helper accepts an `expected_version` parameter that selects between two
modes:

- **Server-driven** (`expected_version = 0`): the helper uses the version it
just read from the database. Internal operations (reconciler, policy status
reports, compute phase transitions) use this mode because the caller does
not track versions.
- **Client-driven** (`expected_version != 0`): the helper validates that the
caller's version matches the current database version before applying the
mutation. If they diverge it returns `Conflict` without attempting the
write. Client-facing operations that carry an `expected_resource_version`
field use this mode: `AttachSandboxProvider`, `DetachSandboxProvider`,
`UpdateProvider`, and `UpdateConfig` (policy backfill path).

**Lists.** The `list_messages` and `list_messages_with_selector` helpers decode
protobuf payloads from list results and hydrate `resource_version` from the
authoritative database column into each decoded message, mirroring the
`get_message` pattern. This ensures list responses carry correct versions
without requiring callers to manually hydrate each record.

**Deletes.** Delete operations are not yet CAS-protected -- the delete request
protos do not carry `expected_resource_version`. A `delete_if` primitive exists
in the persistence layer but is not wired into gRPC handlers.

**Coverage.** All `ObjectMeta`-bearing message types have write-condition
coverage:

| Type | Create | Update | List |
|---|---|---|---|
| Sandbox | `MustCreate` | `update_message_cas` | `list_messages` |
| Provider | `MustCreate` | `update_message_cas` | `list_messages` |
| ProviderProfile | `MustCreate` | (immutable) | `list_messages` |
| InferenceRoute | `MustCreate` | `update_message_cas` | `list_messages` |
| SandboxPolicy | scoped versioning | scoped versioning | scoped query |
| Settings | `Mutex`-guarded | `Mutex`-guarded | single-row |

Global settings updates use a Tokio `Mutex` to serialize multi-step
validation within a single gateway process, with CAS on the underlying
persistence write as defense in depth. In an HA deployment with multiple
gateways, the Mutex alone would be insufficient. Sandbox-scoped settings
rely entirely on CAS without a Mutex.

The `resource_version` is surfaced to clients through `ObjectMeta` in proto
responses. Database migrations backfill existing rows with version 1.

Policy and runtime settings are delivered together through the effective sandbox
config path. A gateway-global policy can override sandbox-scoped policy. The
sandbox supervisor polls for config revisions and hot-reloads dynamic policy
Expand Down
81 changes: 75 additions & 6 deletions crates/openshell-cli/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2389,6 +2389,11 @@ pub async fn sandbox_get(
println!(" {} {}", "Id:".dimmed(), id);
println!(" {} {}", "Name:".dimmed(), name);
println!(" {} {}", "Phase:".dimmed(), phase_name(sandbox.phase));
println!(
" {} {}",
"Resource version:".dimmed(),
sandbox.metadata.as_ref().map_or(0, |m| m.resource_version)
);

// Display labels if present
if let Some(metadata) = &sandbox.metadata
Expand Down Expand Up @@ -3152,14 +3157,38 @@ pub async fn sandbox_provider_attach(
tls: &TlsOptions,
) -> Result<()> {
let mut client = grpc_client(server, tls).await?;
let response = client

// Fetch current sandbox to get resource_version for CAS
let sandbox = client
.get_sandbox(GetSandboxRequest {
name: name.to_string(),
})
.await
.into_diagnostic()?
.into_inner()
.sandbox
.ok_or_else(|| miette::miette!("sandbox not found"))?;

let resource_version = sandbox.metadata.as_ref().map_or(0, |m| m.resource_version);

let response = match client
.attach_sandbox_provider(AttachSandboxProviderRequest {
sandbox_name: name.to_string(),
provider_name: provider.to_string(),
expected_resource_version: resource_version,
})
.await
.into_diagnostic()?
.into_inner();
{
Ok(response) => response.into_inner(),
Err(status) if status.code() == Code::Aborted => {
return Err(miette::miette!(
"Failed to attach provider: sandbox was modified by another operation.\n\
Please retry the command."
)
.with_source_code(status.message().to_string()));
}
Err(e) => return Err(e).into_diagnostic(),
};

if response.attached {
println!(
Expand All @@ -3181,14 +3210,38 @@ pub async fn sandbox_provider_detach(
tls: &TlsOptions,
) -> Result<()> {
let mut client = grpc_client(server, tls).await?;
let response = client

// Fetch current sandbox to get resource_version for CAS
let sandbox = client
.get_sandbox(GetSandboxRequest {
name: name.to_string(),
})
.await
.into_diagnostic()?
.into_inner()
.sandbox
.ok_or_else(|| miette::miette!("sandbox not found"))?;

let resource_version = sandbox.metadata.as_ref().map_or(0, |m| m.resource_version);

let response = match client
.detach_sandbox_provider(DetachSandboxProviderRequest {
sandbox_name: name.to_string(),
provider_name: provider.to_string(),
expected_resource_version: resource_version,
})
.await
.into_diagnostic()?
.into_inner();
{
Ok(response) => response.into_inner(),
Err(status) if status.code() == Code::Aborted => {
return Err(miette::miette!(
"Failed to detach provider: sandbox was modified by another operation.\n\
Please retry the command."
)
.with_source_code(status.message().to_string()));
}
Err(e) => return Err(e).into_diagnostic(),
};

if response.detached {
println!(
Expand Down Expand Up @@ -3523,6 +3576,7 @@ async fn auto_create_provider(
name: exact_name.to_string(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
r#type: provider_type.to_string(),
credentials: discovered.credentials.clone(),
Expand Down Expand Up @@ -3563,6 +3617,7 @@ async fn auto_create_provider(
name: name.clone(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
r#type: provider_type.to_string(),
credentials: discovered.credentials.clone(),
Expand Down Expand Up @@ -3975,6 +4030,7 @@ pub async fn provider_create(
name: name.to_string(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
r#type: provider_type.clone(),
credentials: credential_map,
Expand Down Expand Up @@ -4019,6 +4075,11 @@ pub async fn provider_get(server: &str, name: &str, tls: &TlsOptions) -> Result<
println!(" {} {}", "Id:".dimmed(), provider.object_id());
println!(" {} {}", "Name:".dimmed(), provider.object_name());
println!(" {} {}", "Type:".dimmed(), provider.r#type);
println!(
" {} {}",
"Resource version:".dimmed(),
provider.metadata.as_ref().map_or(0, |m| m.resource_version)
);
println!(
" {} {}",
"Credential keys:".dimmed(),
Expand Down Expand Up @@ -4475,6 +4536,7 @@ pub async fn provider_update(
name: name.to_string(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
r#type: String::new(),
credentials: credential_map,
Expand Down Expand Up @@ -5029,6 +5091,7 @@ pub async fn sandbox_policy_set_global(
delete_setting: false,
global: true,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down Expand Up @@ -5227,6 +5290,7 @@ pub async fn gateway_setting_set(
delete_setting: false,
global: true,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down Expand Up @@ -5261,6 +5325,7 @@ pub async fn sandbox_setting_set(
delete_setting: false,
global: false,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down Expand Up @@ -5295,6 +5360,7 @@ pub async fn gateway_setting_delete(
delete_setting: true,
global: true,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down Expand Up @@ -5329,6 +5395,7 @@ pub async fn sandbox_setting_delete(
delete_setting: true,
global: false,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down Expand Up @@ -5387,6 +5454,7 @@ pub async fn sandbox_policy_set(
delete_setting: false,
global: false,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?;
Expand Down Expand Up @@ -5561,6 +5629,7 @@ pub async fn sandbox_policy_update(
delete_setting: false,
global: false,
merge_operations: plan.merge_operations,
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down
2 changes: 2 additions & 0 deletions crates/openshell-cli/tests/ensure_providers_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl TestOpenShell {
name: name.to_string(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
r#type: provider_type.to_string(),
credentials: HashMap::new(),
Expand Down Expand Up @@ -326,6 +327,7 @@ impl OpenShell for TestOpenShell {
name: provider_metadata.name,
created_at_ms: existing_metadata.created_at_ms,
labels: existing_metadata.labels,
resource_version: 0,
}),
r#type: existing.r#type,
credentials: merge(existing.credentials, provider.credentials),
Expand Down
27 changes: 22 additions & 5 deletions crates/openshell-cli/tests/provider_commands_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use openshell_core::proto::{
GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse,
ListProvidersRequest, ListProvidersResponse, ListSandboxProvidersRequest,
ListSandboxProvidersResponse, ListSandboxesRequest, ListSandboxesResponse, Provider,
ProviderProfile, ProviderResponse, RevokeSshSessionRequest, RevokeSshSessionResponse,
ProviderProfile, ProviderResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, Sandbox,
SandboxResponse, SandboxStreamEvent, ServiceStatus, SupervisorMessage, UpdateProviderRequest,
WatchSandboxRequest,
};
Expand Down Expand Up @@ -83,9 +83,25 @@ impl OpenShell for TestOpenShell {

async fn get_sandbox(
&self,
_request: tonic::Request<GetSandboxRequest>,
request: tonic::Request<GetSandboxRequest>,
) -> Result<Response<SandboxResponse>, Status> {
Ok(Response::new(SandboxResponse::default()))
let name = request.into_inner().name;
// Return a minimal sandbox with metadata for CAS operations
Ok(Response::new(SandboxResponse {
sandbox: Some(Sandbox {
metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta {
id: format!("sb-{name}"),
name,
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 1,
}),
spec: None,
status: None,
phase: 0,
current_policy_version: 0,
}),
}))
}

async fn list_sandboxes(
Expand Down Expand Up @@ -155,7 +171,7 @@ impl OpenShell for TestOpenShell {
providers.push(request.provider_name.clone());
true
};
let sandbox = openshell_core::proto::Sandbox {
let sandbox = Sandbox {
metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta {
name: request.sandbox_name,
..Default::default()
Expand Down Expand Up @@ -192,7 +208,7 @@ impl OpenShell for TestOpenShell {
let before_len = providers.len();
providers.retain(|name| name != &request.provider_name);
let detached = providers.len() != before_len;
let sandbox = openshell_core::proto::Sandbox {
let sandbox = Sandbox {
metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta {
name: request.sandbox_name,
..Default::default()
Expand Down Expand Up @@ -447,6 +463,7 @@ impl OpenShell for TestOpenShell {
name: provider_metadata.name,
created_at_ms: existing_metadata.created_at_ms,
labels: existing_metadata.labels,
resource_version: 0,
}),
r#type: existing.r#type,
credentials: merge(existing.credentials, provider.credentials),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl OpenShell for TestOpenShell {
name: sandbox_name,
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
phase: SandboxPhase::Provisioning as i32,
..Sandbox::default()
Expand All @@ -104,6 +105,7 @@ impl OpenShell for TestOpenShell {
name,
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
phase: SandboxPhase::Ready as i32,
..Sandbox::default()
Expand Down Expand Up @@ -324,6 +326,7 @@ impl OpenShell for TestOpenShell {
name: sandbox_id.trim_start_matches("id-").to_string(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
phase: SandboxPhase::Provisioning as i32,
..Sandbox::default()
Expand Down
Loading
Loading