Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pj_base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ if(PJ_BUILD_TESTS)
tests/platform_test.cpp
tests/arrow_holders_test.cpp
tests/media_metadata_test.cpp
tests/object_ingest_policy_test.cpp
tests/push_message_v2_test.cpp
)

foreach(test_src ${PJ_BASE_TESTS})
Expand Down
136 changes: 136 additions & 0 deletions pj_base/include/pj_base/canonical_object_abi.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* @file canonical_object_abi.h
* @brief C ABI representation of canonical objects produced by parsers.
*
* The C++ vocabulary lives in pj_base/sdk/canonical_object.hpp
* (sdk::CanonicalObject = std::variant<Image, CompressedImage, PointCloud>).
* This file defines the wire format used to cross the plugin C ABI boundary
* for that variant: parser plugins produce a flat byte blob with a small
* header describing the kind, and the host deserializes it back to the
* C++ type.
*
* The blob layout is little-endian, packed, with no implementation-defined
* padding. Trampolines and host loader use it directly.
*/
#ifndef PJ_CANONICAL_OBJECT_ABI_H
#define PJ_CANONICAL_OBJECT_ABI_H

#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>

#include "pj_base/plugin_data_api.h"

#ifdef __cplusplus
extern "C" {
#endif

/**
* Owned buffer of named field values produced by the parse_scalars slot.
* The plugin owns the @p fields array; the host calls @p release(alloc_handle)
* when done. release MAY be NULL if the plugin manages the buffer in a way
* that does not require explicit release between calls.
*/
typedef struct PJ_named_field_value_buffer_t {
const PJ_named_field_value_t* fields;
size_t count;
void* alloc_handle;
void (*release)(void* alloc_handle);
} PJ_named_field_value_buffer_t;

/**
* Canonical object kinds. Numeric values are stable across releases — never
* renumber. Mirror of PJ::sdk::CanonicalObjectKind for use across the C ABI.
*/
typedef enum PJ_canonical_object_kind_t {
PJ_CANONICAL_OBJECT_KIND_NONE = 0,
PJ_CANONICAL_OBJECT_KIND_IMAGE = 1,
PJ_CANONICAL_OBJECT_KIND_COMPRESSED_IMAGE = 2,
PJ_CANONICAL_OBJECT_KIND_POINTCLOUD = 3,
/* Reserve future kinds; appended at the tail. */
/* PJ_CANONICAL_OBJECT_KIND_MARKERS = 4, */
/* PJ_CANONICAL_OBJECT_KIND_OCCUPANCY_GRID = 5, */
} PJ_canonical_object_kind_t;

/**
* Schema classification — what kind a parser declares for a given schema.
* Returned a priori (without parsing payload) by the classify_schema slot.
*
* Currently a single field plus reserved padding to keep the struct size
* stable across future minor extensions (declarative metadata can attach
* via additional structs returned by other slots, not by growing this one).
*/
typedef struct PJ_schema_classification_t {
uint16_t object_kind; /**< PJ_canonical_object_kind_t. */
uint16_t reserved; /**< Must be zero. */
} PJ_schema_classification_t;

/**
* Canonical object as a flat byte blob produced by the parse_object slot.
*
* Layout of @p data:
*
* header (12 bytes, little-endian):
* uint16_t kind // PJ_canonical_object_kind_t
* uint16_t reserved
* int64_t timestamp_ns
*
* body (varies by kind, immediately follows the header):
*
* KIND_IMAGE:
* uint32_t width
* uint32_t height
* uint16_t pixel_format
* uint16_t reserved
* uint32_t pixels_size
* uint8_t pixels[pixels_size] // tightly packed, no row stride
*
* KIND_COMPRESSED_IMAGE:
* uint8_t format // 0=unknown, 1=JPEG, 2=PNG, 3=QOI
* uint8_t has_depth_min
* uint8_t has_depth_max
* uint8_t reserved
* float depth_min // valid iff has_depth_min
* float depth_max // valid iff has_depth_max
* uint32_t bytes_size
* uint8_t bytes[bytes_size]
*
* KIND_POINTCLOUD:
* uint32_t width
* uint32_t height
* uint32_t point_step
* uint32_t row_step
* uint8_t is_bigendian
* uint8_t is_dense
* uint16_t fields_count
* fields[fields_count]:
* uint32_t name_size
* char name[name_size]
* uint32_t offset
* uint8_t datatype // 0=unknown,1=i8,2=u8,3=i16,4=u16,
* // 5=i32,6=u32,7=f32,8=f64
* uint8_t reserved[3]
* uint32_t count
* uint32_t data_size
* uint8_t data[data_size]
*
* Memory ownership:
* The blob's @p data is owned by the parser plugin. The plugin allocates
* it during parse_object and the host calls @p release(ctx, data) when it
* is done with the bytes. release MAY be NULL if data points into a
* plugin-internal buffer that the plugin manages itself across calls.
*/
typedef struct PJ_canonical_object_blob_t {
const uint8_t* data;
uint64_t size;
/** Opaque handle the plugin uses to identify the allocation. */
void* alloc_handle;
/** Release callback invoked by the host. NULL means no release needed. */
void (*release)(void* alloc_handle);
} PJ_canonical_object_blob_t;

#ifdef __cplusplus
}
#endif

#endif /* PJ_CANONICAL_OBJECT_ABI_H */
97 changes: 97 additions & 0 deletions pj_base/include/pj_base/data_source_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,58 @@ typedef struct {
uint32_t id;
} PJ_parser_binding_handle_t;

/**
* Ownership token kept alive while a non-owning byte buffer is in use.
* `ctx` is opaque to the host; `release(ctx)` is invoked once when the host
* no longer needs the bytes referenced by the buffer. `ctx` MAY be NULL —
* meaning the buffer was static / borrowed from an external lifetime — in
* which case `release` is also expected to be NULL.
*
* Mirrors the pattern of PJ_canonical_object_blob_t but applies to raw
* payload bytes, not to serialized canonical objects.
*/
typedef struct PJ_payload_anchor_t {
void* ctx;
void (*release)(void* ctx);
} PJ_payload_anchor_t;

/**
* Payload bytes plus an ownership anchor. The host treats `data` as a
* non-owning view, valid until `anchor.release(anchor.ctx)` is invoked.
*
* For zero-copy ingest, the producer (DataSource plugin) returns a payload
* whose anchor keeps the source buffer (mcap chunk, mmap, …) alive. The
* host hands the same payload to a parser (which can build canonical
* objects holding spans into the buffer) and only releases the anchor when
* everyone done with the bytes.
*/
typedef struct PJ_payload_t {
const uint8_t* data;
size_t size;
PJ_payload_anchor_t anchor;
} PJ_payload_t;

/**
* Idempotent fetcher of payload bytes. The host invokes `fetch(ctx, &out,
* &err)` zero, one, or many times depending on the active
* ObjectIngestPolicy and on consumer pulls. Returns true and populates
* `*out` on success; returns false and (optionally) populates `*err` on
* failure (file read error, source torn down, etc.).
*
* The host ALWAYS calls `release(ctx)` exactly once when it no longer
* needs the fetcher — at the end of ingest for kEager, when the
* corresponding ObjectStore entry is dropped for lazy modes. `release`
* MAY be NULL if the plugin manages the ctx via some external mechanism.
*
* `fetch` MUST be thread-safe: the host may invoke it from the ingest
* thread (kEager) or from consumer threads (lazy pull).
*/
typedef struct PJ_payload_fetcher_t {
void* ctx;
bool (*fetch)(void* ctx, PJ_payload_t* out_payload, PJ_error_t* out_error) PJ_NOEXCEPT;
void (*release)(void* ctx);
} PJ_payload_fetcher_t;

/**
* Request to bind (or look up) a parser for a given topic.
* All string views must remain valid for the duration of the call.
Expand Down Expand Up @@ -232,6 +284,51 @@ typedef struct PJ_data_source_runtime_host_vtable_t {
* are loaded.
*/
const char* (*list_available_encodings)(void* ctx)PJ_NOEXCEPT;

/* ---------------------------------------------------------------------
* Tail slots — appended after v4.0. Readers MUST gate access on
* `vtable->struct_size > offsetof(slot)` before calling.
* --------------------------------------------------------------------- */

/**
* [stream-thread] Push a message via a deferred byte fetcher. The plugin
* hands the host a callable that produces the payload bytes when
* invoked; the host applies the active ObjectIngestPolicy (resolved via
* the application-configured ObjectIngestPolicyResolver against
* source_id, topic, and the parser's classifySchema kind) to decide:
*
* - kEager: invoke fetcher now, parser.parseScalars
* writes columns, parser.parseObject
* materializes the canonical object into
* the ObjectStore via pushOwned. Fetcher
* released after.
* - kLazyObjectsEagerScalars: invoke fetcher now, parser.parseScalars
* writes columns. ObjectStore.pushLazy
* retains the fetcher closure for pull-time
* re-invocation; bytes dropped after
* parseScalars.
* - kPureLazy: do not invoke fetcher at ingest. Register
* ObjectStore entry that defers fetcher
* invocation until consumer pull. No
* scalar columns produced.
*
* The plugin is policy-agnostic: it does not query the policy nor
* track which mode is active. Just constructs the fetcher and hands
* it off via this slot.
*
* Lifetime: the fetcher's `ctx` is allocated by the plugin. The host
* is responsible for calling `fetcher.release(fetcher.ctx)` exactly
* once when the fetcher is no longer needed (kEager: after the
* single fetch; lazy modes: when the ObjectStore entry it backs is
* dropped). `fetcher.fetch` must be thread-safe.
*
* Returns false + error on failure (binding handle invalid,
* ObjectStore push failed, etc.). On failure the host still calls
* `fetcher.release` so the plugin's ctx leaks no resources.
*/
bool (*push_message_v2)(
void* ctx, PJ_parser_binding_handle_t handle, int64_t host_timestamp_ns, PJ_payload_fetcher_t fetcher,
PJ_error_t* out_error) PJ_NOEXCEPT;
} PJ_data_source_runtime_host_vtable_t;

/** Fat pointer pairing a runtime host context with its vtable. */
Expand Down
51 changes: 50 additions & 1 deletion pj_base/include/pj_base/message_parser_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@
* append_arrow_ipc — see plugin_data_api.h. Parsers stay per-record;
* the host coalesces into Arrow batches internally.
*
* v4 appendable tail (no version bump — protocol stays at 4):
* - classify_schema, parse_scalars, parse_object: pure-functional API
* that returns typed values instead of writing to host views. Enables
* lazy materialization and removes the parser's coupling to push policy.
* See pj_base/canonical_object_abi.h for the wire format.
*
* The host obtains the plugin's vtable via `PJ_get_message_parser_vtable()`
* and drives the plugin through: create -> bind(registry) ->
* (bind_schema) -> parse* -> destroy.
* (bind_schema) -> (classify_schema) -> parse* / parseScalars / parseObject
* -> destroy.
*/
#ifndef PJ_MESSAGE_PARSER_PROTOCOL_H
#define PJ_MESSAGE_PARSER_PROTOCOL_H
Expand All @@ -19,6 +26,7 @@
#include <stddef.h>
#include <stdint.h>

#include "pj_base/canonical_object_abi.h"
#include "pj_base/plugin_data_api.h"

#ifdef __cplusplus
Expand Down Expand Up @@ -110,6 +118,47 @@ typedef struct PJ_message_parser_vtable_t {
* Tail slots beyond here are OPTIONAL. Host reads MUST check both
* struct_size and slot-nullability via PJ_HAS_TAIL_SLOT.
* ==================================================================== */

/**
* [thread-safe] A priori classification of the bound schema. Cheap; no
* payload required. Host invokes this after bind_schema(). Returns
* @p out_classification by value (POD).
*
* NULL or absent (struct_size too small) → host treats as
* PJ_CANONICAL_OBJECT_KIND_NONE.
*
* Pure-functional contract: no host side-effects.
*/
bool (*classify_schema)(
void* ctx, PJ_string_view_t type_name, PJ_bytes_view_t schema, PJ_schema_classification_t* out_classification,
PJ_error_t* out_error) PJ_NOEXCEPT;

/**
* [stream-thread] Pure-functional alternative to parse(): returns the
* scalar fields by value (out parameter) instead of writing them to the
* parser write host. The host invokes this in preference to parse() when
* available; legacy plugins keep using parse().
*
* The plugin owns @p out_fields.fields buffer; @p out_fields.release is
* called by the host when done. release MAY be NULL.
*/
bool (*parse_scalars)(
void* ctx, int64_t timestamp_ns, PJ_bytes_view_t payload, PJ_named_field_value_buffer_t* out_fields,
PJ_error_t* out_error) PJ_NOEXCEPT;

/**
* [stream-thread] Pure-functional production of a canonical object from
* the payload. Fills @p out_blob with the serialized object (see layout
* in canonical_object_abi.h). Only meaningful when classify_schema()
* returned a non-zero kind.
*
* Pure-functional contract: no writes to the object write host. The
* caller (DataSource / app) decides whether to push the blob eagerly,
* capture it inside a lazy lambda, or hand it directly to a consumer.
*/
bool (*parse_object)(
void* ctx, int64_t timestamp_ns, PJ_bytes_view_t payload, PJ_canonical_object_blob_t* out_blob,
PJ_error_t* out_error) PJ_NOEXCEPT;
} PJ_message_parser_vtable_t;
/* The vtable above is ABI-APPENDABLE: new slots may be added at the tail;
* host reads guard with PJ_HAS_TAIL_SLOT. See PJ_MESSAGE_PARSER_MIN_VTABLE_SIZE. */
Expand Down
Loading
Loading