feat: kronos implementation#1001
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis PR integrates Kronos, an asynchronous job queue system, into Superposition's webhook execution pipeline. Webhooks now submit jobs for delivery instead of executing HTTP requests synchronously, with configurable retry limits and workspace-aware schema provisioning for job scheduling. ChangesWebhook Kronos Integration
Sequence DiagramssequenceDiagram
participant EventSource
participant ServiceHelpers
participant KronosDispatch
participant KronosClient
participant DispatchHandler
participant TargetWebhook
EventSource->>ServiceHelpers: emit event
ServiceHelpers->>ServiceHelpers: lookup webhook config
ServiceHelpers->>KronosDispatch: submit_webhook_job()
KronosDispatch->>KronosDispatch: build payload with webhook_name
KronosDispatch->>KronosDispatch: generate idempotency key
KronosDispatch->>KronosClient: create_job(dispatcher_endpoint)
KronosClient-->>KronosDispatch: execution_id
KronosDispatch-->>ServiceHelpers: Ok(execution_id)
ServiceHelpers-->>EventSource: success
Note over KronosClient: Later, when job triggers
KronosClient->>DispatchHandler: POST /dispatch/webhook
DispatchHandler->>DispatchHandler: fetch_decrypted_secrets()
DispatchHandler->>DispatchHandler: resolve header templates
DispatchHandler->>TargetWebhook: HTTP request
TargetWebhook-->>DispatchHandler: response
DispatchHandler-->>KronosClient: status
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 13
🧹 Nitpick comments (5)
crates/superposition_types/src/api/webhook.rs (1)
28-28: ⚡ Quick winConsider documenting the
max_retriesfield constraints.The
max_retriesfield lacks documentation describing its purpose, valid range, and default behavior. Based on the validation in handlers, the constraint is0 ≤ max_retries ≤ 3, but API consumers cannot discover this from the type definition alone.📝 Proposed documentation
pub events: Vec<WebhookEvent>, + /// Maximum number of retry attempts on webhook delivery failure (0-3). + /// Defaults to 3 if not specified. pub max_retries: Option<i32>, pub change_reason: ChangeReason,Apply similar documentation to line 43 in
UpdateWebhookRequest.Also applies to: 43-43
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/superposition_types/src/api/webhook.rs` at line 28, Add a doc comment to the max_retries field in the CreateWebhookRequest struct describing its purpose, valid range, and default behavior: state that max_retries controls how many times delivery is retried, valid values are 0 through 3 inclusive (0 ≤ max_retries ≤ 3), and document the default when omitted; apply the same doc comment to the max_retries field in UpdateWebhookRequest so API consumers can discover the constraint from the type definition.crates/superposition/src/app_state.rs (1)
86-121: ⚖️ Poor tradeoffConsider extracting Kronos initialization logic into a separate function.
The Kronos initialization block spans 35 lines and handles two distinct modes (HTTP vs library). Extracting this into a helper function (e.g.,
initialize_kronos_client) would improve readability and testability of thegetfunction.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/superposition/src/app_state.rs` around lines 86 - 121, The Kronos initialization block in get is large and should be moved into a new async helper initialize_kronos_client that returns (Arc<dyn KronosClient>, CancellationToken, Option<JoinHandle<anyhow::Result<()>>>); implement the helper to accept the same inputs used inside the block (e.g., &kms_client, &app_env or their concrete types) and reproduce both branches (HTTP: build KronosHttpClient from KRONOS_URL/KRONOS_API_KEY/KRONOS_ORG_ID; Library: create sqlx pool, KronosLibraryClient, SuperpositionSchemaProvider, CancellationToken, and start_worker) keeping existing logging and expect/error behavior, then replace the inline if-let with a single await call to initialize_kronos_client() in get and update imports/signatures as needed.crates/superposition/src/dispatch/webhook.rs (2)
78-85: ⚡ Quick winPrefer
resp.status().is_success()over the explicit 2xx whitelist.The current list rejects valid
2xxresponses outside{200, 201, 202, 204}(e.g.,205,206,226, and some app-specific200-range codes), reporting them as dispatch failures and triggering unnecessary Kronos retries.is_success()covers the full200..=299range and is the standard idiom in reqwest-based clients.♻️ Suggested change
- if [200u16, 201, 202, 204].contains(&resp.status().as_u16()) { - Ok(HttpResponse::Ok().finish()) - } else { - Err(unexpected_error!( - "Target returned unexpected status {}", - resp.status() - )) - } + if resp.status().is_success() { + Ok(HttpResponse::Ok().finish()) + } else { + Err(unexpected_error!( + "Target returned unexpected status {}", + resp.status() + )) + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/superposition/src/dispatch/webhook.rs` around lines 78 - 85, Replace the explicit 2xx whitelist check with the standard success predicate: in the conditional that currently checks if [200u16, 201, 202, 204].contains(&resp.status().as_u16()) (the branch that returns Ok(HttpResponse::Ok().finish()) or Err(unexpected_error!(... , resp.status()))), use resp.status().is_success() so all 200..=299 responses are treated as success and only non-success statuses produce the unexpected_error! branch.
54-57: 💤 Low valueSilent fallback to POST hides misconfigured webhook methods.
Method::from_bytesonly fails on invalid characters here, so in practice this nearly always succeeds — but if it ever does fail, the dispatcher will silently call the target withPOSTinstead of the configured verb (e.g.,DELETE/PUT) and the operator gets no signal. Alog::warn!on fallback (or surfacing it as an error sincewebhook.methodis supposed to come from a controlled enum) would make this debuggable.♻️ Suggested change
- let method = reqwest::Method::from_bytes( - webhook.method.to_string().to_uppercase().as_bytes(), - ) - .unwrap_or(reqwest::Method::POST); + let method_str = webhook.method.to_string().to_uppercase(); + let method = reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or_else(|e| { + log::warn!( + "Invalid HTTP method '{method_str}' on webhook '{}', defaulting to POST: {e}", + webhook_name + ); + reqwest::Method::POST + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/superposition/src/dispatch/webhook.rs` around lines 54 - 57, The current call to Method::from_bytes when building the HTTP verb silently falls back to POST on failure; update the code in webhook.rs where Method::from_bytes is called (using webhook.method.to_string()) to handle its Result explicitly rather than using unwrap_or: if conversion fails, emit a log::warn! that includes the original webhook.method string and the conversion error, then decide to default to reqwest::Method::POST (or return an Err) so misconfigurations are visible; ensure the warning references webhook.method and the error from Method::from_bytes for debuggability.crates/superposition/src/workspace/handlers.rs (1)
179-225: ⚡ Quick winExtract a helper for the duplicated dispatcher setup.
Lines 179-225 in
create_handlerand lines 513-553 inmigrate_schema_handlerare near-identical: same dispatcher URL template, sameupsert_secret+register_endpointsequence with the same hardcodeddispatcher_retry_policy(3), and the same warning log patterns. If the dispatcher contract changes (URL shape, retry count, header names, additional setup steps), both sites must be edited in lockstep — easy to drift.♻️ Suggested helper extraction
+async fn ensure_dispatcher_setup( + state: &AppState, + schema_name: &str, + organisation_id: &str, + workspace_name: &str, +) { + if let Err(e) = state.kronos_client.provision_workspace(schema_name).await { + log::warn!("Kronos workspace provision failed for '{schema_name}': {e}"); + } + + let dispatcher_url = format!( + "{}/{}/{}/dispatch/webhook", + state.cac_host, organisation_id, workspace_name + ); + if let Err(e) = state + .kronos_client + .upsert_secret(schema_name, DISPATCHER_SECRET_NAME, &state.superposition_token) + .await + { + log::warn!("Dispatcher secret setup failed for '{schema_name}': {e}"); + } + if let Err(e) = state + .kronos_client + .register_endpoint( + schema_name, + DISPATCHER_ENDPOINT_NAME, + "HTTP", + dispatcher_endpoint_spec(&dispatcher_url), + Some(dispatcher_retry_policy(3)), + ) + .await + { + log::warn!("Dispatcher endpoint setup failed for '{schema_name}': {e}"); + } +}Then call it from both handlers:
- if let Err(e) = state - .kronos_client - .provision_workspace(&workspace_schema_name.0) - .await - { ... } - let dispatcher_url = format!(...); - if let Err(e) = state.kronos_client.upsert_secret(...).await { ... } - if let Err(e) = state.kronos_client.register_endpoint(...).await { ... } + ensure_dispatcher_setup( + &state, + &workspace_schema_name.0, + &created_workspace.organisation_id, + &created_workspace.workspace_name, + ) + .await;Also applies to: 513-553
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/superposition/src/workspace/handlers.rs` around lines 179 - 225, Duplicate dispatcher setup logic in create_handler and migrate_schema_handler should be extracted into a single helper to avoid drift; add a function (e.g., setup_dispatcher_for_workspace or ensure_dispatcher) that accepts the shared inputs (State reference, workspace_schema_name: &SchemaName or &str, organisation_id, workspace_name) and performs the dispatcher_url formatting, calls kronos_client.upsert_secret with DISPATCHER_SECRET_NAME and state.superposition_token, and kronos_client.register_endpoint with DISPATCHER_ENDPOINT_NAME, "HTTP", dispatcher_endpoint_spec(&dispatcher_url), and dispatcher_retry_policy(3), preserving the existing warning logs on Err; replace the duplicated blocks in create_handler and migrate_schema_handler with calls to this new helper so any future contract changes (URL shape, retry count, secret/endpoint names, logging) are centralized.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@Cargo.toml`:
- Around line 94-95: Cargo.toml references local path dependencies kronos-worker
and kronos-common which point to ../kronos/crates/worker and
../kronos/crates/common that are missing; resolve by adding the kronos repo as a
git submodule at ../kronos (or clone it as a sibling) so those paths exist, or
update the path entries for kronos-worker and kronos-common to the actual
location where you cloned the kronos monorepo; ensure the paths in Cargo.toml
match the repository layout so cargo can find the crate packages.
In `@crates/frontend/src/components/webhook_form.rs`:
- Around line 50-52: Validation currently only enforces the upper bound for
max_retries (if max_retries > 3) but allows negative values; update the check
that validates max_retries in the webhook form validation code to ensure it's
within 0..=3 by adding a lower-bound check (reject when max_retries < 0) and
returning a clear Err (e.g., "Max retries must be between 0 and 3") where the
existing max_retries validation lives so both negative and too-large values are
rejected.
- Around line 319-322: The on_change callback casts value.as_i64() to i32
without bounds checking; update the closure (the code handling value.as_i64()
and calling max_retries_ws.set) to validate the integer is within the allowed
range before casting and setting the signal (e.g., ensure n is between 0 and 3
or within i32::MIN..=i32::MAX as appropriate), and only call
max_retries_ws.set(n as i32) when the check passes (otherwise ignore or clamp
the value).
In `@crates/service_utils/src/helpers.rs`:
- Around line 267-271: The idempotency_key generation uses
Utc::now().timestamp_millis(), which makes every key unique and prevents
deduplication; change the idempotency_key construction (the let idempotency_key
= ... line) to use stable event properties instead of the timestamp — e.g.,
combine webhook.name (or webhook.id), event, the event's resource_id (or other
stable resource identifier) and the webhook/config_version into the formatted
string so identical logical events produce the same key and allow Kronos to
dedupe retries/duplicates.
- Around line 273-280: The call to submit_webhook_job uses
state.kronos_client.as_ref() which yields Option<&T> but submit_webhook_job (in
kronos_dispatch.rs) expects a plain &dyn KronosClient; fix by unwrapping or
handling the Option before calling submit_webhook_job: either assert/expect that
kronos_client is present (e.g., call state.kronos_client.as_ref().expect(...)
and pass the &dyn KronosClient) if it must be guaranteed, or
pattern-match/state-check and return an error/early-return when None so
submit_webhook_job always receives a concrete &dyn KronosClient; update the call
site around submit_webhook_job and related error path accordingly (or
alternatively change submit_webhook_job signature to accept Option<&dyn
KronosClient> and add None handling inside kronos_dispatch.rs).
In `@crates/service_utils/src/kronos_dispatch.rs`:
- Around line 25-48: Add an in-memory cached layer for the active schema list:
add a cache field (e.g., cache: Arc<RwLock<Option<CachedSchemas>>>) and a ttl:
Duration to the SuperpositionSchemaProvider struct, where CachedSchemas contains
schemas: Vec<String> and last_updated: Instant; then modify get_active_schemas
to first read the cache under a shared RwLock read guard and return
cached.schemas if last_updated + ttl > Instant::now(), otherwise take an
exclusive write lock, re-check staleness, query the DB using the existing
sqlx::query_as block to refresh CachedSchemas (updating schemas and
last_updated) and return the refreshed list; ensure the cache field is
initialized (None) and ttl is configurable and use tokio::sync::RwLock and
std::time::Instant for timestamps.
- Around line 109-131: In submit_webhook_job validate the i32 max_attempts
before casting to i64: ensure it is positive and within a sane upper bound
(e.g., <= 100 or project-specific limit) and return an error (use anyhow::bail
or Err) for invalid values, then pass the validated value (as i64) into
kronos_client.create_job; update the validation logic near the call site that
currently casts max_attempts to i64 so negative or excessively large inputs are
rejected before submission.
- Around line 74-101: The current resolve_secret_templates only replaces
templates for top-level string values and clones non-strings, so nested
objects/arrays aren't handled; change resolve_secret_templates to operate
recursively on a serde_json::Value (accept &serde_json::Value) and match on
Value::String, Value::Object and Value::Array: for String perform the
{{SECRETS.name}} replacements, for Object map each (k,v) to (k.clone(),
resolve_secret_templates(v, secrets)), and for Array map each element through
resolve_secret_templates, returning other variants cloned unchanged so all
nested templates are resolved.
In `@crates/superposition/src/dispatch/webhook.rs`:
- Around line 30-86: dispatch_handler currently decrypts workspace secrets
(fetch_decrypted_secrets) and resolves headers (resolve_secret_templates)
without authenticating the caller; ensure an explicit dispatcher token
validation occurs before any secret operations by extracting the dispatcher
token header from the incoming request context (via WorkspaceContext or direct
request headers) and comparing it against the configured
state.superposition_token (or delegating to the existing AuthNHandler if
present) and return early on mismatch, then proceed to call
fetch_decrypted_secrets and resolve_secret_templates only after successful
validation; also replace the hard-coded status check ([200,201,202,204]) with
resp.status().is_success() to determine success.
In `@crates/superposition/src/main.rs`:
- Around line 229-235: The timeout result from awaiting the Kronos worker handle
is ignored; update the shutdown sequence to capture the result of
tokio::time::timeout(Duration::from_secs(35), handle).await and log whether the
worker completed successfully, returned an error, or timed out using your logger
(e.g., info/warn/error). Specifically, modify the block that checks
kronos_worker_handle so that after calling tokio::time::timeout you match on the
Result to distinguish Ok(Ok(_)) (graceful completion), Ok(Err(e)) (worker
returned an error), and Err(_) (timeout) and emit an appropriate log message
mentioning kronos_cancel/kronos_worker_handle and the timeout duration.
In `@crates/superposition/src/webhooks/handlers.rs`:
- Around line 55-60: The current validation only enforces an upper bound and
allows negative values; update the validation around req.max_retries (the local
variable retries) to reject negative numbers as well by returning
Err(superposition::AppError::BadArgument(...)) when retries < 0 with a clear
message like "max_retries must be non-negative", keeping the existing
upper-bound check (retries > 3) and error type AppError::BadArgument for
consistency.
- Around line 119-125: Add a lower-bound check for req.max_retries so negative
values are rejected like the upper bound: when matching req.max_retries (same
place that currently checks "if let Some(retries) = req.max_retries"), add a
condition that returns superposition::AppError::BadArgument with a clear message
if retries < 0 (in addition to the existing retries > 3 check), keeping the
error type/flow consistent with the existing validation for max_retries.
---
Nitpick comments:
In `@crates/superposition_types/src/api/webhook.rs`:
- Line 28: Add a doc comment to the max_retries field in the
CreateWebhookRequest struct describing its purpose, valid range, and default
behavior: state that max_retries controls how many times delivery is retried,
valid values are 0 through 3 inclusive (0 ≤ max_retries ≤ 3), and document the
default when omitted; apply the same doc comment to the max_retries field in
UpdateWebhookRequest so API consumers can discover the constraint from the type
definition.
In `@crates/superposition/src/app_state.rs`:
- Around line 86-121: The Kronos initialization block in get is large and should
be moved into a new async helper initialize_kronos_client that returns (Arc<dyn
KronosClient>, CancellationToken, Option<JoinHandle<anyhow::Result<()>>>);
implement the helper to accept the same inputs used inside the block (e.g.,
&kms_client, &app_env or their concrete types) and reproduce both branches
(HTTP: build KronosHttpClient from KRONOS_URL/KRONOS_API_KEY/KRONOS_ORG_ID;
Library: create sqlx pool, KronosLibraryClient, SuperpositionSchemaProvider,
CancellationToken, and start_worker) keeping existing logging and expect/error
behavior, then replace the inline if-let with a single await call to
initialize_kronos_client() in get and update imports/signatures as needed.
In `@crates/superposition/src/dispatch/webhook.rs`:
- Around line 78-85: Replace the explicit 2xx whitelist check with the standard
success predicate: in the conditional that currently checks if [200u16, 201,
202, 204].contains(&resp.status().as_u16()) (the branch that returns
Ok(HttpResponse::Ok().finish()) or Err(unexpected_error!(... , resp.status()))),
use resp.status().is_success() so all 200..=299 responses are treated as success
and only non-success statuses produce the unexpected_error! branch.
- Around line 54-57: The current call to Method::from_bytes when building the
HTTP verb silently falls back to POST on failure; update the code in webhook.rs
where Method::from_bytes is called (using webhook.method.to_string()) to handle
its Result explicitly rather than using unwrap_or: if conversion fails, emit a
log::warn! that includes the original webhook.method string and the conversion
error, then decide to default to reqwest::Method::POST (or return an Err) so
misconfigurations are visible; ensure the warning references webhook.method and
the error from Method::from_bytes for debuggability.
In `@crates/superposition/src/workspace/handlers.rs`:
- Around line 179-225: Duplicate dispatcher setup logic in create_handler and
migrate_schema_handler should be extracted into a single helper to avoid drift;
add a function (e.g., setup_dispatcher_for_workspace or ensure_dispatcher) that
accepts the shared inputs (State reference, workspace_schema_name: &SchemaName
or &str, organisation_id, workspace_name) and performs the dispatcher_url
formatting, calls kronos_client.upsert_secret with DISPATCHER_SECRET_NAME and
state.superposition_token, and kronos_client.register_endpoint with
DISPATCHER_ENDPOINT_NAME, "HTTP", dispatcher_endpoint_spec(&dispatcher_url), and
dispatcher_retry_policy(3), preserving the existing warning logs on Err; replace
the duplicated blocks in create_handler and migrate_schema_handler with calls to
this new helper so any future contract changes (URL shape, retry count,
secret/endpoint names, logging) are centralized.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c80d8bc4-8f40-4768-8e74-56ba08ac37f0
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (19)
Cargo.tomlcrates/context_aware_config/src/api/secrets/handlers.rscrates/frontend/src/api.rscrates/frontend/src/components/webhook_form.rscrates/frontend/src/pages/webhook.rscrates/service_utils/Cargo.tomlcrates/service_utils/src/helpers.rscrates/service_utils/src/kronos_dispatch.rscrates/service_utils/src/lib.rscrates/service_utils/src/service/types.rscrates/superposition/Cargo.tomlcrates/superposition/src/app_state.rscrates/superposition/src/dispatch/mod.rscrates/superposition/src/dispatch/webhook.rscrates/superposition/src/main.rscrates/superposition/src/webhooks/handlers.rscrates/superposition/src/workspace/handlers.rscrates/superposition_types/src/api/webhook.rsworkspace_template.sql
| kronos-worker = { path = "../kronos/crates/worker" } | ||
| kronos-common = { path = "../kronos/crates/common" } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if the Kronos submodule or sibling repository exists at the expected location
if [ -d "../kronos" ]; then
echo "✓ ../kronos directory exists"
ls -la ../kronos/crates/
else
echo "✗ ../kronos directory not found"
echo "The Cargo.toml expects kronos to be a sibling directory to superposition"
fi
# Check if it might be a git submodule
git submodule status | grep kronos || echo "No kronos submodule found"
# Check the actual repository structure
fd -t d "kronos" -d 3Repository: juspay/superposition
Length of output: 195
🏁 Script executed:
# First, check the Cargo.toml file at the specified lines
cat -n Cargo.toml | sed -n '90,100p'Repository: juspay/superposition
Length of output: 715
🏁 Script executed:
# Search for all kronos references in the codebase
rg "kronos" --type tomlRepository: juspay/superposition
Length of output: 583
🏁 Script executed:
# Check git configuration and submodule setup
cat -n .gitmodules 2>/dev/null || echo "No .gitmodules file found"Repository: juspay/superposition
Length of output: 90
🏁 Script executed:
# Check if there's any documentation about kronos setup
fd -t f -e md -e MD | xargs grep -l "kronos" 2>/dev/null | head -10Repository: juspay/superposition
Length of output: 46
🏁 Script executed:
# Look for README or setup documentation
fd -t f -e md -e MD -e txt | xargs grep -l "kronos\|setup\|build" 2>/dev/null | head -5Repository: juspay/superposition
Length of output: 206
🏁 Script executed:
# Check GitHub Actions or CI configuration
find .github -type f -name "*.yml" -o -name "*.yaml" 2>/dev/null | xargs cat 2>/dev/null | head -100Repository: juspay/superposition
Length of output: 3612
🏁 Script executed:
# Look for any Makefile, script, or documentation about dependencies
fd -t f \( -name "Makefile" -o -name "setup*" -o -name "build*" \) 2>/dev/nullRepository: juspay/superposition
Length of output: 46
🏁 Script executed:
# Check if there are environment setup scripts or docker files that might clone kronos
fd -t f \( -name "*.sh" -o -name "Dockerfile*" \) | xargs grep -l "kronos" 2>/dev/nullRepository: juspay/superposition
Length of output: 297
🏁 Script executed:
# Search README for kronos and build instructions
rg "kronos|build|setup|prerequisite" README.md -i -A 3Repository: juspay/superposition
Length of output: 1608
🏁 Script executed:
# Check if there's a Makefile that might handle dependencies
head -50 Makefile 2>/dev/null || echo "No Makefile found"Repository: juspay/superposition
Length of output: 82
🌐 Web query:
kronos-worker kronos-common crates.io rust
💡 Result:
kronos-worker and kronos-common are internal Rust crates from the Juspay Kronos project, a distributed job scheduling and execution system. They are not published on crates.io but are part of the monorepo at https://github.com/juspay/kronos [1]. kronos-common: Shared library providing models, database layer, configuration, tenant management, caching, and metrics [1]. kronos-worker: Execution engine that polls the database for queued, retrying, or pending job executions, resolves templates, dispatches to endpoints (including Kafka and Redis Streams via feature flags), and exposes metrics via HTTP [1]. The project includes other crates like kronos-api (REST API server with Actix-web), kronos-mock-server (test fixture), and kronos-dashboard (Leptos/WASM web UI). Build with features for dispatchers: cargo build --workspace --features kronos-worker/kafka [1]. No crates.io pages found for these exact names; they appear to be private/internal to the Juspay repo [1][all searches]. Juspay develops open-source Rust projects like Hyperswitch, and Kronos fits as a job scheduler [2][3].
Citations:
- 1: https://github.com/juspay/kronos
- 2: refactor(scheduler): move scheduler to new crate to support workflows in multiple crates hyperswitch#1681
- 3: https://github.com/juspay/framework-libs-rs
Add kronos as a git submodule or update the path dependency.
The path dependencies for kronos-worker and kronos-common at lines 94–95 point to ../kronos/crates/worker and ../kronos/crates/common, which do not exist in the repository. The kronos project is a separate monorepo at https://github.com/juspay/kronos and is not published on crates.io, so path dependencies must be resolved.
Options:
- Add kronos as a git submodule at
../kronos(relative to superposition) - Clone kronos as a sibling directory before building
- Update the path to match the actual kronos location if cloned elsewhere
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@Cargo.toml` around lines 94 - 95, Cargo.toml references local path
dependencies kronos-worker and kronos-common which point to
../kronos/crates/worker and ../kronos/crates/common that are missing; resolve by
adding the kronos repo as a git submodule at ../kronos (or clone it as a
sibling) so those paths exist, or update the path entries for kronos-worker and
kronos-common to the actual location where you cloned the kronos monorepo;
ensure the paths in Cargo.toml match the repository layout so cargo can find the
crate packages.
| if max_retries > 3 { | ||
| return Err("Max retries must not exceed 3".to_string()); | ||
| } |
There was a problem hiding this comment.
Validate that max_retries is non-negative.
The validation only checks the upper bound (> 3) but allows negative values. Negative retry counts are semantically invalid and should be rejected.
🛡️ Proposed fix to add lower bound validation
- if max_retries > 3 {
- return Err("Max retries must not exceed 3".to_string());
- }
+ if max_retries < 0 || max_retries > 3 {
+ return Err("Max retries must be between 0 and 3".to_string());
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if max_retries > 3 { | |
| return Err("Max retries must not exceed 3".to_string()); | |
| } | |
| if max_retries < 0 || max_retries > 3 { | |
| return Err("Max retries must be between 0 and 3".to_string()); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/frontend/src/components/webhook_form.rs` around lines 50 - 52,
Validation currently only enforces the upper bound for max_retries (if
max_retries > 3) but allows negative values; update the check that validates
max_retries in the webhook form validation code to ensure it's within 0..=3 by
adding a lower-bound check (reject when max_retries < 0) and returning a clear
Err (e.g., "Max retries must be between 0 and 3") where the existing max_retries
validation lives so both negative and too-large values are rejected.
| on_change=Callback::new(move |value: Value| { | ||
| if let Some(n) = value.as_i64() { | ||
| max_retries_ws.set(n as i32); | ||
| } |
There was a problem hiding this comment.
Add bounds checking before casting i64 to i32.
The code casts i64 to i32 without checking if the value fits in the target type's range. Although users are unlikely to enter extreme values, numeric inputs can be manipulated to overflow the i32 range, potentially resulting in unexpected retry counts.
🛡️ Proposed fix to validate range before cast
on_change=Callback::new(move |value: Value| {
if let Some(n) = value.as_i64() {
- max_retries_ws.set(n as i32);
+ if n >= 0 && n <= 3 {
+ max_retries_ws.set(n as i32);
+ }
}
})This validates the range before updating the signal, ensuring only valid values (0–3) are accepted.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| on_change=Callback::new(move |value: Value| { | |
| if let Some(n) = value.as_i64() { | |
| max_retries_ws.set(n as i32); | |
| } | |
| on_change=Callback::new(move |value: Value| { | |
| if let Some(n) = value.as_i64() { | |
| if n >= 0 && n <= 3 { | |
| max_retries_ws.set(n as i32); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/frontend/src/components/webhook_form.rs` around lines 319 - 322, The
on_change callback casts value.as_i64() to i32 without bounds checking; update
the closure (the code handling value.as_i64() and calling max_retries_ws.set) to
validate the integer is within the allowed range before casting and setting the
signal (e.g., ensure n is between 0 and 3 or within i32::MIN..=i32::MAX as
appropriate), and only call max_retries_ws.set(n as i32) when the check passes
(otherwise ignore or clamp the value).
| // Idempotency key: webhook_name + event type + millisecond timestamp. | ||
| // Ensures each trigger attempt gets a unique job while allowing dedup on retries. | ||
| let idempotency_key = format!( | ||
| "{}_{}_{}", webhook.name, event, Utc::now().timestamp_millis() | ||
| ); |
There was a problem hiding this comment.
Idempotency key defeats deduplication by including timestamp.
The timestamp_millis() component ensures every invocation generates a unique key, preventing Kronos from deduplicating identical webhook events triggered in rapid succession (e.g., race conditions, retries, duplicate triggers). For true idempotency, the key should be based on stable event properties without a timestamp—such as webhook_name + event + resource_id + config_version—so that the same logical event always produces the same key.
🔑 Proposed fix: Use stable event properties for idempotency key
- // Idempotency key: webhook_name + event type + millisecond timestamp.
- // Ensures each trigger attempt gets a unique job while allowing dedup on retries.
+ // Idempotency key: webhook_name + event type + workspace + config_version.
+ // Ensures the same logical event deduplicates across rapid retriggers.
let idempotency_key = format!(
- "{}_{}_{}", webhook.name, event, Utc::now().timestamp_millis()
+ "{}_{}_{}_{}",
+ webhook.name,
+ event,
+ workspace_context.workspace_id,
+ config_version_opt.as_deref().unwrap_or("none")
);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/service_utils/src/helpers.rs` around lines 267 - 271, The
idempotency_key generation uses Utc::now().timestamp_millis(), which makes every
key unique and prevents deduplication; change the idempotency_key construction
(the let idempotency_key = ... line) to use stable event properties instead of
the timestamp — e.g., combine webhook.name (or webhook.id), event, the event's
resource_id (or other stable resource identifier) and the webhook/config_version
into the formatted string so identical logical events produce the same key and
allow Kronos to dedupe retries/duplicates.
| match submit_webhook_job( | ||
| state.kronos_client.as_ref(), | ||
| &workspace_context.schema_name, | ||
| &webhook.name, | ||
| &webhook_response, | ||
| &idempotency_key, | ||
| webhook.max_retries, | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Verify chrono version and submit_webhook_job None handling.
# Check chrono version in service_utils Cargo.toml
echo "=== Checking chrono version in service_utils ==="
rg -A 2 'chrono' crates/service_utils/Cargo.toml
# Check submit_webhook_job signature and None handling
echo "=== Checking submit_webhook_job signature ==="
ast-grep --pattern $'pub async fn submit_webhook_job($$$) {
$$$
}'
# Look for None/Option handling in submit_webhook_job
echo "=== Checking Option handling in submit_webhook_job ==="
rg -A 10 'pub async fn submit_webhook_job' crates/service_utils/src/kronos_dispatch.rsRepository: juspay/superposition
Length of output: 647
🌐 Web query:
Does chrono DateTime support timestamp_millis method?
💡 Result:
Yes, chrono DateTime supports the timestamp_millis method. The official documentation confirms that DateTime has a pub const fn timestamp_millis(&self) -> i64 method, which returns the number of non-leap milliseconds since January 1, 1970 UTC [1][2]. Examples show its usage, such as with Utc.ymd constructions verifying the output [1][2]. This method has been available since at least chrono 0.4.x and remains in the latest versions [2]. A companion from_timestamp_millis method also exists for creating DateTime from milliseconds [2].
Citations:
- 1: https://docs.rs/chrono-wasi/latest/chrono/struct.DateTime.html
- 2: https://docs.rs/chrono/latest/chrono/struct.DateTime.html
🏁 Script executed:
# Check state definition and kronos_client type
rg -B 5 -A 10 'kronos_client' crates/service_utils/src/helpers.rs | head -40
# Get full submit_webhook_job implementation
rg -A 30 'pub async fn submit_webhook_job' crates/service_utils/src/kronos_dispatch.rs
# Check if this is a workspace with compile checks
find crates -name "Cargo.toml" -type f | xargs grep -l "submit_webhook_job\|kronos_dispatch"Repository: juspay/superposition
Length of output: 1067
Type mismatch: submit_webhook_job cannot accept Option reference passed from state.kronos_client.as_ref().
The function signature at kronos_dispatch.rs expects kronos_client: &dyn KronosClient (a direct reference), but line 274 passes state.kronos_client.as_ref(), which returns Option<&T>. The function does not contain any Option handling or unwrapping logic — it directly calls methods on kronos_client. Either the function must be updated to accept Option<&dyn KronosClient> and handle None gracefully, or the calling code must ensure kronos_client is guaranteed to be Some before calling (e.g., via .unwrap() or .expect()).
Note: chrono::DateTime::timestamp_millis() is confirmed available in chrono 0.4.x and later.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/service_utils/src/helpers.rs` around lines 273 - 280, The call to
submit_webhook_job uses state.kronos_client.as_ref() which yields Option<&T> but
submit_webhook_job (in kronos_dispatch.rs) expects a plain &dyn KronosClient;
fix by unwrapping or handling the Option before calling submit_webhook_job:
either assert/expect that kronos_client is present (e.g., call
state.kronos_client.as_ref().expect(...) and pass the &dyn KronosClient) if it
must be guaranteed, or pattern-match/state-check and return an
error/early-return when None so submit_webhook_job always receives a concrete
&dyn KronosClient; update the call site around submit_webhook_job and related
error path accordingly (or alternatively change submit_webhook_job signature to
accept Option<&dyn KronosClient> and add None handling inside
kronos_dispatch.rs).
| let kronos_encryption_key = | ||
| get_from_env_or_default("KRONOS_ENCRYPTION_KEY", "0".repeat(64)); |
There was a problem hiding this comment.
Hardcoded default encryption key is unsafe for production.
The fallback value "0".repeat(64) creates a predictable 64-character string of zeros. While acceptable in DEV/TEST environments, this default should never be used in production as it would make all encrypted Kronos job data trivially decryptable.
🔒 Proposed fix to require the key in production environments
- let kronos_encryption_key =
- get_from_env_or_default("KRONOS_ENCRYPTION_KEY", "0".repeat(64));
+ let kronos_encryption_key = match app_env {
+ AppEnv::DEV | AppEnv::TEST => {
+ get_from_env_or_default("KRONOS_ENCRYPTION_KEY", "0".repeat(64))
+ }
+ _ => get_from_env_unsafe("KRONOS_ENCRYPTION_KEY")
+ .expect("KRONOS_ENCRYPTION_KEY must be set in production"),
+ };| #[post("/webhook")] | ||
| async fn dispatch_handler( | ||
| workspace_context: WorkspaceContext, | ||
| state: Data<AppState>, | ||
| db_conn: DbConnection, | ||
| body: Json<DispatchWebhookRequest>, | ||
| ) -> superposition::Result<HttpResponse> { | ||
| let DispatchWebhookRequest { webhook_name, data } = body.into_inner(); | ||
|
|
||
| let DbConnection(mut conn) = db_conn; | ||
| let webhook = | ||
| fetch_webhook(&webhook_name, &workspace_context.schema_name, &mut conn)?; | ||
|
|
||
| if !webhook.enabled { | ||
| return Ok(HttpResponse::Ok().finish()); | ||
| } | ||
|
|
||
| let secrets = fetch_decrypted_secrets(&workspace_context, &mut conn, &state)?; | ||
|
|
||
| let resolved_headers = resolve_secret_templates( | ||
| &serde_json::Value::Object((*webhook.custom_headers).clone()), | ||
| &secrets, | ||
| ); | ||
|
|
||
| let method = reqwest::Method::from_bytes( | ||
| webhook.method.to_string().to_uppercase().as_bytes(), | ||
| ) | ||
| .unwrap_or(reqwest::Method::POST); | ||
|
|
||
| let mut builder = state | ||
| .http_client | ||
| .request(method, webhook.url.as_str()) | ||
| .timeout(std::time::Duration::from_secs(10)) | ||
| .json(&data); | ||
|
|
||
| if let serde_json::Value::Object(hmap) = resolved_headers { | ||
| for (k, v) in hmap { | ||
| if let Some(s) = v.as_str() { | ||
| builder = builder.header(k, s); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let resp = builder | ||
| .send() | ||
| .await | ||
| .map_err(|e| unexpected_error!("Dispatcher HTTP send failed: {}", e))?; | ||
|
|
||
| if [200u16, 201, 202, 204].contains(&resp.status().as_u16()) { | ||
| Ok(HttpResponse::Ok().finish()) | ||
| } else { | ||
| Err(unexpected_error!( | ||
| "Target returned unexpected status {}", | ||
| resp.status() | ||
| )) | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Locate the middleware applied to /dispatch and check what it authenticates.
fd -t f 'middlewares' crates | head
rg -nP --type=rust -C5 '\bOrgWorkspaceMiddlewareFactory\b'
rg -nP --type=rust -C5 'impl\s+FromRequest\s+for\s+WorkspaceContext\b'
# Confirm how the dispatcher route is mounted in main.rs
rg -nP --type=rust -C5 '/dispatch'Repository: juspay/superposition
Length of output: 14930
🏁 Script executed:
cat -n crates/service_utils/src/middlewares/workspace_context.rs | head -100Repository: juspay/superposition
Length of output: 3833
🏁 Script executed:
cat -n crates/service_utils/src/service/types.rs | sed -n '196,250p'Repository: juspay/superposition
Length of output: 2323
🏁 Script executed:
rg -nP --type=rust 'DISPATCHER_SECRET_NAME' crates/Repository: juspay/superposition
Length of output: 618
🏁 Script executed:
find crates/superposition/src/dispatch -type f -name '*.rs' -exec basename {} \;Repository: juspay/superposition
Length of output: 82
🏁 Script executed:
cat -n crates/superposition/src/dispatch/webhook.rsRepository: juspay/superposition
Length of output: 4705
🏁 Script executed:
cat -n crates/superposition/src/dispatch/mod.rsRepository: juspay/superposition
Length of output: 305
🏁 Script executed:
rg -nP --type=rust 'superposition_token|DISPATCHER_SECRET_NAME' crates/superposition/src/dispatch/Repository: juspay/superposition
Length of output: 46
This endpoint requires explicit dispatcher token validation before any secret operations.
The handler decrypts all workspace secrets (line 47: fetch_decrypted_secrets) and forwards them in HTTP headers to a webhook URL fetched from the database (lines 49-71), but performs no authentication check:
- The
OrgWorkspaceMiddlewareFactory(wrapping/dispatchat main.rs:323) is a context resolver only—it extracts org/workspace from the path but does not validate any dispatcher token. WorkspaceContext::from_request(types.rs:201) simply retrieves the context set by the middleware; no authentication occurs.- The handler directly calls
fetch_decrypted_secretsandresolve_secret_templateswithout verifying the caller is the trusted Kronos dispatcher.
If an unauthenticated caller can reach this endpoint (or if org_id/workspace_id in the path can be manipulated), they can trigger an SSRF + secret-exfiltration attack: decrypt all workspace secrets and forward them to any URL stored in a webhook record.
Verify that the upstream authentication layer (AuthNHandler or similar) validates the dispatcher token before requests reach this scope. If not, add an explicit token check in the handler (e.g., extract and validate state.superposition_token from request headers) before calling fetch_decrypted_secrets.
Also: Use resp.status().is_success() (lines 78–85) instead of the fixed list [200, 201, 202, 204] for better maintainability.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/superposition/src/dispatch/webhook.rs` around lines 30 - 86,
dispatch_handler currently decrypts workspace secrets (fetch_decrypted_secrets)
and resolves headers (resolve_secret_templates) without authenticating the
caller; ensure an explicit dispatcher token validation occurs before any secret
operations by extracting the dispatcher token header from the incoming request
context (via WorkspaceContext or direct request headers) and comparing it
against the configured state.superposition_token (or delegating to the existing
AuthNHandler if present) and return early on mismatch, then proceed to call
fetch_decrypted_secrets and resolve_secret_templates only after successful
validation; also replace the hard-coded status check ([200,201,202,204]) with
resp.status().is_success() to determine success.
| // HTTP server runs until Ctrl+C / SIGTERM. After it stops, drain the Kronos worker. | ||
| server.await?; | ||
|
|
||
| kronos_cancel.cancel(); | ||
| if let Some(handle) = kronos_worker_handle { | ||
| let _ = tokio::time::timeout(Duration::from_secs(35), handle).await; | ||
| } |
There was a problem hiding this comment.
Log the Kronos worker shutdown outcome.
The timeout result is currently ignored. Logging whether the worker completed gracefully or timed out would help diagnose shutdown issues in production.
📋 Proposed fix to log shutdown outcome
kronos_cancel.cancel();
if let Some(handle) = kronos_worker_handle {
- let _ = tokio::time::timeout(Duration::from_secs(35), handle).await;
+ match tokio::time::timeout(Duration::from_secs(35), handle).await {
+ Ok(Ok(())) => log::info!("Kronos worker shut down cleanly"),
+ Ok(Err(e)) => log::error!("Kronos worker returned error: {e}"),
+ Err(_) => log::warn!("Kronos worker shutdown timed out after 35s"),
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // HTTP server runs until Ctrl+C / SIGTERM. After it stops, drain the Kronos worker. | |
| server.await?; | |
| kronos_cancel.cancel(); | |
| if let Some(handle) = kronos_worker_handle { | |
| let _ = tokio::time::timeout(Duration::from_secs(35), handle).await; | |
| } | |
| // HTTP server runs until Ctrl+C / SIGTERM. After it stops, drain the Kronos worker. | |
| server.await?; | |
| kronos_cancel.cancel(); | |
| if let Some(handle) = kronos_worker_handle { | |
| match tokio::time::timeout(Duration::from_secs(35), handle).await { | |
| Ok(Ok(())) => log::info!("Kronos worker shut down cleanly"), | |
| Ok(Err(e)) => log::error!("Kronos worker returned error: {e}"), | |
| Err(_) => log::warn!("Kronos worker shutdown timed out after 35s"), | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/superposition/src/main.rs` around lines 229 - 235, The timeout result
from awaiting the Kronos worker handle is ignored; update the shutdown sequence
to capture the result of tokio::time::timeout(Duration::from_secs(35),
handle).await and log whether the worker completed successfully, returned an
error, or timed out using your logger (e.g., info/warn/error). Specifically,
modify the block that checks kronos_worker_handle so that after calling
tokio::time::timeout you match on the Result to distinguish Ok(Ok(_)) (graceful
completion), Ok(Err(e)) (worker returned an error), and Err(_) (timeout) and
emit an appropriate log message mentioning kronos_cancel/kronos_worker_handle
and the timeout duration.
| let retries = req.max_retries.unwrap_or(3); | ||
| if retries > 3 { | ||
| return Err(superposition::AppError::BadArgument( | ||
| "max_retries must not exceed 3".to_string(), | ||
| )); | ||
| } |
There was a problem hiding this comment.
Validate that max_retries is non-negative.
The validation only checks the upper bound but allows negative values. Negative retry counts are semantically invalid and should be rejected before persisting to the database.
🛡️ Proposed fix to add lower bound validation
let retries = req.max_retries.unwrap_or(3);
- if retries > 3 {
+ if retries < 0 || retries > 3 {
return Err(superposition::AppError::BadArgument(
- "max_retries must not exceed 3".to_string(),
+ "max_retries must be between 0 and 3".to_string(),
));
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let retries = req.max_retries.unwrap_or(3); | |
| if retries > 3 { | |
| return Err(superposition::AppError::BadArgument( | |
| "max_retries must not exceed 3".to_string(), | |
| )); | |
| } | |
| let retries = req.max_retries.unwrap_or(3); | |
| if retries < 0 || retries > 3 { | |
| return Err(superposition::AppError::BadArgument( | |
| "max_retries must be between 0 and 3".to_string(), | |
| )); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/superposition/src/webhooks/handlers.rs` around lines 55 - 60, The
current validation only enforces an upper bound and allows negative values;
update the validation around req.max_retries (the local variable retries) to
reject negative numbers as well by returning
Err(superposition::AppError::BadArgument(...)) when retries < 0 with a clear
message like "max_retries must be non-negative", keeping the existing
upper-bound check (retries > 3) and error type AppError::BadArgument for
consistency.
| if let Some(retries) = req.max_retries { | ||
| if retries > 3 { | ||
| return Err(superposition::AppError::BadArgument( | ||
| "max_retries must not exceed 3".to_string(), | ||
| )); | ||
| } | ||
| } |
There was a problem hiding this comment.
Validate that max_retries is non-negative.
The validation only checks the upper bound but allows negative values. This should be consistent with the create handler validation.
🛡️ Proposed fix to add lower bound validation
if let Some(retries) = req.max_retries {
- if retries > 3 {
+ if retries < 0 || retries > 3 {
return Err(superposition::AppError::BadArgument(
- "max_retries must not exceed 3".to_string(),
+ "max_retries must be between 0 and 3".to_string(),
));
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if let Some(retries) = req.max_retries { | |
| if retries > 3 { | |
| return Err(superposition::AppError::BadArgument( | |
| "max_retries must not exceed 3".to_string(), | |
| )); | |
| } | |
| } | |
| if let Some(retries) = req.max_retries { | |
| if retries < 0 || retries > 3 { | |
| return Err(superposition::AppError::BadArgument( | |
| "max_retries must be between 0 and 3".to_string(), | |
| )); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/superposition/src/webhooks/handlers.rs` around lines 119 - 125, Add a
lower-bound check for req.max_retries so negative values are rejected like the
upper bound: when matching req.max_retries (same place that currently checks "if
let Some(retries) = req.max_retries"), add a condition that returns
superposition::AppError::BadArgument with a clear message if retries < 0 (in
addition to the existing retries > 3 check), keeping the error type/flow
consistent with the existing validation for max_retries.
There was a problem hiding this comment.
Pull request overview
This PR introduces a Kronos-backed, job-based webhook execution path and adds a configurable max_retries field to webhook create/update flows, aiming to improve reliability/scalability by moving webhook delivery out of the request path.
Changes:
- Added
max_retriesto webhook API types, backend handlers, and frontend form/API plumbing (with a max of 3). - Refactored webhook execution to submit a Kronos job (
submit_webhook_job) and added a new per-workspace dispatcher endpoint (/dispatch/webhook) to perform the actual outbound HTTP call. - Added Kronos client/worker wiring to app startup and workspace provisioning/migration to set up dispatcher secrets/endpoints.
Reviewed changes
Copilot reviewed 19 out of 20 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| workspace_template.sql | Minor formatting change (trailing newline). |
| crates/superposition/src/workspace/handlers.rs | Provisions Kronos workspace and registers dispatcher secret/endpoint during workspace create/migrate. |
| crates/superposition/src/webhooks/handlers.rs | Adds max_retries handling/validation to create/update webhook endpoints. |
| crates/superposition/src/main.rs | Starts server and drains embedded Kronos worker on shutdown; mounts /dispatch routes. |
| crates/superposition/src/dispatch/webhook.rs | New dispatcher endpoint that resolves templates and performs outbound webhook HTTP request. |
| crates/superposition/src/dispatch/mod.rs | Dispatch module wiring for Actix scopes. |
| crates/superposition/src/app_state.rs | Adds Kronos client initialization (service or embedded worker mode) to AppState. |
| crates/superposition/Cargo.toml | Adds dependencies needed for dispatcher/Kronos integration. |
| crates/superposition_types/src/api/webhook.rs | Extends webhook create/update API requests with max_retries. |
| crates/service_utils/src/service/types.rs | Extends AppState with kronos_client and a cancellation token. |
| crates/service_utils/src/lib.rs | Exposes new kronos_dispatch module. |
| crates/service_utils/src/kronos_dispatch.rs | New Kronos helper module: schema provider, endpoint spec, retry policy, template resolution, job submission. |
| crates/service_utils/src/helpers.rs | Replaces direct webhook HTTP execution with Kronos job submission. |
| crates/service_utils/Cargo.toml | Adds Kronos/async/sqlx dependencies. |
| crates/frontend/src/pages/webhook.rs | Passes max_retries into the webhook edit form. |
| crates/frontend/src/components/webhook_form.rs | Adds UI control + client-side validation for max_retries. |
| crates/frontend/src/api.rs | Sends max_retries in webhook create requests. |
| crates/context_aware_config/src/api/secrets/handlers.rs | Minor fix to secret update filter to use a reference. |
| Cargo.toml | Adds workspace deps for Kronos + supporting crates. |
| Cargo.lock | Lockfile update for new/updated dependencies. |
Comments suppressed due to low confidence (2)
crates/frontend/src/components/webhook_form.rs:322
on_changecastsi64toi32withas, which can wrap on large inputs and also accepts negatives. Since the field is constrained to 0–3, clamp/validate the parsed value before storing it (and ignore or reset invalid values) to avoid sending unexpected retry counts.
on_change=Callback::new(move |value: Value| {
if let Some(n) = value.as_i64() {
max_retries_ws.set(n as i32);
}
crates/superposition/src/webhooks/handlers.rs:123
- Update validation for
max_retriesonly checks> 3. Negative values will slip through and be stored. Please enforce the documented range (0–3) here as well.
if let Some(retries) = req.max_retries {
if retries > 3 {
return Err(superposition::AppError::BadArgument(
"max_retries must not exceed 3".to_string(),
));
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Resolve `{{SECRETS.name}}` templates in JSON header values using a plaintext secrets map. | ||
| pub fn resolve_secret_templates( | ||
| headers: &serde_json::Value, | ||
| secrets: &std::collections::HashMap<String, String>, | ||
| ) -> serde_json::Value { | ||
| match headers { | ||
| serde_json::Value::Object(map) => { | ||
| let resolved = map | ||
| .iter() | ||
| .map(|(k, v)| { | ||
| let v = match v.as_str() { | ||
| Some(s) => { | ||
| let mut result = s.to_string(); | ||
| for (name, value) in secrets { | ||
| result = result | ||
| .replace(&format!("{{{{SECRETS.{name}}}}}"), value); | ||
| } | ||
| serde_json::Value::String(result) | ||
| } |
| let mut builder = state | ||
| .http_client | ||
| .request(method, webhook.url.as_str()) | ||
| .timeout(std::time::Duration::from_secs(10)) | ||
| .json(&data); |
| if max_retries > 3 { | ||
| return Err("Max retries must not exceed 3".to_string()); |
| &HeadersEnum::ConfigVersion.to_string(), | ||
| &config_version_opt.clone().unwrap_or_default(), | ||
| // Idempotency key: webhook_name + event type + millisecond timestamp. | ||
| // Ensures each trigger attempt gets a unique job while allowing dedup on retries. |
| .await | ||
| .expect("Failed to create sqlx pool for Kronos"); | ||
| let kronos_encryption_key = | ||
| get_from_env_or_default("KRONOS_ENCRYPTION_KEY", "0".repeat(64)); |
| "url": dispatcher_url, | ||
| "method": "POST", | ||
| "headers": { | ||
| "Authorization": format!("{{{{secret.{DISPATCHER_SECRET_NAME}}}}}") | ||
| }, |
| let secrets = fetch_decrypted_secrets(&workspace_context, &mut conn, &state)?; | ||
|
|
||
| let resolved_headers = resolve_secret_templates( | ||
| &serde_json::Value::Object((*webhook.custom_headers).clone()), | ||
| &secrets, |
| if retries > 3 { | ||
| return Err(superposition::AppError::BadArgument( | ||
| "max_retries must not exceed 3".to_string(), |
| match submit_webhook_job( | ||
| state.kronos_client.as_ref(), | ||
| &workspace_context.schema_name, | ||
| &webhook.name, | ||
| &webhook_response, | ||
| &idempotency_key, | ||
| webhook.max_retries, |
|
|
||
| let mut builder = state | ||
| .http_client | ||
| .request(method, webhook.url.as_str()) | ||
| .timeout(std::time::Duration::from_secs(10)) | ||
| .json(&data); | ||
|
|
5c96d53 to
a340ec8
Compare
This pull request introduces support for configurable webhook retry attempts and significantly refactors webhook execution to use a job-based system via Kronos, improving reliability and scalability. The frontend is updated to allow users to set the maximum number of retries for webhooks (with validation), and the backend now enqueues webhook executions as jobs instead of making direct HTTP calls. Additionally, several dependencies are updated or added to support the new job-based architecture, and legacy template substitution logic is removed from the backend.
Webhook Retry & Execution Refactor
max_retriesfield to webhook creation and update flows, including frontend form controls, input validation (max 3), and propagation through API layers and state management. [1] [2] [3] [4] [5] [6] [7] [8] [9] [10]submit_webhook_job, replacing the previous direct HTTP request logic. This change includes generating an idempotency key for deduplication and logging execution IDs. [1] [2] [3]Dependency and Codebase Updates
kronos-worker,kronos-common,async-trait,sqlx,tokio-util) to support the job-based webhook execution and database interactions; updatedCargo.tomlfiles accordingly. [1] [2]Minor Fixes
secret_name.Summary by CodeRabbit
New Features
Improvements