Conversation
add `protected` field to user
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughSystem-wide multi-tenant support added: an optional Changes
Sequence Diagram(s)sequenceDiagram
autonumber
rect rgba(200,220,255,0.5)
participant Client
participant HTTP as "HTTP Handler"
participant TEN as "TENANT_METADATA"
participant RBAC
participant Parseable
participant Metastore
participant Storage
end
Client->>HTTP: POST /api/streams/{name} (X-TENANT-ID)
HTTP->>TEN: get_tenant_id_from_request(req)
TEN-->>HTTP: tenant_id
HTTP->>RBAC: authorize(session, action, tenant_id)
RBAC-->>HTTP: Ok
HTTP->>Parseable: create_stream_and_schema_from_storage(name, tenant_id)
Parseable->>Metastore: put_stream_json(..., tenant_id)
Metastore->>Storage: put_object(path_with_tenant_prefix, bytes, tenant_id)
Storage-->>Metastore: Ok
Metastore-->>Parseable: Ok
Parseable-->>HTTP: StreamRef
HTTP-->>Client: 201 Created
sequenceDiagram
autonumber
rect rgba(220,255,220,0.5)
participant Scheduler
participant AlertMgr as "AlertManager"
participant Metastore
participant Auth as "TENANT_METADATA/Auth"
participant QueryExec
end
Scheduler->>AlertMgr: evaluate_alert(alert_id, tenant_id)
AlertMgr->>Metastore: get_alert(alert_id, tenant_id)
Metastore-->>AlertMgr: AlertConfig
AlertMgr->>Auth: get_auth_for_tenant(tenant_id)
Auth-->>AlertMgr: auth_headers
AlertMgr->>QueryExec: execute(query, auth_headers, tenant_id)
QueryExec-->>AlertMgr: results
AlertMgr->>Metastore: put_alert_state(..., tenant_id)
Metastore-->>AlertMgr: Ok
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ❌ 4❌ Failed checks (3 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 19
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (18)
src/migration/mod.rs (2)
480-493:⚠️ Potential issue | 🟡 Minor
get_staging_metadatalacks tenant awareness.
put_staging_metadatawrites to tenant-specific paths (staging_dir/{tenant}/.parseable.json), butget_staging_metadataonly reads from the default path. This asymmetry means tenant-specific staging metadata won't be read.If this is intentional for one-time migration of legacy data to tenant paths, consider adding a comment. Otherwise,
get_staging_metadatashould also accepttenant_id.
134-166:⚠️ Potential issue | 🟠 MajorAdd v6 handler to
migrate_stagingto align with remote metadata migration.The
v5_v6migration produces v6 format, not v7. Remote metadata migration has an explicit v6 handler callingv6_v7, butmigrate_staginglacks it. If staging metadata reaches v6 version, it won't be migrated to v7. Additionally, the v5 case should chainv5_v6withv6_v7like remote metadata does, rather than stopping at v6.Add a v6 case handler:
Some("v6") => { let metadata = metadata_migration::v6_v7(staging_metadata); put_staging_metadata(config, &metadata, tenant_id)?; }And update the v5 case to call v6_v7 after v5_v6:
Some("v5") => { let mut metadata = metadata_migration::v5_v6(staging_metadata); metadata = metadata_migration::v6_v7(metadata); put_staging_metadata(config, &metadata, tenant_id)?; }src/handlers/http/cluster/mod.rs (2)
1692-1779:⚠️ Potential issue | 🟠 MajorTenant-agnostic querier state can mix tenants.
get_available_querieris tenant-scoped, butQUERIER_MAPandLAST_USED_QUERIERare global. That allows querier availability and round‑robin state to bleed across tenants, potentially selecting a querier that isn’t registered for the current tenant. Consider keying the map by tenant and storing a per‑tenantlast_usedvalue.♻️ Suggested direction (per‑tenant state)
- static ref QUERIER_MAP: Arc<RwLock<HashMap<String, QuerierStatus>>> = - Arc::new(RwLock::new(HashMap::new())); - static ref LAST_USED_QUERIER: Arc<RwLock<Option<String>>> = Arc::new(RwLock::new(None)); + static ref QUERIER_MAP: Arc<RwLock<HashMap<String, HashMap<String, QuerierStatus>>>> = + Arc::new(RwLock::new(HashMap::new())); + static ref LAST_USED_QUERIER: Arc<RwLock<HashMap<String, Option<String>>>> = + Arc::new(RwLock::new(HashMap::new()));
1880-1926:⚠️ Potential issue | 🔴 CriticalForward tenant context to querier in
send_query_request.The
tenant_idparameter is unused. Whenauth_tokenisNone, the request only carries the node token with no tenant identifier. Even whenauth_tokenis provided, the "tenant" header may not be included. In multi-tenant mode, the querier relies on the "tenant" header for request scoping; without it, the request falls back to the default tenant, risking cross-tenant data exposure. Add the tenant header to the outgoing request:if let Some(tenant) = tenant_id { if let Ok(val) = reqwest::header::HeaderValue::from_str(tenant) { map.insert( reqwest::header::HeaderName::from_static("tenant"), val, ); } }src/storage/retention.rs (1)
97-119:⚠️ Potential issue | 🟡 MinorAvoid panic in
Task::newwhen days=0.
NonZeroU32::new(days).unwrap()will panic on 0; this is a public constructor and should return a validation error instead.💡 Suggested fix
- pub fn new(description: String, days: u32) -> Self { - let days = NonZeroU32::new(days).unwrap(); - Self { - description, - days, - action: Action::Delete, - } - } + pub fn new(description: String, days: u32) -> Result<Self, String> { + let days = NonZeroU32::new(days).ok_or("days must be > 0")?; + Ok(Self { + description, + days, + action: Action::Delete, + }) + }src/prism/home/mod.rs (1)
156-160:⚠️ Potential issue | 🟠 MajorBug confirmed:
users().len()counts tenants, not users in the current tenantThe
Usersstruct is aHashMap<String, HashMap<String, User>>where the outer map is keyed by tenant. Calling.len()on it returns the number of tenants, makinguser_addedincorrect for multi-tenant deployments.Update to count users for the current tenant:
Suggested fix
- let user_count = users().len(); + let user_count = users() + .get(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)) + .map(|m| m.len()) + .unwrap_or(0); let user_added = user_count > 1; // more than just the default admin userNote:
tenant_idis available in scope (line 106), and the import forDEFAULT_TENANTis already present.src/connectors/kafka/processor.rs (1)
49-103:⚠️ Potential issue | 🔴 CriticalAdd missing
dataset_tagparameter and fix critical tenant-mixing bug in chunk processing
Build breaker:
create_stream_if_not_existsrequires 7 arguments; the call is missing the finaldataset_tag: Option<DatasetTag>parameter. PassNoneexplicitly.Critical data isolation bug:
tenant_idis extracted only fromrecords.first(), but records in a single chunk can have mixed tenant IDs (from Kafka message headers). All records in the chunk are then processed under the first record's tenant, causing cross-tenant data mixing. Chunks must be split by tenant before processing, or tenant consistency must be enforced per-chunk with an assertion.
TelemetryType::default()correctly resolves toLogs, matching theinto_eventcall—no inconsistency here.Compile fix for missing parameter
PARSEABLE .create_stream_if_not_exists( stream_name, StreamType::UserDefined, None, vec![log_source_entry], TelemetryType::default(), tenant_id, + None, ) .await?;src/handlers/http/mod.rs (1)
104-141:⚠️ Potential issue | 🟠 MajorAdd tenant context header to ingestor query request to maintain multi-tenant isolation
send_query_request_to_ingestorcorrectly selects ingestors usingget_node_info(NodeType::Ingestor, tenant_id), but the outgoing HTTP request omits the tenant context. The ingestor's query handler derives tenant viaget_tenant_id_from_request(&req), which looks for a "tenant" header. Without this header, the ingestor defaults to the wrong tenant, breaking multi-tenant data isolation.Add tenant to the request headers:
.header("tenant", tenant_id.as_deref().unwrap_or(DEFAULT_TENANT))src/handlers/http/alerts.rs (1)
247-329:⚠️ Potential issue | 🟠 MajorTenant context must be bound to the authenticated session, not derived from untrusted header.
This endpoint derives
tenant_idfrom thetenantheader viaget_tenant_id_from_request(&req), but the subsequent authorization validation viaalert.validate(&session_key)uses a different tenant context extracted from the session key. No validation exists to ensure these match.Attack scenario: A caller with a valid session for tenant A can specify tenant B in the
tenantheader. The validation will succeed (checking access to tables in tenant A's context viauser_auth_for_query), but the alert is stored and executed in tenant B's isolated storage, allowing unauthorized resource creation.This pattern affects all alert endpoints and is systemic across the codebase. Remediation options:
- Derive tenant exclusively from session key (preferred)
- Validate
tenantheader == session's tenant and reject if mismatch- Remove header-based tenant specification and enforce proxy-only tenant injection with header stripping
src/rbac/role.rs (1)
91-172:⚠️ Potential issue | 🔴 CriticalAdd
Action::SuperAdmincheck tois_admin()function.The
is_admin()function (src/utils/mod.rs:221) only recognizesPermission::Resource(Action::All, Some(ParseableResourceType::All))but does not check forAction::SuperAdmin, which is recognized elsewhere in permission checks (src/rbac/map.rs:343, 373, 379). This creates an inconsistency where users with SuperAdmin permissions may be denied access in code paths relying onis_admin()for authorization (e.g., handlers/http/users/dashboards.rs, filters.rs). SuperAdmin should be included in theis_admin()check.src/handlers/http/modal/query/querier_logstream.rs (1)
93-111:⚠️ Potential issue | 🔴 CriticalPass tenant header to ingestors in delete requests.
The delete operation does not propagate the tenant header to ingestors, unlike the PUT operation. When
send_stream_delete_requestsends the DELETE request, it only includesCONTENT_TYPEandAUTHORIZATIONheaders. The ingestor's delete handler expects to extract the tenant from the request header viaget_tenant_id_from_request(), which will fail if the tenant header is missing. This causes the ingestor to operate without proper tenant context.Update
send_stream_delete_requestto accept and forward the tenant header, matching the pattern used insync_streams_with_ingestorsfor PUT requests.src/correlation.rs (1)
204-212:⚠️ Potential issue | 🟠 MajorDelete should remove from the tenant bucket, not the top-level map
self.write().await.remove(&correlation.id)removes a tenant entry keyed by correlation ID (or no-ops), leaving stale data in memory.🛠️ Suggested fix
- // Delete from memory - self.write().await.remove(&correlation.id); + // Delete from memory + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + if let Some(corrs) = self.write().await.get_mut(tenant) { + corrs.remove(&correlation.id); + }src/handlers/http/oidc.rs (1)
260-298:⚠️ Potential issue | 🟠 MajorOIDC new-user creation ignores tenant context
put_user(..., None)forces new OIDC users into the default tenant even when login is tenant-scoped. Pass the request tenant (clone before move).🛠️ Suggested fix
- let existing_user = find_existing_user(&user_info, tenant_id); + let existing_user = find_existing_user(&user_info, tenant_id.clone()); @@ - (None, roles) => put_user(&user_id, roles, user_info, bearer, None).await?, + (None, roles) => put_user(&user_id, roles, user_info, bearer, tenant_id.clone()).await?,src/parseable/mod.rs (1)
515-524:⚠️ Potential issue | 🔴 CriticalAdd missing
dataset_tagargument tocreate_stream_if_not_existscall in kafka/processor.rs.The call at
src/connectors/kafka/processor.rs:60is missing the newdataset_tagparameter. All other call sites have been updated correctly (passingNonewhere appropriate). This build-breaking issue needs to be fixed immediately.src/storage/s3.rs (1)
919-962:⚠️ Potential issue | 🟡 MinorUse
DEFAULT_TENANTforlist_old_streamsHEAD metrics.
The HEAD calls insidelist_old_streamsare labeled with""while other metrics in the same method useDEFAULT_TENANT, splitting default-tenant telemetry.🛠️ Suggested fix
- increment_object_store_calls_by_date( - "HEAD", - &Utc::now().date_naive().to_string(), - "", - ); + increment_object_store_calls_by_date( + "HEAD", + &Utc::now().date_naive().to_string(), + tenant_str, + );Based on learnings default-tenant paths are intentionally built with an empty string segment (""), so concerns are about metrics labeling rather than path resolution.
src/hottier.rs (1)
186-205:⚠️ Potential issue | 🟠 MajorTenant-aware hot-tier paths still use tenant-agnostic directories.
Line 203 uses
get_oldest_date_time_entry(stream)(tenant-agnostic), Line 216 deleteshot_tier_path.join(stream)(tenant-agnostic), and Line 454–463 writes manifests usingget_stream_path_for_date(stream, …)(tenant-agnostic). For non-default tenants, parquet files are stored underhot_tier_path/<tenant>/…(fromfile_path), while metadata/cleanup operates underhot_tier_path/<stream>/…, causing cross-tenant mixing and orphaned data.🔧 Suggested direction (tenant-aware stream root)
+ fn tenant_stream_root(&self, stream: &str, tenant_id: &Option<String>) -> PathBuf { + if let Some(tenant) = tenant_id.as_ref() { + self.hot_tier_path.join(tenant).join(stream) + } else { + self.hot_tier_path.join(stream) + } + } ... - let path = self.hot_tier_path.join(stream); + let path = self.tenant_stream_root(stream, tenant_id);Then thread
tenant_idintoget_stream_path_for_date,fetch_hot_tier_dates, andget_oldest_date_time_entry, and use the tenant-aware root in all call sites.Also applies to: 208-218, 454-465
src/metastore/metastores/object_store_metastore.rs (2)
317-412:⚠️ Potential issue | 🟠 MajorAlert state paths are not tenant-isolated (and default-tenant path mismatch).
Line 321 uses
DEFAULT_TENANTin the path forNone, whilealert_state_json_path(Line 349/373) writes to.alerts/...without tenant. This means the default tenant reads fromdefault/.alertsbut writes to.alerts, and non-default tenants collide on the same file names.🔧 Suggested fix (tenant-aware alert-state paths)
- let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - let base_path = RelativePathBuf::from_iter([tenant, ALERTS_ROOT_DIRECTORY]); + let tenant = tenant_id.as_deref().unwrap_or(""); + let base_path = RelativePathBuf::from_iter([tenant, ALERTS_ROOT_DIRECTORY]); ... - let path = alert_state_json_path(*alert_id); + let path = if let Some(t) = tenant_id.as_ref() { + RelativePathBuf::from_iter([t, ALERTS_ROOT_DIRECTORY, &format!("alert_state_{alert_id}.json")]) + } else { + alert_state_json_path(*alert_id) + };Apply the same tenant-aware path logic in
put_alert_stateanddelete_alert_state.Based on learnings: In src/metastore/metastores/object_store_metastore.rs, default-tenant paths are intentionally built with an empty string segment (""), so no extra tenant directory is created. Passing Some("") to storage is acceptable for path creation; concerns, if any, are about metrics labeling rather than path resolution.
913-958:⚠️ Potential issue | 🟠 MajorTenant-prefixed root is ignored when listing per-date manifests.
Line 936 builds
date_pathwithout the tenant prefix, so manifest listing for non-default tenants will read from the wrong location. Reuse the tenant-prefixedrootwhen constructingdate_path.🔧 Suggested fix
- let date_path = object_store::path::Path::from(format!("{}/{}", stream_name, &date)); + let date_path = object_store::path::Path::from(format!("{root}/{date}"));
🤖 Fix all issues with AI agents
In `@src/alerts/mod.rs`:
- Around line 1045-1141: The code inserts alerts under the raw tenant_id string
which is empty for the default tenant, but lookups use DEFAULT_TENANT, so
default-tenant alerts become unreachable; when inserting into the map (sites
where map.entry(tenant_id.clone()).or_default().insert(...)) compute a
normalized map key: if tenant_id.is_empty() use DEFAULT_TENANT (string) else use
tenant_id.clone(), and use that normalized key for map.entry(...) while keeping
alert.tenant_id.clone_from(tenant) unchanged so the AlertConfig still reflects
None for the default tenant.
In `@src/correlation.rs`:
- Around line 134-144: The create flow writes the correlation to metastore but
only inserts into the in-memory map if a tenant bucket already exists, so the
first correlation for a tenant is never cached; after calling
PARSEABLE.metastore.put_correlation(...) and computing tenant via
get_tenant_id_from_key(...) / DEFAULT_TENANT, ensure you create or get the
tenant bucket before inserting by using the map entry semantics on self (the
write lock) instead of get_mut(tenant) so you always create a new HashMap for a
missing tenant and then insert correlation.id / correlation.clone() into it.
In `@src/event/mod.rs`:
- Around line 110-111: LiveTail registry currently keys only by stream_name
causing cross-tenant leakage; update the LiveTailRegistry HashMap to include
tenant_id in its key (e.g., use a composite key struct or nested HashMap) and
change the LiveTail API to accept tenant_id for scoping: modify
LIVETAIL.process(&self.stream_name, &self.rb) to LIVETAIL.process(&tenant_id,
&self.stream_name, &self.rb) (and any LiveTailRegistry lookup to use tenant_id),
and update all call sites including new_pipe in handlers/livetail.rs
(new_pipe(...) should pass tenant_id into subscription creation) so tenant_id is
used both when creating/looking up registry entries and when broadcasting
events. Ensure method signatures on LiveTail, LiveTailRegistry, and any helper
functions are updated consistently and that keys are constructed or nested
lookups use tenant_id first then stream_name.
In `@src/handlers/http/correlation.rs`:
- Around line 45-52: The tenant is being taken directly from the request header
via get_tenant_id_from_request(&req) which enables header spoofing; instead
derive the tenant from the authenticated session obtained by
extract_session_key_from_req(&req) (or the session store/context) and use that
tenant when calling CORRELATIONS.get_correlation(&correlation_id, &tenant_id).
If a tenant header is present, compare it to the session-derived tenant and
return an authorization/error when they mismatch (unless an explicit
override/authorization check permits the header), i.e., replace the direct
header usage in the correlation lookup with the session-derived tenant and add a
mismatch check before calling CORRELATIONS.get_correlation.
In `@src/handlers/http/middleware.rs`:
- Around line 171-243: The tenant header handling allows spoofing — only
BasicAuth ingestion is validated and non-ingest flows may leave caller-supplied
tenant headers; fix by deriving the session tenant once via
get_user_and_tenant_from_request/get_user_from_request and/or
extract_session_key (inspect SessionKey variants), then for all auth types
compare any incoming "tenant" header to the derived tenant and either clear the
header (remove from req.headers_mut()) or set header_error to the same
PostError::Header(ParseHeaderError::InvalidTenantId) when the derived tenant is
missing or mismatches; update the logic around user_and_tenant_id, the Ingest
block that calls Users.validate_basic_user_tenant_id, and any branches that
currently insert the tenant into req.headers_mut() so all auth types enforce the
same validation/clearing behavior.
In `@src/handlers/http/modal/ingest/ingestor_rbac.rs`:
- Around line 49-57: The tenant-check logic in get_tenant_id_from_request/... is
inverted: currently it errors when req_tenant != DEFAULT_TENANT && req_tenant ==
user.tenant; change it to error when a non-default requested tenant does not
match the requesting user's tenant. Concretely, within the block using
DEFAULT_TENANT and user.tenant (the code that returns RBACError::Anyhow), flip
the second comparison so the condition becomes req_tenant.ne(DEFAULT_TENANT) &&
!req_tenant.eq(user.tenant.as_deref().unwrap_or(DEFAULT_TENANT)) (or
equivalent), ensuring cross-tenant creation attempts are rejected. Ensure you
still use RBACError::Anyhow(anyhow::Error::msg(...)) for the error return.
In `@src/handlers/http/modal/ingest/ingestor_role.rs`:
- Around line 45-51: The tenant-check condition is inverted: in the block around
get_tenant_id_from_request, req_tenant, DEFAULT_TENANT and sync_req.tenant_id
you currently reject when the request tenant equals the payload tenant and allow
cross-tenant operations; change it to reject when the request tenant is not the
default AND the request tenant does not match the payload tenant. Concretely, in
the if that uses req_tenant.ne(DEFAULT_TENANT) &&
(req_tenant_id.eq(&sync_req.tenant_id)), replace the second clause with a
comparison that verifies the request tenant and sync_req.tenant_id differ (e.g.
compare req_tenant != sync_req.tenant_id or req_tenant_id.as_deref() !=
Some(&sync_req.tenant_id)), keeping the same error return
(RoleError::Anyhow(...)) and using get_tenant_id_from_request, req_tenant_id,
req_tenant, DEFAULT_TENANT and sync_req.tenant_id to locate the change.
In `@src/handlers/http/role.rs`:
- Around line 154-199: get_default currently returns Null for non-default
tenants when DEFAULT_ROLE lacks an entry; fix by adding a fallback to read
persisted metadata: in get_default (and the similar oidc usage) after acquiring
tenant_id and checking DEFAULT_ROLE.read().get(tenant_id), if the map has no
entry for that tenant call get_metadata(&tenant_id).await?, extract
metadata.default_role, backfill DEFAULT_ROLE via
DEFAULT_ROLE.write().insert(tenant_id.to_owned(),
metadata.default_role.clone()), and then return that role (or Null) — this
mirrors put_default behavior and ensures persisted per-tenant defaults are
returned and the in-memory map is hydrated.
- Around line 23-37: The tenant header parsing in get_tenant_id_from_request
currently calls header_value.to_str().unwrap(), which will panic on non-UTF8
header values; change get_tenant_id_from_request to handle invalid UTF-8
gracefully by using to_str().ok() (or mapping the Result) and fall back to a
safe value or return an appropriate error instead of unwrapping; ensure you
reference DEFAULT_TENANT/PARSEABLE behavior so the function either returns
DEFAULT_TENANT when the header is missing/invalid or returns a controlled error
Result, and update any callers expecting a string to handle the non-UTF8 case
accordingly.
In `@src/handlers/livetail.rs`:
- Around line 119-122: The livetail handler currently calls
PARSEABLE.get_stream(stream, &None) which ignores tenant isolation; extract the
tenant id from the authorized SessionKey by calling get_tenant_id_from_key(&key)
(same pattern used in logstream/health_check/query) and pass that tenant context
to get_stream (e.g., use Some(tenant_id) instead of &None) so the lookup
respects tenant boundaries when obtaining the schema via PARSEABLE.get_stream.
In `@src/hottier.rs`:
- Around line 92-115: The aggregation in get_hot_tiers_size incorrectly excludes
streams when either the stream name equals current_stream or the tenant_id
differs from current_tenant_id; change the exclusion logic so only the current
stream in the same tenant is skipped: call check_stream_hot_tier_exists(&stream,
&tenant_id) as before, but replace the condition "stream != current_stream &&
tenant_id != *current_tenant_id" with a check that continues only if NOT (stream
== current_stream && tenant_id == Some(current_tenant_id.clone())), i.e. skip
only when both the stream name and tenant match the provided
current_stream/current_tenant_id; then await get_hot_tier(&stream, &tenant_id)
and add sizes as before.
In `@src/metastore/metastores/object_store_metastore.rs`:
- Around line 515-541: The get_dashboards method is overwriting dashboards per
user because dashboards.insert(tenant.to_owned(), dashboard_bytes) is called
inside the user loop; change this to aggregate dashboard_bytes into the existing
entry for that tenant (e.g., lookup dashboards.get_mut(&tenant) and
append/extend the Vec<Bytes> or create a new Vec when missing) so all users'
dashboards for a tenant are combined; keep the DEFAULT_TENANT normalization
(tenant.clone_from(&DEFAULT_TENANT.to_string())) before inserting/merging so the
map key is consistent.
In `@src/rbac/mod.rs`:
- Around line 289-326: The BasicAuth tenant selection is ambiguous across
tenants; update get_user_tenant_from_basic and mutate_request_with_basic_user to
avoid arbitrarily picking a tenant: in get_user_tenant_from_basic (and any
callers like get_user_and_tenant_from_request) collect all tenants that match
username+password and return Some(tenant) only if exactly one tenant matched
(otherwise return None); similarly, change mutate_request_with_basic_user to
search for all matching (tenant, user) entries rather than using
find_any()/first-match and only insert the tenant header when a single
unambiguous match exists (if multiple matches, do not set the header and let
upstream validation fail or require explicit tenant header).
In `@src/rbac/user.rs`:
- Around line 349-363: The validate method currently checks
read_user_groups().contains_key(&self.name) which tests tenant IDs rather than
group names; change the existence check to scope by tenant using the computed
tenant variable (let tenant =
tenant_id.as_deref().unwrap_or(DEFAULT_TENANT))—e.g., fetch
read_user_groups().get(tenant) and then check .contains_key(&self.name) on that
tenant's map (or .map_or(false, |m| m.contains_key(&self.name))) and return
RBACError::UserGroupExists(self.name.clone()) only if the group exists within
that tenant; keep the rest of validate (role checks, non_existent_roles)
unchanged.
In `@src/storage/localfs.rs`:
- Around line 442-460: delete_stream and other LocalFS methods build paths using
DEFAULT_TENANT so non-default tenants end up using the default path; change path
construction to include the tenant path segment (empty string for default
tenant) instead of DEFAULT_TENANT: compute let tenant_segment =
tenant_id.as_deref().unwrap_or("") and use
self.root.join(tenant_segment).join(stream_name) (or join the tenant_segment
before listing dirs in list_dates, list_hours, list_minutes), and keep the
existing metric calls but ensure they use the appropriate tenant identifier as
before; apply the same pattern to the other affected functions (list_dates,
list_hours, list_minutes) and return results unchanged.
In `@src/tenants/mod.rs`:
- Around line 70-112: The runtime suspension set (tenant.suspended_services) is
being mutated in suspend_service and resume_service but the persisted metadata
returned by get_tenants (tenant.meta.suspended_services) is left stale; update
the metadata whenever you change runtime state and/or derive the metadata on
read. Specifically, in suspend_service and resume_service (and any code that
mutates suspended_services) also update tenant.value().meta.suspended_services
to mirror tenant.suspended_services (or rebuild meta from suspended_services
before returning in get_tenants); ensure get_tenants reads the up-to-date
suspended_services into the returned StorageMetadata so listings reflect current
suspensions.
In `@src/users/filters.rs`:
- Around line 128-133: Filters::update currently no-ops when the tenant bucket
doesn't exist (using s.get_mut(...)), which drops newly created filters; change
the logic in Filters::update to use the map's entry API
(entry(tenant_key).or_default()) to obtain or create the Vec<Filter> for the
tenant (use tenant_id.as_deref().unwrap_or(DEFAULT_TENANT) as the key), then
perform the retain by filter.filter_id and push the cloned Filter so new-tenant
entries are created in-memory immediately.
In `@src/utils/mod.rs`:
- Around line 69-79: The function mutate_request_with_tenant currently calls
basic.password().unwrap(), which can panic if the password is omitted; change
this to safely handle missing passwords by using a safe default (e.g., empty
string) or passing the Option to Users.mutate_request_with_basic_user instead of
unwrapping. Locate mutate_request_with_tenant and the BasicAuth::extract result,
and replace the unwrap with a non-panicking branch (e.g.,
basic.password().map_or("", |p| p) or adjust
Users.mutate_request_with_basic_user to accept Option<&str>) so no panic occurs
when password is absent.
- Around line 118-121: The get_tenant_id_from_request function currently calls
to_str().unwrap() which can panic on invalid header bytes; change it to use
to_str().ok() and propagate the Option safely (e.g., map(|v|
v.to_str().ok().map(|s| s.to_owned())).flatten or and_then) so invalid header
values simply return None instead of panicking; update the function body for
safe header parsing consistent with the middleware's pattern.
🟡 Minor comments (3)
src/handlers/http/llm.rs-19-32 (1)
19-32:⚠️ Potential issue | 🟡 MinorAvoid panic on malformed
tenantheader.
get_tenant_id_from_requestusesto_str().unwrap(), which can panic on non‑UTF8 header values. Consider safe parsing to keep the handler from crashing on bad input.🛠️ Proposed fix (safe header parsing)
-use crate::{ - parseable::{PARSEABLE, StreamNotFound}, - utils::get_tenant_id_from_request, -}; +use crate::parseable::{PARSEABLE, StreamNotFound}; @@ - let tenant_id = get_tenant_id_from_request(&req); + let tenant_id = req + .headers() + .get("tenant") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_owned());Also applies to: 92-104
src/cli.rs-523-529 (1)
523-529:⚠️ Potential issue | 🟡 MinorAdd tenant ID validation to prevent directory traversal.
The
tenant_idfrom the HTTP header is used directly in filesystem paths without validation. A malicious tenant value like"../../../etc"could escape the staging directory. Validate thattenant_idcontains only safe characters (e.g., alphanumeric, hyphens, underscores) or use path normalization to prevent traversal before path operations.src/parseable/streams.rs-1791-1795 (1)
1791-1795:⚠️ Potential issue | 🟡 MinorGuard against unwrap on an empty tenant map in the test.
The map is empty at that point, sounwrap()will panic.🛠️ Suggested fix
- assert!(!guard.get(DEFAULT_TENANT).unwrap().contains_key(stream_name)); + assert!(guard.get(DEFAULT_TENANT).is_none());
🧹 Nitpick comments (10)
src/migration/mod.rs (1)
512-520: UsePARSEABLE_METADATA_FILE_NAMEconstant instead of hardcoded string.
get_staging_metadatauses thePARSEABLE_METADATA_FILE_NAMEconstant, but here.parseable.jsonis hardcoded. Use the constant for consistency and maintainability.♻️ Suggested fix
pub fn put_staging_metadata( config: &Parseable, metadata: &serde_json::Value, tenant_id: &Option<String>, ) -> anyhow::Result<()> { let path = if let Some(tenant) = tenant_id.as_ref() { config .options .staging_dir() .join(tenant) - .join(".parseable.json") + .join(PARSEABLE_METADATA_FILE_NAME) } else { - config.options.staging_dir().join(".parseable.json") + config.options.staging_dir().join(PARSEABLE_METADATA_FILE_NAME) };src/query/stream_schema_provider.rs (2)
282-293: Commented-out tenant URL logic appears intentionally simplified.The staging execution plan retrieves files from
staging.parquet_files()which returns absolute paths from the tenant-scopeddata_path. SincePartitionedFileuses absolute paths, thefile:///base URL should resolve correctly. Consider removing the commented code if this approach is confirmed as the final design.
193-193: Minor: Typo in function nameget_hottier_exectuion_plan.The function name has a typo: "exectuion" should be "execution". Consider renaming to
get_hottier_execution_planfor clarity.✏️ Suggested rename
- async fn get_hottier_exectuion_plan( + async fn get_hottier_execution_plan(Note: This would require updating the call site at line 619 as well.
src/handlers/http/correlation.rs (1)
73-75: Avoid redundant String allocations when hashing user IDs.
get_hashaccepts&str, sos.to_string()is an unnecessary clone in all three call sites.♻️ Suggested change
- let user_id = get_user_and_tenant_from_request(&req) - .map(|(s, _)| get_hash(&s.to_string())) + let user_id = get_user_and_tenant_from_request(&req) + .map(|(s, _)| get_hash(&s)) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;- correlation.user_id = get_user_and_tenant_from_request(&req) - .map(|(s, _)| get_hash(&s.to_string())) + correlation.user_id = get_user_and_tenant_from_request(&req) + .map(|(s, _)| get_hash(&s)) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;- let (user_id, tenant_id) = get_user_and_tenant_from_request(&req) - .map(|(s, t)| (get_hash(&s.to_string()), t)) + let (user_id, tenant_id) = get_user_and_tenant_from_request(&req) + .map(|(s, t)| (get_hash(&s), t)) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;Also applies to: 89-91, 106-112
src/handlers/http/query.rs (1)
118-145: Reuse tenant_id to avoid duplicate header parsing.
get_tenant_id_from_requestis called twice; storing it once reduces work and ensures consistent usage.♻️ Suggested refactor
- create_streams_for_distributed(tables.clone(), &get_tenant_id_from_request(&req)).await?; - - let tenant_id = get_tenant_id_from_request(&req); + let tenant_id = get_tenant_id_from_request(&req); + create_streams_for_distributed(tables.clone(), &tenant_id).await?;src/stats.rs (1)
235-273: Consider unifying the tenant_id parameter types for consistency.The
event_labelsandstorage_size_labelsfunctions accept&Option<String>while their date variants (event_labels_date,storage_size_labels_date) accept&str. This inconsistency may cause confusion for callers.♻️ Suggested unification
Consider either:
- Make all label functions accept
&Option<String>and handle the default internally, or- Make all label functions accept
&strand have callers resolve the default before calling.Option 1 (preferred for internal consistency):
pub fn event_labels_date<'a>( stream_name: &'a str, format: &'static str, date: &'a str, - tenant_id: &'a str, + tenant_id: &'a Option<String>, ) -> [&'a str; 4] { - [stream_name, format, date, tenant_id] + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + [stream_name, format, date, tenant] }src/handlers/http/role.rs (2)
41-86: Avoidtenant_idshadowing input(reduces tenant-scoping mistakes)In
put,tenant_idis first anOption<String>and later shadowed as&str. This is easy to misuse when threading tenant context through storage + in-memory maps + session invalidation.Suggested refactor
pub async fn put( req: HttpRequest, name: web::Path<String>, Json(privileges): Json<Vec<DefaultPrivilege>>, ) -> Result<impl Responder, RoleError> { let name = name.into_inner(); let tenant_id = get_tenant_id_from_request(&req); + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); // validate the role name validator::user_role_name(&name).map_err(RoleError::ValidationError)?; let mut metadata = get_metadata(&tenant_id).await?; metadata.roles.insert(name.clone(), privileges.clone()); put_metadata(&metadata, &tenant_id).await?; - let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); mut_roles() - .entry(tenant_id.to_owned()) + .entry(tenant.to_owned()) .or_default() .insert(name.clone(), privileges.clone()); let mut session_refresh_users: HashSet<String> = HashSet::new(); - if let Some(groups) = read_user_groups().get(tenant_id) { + if let Some(groups) = read_user_groups().get(tenant) { for user_group in groups.values() { if user_group.roles.contains(&name) { session_refresh_users .extend(user_group.users.iter().map(|u| u.userid().to_string())); } } } - if let Some(users) = users().get(tenant_id) { + if let Some(users) = users().get(tenant) { for user in users.values() { if user.roles.contains(&name) { session_refresh_users.insert(user.userid().to_string()); } } } for userid in session_refresh_users { - mut_sessions().remove_user(&userid, tenant_id); + mut_sessions().remove_user(&userid, tenant); } Ok(HttpResponse::Ok().finish()) }
201-220: Consider deduplicating RBAC metadata helpersThis file defines
get_metadata/put_metadata, whilesrc/handlers/http/modal/utils/rbac_utils.rsprovides equivalent helpers (per the snippets). Keeping two copies increases drift risk, especially with multi-tenant signature changes.src/handlers/http/modal/query/querier_role.rs (1)
21-91: Tenant-scoped role updates look consistent; considersync_role_updatefailure semanticsGood: tenant is derived once (
tenant), metadata +mut_roles()are updated under that tenant, and session invalidation only targets tenant-scoped users/groups. Passingtenant_idintosync_role_update(...)is the right multi-tenant fix.Potential reliability nit:
sync_role_update(...).await?happens after metadata + in-memory state are already updated. If sync fails, the handler returns an error although the change is committed locally; that can lead to confusing retries. If sync is best-effort, consider logging + returning200(or returning a partial-success response) instead of failing the request.src/handlers/http/ingest.rs (1)
441-461: Stream existence check should only load from storage in Query mode.The new condition attempts
create_stream_and_schema_from_storagefor any non‑Mode::All, which can trigger unexpected storage access on ingest/index nodes. Restore the prior Query‑only check to keep behavior consistent.Based on learnings: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.♻️ Suggested adjustment
- if PARSEABLE.options.mode != Mode::All { + if PARSEABLE.options.mode == Mode::Query { match PARSEABLE .create_stream_and_schema_from_storage(&stream_name, &tenant_id) .await { Ok(true) => {} Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), } } else { return Err(StreamNotFound(stream_name.clone()).into()); }
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/metastore/metastores/object_store_metastore.rs (1)
940-954:⚠️ Potential issue | 🟠 MajorTenant prefix missing from date_path construction.
When
tenant_idis provided,rootcorrectly includes the tenant prefix (line 925-929), butdate_pathat line 941 only usesstream_namewithout the tenant prefix. This causes subsequent storage operations to access incorrect paths.Example with
tenant_id = Some("acme"):
root="acme/my_stream"✓date_path="my_stream/2024-01-01"✗ (should be"acme/my_stream/2024-01-01")🔧 Proposed fix
for date in dates { - let date_path = object_store::path::Path::from(format!("{}/{}", stream_name, &date)); + let date_path = if let Some(tenant) = tenant_id { + object_store::path::Path::from(format!("{}/{}/{}", tenant, stream_name, &date)) + } else { + object_store::path::Path::from(format!("{}/{}", stream_name, &date)) + }; let resp = self.storage.list_with_delimiter(Some(date_path)).await?;
🤖 Fix all issues with AI agents
In `@src/correlation.rs`:
- Line 213: The current delete call removes from the outer tenant map using
correlation.id; instead look up the tenant bucket and remove the correlation id
from that inner CorrelationMap. Replace
self.write().await.remove(&correlation.id) with logic that acquires a mutable
write lock, gets the bucket by the tenant key (e.g., correlation.tenant_id or
correlation.tenant), and then calls remove(&correlation.id) on that bucket
(handling the bucket-missing case appropriately); reference the types/methods
involved: the outer HashMap (tenant -> CorrelationMap), the inner
CorrelationMap, and the correlation.id / correlation.tenant_id fields.
In `@src/metastore/metastores/object_store_metastore.rs`:
- Around line 696-730: The migration loop is deleting the parent directory
(filters_path) instead of the specific v1 file because get_objects() only
returns file contents; update the loop that calls get_objects() and the
migration block (migrate_v1_v2, filter_value handling, and the delete_object
call) to track each file's path alongside its bytes and call delete_object with
that specific file path (not filters_path), or change get_objects() to return
(file_path, file_bytes) tuples and use the returned file_path when invoking
self.storage.delete_object and when deciding the new v2 path (filter_path) so
the original v1 file is removed after creating the v2 file.
In `@src/rbac/user.rs`:
- Around line 349-377: The validate method currently skips existence checks when
roles().get(tenant) or users().get(tenant) returns None; change validate (in the
function validate) so that if roles().get(tenant) is None you treat all
requested roles as non-existent by pushing each role from self.roles into
non_existent_roles, and likewise if users().get(tenant) is None push each
group_user.userid() from self.users into non_existent_users; keep the existing
loops when the map exists (i.e., only branch to "mark all missing" when the
Option is None) and preserve the existing error reporting (e.g., RBACError
return) behavior.
🧹 Nitpick comments (3)
src/correlation.rs (1)
180-185:updatesilently drops in-memory changes if tenant bucket doesn't exist.If the tenant bucket was removed from memory (e.g., during a reload race), the in-memory update is silently skipped even though the metastore was updated. Consider using
entry().or_default()for consistency withcreate.♻️ Suggested improvement
- let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); // Update in memory - if let Some(corrs) = self.write().await.get_mut(tenant) { - corrs.insert( - updated_correlation.id.to_owned(), - updated_correlation.clone(), - ); - } + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT).to_string(); + self.write() + .await + .entry(tenant) + .or_default() + .insert(updated_correlation.id.to_owned(), updated_correlation.clone());src/parseable/mod.rs (1)
1099-1126: Consider adding storage cleanup indelete_tenant.The
delete_tenantfunction removes in-memory state (users, roles, tenant metadata) but doesn't appear to clean up storage artifacts (streams, stored data, parseable.json for the tenant). If tenants should be fully deleted including their data, this may need additional cleanup. If this is intentional (soft delete preserving data), consider adding a comment to clarify.src/rbac/user.rs (1)
502-508: Consider removing the commented-outupdate_in_metadatablock.If it’s deprecated, remove it or replace with a short TODO and an issue link to avoid lingering dead code.
8caed34 to
ccb78d3
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
src/prism/home/mod.rs (2)
381-383:⚠️ Potential issue | 🟡 MinorPotential panic if
dashboard_idisNone.The
unwrap()ondashboard_idwill panic if a dashboard lacks an ID. Consider using?withfilter_mapto gracefully skip such entries.🛡️ Proposed fix to handle missing dashboard_id gracefully
.filter_map(|dashboard| { - let dashboard_id = *dashboard.dashboard_id.as_ref().unwrap(); - let dashboard_id = dashboard_id.to_string(); + let dashboard_id = dashboard.dashboard_id.as_ref()?.to_string(); if dashboard.title.to_lowercase().contains(query_value) || dashboard_id.to_lowercase().contains(query_value) {
409-412:⚠️ Potential issue | 🟡 MinorPotential panic if
filter_idisNone.Similar to the dashboard case,
unwrap()onfilter_idwill panic if a filter lacks an ID.🛡️ Proposed fix to handle missing filter_id gracefully
.filter_map(|filter| { - let filter_id = filter.filter_id.as_ref().unwrap().clone(); + let filter_id = filter.filter_id.as_ref()?.clone(); if filter.filter_name.to_lowercase().contains(query_value)src/correlation.rs (2)
56-75:⚠️ Potential issue | 🟠 MajorNormalize default-tenant key during load.
If the metastore returns an empty tenant ID for the default tenant, these correlations get stored under
""and later lookups (which useDEFAULT_TENANT) will miss them. Normalize empty keys toDEFAULT_TENANTbefore inserting into the in-memory map.🛠️ Proposed fix
- for (tenant_id, correlations_bytes) in all_correlations { + for (tenant_id, correlations_bytes) in all_correlations { + let tenant_key = if tenant_id.is_empty() { + DEFAULT_TENANT.to_string() + } else { + tenant_id + }; let mut corrs = HashMap::new(); for corr in correlations_bytes { let correlation = match serde_json::from_slice::<CorrelationConfig>(&corr) { Ok(c) => c, Err(e) => { error!("Unable to load correlation file : {e}"); continue; } }; corrs.insert(correlation.id.clone(), correlation); } - guard.insert(tenant_id, corrs); + guard.insert(tenant_key, corrs); }Based on learnings: In src/metastore/metastores/object_store_metastore.rs, default-tenant paths are intentionally built with an empty string segment (""), so no extra tenant directory is created. Passing Some("") to storage is acceptable for path creation; concerns, if any, are about metrics labeling rather than path resolution.
151-188:⚠️ Potential issue | 🟠 MajorUpdate flow overwrites new data and validates the wrong config.
updated_correlation.update(correlation)copies the old correlation into the new one, effectively discarding user changes, and validation runs on the old payload. The flow should apply updates to the existing object, validate the updated payload, and persist that.🛠️ Proposed fix
- let correlation = self + let mut correlation = self .get_correlation(&updated_correlation.id, &tenant_id) .await?; if correlation.user_id != updated_correlation.user_id { return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( r#"User "{}" isn't authorized to update correlation with ID - {}"#, updated_correlation.user_id, correlation.id )))); } - correlation.validate(session_key).await?; - updated_correlation.update(correlation); + correlation.update(updated_correlation); + correlation.validate(session_key).await?; // Update in metastore PARSEABLE .metastore - .put_correlation(&updated_correlation, &tenant_id) + .put_correlation(&correlation, &tenant_id) .await?; let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); // Update in memory if let Some(corrs) = self.write().await.get_mut(tenant) { - corrs.insert( - updated_correlation.id.to_owned(), - updated_correlation.clone(), - ); + corrs.insert(correlation.id.to_owned(), correlation.clone()); } - Ok(updated_correlation) + Ok(correlation)src/metastore/metastores/object_store_metastore.rs (2)
317-342:⚠️ Potential issue | 🟠 MajorDefault-tenant alert-state path uses the DEFAULT_TENANT directory.
When
tenant_idisNone,base_pathis built withDEFAULT_TENANT, which will look underdefault/alertseven though default-tenant storage paths are intended to live at the root (""). This can make default-tenant alert states undiscoverable.🔧 Suggested fix
- let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let tenant = tenant_id.as_deref().unwrap_or(""); let base_path = RelativePathBuf::from_iter([tenant, ALERTS_ROOT_DIRECTORY]);Based on learnings: In src/metastore/metastores/object_store_metastore.rs, default-tenant paths are intentionally built with an empty string segment ("") so no extra tenant directory is created.
931-976:⚠️ Potential issue | 🟠 MajorFix dropped tenant prefix when listing manifest dates for non-default tenants.
For non-default tenants, the initial listing is scoped to
{tenant}/{stream_name}, butlist_with_delimiterreturns full paths (e.g.,tenant/stream/2024-01-01). Callingpath.parts()on these extracts all segments:["tenant", "stream", "2024-01-01"]. The filter removesstream_namebut leavestenant, sodatesincorrectly contains the tenant name alongside actual dates. Subsequentdate_pathconstruction at line 953 uses onlystream_name/{date}, producing paths likestream_name/tenantorstream_name/2024-01-01—both missing the tenant prefix. This causes manifest files for non-default tenants to be missed.🔧 Suggested fix
- let root = if let Some(tenant) = tenant_id { - format!("{tenant}/{stream_name}") - } else { - stream_name.into() - }; - let resp = self.storage.list_with_delimiter(Some(root.into())).await?; + let base = if let Some(tenant) = tenant_id { + format!("{tenant}/{stream_name}") + } else { + stream_name.into() + }; + let resp = self.storage.list_with_delimiter(Some(base.clone().into())).await?; let dates = resp .common_prefixes .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != stream_name && name.as_ref() != STREAM_ROOT_DIRECTORY) - .map(|name| name.as_ref().to_string()) + .filter_map(|path| path.filename().map(|name| name.as_ref().to_string())) + .filter(|name| name.as_ref() != STREAM_ROOT_DIRECTORY) .collect::<Vec<_>>(); for date in dates { - let date_path = object_store::path::Path::from(format!("{}/{}", stream_name, &date)); + let date_path = object_store::path::Path::from(format!("{base}/{date}")); let resp = self.storage.list_with_delimiter(Some(date_path)).await?;
🤖 Fix all issues with AI agents
In `@src/connectors/kafka/processor.rs`:
- Around line 53-57: The current code pulls tenant_id from the first record and
reuses it for the whole chunk (the let tenant_id = ... using records.first()),
which can mix tenants; change processing so each Kafka record is handled with
its own tenant_id: in the function that iterates over records (use the existing
records iterator and the record variable r), extract r.tenant_id per-record and
pass that value into whatever downstream calls you currently give the single
tenant_id (or, alternatively, validate inside the loop that all r.tenant_id
values are identical and reject the chunk if not). Update any usages of the
previous tenant_id variable to use the per-record tenant value (e.g., replace
references to tenant_id with r.tenant_id or a local let tenant_id = r.tenant_id
inside the record-processing loop) to ensure tenant isolation.
In `@src/parseable/mod.rs`:
- Around line 1099-1125: The delete_tenant implementation removes users/roles
and TENANT_METADATA but never updates the in-memory tenant registry nor calls
the persistent metastore cleanup; refactor pub fn delete_tenant into an async fn
delete_tenant(&self, tenant_id: &str) -> Result<(), anyhow::Error> so you can
call self.metastore.delete_tenant(tenant_id).await and also remove the tenant
from self.tenants (e.g., self.tenants.remove(tenant_id) or the appropriate map
accessor) so list_tenants() stops returning it; keep existing removals
(mut_users(), mut_roles(), TENANT_METADATA.delete_tenant, Users.delete_user) but
ensure ordering handles errors from metastore.delete_tenant and propagate
failures via Result.
- Around line 1128-1163: The load_tenants function unconditionally populates
self.tenants even when is_multi_tenant is false; modify load_tenants (the
is_multi_tenant check near the top) to only extend/write self.tenants when
is_multi_tenant is true (and keep the existing TENANT_METADATA insertion logic
gated as-is), and when not multi-tenant return Ok(None) (or preserve the
previous None/Some semantics) instead of extending the tenants list; ensure the
write lock on self.tenants is only acquired and used in the multi-tenant branch
so list_tenants remains None for single-tenant mode.
🧹 Nitpick comments (4)
src/storage/retention.rs (1)
111-126: Minor style improvement: use direct equality comparison.The
days.eq(&0)can be simplified todays == 0for better readability.♻️ Suggested simplification
impl Task { pub fn new(description: String, mut days: u32) -> Self { - if days.eq(&0) { + if days == 0 { days = 7; tracing::warn!( "Using default 7 days for retention since an invalid value of 0 was provided by the user" ); } let days = NonZeroU32::new(days).unwrap();src/connectors/kafka/processor.rs (1)
49-57: Consider consistent extraction style forstream_nameandtenant_id.The
stream_nameextraction uses a fluent pattern, whiletenant_iduses a more verbose if-let-else. For consistency and conciseness:♻️ Suggested simplification
let stream_name = records .first() .map(|r| r.topic.as_str()) .unwrap_or_default(); - let tenant_id = if let Some(r) = records.first() { - &r.tenant_id - } else { - &None - }; + let tenant_id = records.first().map(|r| &r.tenant_id).unwrap_or(&None);src/rbac/user.rs (1)
506-512: Consider removing stale commented-out method or tracking it via a TODO/issue.src/handlers/http/ingest.rs (1)
284-338: Consider deriving tenant_id once for OTEL ingestion.
You currently re-parse the tenant header insideprocess_otel_content; passing a precomputed tenant_id from the handler avoids duplicate parsing and keeps a single source of truth.♻️ Optional refactor sketch
-pub async fn setup_otel_stream( - req: &HttpRequest, +pub async fn setup_otel_stream( + req: &HttpRequest, + tenant_id: &Option<String>, expected_log_source: LogSource, known_fields: &[&str], telemetry_type: TelemetryType, ) -> Result<(String, LogSource, LogSourceEntry, Option<String>), PostError> { @@ - let tenant_id = get_tenant_id_from_request(req); PARSEABLE .create_stream_if_not_exists( @@ - &tenant_id, + tenant_id, None, ) .await?; @@ - if let Ok(stream) = PARSEABLE.get_stream(&stream_name, &tenant_id) { + if let Ok(stream) = PARSEABLE.get_stream(&stream_name, tenant_id) { @@ - PARSEABLE - .add_update_log_source(&stream_name, log_source_entry.clone(), &tenant_id) + PARSEABLE + .add_update_log_source(&stream_name, log_source_entry.clone(), tenant_id) .await?;-async fn process_otel_content( - req: &HttpRequest, +async fn process_otel_content( + req: &HttpRequest, + tenant_id: &Option<String>, body: web::Bytes, stream_name: &str, log_source: &LogSource, telemetry_type: TelemetryType, ) -> Result<(), PostError> { @@ - let tenant_id = get_tenant_id_from_request(req); if content_type == CONTENT_TYPE_JSON { @@ - &tenant_id, + tenant_id, ) .await {pub async fn handle_otel_logs_ingestion( req: HttpRequest, body: web::Bytes, ) -> Result<HttpResponse, PostError> { + let tenant_id = get_tenant_id_from_request(&req); let (stream_name, log_source, ..) = setup_otel_stream( &req, + &tenant_id, LogSource::OtelLogs, &OTEL_LOG_KNOWN_FIELD_LIST, TelemetryType::Logs, ) @@ - process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Logs).await?; + process_otel_content(&req, &tenant_id, body, &stream_name, &log_source, TelemetryType::Logs).await?;(Apply the same pattern to metrics and traces handlers.)
| let tenant_id = if let Some(r) = records.first() { | ||
| &r.tenant_id | ||
| } else { | ||
| &None | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Search for ConsumerRecord struct definition and tenant_id field
rg -n 'struct ConsumerRecord|pub tenant_id' --type rust src/connectors/kafka/Repository: parseablehq/parseable
Length of output: 248
🏁 Script executed:
# Look at the full context of the kafka processor file to understand the data flow
head -100 src/connectors/kafka/processor.rs | cat -nRepository: parseablehq/parseable
Length of output: 4222
🏁 Script executed:
# Search for where tenant_id is used in the processing flow
rg -n 'tenant_id' src/connectors/kafka/processor.rs -A 2 -B 2Repository: parseablehq/parseable
Length of output: 768
🏁 Script executed:
# Find how records are created/consumed and if there's any tenant-aware partitioning
rg -n 'ConsumerRecord|consume|partition' src/connectors/kafka/ --type rust -A 1 | head -100Repository: parseablehq/parseable
Length of output: 5363
🏁 Script executed:
# Find the full ConsumerRecord struct and from_borrowed_msg implementation
sed -n '141,170p' src/connectors/kafka/mod.rs | cat -nRepository: parseablehq/parseable
Length of output: 1220
🏁 Script executed:
# Search for where tenant_id is set on ConsumerRecord
rg -n 'tenant_id.*=' src/connectors/kafka/ --type rust -B 3 -A 3Repository: parseablehq/parseable
Length of output: 1064
🏁 Script executed:
# Find where records are produced into the chunk stream and if there's any tenant-based filtering
rg -n 'ReceiverStream|process_partition|send.*ConsumerRecord' src/connectors/kafka/ --type rust -B 2 -A 2Repository: parseablehq/parseable
Length of output: 5009
🏁 Script executed:
# Check the partition_stream or consumer implementation to see if records are partitioned by tenant
rg -n 'partition_stream|fn.*consume|next.*message' src/connectors/kafka/ --type rust -A 3 | head -80Repository: parseablehq/parseable
Length of output: 5228
Process each record with its own tenant_id instead of assuming all records in a chunk share the same tenant.
Records are partitioned by Kafka topic/partition, not by tenant. Since tenant_id comes from individual message headers, a single chunk can contain messages with different tenant IDs. The current code at lines 53-57 extracts tenant_id only from the first record and applies it to all subsequent records, which causes cross-tenant data leakage if the chunk contains records from multiple tenants.
Iterate through records and either (1) process each record individually with its own tenant_id, (2) validate that all records in the chunk have the same tenant_id and reject mixed chunks, or (3) enforce tenant isolation at the producer level to guarantee records from different tenants never land in the same partition.
🤖 Prompt for AI Agents
In `@src/connectors/kafka/processor.rs` around lines 53 - 57, The current code
pulls tenant_id from the first record and reuses it for the whole chunk (the let
tenant_id = ... using records.first()), which can mix tenants; change processing
so each Kafka record is handled with its own tenant_id: in the function that
iterates over records (use the existing records iterator and the record variable
r), extract r.tenant_id per-record and pass that value into whatever downstream
calls you currently give the single tenant_id (or, alternatively, validate
inside the loop that all r.tenant_id values are identical and reject the chunk
if not). Update any usages of the previous tenant_id variable to use the
per-record tenant value (e.g., replace references to tenant_id with r.tenant_id
or a local let tenant_id = r.tenant_id inside the record-processing loop) to
ensure tenant isolation.
ccb78d3 to
8f2b711
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/metastore/metastores/object_store_metastore.rs (2)
317-341:⚠️ Potential issue | 🟠 MajorUse an empty path segment for the default tenant.
tenant_id == Nonecurrently resolves toDEFAULT_TENANT, which makes the code look under adefault/alertsdirectory that won’t exist in single-tenant mode. Use""for path construction and keeptenant_idfor storage calls.Based on learnings, default-tenant paths are intentionally built with an empty string segment so no extra tenant directory is created.🔧 Suggested fix
- let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let tenant = tenant_id.as_deref().unwrap_or(""); let base_path = RelativePathBuf::from_iter([tenant, ALERTS_ROOT_DIRECTORY]);
344-415:⚠️ Potential issue | 🟠 MajorAlert state files are not tenant-scoped.
alert_state_json_pathdoesn’t include tenant context, so alert state files can collide across tenants and won’t be found byget_alert_states(which searches tenant-scoped directories).🔧 Suggested fix
- let path = alert_state_json_path(*alert_id); + let tenant = tenant_id.as_deref().unwrap_or(""); + let path = RelativePathBuf::from_iter([ + tenant, + ALERTS_ROOT_DIRECTORY, + format!("alert_state_{alert_id}.json"), + ]);- let path = alert_state_json_path(id); + let tenant = tenant_id.as_deref().unwrap_or(""); + let path = RelativePathBuf::from_iter([ + tenant, + ALERTS_ROOT_DIRECTORY, + format!("alert_state_{id}.json"), + ]);
🤖 Fix all issues with AI agents
In `@src/metastore/metastores/object_store_metastore.rs`:
- Around line 75-88: Update the doc comment on the get_objects async method to
fix the spelling mistake: change "Fetch mutiple .json objects" to "Fetch
multiple .json objects" in the comment above fn get_objects (the method that
calls self.storage.get_objects with the .json filter) so the docstring reads
correctly.
- Around line 654-760: In get_filters the migrated v1→v2 writes use
filter_path(...) which builds a RelativePath lacking the tenant prefix, so
migrated filters land in the shared/default tenant; update the write to include
tenant context (same way reads use users_dir) by prefixing the constructed
filter_path with the tenant (e.g. RelativePathBuf::from_iter([&tenant,
<filter_path>]) before calling storage.put_object) or change filter_path to
accept a tenant parameter; modify the code paths around get_filters,
filter_path, and the put_object call to ensure the RelativePath passed to
put_object contains the tenant prefix.
In `@src/parseable/mod.rs`:
- Around line 1040-1059: The add_tenant function has a TOCTOU race: the read
lock on self.tenants is released before acquiring the write lock, allowing
another thread to insert the same tenant; change the logic to acquire a write
lock (e.g., tenants.write()) once and perform the contains check and push under
that same lock so the check-and-insert is atomic, then call
TENANT_METADATA.insert_tenant while still holding (or immediately after holding)
the write lock to ensure consistency; keep the initial options.is_multi_tenant()
early-return intact and reference add_tenant, self.tenants, and TENANT_METADATA
when locating the code to modify.
🧹 Nitpick comments (8)
src/storage/retention.rs (1)
112-118: Minor style: prefer==over.eq(&0)for integer comparison.Using
days == 0is more idiomatic Rust thandays.eq(&0).♻️ Suggested simplification
- if days.eq(&0) { + if days == 0 { days = 7; tracing::warn!( "Using default 7 days for retention since an invalid value of 0 was provided by the user" ); }src/metastore/metastores/object_store_metastore.rs (1)
1241-1309: Remove unnecessarytenant_idparameter or document its purpose.
put_node_metadatahardcodestenant_id = None, butget_node_metadataaccepts atenant_idparameter passed by callers. Since the storage layer does not usetenant_idfor path construction (it exists for trait compatibility only), this parameter creates inconsistency without adding value. Either remove the parameter and always passNone, or document why callers should pass specific tenant values.src/tenants/mod.rs (1)
125-172: Consider removing the commented‑out legacy block.It adds noise and can confuse the active serialization model.
🧹 Suggested cleanup
-// // Type for serialization and deserialization -// #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -// pub struct TenantMetadata { -// pub tenant_id: String, -// pub version: String, -// pub mode: String, -// pub staging: PathBuf, -// pub storage: String, -// #[serde(default = "crate::utils::uid::generate_ulid")] -// pub deployment_id: uid::Uid, -// pub users: Vec<User>, -// pub user_groups: Vec<UserGroup>, -// pub streams: Vec<String>, -// pub server_mode: Mode, -// #[serde(default)] -// pub roles: HashMap<String, Vec<DefaultPrivilege>>, -// #[serde(default)] -// pub default_role: Option<String>, -// } - -// impl TenantMetadata { -// pub fn from_storage_meta(meta: StorageMetadata, id: &str) -> Self { -// Self { -// tenant_id: id.to_owned(), -// version: meta.version, -// mode: meta.mode, -// staging: meta.staging, -// storage: meta.storage, -// deployment_id: meta.deployment_id, -// users: meta.users, -// user_groups: meta.user_groups, -// streams: meta.streams, -// server_mode: meta.server_mode, -// roles: meta.roles, -// default_role: meta.default_role, -// } -// } -// } - -// impl MetastoreObject for TenantMetadata { -// fn get_object_path(&self) -> String { -// format!("{}/.parseable.json", &self.tenant_id) -// } - -// fn get_object_id(&self) -> String { -// self.tenant_id.clone() -// } -// }src/parseable/mod.rs (5)
180-182: Remove commented-out code.The commented-out
tenant_storagefield should be removed to keep the codebase clean. If this is planned for future implementation, consider tracking it in an issue instead.Proposed fix
pub struct Parseable { /// Configuration variables for parseable pub options: Arc<Options>, /// Storage engine backing parseable pub storage: Arc<dyn ObjectStorageProvider>, - // /// ObjectStorageProvider for each tenant - // pub tenant_storage: Arc<DashMap<String, Arc<dyn ObjectStorageProvider>>>, - /// Metadata and staging relating to each logstreams + /// Metadata and staging relating to each logstreams
1061-1097: Consider persisting to storage before updating in-memory state.Both
suspend_tenant_serviceandresume_tenant_serviceupdate in-memory state first (line 1066, 1086), then persist to storage. Ifput_remote_metadatafails, the in-memory state will be inconsistent with storage. Consider reversing the order: persist first, then update in-memory on success.Proposed pattern for suspend_tenant_service
pub async fn suspend_tenant_service( &self, tenant_id: String, service: Service, ) -> Result<(), anyhow::Error> { - TENANT_METADATA.suspend_service(&tenant_id, service.clone()); - // write to disk let tenant_id = &Some(tenant_id); let mut meta = get_metadata(tenant_id).await?; if let Some(sus) = meta.suspended_services.as_mut() { sus.insert(service); } else { meta.suspended_services = Some(HashSet::from_iter([service])); } put_remote_metadata(&meta, tenant_id).await?; + + // Update in-memory state only after successful persistence + TENANT_METADATA.suspend_service(tenant_id.as_ref().unwrap(), service); Ok(()) }
1100-1119: Remove commented-out code blocks.Lines 1100, 1105-1107, and 1115-1118 contain commented-out code that should be removed. If this logic is needed for future implementation, track it in an issue instead.
Proposed cleanup
pub fn delete_tenant(&self, tenant_id: &str) -> Result<(), anyhow::Error> { - // let mut metadata = get_metadata(&Some(tenant_id.to_owned())).await?; // delete users and sessions let users = mut_users().remove(tenant_id); if let Some(users) = users { for (userid, user) in users { - // metadata - // .users - // .retain(|u| u.tenant.eq(&Some(tenant_id.to_owned()))); - Users.delete_user(&userid, &user.tenant); } } // delete roles mut_roles().remove(tenant_id); - // if let Some(roles) = mut_roles().remove(tenant_id) { - // for (role, _) in roles { - // // metadata.roles.retain(|r, _| !role.eq(r)); - // } - // } // delete resources
1166-1175: Consider logging lock acquisition failures.When the read lock cannot be acquired (the
elsebranch at line 1172), the function silently returnsNone, which could mask lock contention issues. Consider logging this condition for debugging purposes.Proposed improvement
pub fn list_tenants(&self) -> Option<Vec<String>> { - if let Ok(t) = self.tenants.as_ref().read() - && !t.is_empty() - { - let t = t.clone(); - Some(t) - } else { - None + match self.tenants.as_ref().read() { + Ok(t) if !t.is_empty() => Some(t.clone()), + Ok(_) => None, + Err(e) => { + tracing::warn!("Failed to acquire read lock for tenants: {e}"); + None + } } }
570-572: Consider replacing.expect()with proper error handling.Multiple methods (lines 570-572, 853-855, 908-910, 1032-1034) use
.expect(STREAM_EXISTS)which will panic if the stream is not found. While these paths assume the stream exists, race conditions could cause unexpected panics. Consider returning appropriate errors instead.Example fix for add_update_log_source
- let stream = self - .get_stream(stream_name, tenant_id) - .expect(STREAM_EXISTS); + let stream = self.get_stream(stream_name, tenant_id)?;This requires changing the return type or mapping
StreamNotFoundtoStreamError.
| /// Fetch mutiple .json objects | ||
| async fn get_objects(&self, parent_path: &str) -> Result<Vec<Bytes>, MetastoreError> { | ||
| async fn get_objects( | ||
| &self, | ||
| parent_path: &str, | ||
| tenant_id: &Option<String>, | ||
| ) -> Result<Vec<Bytes>, MetastoreError> { | ||
| Ok(self | ||
| .storage | ||
| .get_objects( | ||
| Some(&RelativePathBuf::from(parent_path)), | ||
| Box::new(|file_name| file_name.ends_with(".json")), | ||
| tenant_id, | ||
| ) | ||
| .await?) |
There was a problem hiding this comment.
Fix typo in comment ("mutiple" → "multiple").
💡 Suggested edit
- /// Fetch mutiple .json objects
+ /// Fetch multiple .json objects🤖 Prompt for AI Agents
In `@src/metastore/metastores/object_store_metastore.rs` around lines 75 - 88,
Update the doc comment on the get_objects async method to fix the spelling
mistake: change "Fetch mutiple .json objects" to "Fetch multiple .json objects"
in the comment above fn get_objects (the method that calls
self.storage.get_objects with the .json filter) so the docstring reads
correctly.
f456045 to
e8a40fd
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/metastore/metastores/object_store_metastore.rs (1)
316-342:⚠️ Potential issue | 🟠 MajorDefault-tenant alert state path should not use
DEFAULT_TENANT.When
tenant_idisNone, usingDEFAULT_TENANTyields adefault/alertsprefix, which won’t match default-tenant storage paths that use the empty segment. Use""for the default tenant prefix instead.🔧 Suggested fix
- let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let tenant = tenant_id.as_deref().unwrap_or(""); let base_path = RelativePathBuf::from_iter([tenant, ALERTS_ROOT_DIRECTORY]);Based on learnings: In src/metastore/metastores/object_store_metastore.rs, default-tenant paths are intentionally built with an empty string segment (""), so no extra tenant directory is created. Passing Some("") to storage is acceptable for path creation; concerns, if any, are about metrics labeling rather than path resolution.
🤖 Fix all issues with AI agents
In `@src/rbac/user.rs`:
- Around line 370-382: The current control flow binds the else to if
!self.users.is_empty(), causing non_existent_users.extend(...) to run only when
self.users is empty; instead, when users().get(tenant) returns None we should
mark all requested users as non-existent. Fix by removing the outer else and
moving the extend(...) into the else branch of the if let Some(users) =
users().get(tenant) { ... } else {
non_existent_users.extend(self.users.iter().map(|u| u.userid().to_string())); },
keeping the initial check for self.users.is_empty() and the loop that checks
users.contains_key(group_user.userid()) intact (references: non_existent_users,
self.users, users(), tenant, userid()).
- Around line 408-418: The add_roles method inconsistently uses the external
tenant_id when calling mut_sessions().remove_user, which differs from
add_users/remove_roles/remove_users that use each group_user.tenant_id; update
add_roles (and its call site if needed) so after extending self.roles you
iterate group_user in &self.users and call
mut_sessions().remove_user(group_user.userid(), group_user.tenant_id()) (i.e.,
use the per-user tenant from group_user) to avoid invalidating sessions for the
wrong tenant while keeping the early-return for empty roles.
🧹 Nitpick comments (8)
src/storage/retention.rs (1)
111-126: Consider adding upper-bound validation for consistency.
TryFrom<Vec<TaskView>>validates thatdays <= 36500, butTask::newonly validatesdays != 0. IfTask::newis called directly with a large value, it bypasses the upper-bound check.♻️ Suggested improvement for consistent validation
impl Task { pub fn new(description: String, mut days: u32) -> Self { if days.eq(&0) { days = 7; tracing::warn!( "Using default 7 days for retention since an invalid value of 0 was provided by the user" ); } + if days > 36500 { + days = 36500; + tracing::warn!( + "Capping retention to 36500 days (approx 100 years) since an invalid value was provided" + ); + } let days = NonZeroU32::new(days).unwrap();Alternatively, if
Task::newis only called internally with pre-validated values, this may be acceptable as-is.src/rbac/user.rs (2)
161-165: Consider removing the commented-out code.The salt generation change to explicit 32-byte fill is fine, but the commented-out line (
// let salt = SaltString::generate(&mut OsRng);) should be removed to avoid code clutter.🧹 Suggested cleanup
let mut bytes = [0u8; 32]; let r = &mut OsRng; r.fill_bytes(&mut bytes); let salt = SaltString::encode_b64(&bytes).unwrap(); - // let salt = SaltString::generate(&mut OsRng); let argon2 = Argon2::default();
506-512: Remove commented-out code.The
update_in_metadatamethod is fully commented out. If it's no longer needed with the new tenant-aware architecture, remove it to avoid confusion.🧹 Suggested cleanup
pub fn remove_users_by_user_ids(&mut self, user_ids: HashSet<String>) -> Result<(), RBACError> { ... self.remove_users(users_to_remove) } - - // pub async fn update_in_metadata(&self, tenant_id: &Option<String>) -> Result<(), RBACError> { - // let mut metadata = get_metadata(tenant_id).await?; - // metadata.user_groups.retain(|x| x.name != self.name); - // metadata.user_groups.push(self.clone()); - // put_metadata(&metadata).await?; - // Ok(()) - // } }src/parseable/mod.rs (5)
180-181: Consider removing commented-out code.The commented
tenant_storagefield can be removed if it's no longer needed. Keeping dead code can lead to confusion.♻️ Proposed removal
pub struct Parseable { /// Configuration variables for parseable pub options: Arc<Options>, /// Storage engine backing parseable pub storage: Arc<dyn ObjectStorageProvider>, - // /// ObjectStorageProvider for each tenant - // pub tenant_storage: Arc<DashMap<String, Arc<dyn ObjectStorageProvider>>>, /// Metadata and staging relating to each logstreams
570-572: Consider returning errors instead of panicking on missing streams.Multiple methods use
.expect(STREAM_EXISTS)(here, and at lines 855, 910, 1034) which will panic if the stream doesn't exist. While the callers may validate stream existence beforehand, returning a properStreamErrorwould be more defensive and prevent potential runtime panics if invariants are violated.♻️ Example refactor for this method
pub async fn add_update_log_source( &self, stream_name: &str, log_source: LogSourceEntry, tenant_id: &Option<String>, ) -> Result<(), StreamError> { - let stream = self - .get_stream(stream_name, tenant_id) - .expect(STREAM_EXISTS); + let stream = self + .get_stream(stream_name, tenant_id) + .map_err(|_| StreamNotFound(stream_name.to_owned()))?;
1099-1129: Consider removing commented-out code in delete_tenant.The implementation correctly handles in-memory cleanup as per the established contract (caller performs persistent cleanup via
Metastore::delete_tenant). However, the commented-out code at lines 1105-1108 and 1115-1119 should be removed.♻️ Proposed cleanup
pub fn delete_tenant(&self, tenant_id: &str) -> Result<(), anyhow::Error> { - // let mut metadata = get_metadata(&Some(tenant_id.to_owned())).await?; // delete users and sessions let users = mut_users().remove(tenant_id); if let Some(users) = users { for (userid, user) in users { - // metadata - // .users - // .retain(|u| u.tenant.eq(&Some(tenant_id.to_owned()))); - Users.delete_user(&userid, &user.tenant); } } // delete roles mut_roles().remove(tenant_id); - // if let Some(roles) = mut_roles().remove(tenant_id) { - // for (role, _) in roles { - // // metadata.roles.retain(|r, _| !role.eq(r)); - // } - // } // delete resourcesBased on learnings: In src/parseable/mod.rs, enforce the separation of concerns for tenant deletion: Metastore::delete_tenant(tenant_id) must perform the persistent cleanup, while Parseable::delete_tenant(&self, tenant_id: &str) should only perform in-memory cleanup.
1143-1158: Conditional structure is correct but could be clearer.The logic correctly handles the three cases (tenant with metadata in multi-tenant mode, non-multi-tenant mode, invalid tenant directory). However, the empty
else if !is_multi_tenant {}branch (line 1153) is confusing. Consider restructuring for clarity.♻️ Suggested refactor for clarity
for tenant_id in dirs.iter() { - if let Some(meta) = PARSEABLE + let meta = PARSEABLE .metastore .get_parseable_metadata(&Some(tenant_id.clone())) - .await? - && is_multi_tenant - { - let metadata: StorageMetadata = serde_json::from_slice(&meta)?; + .await?; - TENANT_METADATA.insert_tenant(tenant_id.clone(), metadata.clone()); - } else if !is_multi_tenant { - } else { - return Err(anyhow::Error::msg(format!( - "Found invalid tenant directory with multi-tenant mode- {tenant_id}.\nExiting." - ))); + match (meta, is_multi_tenant) { + (Some(meta), true) => { + let metadata: StorageMetadata = serde_json::from_slice(&meta)?; + TENANT_METADATA.insert_tenant(tenant_id.clone(), metadata.clone()); + } + (None, true) => { + return Err(anyhow::Error::msg(format!( + "Found invalid tenant directory with multi-tenant mode- {tenant_id}.\nExiting." + ))); + } + (_, false) => { + // Non-multi-tenant mode: skip tenant directory processing + } } }
290-302: Consider including tenant ID in error log for easier debugging.When a tenant fails stream listing, the log at line 294 includes the error but not which tenant failed. Adding the tenant ID would aid debugging in multi-tenant deployments.
♻️ Enhanced logging
for tenant in tenants { if let Err(e) = PARSEABLE.metastore.list_streams(&Some(tenant)).await { - tracing::error!("{e}"); + tracing::error!("Failed to list streams for tenant {tenant}: {e}"); has_stream = false; break; };
58b2c1b to
b039be5
Compare
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
src/metastore/metastores/object_store_metastore.rs (1)
344-415:⚠️ Potential issue | 🔴 CriticalAlert state path is missing tenant context, causing multi-tenant data misplacement.
get_alert_states(lines 321-322) correctly constructs a tenant-prefixed path:[tenant, ALERTS_ROOT_DIRECTORY]. However, bothget_alert_state_entry(line 349) andput_alert_state(line 373) callalert_state_json_path(id)which does not include the tenant prefix.This inconsistency means:
- Reading all alert states scans
{tenant}/.alerts/✓- Reading/writing individual alert state uses
.alerts/alert_state_{id}.json✗In multi-tenant deployments, alert states will be written to and read from the wrong (shared) location.
Update
alert_state_json_pathto accepttenant_id(similar toalert_json_path) and update call sites:// In src/storage/object_storage.rs -pub fn alert_state_json_path(alert_id: Ulid) -> RelativePathBuf { +pub fn alert_state_json_path(alert_id: Ulid, tenant_id: &Option<String>) -> RelativePathBuf { + if let Some(tenant) = tenant_id.as_ref() { + RelativePathBuf::from_iter([tenant, ALERTS_ROOT_DIRECTORY, &format!("alert_state_{alert_id}.json")]) + } else { RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY, &format!("alert_state_{alert_id}.json")]) + } }// In get_alert_state_entry -let path = alert_state_json_path(*alert_id); +let path = alert_state_json_path(*alert_id, tenant_id); // In put_alert_state -let path = alert_state_json_path(id); +let path = alert_state_json_path(id, tenant_id);src/correlation.rs (1)
157-185:⚠️ Potential issue | 🟠 MajorUpdate path currently discards incoming changes.
correlation.validate(session_key)validates the old config, andupdated_correlation.update(correlation)overwrites the new values with the old ones. This makes updates a no-op and skips validation for new tables. Validate the incoming config and apply it to the stored correlation before persisting.🛠️ Proposed fix
- correlation.validate(session_key).await?; - updated_correlation.update(correlation); + updated_correlation.validate(session_key).await?; + let mut correlation = correlation; + correlation.update(updated_correlation); // Update in metastore PARSEABLE .metastore - .put_correlation(&updated_correlation, &tenant_id) + .put_correlation(&correlation, &tenant_id) .await?; let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); // Update in memory if let Some(corrs) = self.write().await.get_mut(tenant) { - corrs.insert( - updated_correlation.id.to_owned(), - updated_correlation.clone(), - ); + corrs.insert(correlation.id.to_owned(), correlation.clone()); } - Ok(updated_correlation) + Ok(correlation)src/handlers/http/middleware.rs (1)
242-259:⚠️ Potential issue | 🟠 MajorRe-check authorization after token refresh.
Line 242 computes
auth_resultsynchronously before the async block. For non-BasicAuth sessions (e.g., OIDC-issuedSessionId), theauthorize()function returnsResponse::ReloadRequiredif the session is not found inactive_sessionsor if no permissions are cached (seerbac/mod.rs:232). Later, whenrefresh_token()is called at line 258, it creates/updates the session inactive_sessions, but the already-computedauth_resultremains unchanged. The middleware then matches against this stale result at line 261, causing the request to fail with "session expired" even though the refresh just succeeded.Solution: Either call the refresh before invoking
authorize(), or re-run authorization after a successful refresh to ensure the decision reflects the refreshed session state.src/rbac/user.rs (1)
334-403:⚠️ Potential issue | 🟠 MajorEnforce tenant-scoped UserGroup membership.
validatedoesn’t reject users whosetenant_iddiffers from the group’s tenant, which breaks tenant isolation. Please guard against cross-tenant membership here.Based on learnings: RBAC UserGroup is tenant-scoped in parseablehq/parseable (PR `#1545`): a group cannot contain users from different tenants. Avoid cross-tenant membership assumptions in group APIs.🛠️ Possible fix
pub fn validate(&self, tenant_id: &Option<String>) -> Result<(), RBACError> { let valid_name = is_valid_group_name(&self.name); let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let mut cross_tenant_users = Vec::new(); + for group_user in &self.users { + let user_tenant = group_user.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + if user_tenant != tenant { + cross_tenant_users.push(group_user.userid().to_string()); + } + } if let Some(tenant_ug) = read_user_groups().get(tenant) && tenant_ug.contains_key(&self.name) { return Err(RBACError::UserGroupExists(self.name.clone())); } @@ if !self.users.is_empty() { // validate that the users exist if let Some(users) = users().get(tenant) { for group_user in &self.users { if !users.contains_key(group_user.userid()) { non_existent_users.push(group_user.userid().to_string()); } } } else { non_existent_users.extend(self.users.iter().map(|u| u.userid().to_string())); } } + if !cross_tenant_users.is_empty() { + non_existent_users.extend(cross_tenant_users); + }src/storage/object_storage.rs (1)
1120-1138:⚠️ Potential issue | 🟡 MinorHandle empty tenant_id as default in
stream_relative_path.
Iftenant_idisSome(""), the current format produces a leading slash. Treat empty tenant IDs like the default tenant to keep paths consistent.Based on learnings: In src/metastore/metastores/object_store_metastore.rs, default-tenant paths are intentionally built with an empty string segment (""), so no extra tenant directory is created.🛠️ Suggested fix
- if let Some(tenant) = tenant_id - && !tenant.eq(DEFAULT_TENANT) - { - format!("{tenant}/{stream_name}/{file_suffix}") - } else { - format!("{stream_name}/{file_suffix}") - } + if let Some(tenant) = tenant_id.as_deref() { + if tenant.is_empty() || tenant == DEFAULT_TENANT { + format!("{stream_name}/{file_suffix}") + } else { + format!("{tenant}/{stream_name}/{file_suffix}") + } + } else { + format!("{stream_name}/{file_suffix}") + }
🤖 Fix all issues with AI agents
In `@src/metastore/metastores/object_store_metastore.rs`:
- Around line 713-727: The three fragile .as_object().unwrap() calls on
filter_value after migrate_v1_v2 can panic if the migration returns non-object
JSON; update the code that extracts user_id, filter_id, and stream_name to first
verify filter_value.as_object() (e.g., if let Some(obj) =
filter_value.as_object() or match) and only then call .get(...).and_then(...);
if the value is not an object, handle it gracefully (log an error via the
existing logger, skip this entry, or return a controlled Err) so that code in
object_store_metastore.rs (the logic that calls migrate_v1_v2 and then reads
user_id/filter_id/stream_name) no longer uses unwrap and cannot panic on
malformed migrated data.
In `@src/parseable/mod.rs`:
- Around line 324-333: The docstring for validate_multi_tenancy is misleading:
it says the function should fail when multi-tenant traces exist but the flag is
off, yet the implementation simply delegates to load_tenants() which currently
no-ops for that case; either implement the failure logic here or change the
comment to accurately describe current behavior. Update the comment on
validate_multi_tenancy to state that it delegates to load_tenants() and that
load_tenants controls whether an error is returned (or, if you prefer enforcing,
modify validate_multi_tenancy to call load_tenants(), inspect tenant traces and
the multi-tenant start flag, and return an Err when traces exist but the flag is
not set, referencing validate_multi_tenancy and load_tenants by name).
🧹 Nitpick comments (3)
src/handlers/http/middleware.rs (1)
168-210: Tenant header normalization looks solid; update the inline comment to reflect header removal.The non‑ingest branch now clears any caller‑supplied tenant header when derivation fails, which blocks spoofing. Line 196’s comment still reads as if we only add the header; consider updating it to mention the removal path too.
✏️ Suggested comment tweak
- // tenant header should not be present, modify request to add + // derive tenant from session; set header when available, otherwise remove any caller-supplied headerBased on learnings: In src/handlers/http/middleware.rs, when running in multi-tenant mode, remove any incoming "tenant" header if the tenant cannot be derived from the authenticated session for non-ingest requests (excluding Login/SuperAdmin); and pay close attention to code comments for typos and semantic clarity during reviews for the Parseable project.
src/tenants/mod.rs (1)
125-172: Optional: remove the commented-out legacy TenantMetadata block.🧹 Suggested cleanup
-// // Type for serialization and deserialization -// #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -// pub struct TenantMetadata { -// pub tenant_id: String, -// pub version: String, -// pub mode: String, -// pub staging: PathBuf, -// pub storage: String, -// #[serde(default = "crate::utils::uid::generate_ulid")] -// pub deployment_id: uid::Uid, -// pub users: Vec<User>, -// pub user_groups: Vec<UserGroup>, -// pub streams: Vec<String>, -// pub server_mode: Mode, -// #[serde(default)] -// pub roles: HashMap<String, Vec<DefaultPrivilege>>, -// #[serde(default)] -// pub default_role: Option<String>, -// } -// -// impl TenantMetadata { -// pub fn from_storage_meta(meta: StorageMetadata, id: &str) -> Self { -// Self { -// tenant_id: id.to_owned(), -// version: meta.version, -// mode: meta.mode, -// staging: meta.staging, -// storage: meta.storage, -// deployment_id: meta.deployment_id, -// users: meta.users, -// user_groups: meta.user_groups, -// streams: meta.streams, -// server_mode: meta.server_mode, -// roles: meta.roles, -// default_role: meta.default_role, -// } -// } -// } -// -// impl MetastoreObject for TenantMetadata { -// fn get_object_path(&self) -> String { -// format!("{}/.parseable.json", &self.tenant_id) -// } -// -// fn get_object_id(&self) -> String { -// self.tenant_id.clone() -// } -// }src/rbac/user.rs (1)
494-500: Optional: drop the commented-outupdate_in_metadatablock.🧹 Suggested cleanup
- // pub async fn update_in_metadata(&self, tenant_id: &Option<String>) -> Result<(), RBACError> { - // let mut metadata = get_metadata(tenant_id).await?; - // metadata.user_groups.retain(|x| x.name != self.name); - // metadata.user_groups.push(self.clone()); - // put_metadata(&metadata).await?; - // Ok(()) - // }
| let user_id = filter_value | ||
| .as_object() | ||
| .unwrap() | ||
| .get("user_id") | ||
| .and_then(|user_id| user_id.as_str()); | ||
| let filter_id = filter_value | ||
| .as_object() | ||
| .unwrap() | ||
| .get("filter_id") | ||
| .and_then(|filter_id| filter_id.as_str()); | ||
| let stream_name = filter_value | ||
| .as_object() | ||
| .unwrap() | ||
| .get("stream_name") | ||
| .and_then(|stream_name| stream_name.as_str()); |
There was a problem hiding this comment.
Fragile .unwrap() calls after filter migration may panic.
After migrate_v1_v2 transforms the filter value (line 712), the code calls .as_object().unwrap() three times (lines 715, 720, 725) without verifying the migrated value is still a valid object. If migrate_v1_v2 returns malformed data, this will panic.
🛡️ Suggested defensive handling
filter_value = migrate_v1_v2(filter_value, tenant_id);
- let user_id = filter_value
- .as_object()
- .unwrap()
- .get("user_id")
- .and_then(|user_id| user_id.as_str());
- let filter_id = filter_value
- .as_object()
- .unwrap()
- .get("filter_id")
- .and_then(|filter_id| filter_id.as_str());
- let stream_name = filter_value
- .as_object()
- .unwrap()
- .get("stream_name")
- .and_then(|stream_name| stream_name.as_str());
+ let migrated_obj = filter_value.as_object();
+ let user_id = migrated_obj
+ .and_then(|obj| obj.get("user_id"))
+ .and_then(|v| v.as_str());
+ let filter_id = migrated_obj
+ .and_then(|obj| obj.get("filter_id"))
+ .and_then(|v| v.as_str());
+ let stream_name = migrated_obj
+ .and_then(|obj| obj.get("stream_name"))
+ .and_then(|v| v.as_str());📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let user_id = filter_value | |
| .as_object() | |
| .unwrap() | |
| .get("user_id") | |
| .and_then(|user_id| user_id.as_str()); | |
| let filter_id = filter_value | |
| .as_object() | |
| .unwrap() | |
| .get("filter_id") | |
| .and_then(|filter_id| filter_id.as_str()); | |
| let stream_name = filter_value | |
| .as_object() | |
| .unwrap() | |
| .get("stream_name") | |
| .and_then(|stream_name| stream_name.as_str()); | |
| let migrated_obj = filter_value.as_object(); | |
| let user_id = migrated_obj | |
| .and_then(|obj| obj.get("user_id")) | |
| .and_then(|v| v.as_str()); | |
| let filter_id = migrated_obj | |
| .and_then(|obj| obj.get("filter_id")) | |
| .and_then(|v| v.as_str()); | |
| let stream_name = migrated_obj | |
| .and_then(|obj| obj.get("stream_name")) | |
| .and_then(|v| v.as_str()); |
🤖 Prompt for AI Agents
In `@src/metastore/metastores/object_store_metastore.rs` around lines 713 - 727,
The three fragile .as_object().unwrap() calls on filter_value after
migrate_v1_v2 can panic if the migration returns non-object JSON; update the
code that extracts user_id, filter_id, and stream_name to first verify
filter_value.as_object() (e.g., if let Some(obj) = filter_value.as_object() or
match) and only then call .get(...).and_then(...); if the value is not an
object, handle it gracefully (log an error via the existing logger, skip this
entry, or return a controlled Err) so that code in object_store_metastore.rs
(the logic that calls migrate_v1_v2 and then reads
user_id/filter_id/stream_name) no longer uses unwrap and cannot panic on
malformed migrated data.
| /// this function only gets called from enterprise main | ||
| /// If the server has traces of multi-tenancy AND is started with multi-tenant flag, then proceed | ||
| /// otherwise fail with error | ||
| /// | ||
| /// if the server doesn't have traces of multi-tenancy AND is started without the flag, then proceed | ||
| /// otherwise fail with error | ||
| pub async fn validate_multi_tenancy(&self) -> Result<Option<()>, anyhow::Error> { | ||
| self.load_tenants().await | ||
| } | ||
|
|
There was a problem hiding this comment.
Clarify validate_multi_tenancy doc vs behavior.
The comment states the function should fail when multi-tenant traces exist but the flag is off, yet load_tenants currently no-ops in that case. Either enforce the failure or update the comment to avoid misleading behavior.
Based on learnings: Pay close attention to code comments for typos and semantic clarity during reviews for the Parseable project.
🤖 Prompt for AI Agents
In `@src/parseable/mod.rs` around lines 324 - 333, The docstring for
validate_multi_tenancy is misleading: it says the function should fail when
multi-tenant traces exist but the flag is off, yet the implementation simply
delegates to load_tenants() which currently no-ops for that case; either
implement the failure logic here or change the comment to accurately describe
current behavior. Update the comment on validate_multi_tenancy to state that it
delegates to load_tenants() and that load_tenants controls whether an error is
returned (or, if you prefer enforcing, modify validate_multi_tenancy to call
load_tenants(), inspect tenant traces and the multi-tenant start flag, and
return an Err when traces exist but the flag is not set, referencing
validate_multi_tenancy and load_tenants by name).
b039be5 to
3186ae1
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/storage/object_storage.rs (1)
1120-1138:⚠️ Potential issue | 🟠 MajorHandle empty tenant_id to avoid leading-slash paths.
When tenant_id is
Some(""),format!("{tenant}/...")yields a leading “/” and can create invalid relative paths for default-tenant uploads. Treat empty tenant as no prefix.🛠️ Proposed fix
- if let Some(tenant) = tenant_id - && !tenant.eq(DEFAULT_TENANT) - { - format!("{tenant}/{stream_name}/{file_suffix}") - } else { - format!("{stream_name}/{file_suffix}") - } + if let Some(tenant) = tenant_id.as_deref() + && !tenant.is_empty() + && !tenant.eq(DEFAULT_TENANT) + { + format!("{tenant}/{stream_name}/{file_suffix}") + } else { + format!("{stream_name}/{file_suffix}") + }Based on learnings: In src/metastore/metastores/object_store_metastore.rs, default-tenant paths are intentionally built with an empty string segment (""), so no extra tenant directory is created. Passing Some("") to storage is acceptable for path creation; concerns, if any, are about metrics labeling rather than path resolution.
src/parseable/mod.rs (1)
267-306:⚠️ Potential issue | 🟠 MajorCall
validate_multi_tenancy()beforeload_metadata()to populate the tenants registry in multi-tenant deployments.In multi-tenant mode,
validate_storage()relies onlist_tenants()to iterate over tenant directories. However,validate_multi_tenancy()(which populates the tenants list viaload_tenants()) is never invoked during startup, leaving the tenants registry empty. This causeslist_tenants()to returnNone, making multi-tenant data appear as single-tenant data during initialization, leading to incorrect startup validation.
🤖 Fix all issues with AI agents
In `@src/parseable/mod.rs`:
- Around line 1100-1129: delete_tenant currently removes users and roles but
leaves tenant-scoped UserGroup entries behind; update the cleanup to remove user
groups for the tenant as well by calling the user-groups map remover. Add or use
a mut_user_groups() accessor (mirroring mut_users()/mut_roles()) and invoke
mut_user_groups().remove(tenant_id), or call
write_user_groups().0.remove(tenant_id) before returning in delete_tenant to
ensure UserGroup entries are cleared when a tenant is deleted.
In `@src/rbac/user.rs`:
- Around line 349-382: The validate() method currently only ensures users exist
somewhere but doesn't prevent adding users from other tenants; update
UserGroup::validate to enforce tenant-scoped membership by checking each group
user against the tenant-specific users map (use the local tenant variable and
users().get(tenant) as the primary lookup) and, if a user is found in a
different tenant, treat that as invalid: collect those cross-tenant user IDs and
return a validation error (e.g., an RBACError variant like
UserGroupContainsForeignUser or UserNotInTenant) instead of allowing the group
to be created; refer to validate(), users(), read_user_groups(), self.users and
group_user.userid() to locate where to add the additional check.
🧹 Nitpick comments (4)
src/correlation.rs (1)
157-185: Optional: make cache update resilient withentry(...).or_default().If the tenant bucket is missing between the read and write (e.g., tenant cleanup), the in-memory update is skipped. Using
entrykeeps the cache consistent in edge cases.♻️ Suggested tweak
- if let Some(corrs) = self.write().await.get_mut(tenant) { - corrs.insert( - updated_correlation.id.to_owned(), - updated_correlation.clone(), - ); - } + let mut guard = self.write().await; + let corrs = guard.entry(tenant.to_string()).or_default(); + corrs.insert( + updated_correlation.id.to_owned(), + updated_correlation.clone(), + );src/storage/retention.rs (3)
48-75: Include tenant_id in retention-load warningsIn multi-tenant mode, identical stream names across tenants can make this warning ambiguous.
💡 Suggested log context
- Err(err) => { - warn!("failed to load retention config for {stream_name} due to {err:?}") - } + Err(err) => { + warn!( + "failed to load retention config for tenant={tenant_id:?} stream={stream_name} due to {err:?}" + ) + }
111-126: KeepTask::newvalidation aligned with the 36500‑day cap
TryFrom<Vec<TaskView>>rejects durations > 36500 days, butTask::newallows any value. Consider enforcing the same bound (or returning a Result) to avoid creating invalid tasks programmatically.
211-245: Add tenant context to retention delete logsThe delete path now runs per tenant, but log lines don’t include tenant_id, making troubleshooting noisy in multi-tenant deployments.
💡 Suggested log context
- info!("running retention task - delete for stream={stream_name}"); + info!( + "running retention task - delete for tenant={tenant_id:?} stream={stream_name}" + ); ... - error!( - "Failed to update snapshot for retention cleanup (stream={}): {}. Aborting delete.", - stream_name, err - ); + error!( + "Failed to update snapshot for retention cleanup (tenant={tenant_id:?} stream={}): {}. Aborting delete.", + stream_name, err + ); ... - error!("Failed to run delete task {err:?}"); + error!( + "Failed to run delete task for tenant={tenant_id:?} stream={stream_name}: {err:?}" + );
| pub fn delete_tenant(&self, tenant_id: &str) -> Result<(), anyhow::Error> { | ||
| // let mut metadata = get_metadata(&Some(tenant_id.to_owned())).await?; | ||
| // delete users and sessions | ||
| let users = mut_users().remove(tenant_id); | ||
| if let Some(users) = users { | ||
| for (userid, user) in users { | ||
| // metadata | ||
| // .users | ||
| // .retain(|u| u.tenant.eq(&Some(tenant_id.to_owned()))); | ||
|
|
||
| Users.delete_user(&userid, &user.tenant); | ||
| } | ||
| } | ||
|
|
||
| // delete roles | ||
| mut_roles().remove(tenant_id); | ||
| // if let Some(roles) = mut_roles().remove(tenant_id) { | ||
| // for (role, _) in roles { | ||
| // // metadata.roles.retain(|r, _| !role.eq(r)); | ||
| // } | ||
| // } | ||
|
|
||
| // delete resources | ||
|
|
||
| // delete from in-mem | ||
| if let Ok(mut tenants) = self.tenants.write() { | ||
| tenants.retain(|t| t != tenant_id); | ||
| } | ||
| TENANT_METADATA.delete_tenant(tenant_id); | ||
| Ok(()) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Search for UserGroup definition and mut_user_groups accessor
rg -n "struct UserGroup|enum UserGroup" src/rbac --type rust -A 3Repository: parseablehq/parseable
Length of output: 507
🏁 Script executed:
# Search for mut_user_groups in the codebase
rg -n "mut_user_groups|user_groups" src/rbac --type rust -B 2 -A 2Repository: parseablehq/parseable
Length of output: 6852
🏁 Script executed:
# Check the rbac/map.rs file to understand the storage structure
rg -n "fn mut_" src/rbac/map.rs --type rust -A 1Repository: parseablehq/parseable
Length of output: 403
🏁 Script executed:
# Check the UserGroups impl for remove method
rg -n "impl UserGroups" src/rbac/map.rs -A 20 --type rustRepository: parseablehq/parseable
Length of output: 907
🏁 Script executed:
# Verify the correct pattern by checking how write_user_groups is used elsewhere
rg -n "write_user_groups()" src/ --type rust -B 2 -A 3Repository: parseablehq/parseable
Length of output: 1346
🏁 Script executed:
# Check if there's any cleanup of user groups elsewhere
rg -n "USER_GROUPS|write_user_groups|read_user_groups" src/parseable/mod.rs --type rustRepository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
# Check the full UserGroups impl to see if there's a remove method
rg -n "impl UserGroups|pub fn" src/rbac/map.rs --type rust -A 2 | grep -A 10 "impl UserGroups"Repository: parseablehq/parseable
Length of output: 225
🏁 Script executed:
# Check the UserGroups struct definition and all methods
sed -n '524,560p' src/rbac/map.rsRepository: parseablehq/parseable
Length of output: 926
🏁 Script executed:
# Look at how UserGroups is accessed mutably elsewhere for cleanup patterns
rg -n "write_user_groups\(\)" src/ --type rust -A 5 | head -50Repository: parseablehq/parseable
Length of output: 970
🏁 Script executed:
# Check the delete_tenant method in detail to see current cleanup
sed -n '1100,1130p' src/parseable/mod.rsRepository: parseablehq/parseable
Length of output: 1118
🏁 Script executed:
# Check if UserGroups has any other methods we missed
rg -n "pub fn|fn " src/rbac/map.rs --type rust | grep -A 5 -B 5 "UserGroups"Repository: parseablehq/parseable
Length of output: 499
Tenant deletion leaves user groups in memory; add cleanup to prevent stale groups on tenant ID reuse.
delete_tenant removes users and roles but leaves UserGroup entries behind. Since UserGroup is tenant-scoped, this creates a gap in in-memory cleanup—if the tenant ID is reused later, stale groups could be resurrected. Either add a mut_user_groups() accessor following the pattern of mut_users()/mut_roles(), then call .remove(tenant_id), or directly use write_user_groups().0.remove(tenant_id).
🛠️ Suggested fix (add mut_user_groups accessor)
use crate::{
rbac::{
Users,
- map::{mut_roles, mut_users},
+ map::{mut_roles, mut_user_groups, mut_users},
},
...
};
pub fn delete_tenant(&self, tenant_id: &str) -> Result<(), anyhow::Error> {
...
// delete roles
mut_roles().remove(tenant_id);
+ // delete user groups
+ mut_user_groups().0.remove(tenant_id);
...
}Then in src/rbac/map.rs, add:
pub fn mut_roles() -> RwLockWriteGuard<'static, Roles> {
ROLES.get().expect("map is set").write()
}
+pub fn mut_user_groups() -> RwLockWriteGuard<'static, UserGroups> {
+ USER_GROUPS.get().expect("map is set").write()
+}🤖 Prompt for AI Agents
In `@src/parseable/mod.rs` around lines 1100 - 1129, delete_tenant currently
removes users and roles but leaves tenant-scoped UserGroup entries behind;
update the cleanup to remove user groups for the tenant as well by calling the
user-groups map remover. Add or use a mut_user_groups() accessor (mirroring
mut_users()/mut_roles()) and invoke mut_user_groups().remove(tenant_id), or call
write_user_groups().0.remove(tenant_id) before returning in delete_tenant to
ensure UserGroup entries are cleared when a tenant is deleted.
| pub fn validate(&self, tenant_id: &Option<String>) -> Result<(), RBACError> { | ||
| let valid_name = is_valid_group_name(&self.name); | ||
|
|
||
| if read_user_groups().contains_key(&self.name) { | ||
| let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); | ||
| if let Some(tenant_ug) = read_user_groups().get(tenant) | ||
| && tenant_ug.contains_key(&self.name) | ||
| { | ||
| return Err(RBACError::UserGroupExists(self.name.clone())); | ||
| } | ||
| let mut non_existent_roles = Vec::new(); | ||
| if !self.roles.is_empty() { | ||
| // validate that the roles exist | ||
| for role in &self.roles { | ||
| if !roles().contains_key(role) { | ||
| non_existent_roles.push(role.clone()); | ||
| if let Some(tenant_roles) = roles().get(tenant) { | ||
| for role in &self.roles { | ||
| if !tenant_roles.contains_key(role) { | ||
| non_existent_roles.push(role.clone()); | ||
| } | ||
| } | ||
| } else { | ||
| non_existent_roles.extend(self.roles.iter().cloned()); | ||
| } | ||
| } | ||
| let mut non_existent_users = Vec::new(); | ||
| if !self.users.is_empty() { | ||
| // validate that the users exist | ||
| for group_user in &self.users { | ||
| if !users().contains_key(group_user.userid()) { | ||
| non_existent_users.push(group_user.userid().to_string()); | ||
| if let Some(users) = users().get(tenant) { | ||
| for group_user in &self.users { | ||
| if !users.contains_key(group_user.userid()) { | ||
| non_existent_users.push(group_user.userid().to_string()); | ||
| } | ||
| } | ||
| } else { | ||
| non_existent_users.extend(self.users.iter().map(|u| u.userid().to_string())); | ||
| } | ||
| } |
There was a problem hiding this comment.
Enforce tenant‑scoped group membership during validation.
Right now a group can include users whose tenant_id doesn’t match the group’s tenant context (e.g., when user IDs overlap across tenants), which breaks the per‑tenant isolation guarantee and can lead to cross‑tenant session invalidation.
🛠️ Suggested fix
let mut non_existent_users = Vec::new();
if !self.users.is_empty() {
// validate that the users exist
if let Some(users) = users().get(tenant) {
for group_user in &self.users {
+ if group_user
+ .tenant_id
+ .as_deref()
+ .unwrap_or(DEFAULT_TENANT)
+ != tenant
+ {
+ non_existent_users.push(group_user.userid().to_string());
+ continue;
+ }
if !users.contains_key(group_user.userid()) {
non_existent_users.push(group_user.userid().to_string());
}
}Based on learnings: In parseablehq/parseable (PR #1545), RBAC UserGroup is tenant‑scoped; groups must not contain users from different tenants.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub fn validate(&self, tenant_id: &Option<String>) -> Result<(), RBACError> { | |
| let valid_name = is_valid_group_name(&self.name); | |
| if read_user_groups().contains_key(&self.name) { | |
| let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); | |
| if let Some(tenant_ug) = read_user_groups().get(tenant) | |
| && tenant_ug.contains_key(&self.name) | |
| { | |
| return Err(RBACError::UserGroupExists(self.name.clone())); | |
| } | |
| let mut non_existent_roles = Vec::new(); | |
| if !self.roles.is_empty() { | |
| // validate that the roles exist | |
| for role in &self.roles { | |
| if !roles().contains_key(role) { | |
| non_existent_roles.push(role.clone()); | |
| if let Some(tenant_roles) = roles().get(tenant) { | |
| for role in &self.roles { | |
| if !tenant_roles.contains_key(role) { | |
| non_existent_roles.push(role.clone()); | |
| } | |
| } | |
| } else { | |
| non_existent_roles.extend(self.roles.iter().cloned()); | |
| } | |
| } | |
| let mut non_existent_users = Vec::new(); | |
| if !self.users.is_empty() { | |
| // validate that the users exist | |
| for group_user in &self.users { | |
| if !users().contains_key(group_user.userid()) { | |
| non_existent_users.push(group_user.userid().to_string()); | |
| if let Some(users) = users().get(tenant) { | |
| for group_user in &self.users { | |
| if !users.contains_key(group_user.userid()) { | |
| non_existent_users.push(group_user.userid().to_string()); | |
| } | |
| } | |
| } else { | |
| non_existent_users.extend(self.users.iter().map(|u| u.userid().to_string())); | |
| } | |
| } | |
| pub fn validate(&self, tenant_id: &Option<String>) -> Result<(), RBACError> { | |
| let valid_name = is_valid_group_name(&self.name); | |
| let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); | |
| if let Some(tenant_ug) = read_user_groups().get(tenant) | |
| && tenant_ug.contains_key(&self.name) | |
| { | |
| return Err(RBACError::UserGroupExists(self.name.clone())); | |
| } | |
| let mut non_existent_roles = Vec::new(); | |
| if !self.roles.is_empty() { | |
| // validate that the roles exist | |
| if let Some(tenant_roles) = roles().get(tenant) { | |
| for role in &self.roles { | |
| if !tenant_roles.contains_key(role) { | |
| non_existent_roles.push(role.clone()); | |
| } | |
| } | |
| } else { | |
| non_existent_roles.extend(self.roles.iter().cloned()); | |
| } | |
| } | |
| let mut non_existent_users = Vec::new(); | |
| if !self.users.is_empty() { | |
| // validate that the users exist | |
| if let Some(users) = users().get(tenant) { | |
| for group_user in &self.users { | |
| if group_user | |
| .tenant_id | |
| .as_deref() | |
| .unwrap_or(DEFAULT_TENANT) | |
| != tenant | |
| { | |
| non_existent_users.push(group_user.userid().to_string()); | |
| continue; | |
| } | |
| if !users.contains_key(group_user.userid()) { | |
| non_existent_users.push(group_user.userid().to_string()); | |
| } | |
| } | |
| } else { | |
| non_existent_users.extend(self.users.iter().map(|u| u.userid().to_string())); | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@src/rbac/user.rs` around lines 349 - 382, The validate() method currently
only ensures users exist somewhere but doesn't prevent adding users from other
tenants; update UserGroup::validate to enforce tenant-scoped membership by
checking each group user against the tenant-specific users map (use the local
tenant variable and users().get(tenant) as the primary lookup) and, if a user is
found in a different tenant, treat that as invalid: collect those cross-tenant
user IDs and return a validation error (e.g., an RBACError variant like
UserGroupContainsForeignUser or UserNotInTenant) instead of allowing the group
to be created; refer to validate(), users(), read_user_groups(), self.users and
group_user.userid() to locate where to add the additional check.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (8)
src/storage/object_storage.rs (1)
1119-1138:⚠️ Potential issue | 🟠 MajorHandle empty tenant IDs in stream paths (default-tenant).
If tenant_id is Some(""), the current formatting yields a leading “/” (e.g., "/stream/..."), which can misplace default-tenant uploads. Treat empty strings like None for path construction.
🔧 Suggested fix
- if let Some(tenant) = tenant_id - && !tenant.eq(DEFAULT_TENANT) - { - format!("{tenant}/{stream_name}/{file_suffix}") - } else { - format!("{stream_name}/{file_suffix}") - } + if let Some(tenant) = tenant_id + .as_deref() + .filter(|t| !t.is_empty() && *t != DEFAULT_TENANT) + { + format!("{tenant}/{stream_name}/{file_suffix}") + } else { + format!("{stream_name}/{file_suffix}") + }Based on learnings: In src/metastore/metastores/object_store_metastore.rs, default-tenant paths are intentionally built with an empty string segment (""), so no extra tenant directory is created. Passing Some("") to storage is acceptable for path creation; concerns, if any, are about metrics labeling rather than path resolution.
src/metastore/metastores/object_store_metastore.rs (1)
316-342:⚠️ Potential issue | 🟠 MajorDefault-tenant alert-state listing should not use DEFAULT_TENANT for None.
When tenant_id is None (single-tenant/default), this lists under "default/.alerts" instead of ".alerts", so alert-state files may be missed. Use empty root for None.
🔧 Suggested fix
- let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let tenant = tenant_id.as_deref().unwrap_or(""); let base_path = RelativePathBuf::from_iter([tenant, ALERTS_ROOT_DIRECTORY]);Based on learnings: In src/metastore/metastores/object_store_metastore.rs, default-tenant paths are intentionally built with an empty string segment (""), so no extra tenant directory is created. Passing Some("") to storage is acceptable for path creation; concerns, if any, are about metrics labeling rather than path resolution.
src/alerts/alert_structs.rs (1)
42-69:⚠️ Potential issue | 🟡 MinorAdd tenant_id to reserved field list to avoid other_fields collisions.
Without this, a client can pass
tenant_id/tenantIdvia other_fields and create conflicting flattened keys.🔧 Suggested fix
const RESERVED_FIELDS: &[&str] = &[ "version", "id", "severity", "title", "query", "datasets", "alertType", "alert_type", "anomalyConfig", "anomaly_config", "forecastConfig", "forecast_config", "thresholdConfig", "threshold_config", "evalConfig", "eval_config", "targets", "state", "notificationState", "notification_state", "notificationConfig", "notification_config", "created", "tags", "lastTriggeredAt", "last_triggered_at", + "tenantId", + "tenant_id", ];src/alerts/target.rs (2)
304-385:⚠️ Potential issue | 🔴 CriticalPrevent
Retry::Finite(0)underflow (times - 1) leading to wraparound / huge loops.
for _ in 0..(times - 1)is unsafe whentimes == 0(user-provided), and will underflow. Even if UI “should” send>= 1, this is a sharp edge.Proposed fix (saturating subtraction + validate at parse)
- Retry::Finite(times) => { - for _ in 0..(times - 1) { + Retry::Finite(times) => { + for _ in 0..times.saturating_sub(1) { let current_state = if let Ok(state) = alerts.get_state(alert_id, &tenant_id).await { state } else { *state.lock().unwrap() = TimeoutState::default(); warn!( "Unable to fetch state for given alert_id- {alert_id}, stopping target notifs" ); return; }; ... } }And when parsing:
if let Some(times) = notification_config.times { - timeout.times = Retry::Finite(times) + timeout.times = Retry::Finite(times.max(1)) }Also applies to: 441-444
185-195:⚠️ Potential issue | 🟠 MajorFix
Targetserde/API mismatch (tenantvstenantId) and DEFAULT_TENANT path inconsistency.Inbound JSON uses
"tenantId"(viaTargetVerifier+rename_all = "camelCase"), butTargetserializes it as"tenant". This breaks round-tripping: a client cannot POST back a response from GET because the field name changes. Additionally,target_json_path()usesself.tenantdirectly without normalizingSome("DEFAULT_TENANT")toNone, risking path mismatches when targets are initially stored withtenant=Nonethen later loaded asSome("DEFAULT_TENANT").Rename the field to
tenant_idwith explicit#[serde(rename = "tenantId")]to fix both round-trip serialization and clarify the DEFAULT_TENANT contract. Apply the same change at lines 410–452 (TargetVerifier→Target conversion) and line 78 (put_target call).Diff
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] #[serde(try_from = "TargetVerifier")] pub struct Target { pub name: String, #[serde(flatten)] pub target: TargetType, #[serde(default = "Ulid::new")] pub id: Ulid, - pub tenant: Option<String>, + #[serde(rename = "tenantId")] + pub tenant_id: Option<String>, }impl MetastoreObject for Target { fn get_object_path(&self) -> String { - target_json_path(&self.id, &self.tenant).to_string() + target_json_path(&self.id, &self.tenant_id).to_string() }Ok(Target { name: value.name, target: value.target, id: value.id, - tenant: value.tenant_id, + tenant_id: value.tenant_id, })- .put_target(&target, &target.tenant) + .put_target(&target, &target.tenant_id)- let tenant_id = target.tenant.as_deref().unwrap_or(DEFAULT_TENANT); + let tenant_id = target.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);- t.tenant = Some(tenant_id.clone()); + t.tenant_id = Some(tenant_id.clone());src/alerts/mod.rs (3)
1055-1157:⚠️ Potential issue | 🟠 MajorDefault-tenant representation looks inconsistent (
Some(DEFAULT_TENANT)vsNone) and can break default schema selection.
get_tenant_id_from_key()returnsNonefor the default tenant, and query helpers treatNoneas"public"(seeget_number_of_agg_exprs()/get_aggregate_projection()). But inAlerts::load(), thetenantoption is derived viatenant_id.is_empty(), and thenalert.tenant_id.clone_from(tenant). If the metastore returnsDEFAULT_TENANTas the key (per repo behavior),tenantbecomesSome(DEFAULT_TENANT), which can later set DataFusion’sdefault_schemato"default"instead of"public".Proposed fix (treat DEFAULT_TENANT like “no tenant” in Option fields)
- for (tenant_id, raw_bytes) in raw_objects { - let tenant = if tenant_id.is_empty() { - &None - } else { - &Some(tenant_id.clone()) - }; + for (tenant_id, raw_bytes) in raw_objects { + let tenant = if tenant_id.is_empty() || tenant_id == DEFAULT_TENANT { + &None + } else { + &Some(tenant_id.clone()) + }; for alert_bytes in raw_bytes { ... // ensure that alert config's tenant is correctly set alert.tenant_id.clone_from(tenant); ... - map.entry(tenant_id.clone()) + map.entry(tenant_id.clone()) .or_default() .insert(*alert.get_id(), alert); } }Based on learnings,
get_tenant_id_from_key()intentionally returnsNonefor the default tenant (i.e., don’t persist/propagateSome(DEFAULT_TENANT)as the “tenant option”).Also applies to: 733-775
616-651:⚠️ Potential issue | 🔴 CriticalPrevent cross-tenant SSE notification leaks by filtering sessions to the alert’s tenant.
trigger_notifications()currently broadcasts to any active session that passesuser_auth_for_query(). Butuser_auth_for_query()derives tenant from the session, not from the alert. In multi-tenant mode, this can allow users in other tenants (with similarly named datasets/streams) to receive alerts that belong to a different tenant.Proposed fix (gate by tenant equality before auth)
// get active sessions let active_session = sessions().get_active_sessions(); let mut broadcast_to = vec![]; for (session, _, _) in active_session { + let session_tenant = get_tenant_id_from_key(&session); + let alert_tenant = self.tenant_id.clone(); + let same_tenant = match (session_tenant.as_deref(), alert_tenant.as_deref()) { + (None, None) => true, + (Some(a), Some(b)) => a == b, + _ => false, + }; + if !same_tenant { + continue; + } if user_auth_for_query(&session, &self.query).await.is_ok() && let SessionKey::SessionId(id) = &session { broadcast_to.push(*id); } }
733-775:⚠️ Potential issue | 🟠 MajorNormalize tenant_id to "public" schema when it is
DEFAULT_TENANTor empty.The code sets
default_schema = tenant_id.as_deref().unwrap_or("public"), which passesDEFAULT_TENANTthrough literally as a schema name. However:
- In single-tenant mode, only the "public" schema is registered (query/mod.rs:200)
- In multi-tenant mode, schemas are registered with actual tenant names (query/mod.rs:185-190)
metastore.get_alerts()normalizes empty tenant IDs toDEFAULT_TENANTbefore returning (object_store_metastore.rs:276-278), so alerts may havetenant_id = Some("DEFAULT_TENANT")Setting
default_schema = "DEFAULT_TENANT"in single-tenant mode will cause schema resolution to fail. Normalize explicitly:Suggested fix
- session_state - .config_mut() - .options_mut() - .catalog - .default_schema = tenant_id.as_deref().unwrap_or("public").to_owned(); + let schema = match tenant_id.as_deref() { + None | Some("") | Some(DEFAULT_TENANT) => "public", + Some(t) => t, + }; + session_state + .config_mut() + .options_mut() + .catalog + .default_schema = schema.to_owned();Note:
get_ctx().state()returns a clonedSessionContext, so mutations are local to each call—no concurrency risk.
🤖 Fix all issues with AI agents
In `@src/alerts/target.rs`:
- Around line 53-70: The code is inconsistently handling default tenants: fix by
centralizing tenant normalization (e.g., a helper like normalize_tenant_option)
and use it everywhere; in TargetConfigs::load ensure you normalize tenant_id
before assigning t.tenant (set t.tenant = None for the default tenant instead of
Some(DEFAULT_TENANT) or Some("")), and update all lookups that currently use
tenant_id.as_deref().unwrap_or(DEFAULT_TENANT) and the delete() path to call the
normalization helper first and pass the normalized tenant (or normalized Option)
into alerts.get_all_alerts and other tenant-aware methods so callers with
Some("") cannot bypass checks.
| pub target_configs: RwLock<HashMap<String, HashMap<Ulid, Target>>>, | ||
| } | ||
|
|
||
| impl TargetConfigs { | ||
| /// Loads alerts from disk, blocks | ||
| pub async fn load(&self) -> anyhow::Result<()> { | ||
| let targets = PARSEABLE.metastore.get_targets().await?; | ||
| let mut map = self.target_configs.write().await; | ||
| for target in targets { | ||
| map.insert(target.id, target); | ||
| for (tenant_id, targets) in targets { | ||
| let inner = targets | ||
| .into_iter() | ||
| .map(|mut t| { | ||
| t.tenant = Some(tenant_id.clone()); | ||
| (t.id, t) | ||
| }) | ||
| .collect(); | ||
| map.insert(tenant_id, inner); | ||
| } |
There was a problem hiding this comment.
Normalize tenant IDs consistently (handle None/""/DEFAULT_TENANT) to avoid default-tenant misrouting.
Right now most lookups use tenant_id.as_deref().unwrap_or(DEFAULT_TENANT), which treats Some("") as not default. Also load() always sets t.tenant = Some(tenant_id.clone()), which risks persisting/propagating Some(DEFAULT_TENANT) (or Some("")) into places that expect the default tenant to be None (e.g., path/schema selection).
This is especially important in delete(): you compute a normalized tenant string, but still call alerts.get_all_alerts(tenant_id) with the unnormalized option, which can let a caller bypass “target in use” checks if they pass Some("").
Proposed fix (centralize normalization + use it everywhere)
impl TargetConfigs {
+ fn tenant_key(tenant_id: &Option<String>) -> &str {
+ match tenant_id.as_deref() {
+ None | Some("") | Some(DEFAULT_TENANT) => DEFAULT_TENANT,
+ Some(t) => t,
+ }
+ }
+
+ fn tenant_opt_from_key(tenant_key: &str) -> Option<String> {
+ if tenant_key.is_empty() || tenant_key == DEFAULT_TENANT {
+ None
+ } else {
+ Some(tenant_key.to_owned())
+ }
+ }
+
/// Loads alerts from disk, blocks
pub async fn load(&self) -> anyhow::Result<()> {
let targets = PARSEABLE.metastore.get_targets().await?;
let mut map = self.target_configs.write().await;
- for (tenant_id, targets) in targets {
+ for (tenant_id, targets) in targets {
+ let tenant_key = if tenant_id.is_empty() { DEFAULT_TENANT } else { &tenant_id };
+ let tenant_opt = Self::tenant_opt_from_key(tenant_key);
let inner = targets
.into_iter()
.map(|mut t| {
- t.tenant = Some(tenant_id.clone());
+ t.tenant = tenant_opt.clone();
(t.id, t)
})
.collect();
- map.insert(tenant_id, inner);
+ map.insert(tenant_key.to_owned(), inner);
}
Ok(())
}
pub async fn update(&self, target: Target) -> Result<(), AlertError> {
PARSEABLE
.metastore
.put_target(&target, &target.tenant)
.await?;
let mut map = self.target_configs.write().await;
- let tenant_id = target.tenant.as_deref().unwrap_or(DEFAULT_TENANT);
+ let tenant_id = Self::tenant_key(&target.tenant);
map.entry(tenant_id.to_owned())
.or_default()
.insert(target.id, target);
Ok(())
}
pub async fn list(&self, tenant_id: &Option<String>) -> Result<Vec<Target>, AlertError> {
- let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
+ let tenant_id = Self::tenant_key(tenant_id);
let targets = if let Some(targets) = self.target_configs.read().await.get(tenant_id) {
targets.values().cloned().collect_vec()
} else {
vec![]
};
Ok(targets)
}
pub async fn get_target_by_id(
&self,
target_id: &Ulid,
tenant_id: &Option<String>,
) -> Result<Target, AlertError> {
- let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
+ let tenant_id = Self::tenant_key(tenant_id);
let target = if let Some(targets) = self.target_configs.read().await.get(tenant_id) {
targets
.get(target_id)
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))
.cloned()?
} else {
return Err(AlertError::InvalidTargetID(target_id.to_string()));
};
Ok(target)
}
pub async fn delete(
&self,
target_id: &Ulid,
tenant_id: &Option<String>,
) -> Result<Target, AlertError> {
- let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
+ let tenant = Self::tenant_key(tenant_id);
// ensure that the target is not being used by any alert
let guard = ALERTS.read().await;
let alerts = if let Some(alerts) = guard.as_ref() {
alerts
} else {
return Err(AlertError::CustomError("No AlertManager set".into()));
};
- for (_, alert) in alerts.get_all_alerts(tenant_id).await.iter() {
+ let normalized_tenant_opt = Self::tenant_opt_from_key(tenant);
+ for (_, alert) in alerts.get_all_alerts(&normalized_tenant_opt).await.iter() {
if alert.get_targets().contains(target_id) {
return Err(AlertError::TargetInUse);
}
}
let target = if let Some(targets) = self.target_configs.write().await.get_mut(tenant) {
targets
.remove(target_id)
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))?
} else {
return Err(AlertError::InvalidTargetID(target_id.to_string()));
};
PARSEABLE
.metastore
- .delete_target(&target, tenant_id)
+ .delete_target(&target, &normalized_tenant_opt)
.await?;
Ok(target)
}
}Based on learnings, default-tenant lookups should rely on DEFAULT_TENANT, and tenant context should be derived/normalized rather than trusting arbitrary client inputs.
Also applies to: 75-85, 89-105, 106-129, 131-168
🤖 Prompt for AI Agents
In `@src/alerts/target.rs` around lines 53 - 70, The code is inconsistently
handling default tenants: fix by centralizing tenant normalization (e.g., a
helper like normalize_tenant_option) and use it everywhere; in
TargetConfigs::load ensure you normalize tenant_id before assigning t.tenant
(set t.tenant = None for the default tenant instead of Some(DEFAULT_TENANT) or
Some("")), and update all lookups that currently use
tenant_id.as_deref().unwrap_or(DEFAULT_TENANT) and the delete() path to call the
normalization helper first and pass the normalized tenant (or normalized Option)
into alerts.get_all_alerts and other tenant-aware methods so callers with
Some("") cannot bypass checks.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
src/storage/gcs.rs (2)
180-207:⚠️ Potential issue | 🟡 MinorRecord metrics only after successful GET and normalize empty tenant IDs.
Line 185-187 increments the call metric before confirming success, and emptytenant_idwill label metrics with""instead of DEFAULT_TENANT. Consider moving the call metric into the Ok branch and normalizing empty strings.Suggested fix
- let resp = self.client.get(&to_object_store_path(path)).await; - let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string(), tenant); + let resp = self.client.get(&to_object_store_path(path)).await; + let tenant = tenant_id + .as_deref() + .filter(|t| !t.is_empty()) + .unwrap_or(DEFAULT_TENANT); match resp { Ok(resp) => { + increment_object_store_calls_by_date( + "GET", + &Utc::now().date_naive().to_string(), + tenant, + ); let body: Bytes = resp.bytes().await?;Based on learnings: In Parseable's object storage implementations, metrics should only be captured when operations succeed, not when they are attempted; and default-tenant paths are intentionally built with an empty string segment (""), so metrics labeling should normalize empty tenant IDs to DEFAULT_TENANT.
232-283:⚠️ Potential issue | 🟡 MinorCount DELETE metrics only on successful delete.
Line 249-255 increments delete counts regardless of delete outcome. This inflates DELETE metrics and file counts when a delete fails. Movefiles_deletedincrements and DELETE call metrics into the Ok branch.Suggested fix
- Ok(obj) => { - files_deleted.fetch_add(1, Ordering::Relaxed); - let delete_resp = self.client.delete(&obj.location).await; - increment_object_store_calls_by_date( - "DELETE", - &Utc::now().date_naive().to_string(), - tenant, - ); - if delete_resp.is_err() { + Ok(obj) => { + let delete_resp = self.client.delete(&obj.location).await; + if delete_resp.is_ok() { + files_deleted.fetch_add(1, Ordering::Relaxed); + increment_object_store_calls_by_date( + "DELETE", + &Utc::now().date_naive().to_string(), + tenant, + ); + } else { error!( "Failed to delete object during delete stream: {:?}", delete_resp ); } }Based on learnings: In Parseable's object storage implementations, metrics should only be captured when operations succeed, not when they are attempted.
src/storage/azure_blob.rs (1)
215-239:⚠️ Potential issue | 🟡 MinorRecord object-store call metrics only after a successful GET.
increment_object_store_calls_by_dateruns even when GET fails (or the body read fails), skewing metrics. Move it into the success path after the body is read so only successful operations are counted.Based on learnings: In Parseable's object storage implementations, metrics should only be captured when operations succeed, not when they are attempted. The increment_object_store_calls_by_date() calls should be placed after the await? to ensure they only execute on successful operations.🔧 Suggested fix
- let resp = self.client.get(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string(), tenant); - - match resp { - Ok(resp) => { - let body: Bytes = resp.bytes().await?; - increment_files_scanned_in_object_store_calls_by_date( - "GET", - 1, - &Utc::now().date_naive().to_string(), - tenant, - ); - increment_bytes_scanned_in_object_store_calls_by_date( - "GET", - body.len() as u64, - &Utc::now().date_naive().to_string(), - tenant, - ); - Ok(body) - } - Err(err) => Err(err.into()), - } + let resp = self.client.get(&to_object_store_path(path)).await; + match resp { + Ok(resp) => { + let body: Bytes = resp.bytes().await?; + increment_object_store_calls_by_date( + "GET", + &Utc::now().date_naive().to_string(), + tenant, + ); + increment_files_scanned_in_object_store_calls_by_date( + "GET", + 1, + &Utc::now().date_naive().to_string(), + tenant, + ); + increment_bytes_scanned_in_object_store_calls_by_date( + "GET", + body.len() as u64, + &Utc::now().date_naive().to_string(), + tenant, + ); + Ok(body) + } + Err(err) => Err(err.into()), + }src/storage/s3.rs (2)
924-958:⚠️ Potential issue | 🟡 MinorUse DEFAULT_TENANT label consistently for HEAD metrics.
Line 956 uses an empty string for the tenant label, while the rest of
list_old_streamsusestenant_str(DEFAULT_TENANT). This splits metrics across two labels for the same tenant.🔧 Proposed fix
- increment_object_store_calls_by_date( - "HEAD", - &Utc::now().date_naive().to_string(), - "", - ); + increment_object_store_calls_by_date( + "HEAD", + &Utc::now().date_naive().to_string(), + tenant_str, + );
973-1064:⚠️ Potential issue | 🔴 CriticalInclude tenant prefix in list_dates/list_hours/list_minutes path construction.
The
stream_nameparameter is unprefixed (seestreams.list()which returns plain names keyed by tenant). In multi-tenant mode, the paths must be prefixed withtenant/{stream_name}whentenant_idis provided, consistent with the pattern already used indelete_stream(line 874-889). Currently,list_dates(via_list_dates),list_hours, andlist_minutesbuild paths without the tenant prefix, causing them to target incorrect paths. The extractedtenant_stris only used for metrics; add it to the path format. This pattern is consistent across all storage backends (S3, GCS, Azure, LocalFS).
327d44a to
d2c9634
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
src/storage/azure_blob.rs (1)
214-243:⚠️ Potential issue | 🟡 MinorMove object‑store call counters into success paths.
Across these methods,
increment_object_store_calls_by_datefires before confirming a successful Result (or regardless of delete success), so failed operations are counted. Gate the increment onOk(and only count DELETE whendelete_respsucceeds).Proposed fix (example for GET)
- let resp = self.client.get(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string(), tenant); - - match resp { - Ok(resp) => { + let resp = self.client.get(&to_object_store_path(path)).await; + match resp { + Ok(resp) => { + increment_object_store_calls_by_date( + "GET", + &Utc::now().date_naive().to_string(), + tenant, + ); let body: Bytes = resp.bytes().await?;Based on learnings, metrics should only be captured when operations succeed, not when they are attempted.
Also applies to: 245-266, 268-319, 321-357, 359-381, 383-472, 499-517, 639-661, 663-680, 697-722, 756-765, 896-926, 928-959
src/storage/gcs.rs (1)
179-207:⚠️ Potential issue | 🟡 MinorOnly increment object‑store metrics on successful operations.
These blocks increment call counters before confirming success, so failed calls are counted. Move
increment_object_store_calls_by_dateinto theOkpath (or after?) and only count DELETE on success.Proposed fix (example for GET)
- let resp = self.client.get(&to_object_store_path(path)).await; - let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string(), tenant); - match resp { - Ok(resp) => { + let resp = self.client.get(&to_object_store_path(path)).await; + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + match resp { + Ok(resp) => { + increment_object_store_calls_by_date( + "GET", + &Utc::now().date_naive().to_string(), + tenant, + ); let body: Bytes = resp.bytes().await?;Based on learnings, metrics should only be captured when operations succeed, not when they are attempted.
Also applies to: 209-230, 232-283, 285-321, 323-345, 347-438, 441-470, 481-499, 621-643, 645-663, 680-705, 738-747, 874-905, 907-938
src/storage/s3.rs (2)
342-374:⚠️ Potential issue | 🟡 MinorGate object‑store call metrics on success.
These blocks increment counters before confirming success, which counts failed calls. Move
increment_object_store_calls_by_dateinto theOkpath (or after?) and only count DELETE on success.Proposed fix (example for GET)
- let resp = self.client.get(&to_object_store_path(path)).await; - increment_object_store_calls_by_date( - "GET", - &Utc::now().date_naive().to_string(), - tenant_str, - ); - - match resp { - Ok(resp) => { + let resp = self.client.get(&to_object_store_path(path)).await; + match resp { + Ok(resp) => { + increment_object_store_calls_by_date( + "GET", + &Utc::now().date_naive().to_string(), + tenant_str, + ); let body = resp.bytes().await?;Based on learnings, metrics should only be captured when operations succeed, not when they are attempted.
Also applies to: 376-401, 403-460, 462-502, 504-531, 533-626, 629-662, 673-695, 826-848, 850-872, 889-915, 1093-1127, 1129-1165
924-958:⚠️ Potential issue | 🟡 MinorAvoid empty‑tenant metrics labels in list_old_streams HEAD checks.
The HEAD metric uses
""and is emitted before checking success. Usetenant_str(DEFAULT_TENANT) and increment only onOk.Proposed fix
- let result = self.client.head(&StorePath::from(key)).await; - increment_object_store_calls_by_date( - "HEAD", - &Utc::now().date_naive().to_string(), - "", - ); - result.map(|_| ()) + let result = self.client.head(&StorePath::from(key)).await; + if result.is_ok() { + increment_object_store_calls_by_date( + "HEAD", + &Utc::now().date_naive().to_string(), + tenant_str, + ); + } + result.map(|_| ())Based on learnings, metrics should only be captured when operations succeed, not when they are attempted, and default-tenant paths use empty segments only for path creation (metrics labeling is the concern).
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
Improvements
Chores