diff --git a/pj_base/CMakeLists.txt b/pj_base/CMakeLists.txt index 12876b9..b3dece2 100644 --- a/pj_base/CMakeLists.txt +++ b/pj_base/CMakeLists.txt @@ -56,6 +56,8 @@ if(PJ_BUILD_TESTS) tests/platform_test.cpp tests/arrow_holders_test.cpp tests/media_metadata_test.cpp + tests/object_ingest_policy_test.cpp + tests/push_message_v2_test.cpp ) foreach(test_src ${PJ_BASE_TESTS}) diff --git a/pj_base/include/pj_base/canonical_object_abi.h b/pj_base/include/pj_base/canonical_object_abi.h new file mode 100644 index 0000000..f42b2e9 --- /dev/null +++ b/pj_base/include/pj_base/canonical_object_abi.h @@ -0,0 +1,65 @@ +/** + * @file canonical_object_abi.h + * @brief C ABI vocabulary for schema classification. + * + * The host invokes classify_schema (a slot in PJ_message_parser_vtable_t) + * after bind_schema to learn what kind of canonical object the parser will + * produce for that schema. The parser returns a PJ_schema_classification_t + * carrying a PJ_canonical_object_kind_t. + * + * Canonical-object production (sdk::Image / sdk::CompressedImage / + * sdk::PointCloud) and the pure-functional scalar production + * (Expected>) are C++ SDK contracts: plugins + * inheriting from MessageParserPluginBase register handlers in + * SchemaHandler, and the in-process host consumes them via + * MessageParserPluginBase::parseObject() and parseScalars() called + * directly on the C++ pointer. Pure-C plugins emit scalars via the + * parse() slot (writing to writeHost). + */ +#ifndef PJ_CANONICAL_OBJECT_ABI_H +#define PJ_CANONICAL_OBJECT_ABI_H + +#include +#include +#include + +#include "pj_base/plugin_data_api.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Canonical object kinds. Numeric values are stable across releases — never + * renumber. Returned by the classify_schema slot to advertise what kind of + * canonical object the parser will produce for this schema (or kNone if + * the parser only produces scalars). + */ +typedef enum PJ_canonical_object_kind_t { + PJ_CANONICAL_OBJECT_KIND_NONE = 0, + PJ_CANONICAL_OBJECT_KIND_IMAGE = 1, + PJ_CANONICAL_OBJECT_KIND_COMPRESSED_IMAGE = 2, + PJ_CANONICAL_OBJECT_KIND_POINTCLOUD = 3, + /* Reserve future kinds; appended at the tail. */ + /* PJ_CANONICAL_OBJECT_KIND_MARKERS = 4, */ + /* PJ_CANONICAL_OBJECT_KIND_OCCUPANCY_GRID = 5, */ +} PJ_canonical_object_kind_t; + +/** + * Schema classification — what kind a parser declares for a given schema. + * Returned a priori (without parsing payload) by the classify_schema slot. + * + * Single field plus reserved padding to keep the struct size stable across + * future minor extensions. The reserved byte must be zero today; readers + * accept any value (forward compat). + */ +typedef struct PJ_schema_classification_t { + uint16_t object_kind; /**< PJ_canonical_object_kind_t. */ + uint16_t reserved; +} PJ_schema_classification_t; + +#ifdef __cplusplus +} +#endif + +#endif /* PJ_CANONICAL_OBJECT_ABI_H */ diff --git a/pj_base/include/pj_base/data_source_protocol.h b/pj_base/include/pj_base/data_source_protocol.h index 584ede1..0161c91 100644 --- a/pj_base/include/pj_base/data_source_protocol.h +++ b/pj_base/include/pj_base/data_source_protocol.h @@ -128,6 +128,55 @@ typedef struct { uint32_t id; } PJ_parser_binding_handle_t; +/** + * Ownership token kept alive while a non-owning byte buffer is in use. + * `ctx` is opaque to the host; `release(ctx)` is invoked once when the host + * no longer needs the bytes referenced by the buffer. `ctx` MAY be NULL — + * meaning the buffer was static / borrowed from an external lifetime — in + * which case `release` is also expected to be NULL. + */ +typedef struct PJ_payload_anchor_t { + void* ctx; + void (*release)(void* ctx); +} PJ_payload_anchor_t; + +/** + * Payload bytes plus an ownership anchor. The host treats `data` as a + * non-owning view, valid until `anchor.release(anchor.ctx)` is invoked. + * + * For zero-copy ingest, the producer (DataSource plugin) returns a payload + * whose anchor keeps the source buffer (mcap chunk, mmap, …) alive. The + * host hands the same payload to a parser (which can build canonical + * objects holding spans into the buffer) and only releases the anchor when + * everyone done with the bytes. + */ +typedef struct PJ_payload_t { + const uint8_t* data; + uint64_t size; + PJ_payload_anchor_t anchor; +} PJ_payload_t; + +/** + * Idempotent fetcher of payload bytes. The host invokes `fetch(ctx, &out, + * &err)` zero, one, or many times depending on the active + * ObjectIngestPolicy and on consumer pulls. Returns true and populates + * `*out` on success; returns false and (optionally) populates `*err` on + * failure (file read error, source torn down, etc.). + * + * The host ALWAYS calls `release(ctx)` exactly once when it no longer + * needs the fetcher — at the end of ingest for kEager, when the + * corresponding ObjectStore entry is dropped for lazy modes. `release` + * MAY be NULL if the plugin manages the ctx via some external mechanism. + * + * `fetch` MUST be thread-safe: the host may invoke it from the ingest + * thread (kEager) or from consumer threads (lazy pull). + */ +typedef struct PJ_payload_fetcher_t { + void* ctx; + bool (*fetch)(void* ctx, PJ_payload_t* out_payload, PJ_error_t* out_error) PJ_NOEXCEPT; + void (*release)(void* ctx); +} PJ_payload_fetcher_t; + /** * Request to bind (or look up) a parser for a given topic. * All string views must remain valid for the duration of the call. @@ -205,6 +254,12 @@ typedef struct PJ_data_source_runtime_host_vtable_t { * @p handle must have been obtained from ensure_parser_binding. * @p host_timestamp_ns is nanoseconds since the Unix epoch * (1970-01-01T00:00:00Z). Returns false + error on failure. + * + * Eager-only push: the host parses immediately and the bytes are not + * retained for later replay. Plugins that need lazy materialization or + * ObjectIngestPolicy dispatch should use push_message_v2 instead. This + * slot remains for sources that fan-out raw bytes without an associated + * fetcher (streaming or eager-only consumers). */ bool (*push_raw_message)( void* ctx, PJ_parser_binding_handle_t handle, int64_t host_timestamp_ns, PJ_bytes_view_t payload, @@ -232,6 +287,51 @@ typedef struct PJ_data_source_runtime_host_vtable_t { * are loaded. */ const char* (*list_available_encodings)(void* ctx)PJ_NOEXCEPT; + + /* --------------------------------------------------------------------- + * Tail slots — appended after v4.0. Readers MUST gate access on + * `vtable->struct_size > offsetof(slot)` before calling. + * --------------------------------------------------------------------- */ + + /** + * [stream-thread] Push a message via a deferred byte fetcher. The plugin + * hands the host a callable that produces the payload bytes when + * invoked; the host applies the active ObjectIngestPolicy (resolved via + * the application-configured ObjectIngestPolicyResolver against + * source_id, topic, and the parser's classifySchema kind) to decide: + * + * - kEager: invoke fetcher now, parser.parseScalars + * writes columns, parser.parseObject + * materializes the canonical object into + * the ObjectStore via pushOwned. Fetcher + * released after. + * - kLazyObjectsEagerScalars: invoke fetcher now, parser.parseScalars + * writes columns. ObjectStore.pushLazy + * retains the fetcher closure for pull-time + * re-invocation; bytes dropped after + * parseScalars. + * - kPureLazy: do not invoke fetcher at ingest. Register + * ObjectStore entry that defers fetcher + * invocation until consumer pull. No + * scalar columns produced. + * + * The plugin is policy-agnostic: it does not query the policy nor + * track which mode is active. Just constructs the fetcher and hands + * it off via this slot. + * + * Lifetime: the fetcher's `ctx` is allocated by the plugin. The host + * is responsible for calling `fetcher.release(fetcher.ctx)` exactly + * once when the fetcher is no longer needed (kEager: after the + * single fetch; lazy modes: when the ObjectStore entry it backs is + * dropped). `fetcher.fetch` must be thread-safe. + * + * Returns false + error on failure (binding handle invalid, + * ObjectStore push failed, etc.). On failure the host still calls + * `fetcher.release` so the plugin's ctx leaks no resources. + */ + bool (*push_message_v2)( + void* ctx, PJ_parser_binding_handle_t handle, int64_t host_timestamp_ns, PJ_payload_fetcher_t fetcher, + PJ_error_t* out_error) PJ_NOEXCEPT; } PJ_data_source_runtime_host_vtable_t; /** Fat pointer pairing a runtime host context with its vtable. */ diff --git a/pj_base/include/pj_base/message_parser_protocol.h b/pj_base/include/pj_base/message_parser_protocol.h index e3c06ae..fc34507 100644 --- a/pj_base/include/pj_base/message_parser_protocol.h +++ b/pj_base/include/pj_base/message_parser_protocol.h @@ -8,9 +8,16 @@ * append_arrow_ipc — see plugin_data_api.h. Parsers stay per-record; * the host coalesces into Arrow batches internally. * + * Pure-functional production (scalars by value, canonical objects by + * value with BufferAnchor) is a C++ SDK contract: parsers inheriting from + * MessageParserPluginBase register handlers in SchemaHandler and the + * in-process host calls parseScalars() / parseObject() directly on the + * C++ pointer. Pure-C plugins use the parse() slot to write scalars to + * writeHost. + * * The host obtains the plugin's vtable via `PJ_get_message_parser_vtable()` * and drives the plugin through: create -> bind(registry) -> - * (bind_schema) -> parse* -> destroy. + * (bind_schema) -> (classify_schema) -> parse -> destroy. */ #ifndef PJ_MESSAGE_PARSER_PROTOCOL_H #define PJ_MESSAGE_PARSER_PROTOCOL_H @@ -19,6 +26,7 @@ #include #include +#include "pj_base/canonical_object_abi.h" #include "pj_base/plugin_data_api.h" #ifdef __cplusplus @@ -110,6 +118,20 @@ typedef struct PJ_message_parser_vtable_t { * Tail slots beyond here are OPTIONAL. Host reads MUST check both * struct_size and slot-nullability via PJ_HAS_TAIL_SLOT. * ==================================================================== */ + + /** + * [thread-safe] A priori classification of the bound schema. Cheap; no + * payload required. Host invokes this after bind_schema(). Returns + * @p out_classification by value (POD). + * + * NULL or absent (struct_size too small) → host treats as + * PJ_CANONICAL_OBJECT_KIND_NONE. + * + * Pure-functional contract: no host side-effects. + */ + bool (*classify_schema)( + void* ctx, PJ_string_view_t type_name, PJ_bytes_view_t schema, PJ_schema_classification_t* out_classification, + PJ_error_t* out_error) PJ_NOEXCEPT; } PJ_message_parser_vtable_t; /* The vtable above is ABI-APPENDABLE: new slots may be added at the tail; * host reads guard with PJ_HAS_TAIL_SLOT. See PJ_MESSAGE_PARSER_MIN_VTABLE_SIZE. */ diff --git a/pj_base/include/pj_base/sdk/canonical_object.hpp b/pj_base/include/pj_base/sdk/canonical_object.hpp new file mode 100644 index 0000000..5e5aae0 --- /dev/null +++ b/pj_base/include/pj_base/sdk/canonical_object.hpp @@ -0,0 +1,284 @@ +/** + * @file canonical_object.hpp + * @brief Canonical object types produced by MessageParser plugins and consumed + * by widgets and toolboxes. + * + * This header defines the vocabulary that bridges parser plugins (which + * understand wire formats: ROS, Foxglove, Protobuf, etc.) and consumer code + * (widgets, toolboxes) that renders or processes the result. The ObjectStore + * itself remains agnostic to these types — it stores opaque bytes; the + * decoding into a CanonicalObject happens in the consumer at pull time, by + * invoking the parser's parseObject() against the bytes. + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "pj_base/span.hpp" +#include "pj_base/types.hpp" + +namespace PJ { +namespace sdk { + +// ----------------------------------------------------------------------------- +// Schema classification +// ----------------------------------------------------------------------------- + +// ----------------------------------------------------------------------------- +// Buffer anchor — type-erased ownership token shared between a payload buffer +// and any non-owning views derived from it. Carries no data, only keeps the +// underlying allocation alive while at least one anchor copy exists. Concrete +// typical type erased here is std::shared_ptr>; consumers +// never need to know. +// ----------------------------------------------------------------------------- + +using BufferAnchor = std::shared_ptr; + +/// Non-owning view + ownership anchor of a payload buffer. Used by the host +/// to hand a parser a message payload without committing to a copy: the parser +/// reads `bytes` and, in the canonical object it returns, may keep a Span into +/// the same memory plus a copy of `anchor` so the bytes outlive the parse call. +/// +/// `anchor` may be empty when the caller does not share ownership — in that +/// case the parser must materialize any bytes it wants to retain (the C ABI +/// trampoline path is the typical case; in-process direct calls are expected +/// to provide a non-empty anchor). +struct PayloadView { + Span bytes; + BufferAnchor anchor; +}; + +/// What kind of canonical object a parser produces for a given schema. +/// Returned a priori (without parsing payload) by classifySchema(). kNone means +/// the parser only produces scalars for the Datastore — no ObjectTopic to +/// register. +enum class CanonicalObjectKind : uint16_t { + kNone = 0, + kImage = 1, ///< sdk::Image — pixels already in canonical PixelFormat. + kCompressedImage = 2, ///< sdk::CompressedImage — JPEG/PNG/QOI bytes, undecoded. + kPointCloud = 3, ///< sdk::PointCloud — packed points + per-channel field layout. + // Reserved for future kinds; keep numeric values stable across releases. + // kMarkers = 4, + // kOccupancyGrid = 5, +}; + +/// A priori classification of a schema, returned by MessageParser::classifySchema(). +/// Currently a single field; struct (vs raw enum) leaves room to attach +/// declarative metadata later (preferred cache size, expected rate, etc.) without +/// breaking the API. What deliberately does NOT belong here: parse cost hints +/// (the DataSource knows the payload size), retention policy, eager/lazy choice. +struct SchemaClassification { + CanonicalObjectKind object_kind = CanonicalObjectKind::kNone; +}; + +// ----------------------------------------------------------------------------- +// Pixel formats — canonical for sdk::Image +// ----------------------------------------------------------------------------- + +/// Canonical pixel format for sdk::Image. The buffer may include row padding +/// (sdk::Image::row_step >= width * bytesPerPixel(format)); consumers must +/// honor row_step rather than assuming tightly-packed. +/// +/// Both R-G-B and B-G-R orderings are first-class citizens. ROS bgr8/bgra8 +/// (and many machine-vision sources) deliver bytes in B-G-R order natively; +/// keeping the byte order in the format tag (instead of swizzling at parse +/// time) lets the consumer hand bytes straight to a renderer that supports +/// GL_BGR / GL_BGRA texture uploads — zero-copy all the way. +/// +/// Note: pj_scene2D (and other consumers) currently define their own pixel +/// format. Harmonizing on this canonical enum is part of consumer-side +/// migration; this header defines the SDK-level vocabulary. +enum class PixelFormat : uint16_t { + kUnknown = 0, + kRGB888 = 1, ///< 3 bytes/pixel, R-G-B order. + kRGBA8888 = 2, ///< 4 bytes/pixel, R-G-B-A order. + kMono8 = 3, ///< 1 byte/pixel, grayscale. + kMono16 = 4, ///< 2 bytes/pixel, grayscale (depth, etc.); see is_bigendian. + kBGR888 = 5, ///< 3 bytes/pixel, B-G-R order (ROS bgr8, OpenCV native). + kBGRA8888 = 6, ///< 4 bytes/pixel, B-G-R-A order (ROS bgra8). +}; + +/// Bytes per pixel for a given format. Returns 0 for kUnknown. +[[nodiscard]] constexpr uint32_t bytesPerPixel(PixelFormat format) noexcept { + switch (format) { + case PixelFormat::kRGB888: + case PixelFormat::kBGR888: + return 3; + case PixelFormat::kRGBA8888: + case PixelFormat::kBGRA8888: + return 4; + case PixelFormat::kMono8: + return 1; + case PixelFormat::kMono16: + return 2; + case PixelFormat::kUnknown: + return 0; + } + return 0; +} + +// ----------------------------------------------------------------------------- +// sdk::Image — already-decoded image +// ----------------------------------------------------------------------------- + +/// Image already decoded into a canonical pixel format. If the producer +/// (parser) returns this, the consumer can upload the pixels directly to a +/// renderer (QRhi or otherwise) without going through any codec. +/// +/// Layout: `pixels` is a non-owning view of size at least `row_step * height`. +/// `row_step` may exceed `width * bytesPerPixel(pixel_format)` when the wire +/// format included per-row padding; consumers must honor it. `anchor` keeps +/// the underlying buffer alive — the parser may have made `pixels` a view +/// into the source payload (zero-copy) or into a freshly-allocated vector +/// (when the wire format required conversion); consumers don't need to know +/// which. +/// +/// For mono16 buffers `is_bigendian` indicates the byte order of each sample; +/// otherwise it is unused. RGB/BGR ordering is encoded in `pixel_format`. +struct Image { + uint32_t width = 0; + uint32_t height = 0; + PixelFormat pixel_format = PixelFormat::kUnknown; + uint32_t row_step = 0; + bool is_bigendian = false; + Span pixels; + BufferAnchor anchor; + Timestamp timestamp_ns = 0; +}; + +// ----------------------------------------------------------------------------- +// sdk::CompressedImage — undecoded compressed image bytes +// ----------------------------------------------------------------------------- + +/// Image still in compressed wire format (JPEG/PNG/QOI). The consumer is +/// expected to run it through the appropriate codec (pj_scene2D::JpegCodec, +/// PngCodec, etc.) to obtain an sdk::Image. +/// +/// The parser does NOT decompress: it only extracts the compressed payload +/// from whatever wrapper the wire format used (CDR for ROS2, etc.) and tags it +/// with the format. +struct CompressedImage { + enum class Format : uint8_t { + kUnknown = 0, + kJPEG = 1, + kPNG = 2, + kQOI = 3, + }; + + /// Auxiliary metadata that some wrappers attach to the compressed bytes + /// and that the consumer needs to decode correctly. The parser fills the + /// fields it can; consumers ignore those they don't care about. + struct Extras { + /// For ROS compressedDepth: the depth-quantization range to use after + /// PNG decoding. Both nullopt for non-depth compressed images. + std::optional compressed_depth_min; + std::optional compressed_depth_max; + }; + + Format format = Format::kUnknown; + Span bytes; + BufferAnchor anchor; + Timestamp timestamp_ns = 0; + Extras extras; +}; + +// ----------------------------------------------------------------------------- +// sdk::PointCloud — packed point cloud +// ----------------------------------------------------------------------------- + +/// Description of one channel inside a packed point cloud (x, y, z, intensity, +/// rgb, ring, time, …). Mirrors the shape of sensor_msgs/PointField but the +/// type is canonical PJ vocabulary, not a ROS-specific enum. +struct PointField { + enum class Datatype : uint8_t { + kUnknown = 0, + kInt8 = 1, + kUint8 = 2, + kInt16 = 3, + kUint16 = 4, + kInt32 = 5, + kUint32 = 6, + kFloat32 = 7, + kFloat64 = 8, + }; + + std::string name; + uint32_t offset = 0; ///< Byte offset of this field within a single point. + Datatype datatype = Datatype::kUnknown; + uint32_t count = 1; ///< Number of elements of `datatype` (typically 1). +}; + +/// Bytes per element for a given PointField datatype. Returns 0 for kUnknown. +[[nodiscard]] constexpr uint32_t bytesPerElement(PointField::Datatype dt) noexcept { + switch (dt) { + case PointField::Datatype::kInt8: + case PointField::Datatype::kUint8: + return 1; + case PointField::Datatype::kInt16: + case PointField::Datatype::kUint16: + return 2; + case PointField::Datatype::kInt32: + case PointField::Datatype::kUint32: + case PointField::Datatype::kFloat32: + return 4; + case PointField::Datatype::kFloat64: + return 8; + case PointField::Datatype::kUnknown: + return 0; + } + return 0; +} + +/// Packed point cloud. The `data` buffer holds `width * height` points, each +/// occupying `point_step` bytes laid out per `fields`. `is_dense=false` means +/// some points may be invalid (typically NaN-filled). +struct PointCloud { + uint32_t width = 0; + uint32_t height = 1; + uint32_t point_step = 0; ///< Bytes per point. + uint32_t row_step = 0; ///< Bytes per row (= point_step * width when no padding). + bool is_bigendian = false; + bool is_dense = true; + std::vector fields; + Span data; + BufferAnchor anchor; + Timestamp timestamp_ns = 0; +}; + +// ----------------------------------------------------------------------------- +// CanonicalObject — variant carried by parser->parseObject() +// ----------------------------------------------------------------------------- + +/// Sum type of all canonical objects a parser may produce. New alternatives +/// (kMarkers, kOccupancyGrid, …) are appended at the tail and announced via +/// CanonicalObjectKind. Plugins built against an older SDK keep producing +/// the alternatives they know; hosts built against an older SDK that receive +/// an unknown kind reject the message rather than crashing. Forward-compatible +/// — no protocol bump required. +using CanonicalObject = std::variant; + +/// Helper: get the kind tag for a CanonicalObject without unpacking it. +[[nodiscard]] inline CanonicalObjectKind kindOf(const CanonicalObject& obj) noexcept { + return std::visit( + [](const auto& concrete) -> CanonicalObjectKind { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return CanonicalObjectKind::kImage; + } else if constexpr (std::is_same_v) { + return CanonicalObjectKind::kCompressedImage; + } else if constexpr (std::is_same_v) { + return CanonicalObjectKind::kPointCloud; + } else { + return CanonicalObjectKind::kNone; + } + }, + obj); +} + +} // namespace sdk +} // namespace PJ diff --git a/pj_base/include/pj_base/sdk/data_source_host_views.hpp b/pj_base/include/pj_base/sdk/data_source_host_views.hpp index 7dc191d..bce4836 100644 --- a/pj_base/include/pj_base/sdk/data_source_host_views.hpp +++ b/pj_base/include/pj_base/sdk/data_source_host_views.hpp @@ -18,9 +18,11 @@ #include #include #include +#include #include "pj_base/data_source_protocol.h" #include "pj_base/expected.hpp" +#include "pj_base/sdk/canonical_object.hpp" #include "pj_base/sdk/plugin_data_api.hpp" namespace PJ { @@ -217,6 +219,96 @@ class DataSourceRuntimeHostView { return okStatus(); } + /// Push a message via a deferred byte fetcher. The DataSource hands the + /// host a callable that produces the payload bytes when invoked. The host + /// applies the active ObjectIngestPolicy (resolved via the + /// ObjectIngestPolicyResolver below for source_id, topic, kind) to decide + /// whether to invoke the fetcher at ingest, only on consumer pull, or + /// never. The DataSource is policy-agnostic — it neither queries the + /// policy nor tracks which mode is active. + /// + /// The fetcher MUST be idempotent — the host may invoke it zero, one, or + /// many times depending on policy and consumer pulls. It MUST be + /// thread-safe: invocations may come from the ingest thread (kEager) or + /// from consumer threads (lazy pulls). Capture by shared_ptr (file + /// readers, mcap chunks) so the source buffer outlives every pending + /// pull. + /// + /// Fetcher return type: + /// - sdk::PayloadView { bytes, anchor } — preferred, zero-copy. The + /// anchor is propagated through the C ABI as a heap-held shared_ptr + /// copy that the host releases when no longer needed. + /// - std::vector — legacy form. The vector is + /// heap-relocated and used as its own anchor; bytes survive across + /// the C ABI boundary at the cost of one alloc-and-move. + /// + /// The host MUST advertise the push_message_v2 tail slot. We wrap the + /// closure into a PJ_payload_fetcher_t and hand it over verbatim; the + /// host applies ObjectIngestPolicy and decides when (and whether) to + /// invoke it. There is no legacy fallback: a host that doesn't expose + /// the slot returns an explicit error here rather than silently + /// degrading to a kEager push_raw_message. + template + [[nodiscard]] Status pushMessage(ParserBindingHandle handle, Timestamp host_timestamp_ns, Fetcher&& fetcher) const { + using FetcherT = std::decay_t; + using FetcherResult = std::decay_t>; + static_assert( + std::is_same_v || std::is_same_v>, + "Fetcher must return sdk::PayloadView (zero-copy) or std::vector"); + + if (!valid()) { + return unexpected(std::string("runtime host is not bound")); + } + if (!PJ_HAS_TAIL_SLOT(PJ_data_source_runtime_host_vtable_t, host_.vtable, push_message_v2)) { + return unexpected(std::string("runtime host does not expose push_message_v2")); + } + + auto* ctx = new FetcherT(std::forward(fetcher)); + + PJ_payload_fetcher_t abi_fetcher{ + .ctx = ctx, + .fetch = +[](void* c, PJ_payload_t* out, PJ_error_t* err) noexcept -> bool { + try { + auto& fn = *static_cast(c); + using Result = std::decay_t; + if constexpr (std::is_same_v) { + // Zero-copy path: hold a heap copy of the BufferAnchor so it + // survives across the C ABI; release_fn deletes the holder + // (and decrements the underlying shared_ptr ref count). + auto pv = fn(); + auto* held = new sdk::BufferAnchor(std::move(pv.anchor)); + out->data = pv.bytes.data(); + out->size = pv.bytes.size(); + out->anchor.ctx = held; + out->anchor.release = +[](void* h) noexcept { delete static_cast(h); }; + } else { + // Closure returns std::vector: heap-hold the vector; + // it owns its bytes. + auto* held = new std::vector(fn()); + out->data = held->data(); + out->size = held->size(); + out->anchor.ctx = held; + out->anchor.release = +[](void* h) noexcept { delete static_cast*>(h); }; + } + return true; + } catch (const std::exception& e) { + sdk::fillError(err, 1, "plugin", e.what()); + return false; + } catch (...) { + sdk::fillError(err, 1, "plugin", "unknown exception in payload fetcher"); + return false; + } + }, + .release = +[](void* c) noexcept { delete static_cast(c); }, + }; + + PJ_error_t err{}; + if (!host_.vtable->push_message_v2(host_.ctx, handle, host_timestamp_ns, abi_fetcher, &err)) { + return unexpected(errorToString(err)); + } + return okStatus(); + } + /** * Display a modal message box and wait for user response. * @return The button clicked, or kOk if the host does not support dialogs. diff --git a/pj_base/include/pj_base/sdk/detail/message_parser_trampolines.hpp b/pj_base/include/pj_base/sdk/detail/message_parser_trampolines.hpp index caa56b6..90cd721 100644 --- a/pj_base/include/pj_base/sdk/detail/message_parser_trampolines.hpp +++ b/pj_base/include/pj_base/sdk/detail/message_parser_trampolines.hpp @@ -127,4 +127,32 @@ inline const void* MessageParserPluginBase::trampoline_get_plugin_extension(void } } +// ----------------------------------------------------------------------------- +// Pure-functional API trampolines (canonical-object tail of the vtable) +// ----------------------------------------------------------------------------- + +inline bool MessageParserPluginBase::trampoline_classify_schema( + void* ctx, PJ_string_view_t type_name, PJ_bytes_view_t schema, PJ_schema_classification_t* out_classification, + PJ_error_t* out_error) noexcept { + auto* self = static_cast(ctx); + if (out_classification == nullptr) { + self->storeError(out_error, 2, "plugin", "classify_schema called with null out_classification"); + return false; + } + try { + auto name_sv = type_name.data == nullptr ? std::string_view{} : std::string_view(type_name.data, type_name.size); + Span schema_span(schema.data, schema.size); + const auto cls = self->classifySchema(name_sv, schema_span); + out_classification->object_kind = static_cast(cls.object_kind); + out_classification->reserved = 0; + return true; + } catch (const std::exception& e) { + self->storeError(out_error, 1, "plugin", std::string("classify_schema threw: ") + e.what()); + return false; + } catch (...) { + self->storeError(out_error, 1, "plugin", "unknown exception in classify_schema"); + return false; + } +} + } // namespace PJ diff --git a/pj_base/include/pj_base/sdk/message_parser_plugin_base.hpp b/pj_base/include/pj_base/sdk/message_parser_plugin_base.hpp index f2b88a9..e5ca74e 100644 --- a/pj_base/include/pj_base/sdk/message_parser_plugin_base.hpp +++ b/pj_base/include/pj_base/sdk/message_parser_plugin_base.hpp @@ -13,18 +13,49 @@ #include #include +#include #include #include +#include #include +#include #include "pj_base/expected.hpp" #include "pj_base/message_parser_protocol.h" #include "pj_base/plugin_abi_export.h" +#include "pj_base/sdk/canonical_object.hpp" #include "pj_base/sdk/plugin_data_api.hpp" #include "pj_base/sdk/service_registry.hpp" #include "pj_base/sdk/service_traits.hpp" namespace PJ { +namespace sdk { + +/// Per-schema handler bundle: classification + the two parse routes for one +/// schema type. Plugins build a table of these in their constructor; the +/// MessageParserPluginBase base class then implements classifySchema / +/// parseScalars / parseObject as final lookups into the table. +/// +/// Either parse_scalars or parse_object may be null (or both), reflecting +/// schemas that produce only scalars, only objects, or that the plugin +/// recognizes but routes through the legacy parse() path. +struct SchemaHandler { + CanonicalObjectKind object_kind = CanonicalObjectKind::kNone; + + /// Scalar route: returns owned column data — no anchor needed because the + /// returned vector and any string_views inside it are materialized by the + /// parser, independent of the caller's payload buffer. + std::function>(Timestamp, Span)> parse_scalars; + + /// Canonical-object route: takes a PayloadView so the parser can return a + /// CanonicalObject whose internal Span(s) reference the same underlying + /// buffer (zero-copy). The parser propagates `payload.anchor` into the + /// returned object so its bytes outlive this call. When the caller passes + /// an empty anchor, the parser must materialize whatever it wants to retain. + std::function(Timestamp, PayloadView)> parse_object; +}; + +} // namespace sdk /** * Base class for MessageParser plugins (protocol v4). @@ -59,10 +90,26 @@ class MessageParserPluginBase { return okStatus(); } - /// Bind a message schema. Default is no-op (for parsers that don't need schema). + /// Bind a message schema. The base implementation records the type name + /// verbatim so subsequent parseScalars / parseObject calls can dispatch + /// against the registered handler table without needing it as a parameter. + /// + /// The base does NO domain-specific normalization on the type name — + /// the SDK has no idea whether a name like \"pkg/msg/Type\" is valid or + /// equivalent to \"pkg/Type\" in some plugin's domain (that\'s a ROS-2 + /// convention, not a general one). Plugins that have their own naming + /// convention should apply it here, in their override, before delegating + /// to MessageParserPluginBase::bindSchema with the canonical form. They + /// must also use that same canonical form when calling + /// registerSchemaHandler. + /// + /// Subclasses that override this MUST call MessageParserPluginBase::bindSchema() + /// first (or set bound_type_name_ themselves) before any plugin-specific + /// schema setup, otherwise the table-based dispatch will fail to find the + /// schema's handler. virtual Status bindSchema(std::string_view type_name, Span schema) { - (void)type_name; (void)schema; + bound_type_name_.assign(type_name); return okStatus(); } @@ -75,8 +122,142 @@ class MessageParserPluginBase { return okStatus(); } - /// Parse one raw message and write decoded fields via writeHost(). PURE VIRTUAL. - virtual Status parse(Timestamp timestamp_ns, Span payload) = 0; + /// Parse one raw message and write decoded fields via writeHost(). + /// + /// The default implementation dispatches through the SchemaHandler table: + /// it invokes parseScalars() (which looks up the registered handler for + /// bound_type_name_) and shovels the returned vector to + /// writeHost().appendRecord(). Plugins that register all their schemas + /// via registerSchemaHandler() therefore inherit a working parse() for + /// free — no override needed. + /// + /// Subclasses MAY override to (a) add a fallback for type names not in + /// the registered table (e.g. a ROS-style generic flattener that handles + /// any message whose schema definition is known to the plugin), or + /// (b) retain a fully imperative implementation during migration to the + /// table-based dispatch. Plugins that have already migrated do not need + /// to override. + /// + /// This entry point exists for compatibility with the legacy v4 ingest + /// path (host calls parser.parse() directly to push fields to writeHost). + /// New host code should prefer pushing through parseScalars() / parseObject() + /// — the pure-functional pair enables lazy materialization, because the + /// caller (DataSource / app) needs the result returned, not pushed. Once + /// every host migrates to that path, parse() will be deprecated. + virtual Status parse(Timestamp timestamp_ns, Span payload) { + if (!writeHostBound()) { + return unexpected(std::string("write host not bound")); + } + auto fields = parseScalars(timestamp_ns, payload); + if (!fields) { + return unexpected(std::move(fields).error()); + } + if (fields->empty()) { + return okStatus(); + } + return writeHost().appendRecord(timestamp_ns, Span(fields->data(), fields->size())); + } + + // --------------------------------------------------------------------------- + // Pure-functional API + // --------------------------------------------------------------------------- + // + // Design principle: the parser does NOT decide push policy (eager vs lazy) + // and does NOT decide where the result goes (Datastore, ObjectStore, none). + // Both decisions belong to the caller (DataSource / app). The parser is + // strictly a translator: bytes in, typed values out. Always eager when + // invoked — there is no internal deferral. Lazyness is modeled by callers + // wrapping these methods inside a lambda that fires on pull. + // + // Plugins extend the parser by populating a per-schema handler table in + // the constructor (registerSchemaHandler). The base class implements + // classifySchema / parseScalars / parseObject as `final` lookups into that + // table, invoked by the host directly on a MessageParserPluginBase* pointer + // (no vtable indirection, no cross-ABI copy). + + /// Register a handler for one schema type name. Typically called once per + /// supported schema in the plugin's constructor. + /// + /// The type_name is stored verbatim — the base class does no domain- + /// specific normalization. Plugins that have their own naming convention + /// (e.g. ROS-2 \"pkg/msg/Type\" vs ROS-1 \"pkg/Type\") must register and + /// look up using a single canonical form they pick. The base class will + /// look up handlers using the bound_type_name_ value the plugin set in + /// bindSchema, so the two must agree on the convention. + /// + /// Either `handler.parse_scalars` or `handler.parse_object` may be null — + /// the base class returns the appropriate unexpected when an absent route + /// is invoked for that schema. + void registerSchemaHandler(std::string_view type_name, sdk::SchemaHandler handler) { + handlers_.insert_or_assign(std::string(type_name), std::move(handler)); + } + + /// Strict lookup — returns nullptr if no handler is registered for this + /// exact type name. Caller must not retain the pointer past the next + /// mutation of the handler table. There is no fallback / default + /// mechanism in the SDK: a plugin that wants behaviour for unknown + /// types is expected to register a handler under the bound name itself + /// (typically inside its bindSchema override). + [[nodiscard]] const sdk::SchemaHandler* findSchemaHandler(std::string_view type_name) const { + auto it = handlers_.find(std::string(type_name)); + if (it == handlers_.end()) { + return nullptr; + } + return &it->second; + } + + /// Lookup against the registered handler table. Marked `final`: plugins + /// populate the table via registerSchemaHandler() rather than overriding. + /// The C ABI trampolines call this on MessageParserPluginBase*; a derived + /// override would never be invoked, so the compiler rejects it explicitly. + /// Returns kNone when no handler is registered for this type name. + /// + /// `type_name` is passed as a parameter (rather than using bound_type_name_) + /// because classification may be queried for any schema this parser handles, + /// including before bindSchema has fixed the instance to one. + virtual sdk::SchemaClassification classifySchema(std::string_view type_name, Span schema) const final { + (void)schema; + if (const auto* h = findSchemaHandler(type_name)) { + return {h->object_kind}; + } + return {}; + } + + /// Invoke the registered scalar handler for the currently-bound schema. + /// Returns unexpected if no handler is registered, or if the registered + /// handler did not provide a parse_scalars callable. Marked `final` — see + /// classifySchema above for the rationale. + virtual Expected> parseScalars( + Timestamp timestamp_ns, Span payload) final { + const auto* h = findSchemaHandler(bound_type_name_); + if (h == nullptr) { + return unexpected(std::string("parser does not register schema: ") + bound_type_name_); + } + if (!h->parse_scalars) { + return unexpected(std::string("registered handler has no parse_scalars: ") + bound_type_name_); + } + return h->parse_scalars(timestamp_ns, payload); + } + + /// Invoke the registered object handler for the currently-bound schema. + /// Returns unexpected if no handler is registered, or if the registered + /// handler did not provide a parse_object callable (i.e. this schema + /// produces only scalars). Marked `final` — see classifySchema above. + /// + /// `payload.anchor` may be empty; in that case the parser is expected to + /// materialize anything it wants to outlive this call. In-process callers + /// that already own the payload buffer should pass a non-empty anchor so + /// the parser can return a zero-copy CanonicalObject. + virtual Expected parseObject(Timestamp timestamp_ns, sdk::PayloadView payload) final { + const auto* h = findSchemaHandler(bound_type_name_); + if (h == nullptr) { + return unexpected(std::string("parser does not register schema: ") + bound_type_name_); + } + if (!h->parse_object) { + return unexpected(std::string("registered handler has no parse_object: ") + bound_type_name_); + } + return h->parse_object(timestamp_ns, payload); + } /// Return a pointer to a static plugin-exposed extension for @p id, or /// nullptr if unknown. Default returns nullptr. @@ -104,6 +285,7 @@ class MessageParserPluginBase { trampoline_load_config, trampoline_parse, trampoline_get_plugin_extension, + trampoline_classify_schema, }; return &vt; } @@ -130,12 +312,26 @@ class MessageParserPluginBase { return write_host_view_.valid(); } + protected: + /// Last type name received by bindSchema, stored verbatim. Used by the + /// table-based dispatch in classifySchema / parseScalars / parseObject: + /// the base looks up the handler for this string in the registered table. + /// + /// Subclasses that override bindSchema must either call the base class + /// implementation or set this member themselves. If the plugin has its + /// own naming convention, the canonical form it picks must be the same + /// here and at registerSchemaHandler — the base does not normalize. + std::string bound_type_name_; + private: sdk::ServiceRegistry service_registry_{}; sdk::ParserWriteHostView write_host_view_{PJ_parser_write_host_t{}}; sdk::ParserObjectWriteHostView object_write_host_view_{}; std::string config_buf_; + // Schema handler table populated by the plugin via registerSchemaHandler(). + std::unordered_map handlers_; + static void storeError(PJ_error_t* out_error, int32_t code, std::string_view domain, std::string_view message) { sdk::fillError(out_error, code, domain, message); } @@ -149,6 +345,9 @@ class MessageParserPluginBase { static bool trampoline_parse( void* ctx, int64_t timestamp_ns, PJ_bytes_view_t payload, PJ_error_t* out_error) noexcept; static const void* trampoline_get_plugin_extension(void* ctx, PJ_string_view_t id) noexcept; + static bool trampoline_classify_schema( + void* ctx, PJ_string_view_t type_name, PJ_bytes_view_t schema, PJ_schema_classification_t* out_classification, + PJ_error_t* out_error) noexcept; }; } // namespace PJ diff --git a/pj_base/include/pj_base/sdk/object_ingest_policy.hpp b/pj_base/include/pj_base/sdk/object_ingest_policy.hpp new file mode 100644 index 0000000..7ab5bda --- /dev/null +++ b/pj_base/include/pj_base/sdk/object_ingest_policy.hpp @@ -0,0 +1,115 @@ +/** + * @file object_ingest_policy.hpp + * @brief Configurable policy that the host applies when a DataSource hands + * it a deferred byte fetcher via DataSourceRuntimeHostView::pushMessage. + * + * The DataSource is policy-agnostic: it only fabricates a callable that + * produces the raw payload bytes when invoked. The host decides — based on + * the policy resolved for (source_id, topic, kind) — whether to invoke the + * fetcher immediately (parse and store now), invoke it once for scalars + * and again on each pull, or never invoke it during ingest and only on + * consumer pulls. + */ +#pragma once + +#include +#include +#include + +#include "pj_base/sdk/canonical_object.hpp" + +namespace PJ { +namespace sdk { + +enum class ObjectIngestPolicy : uint8_t { + /// Host never invokes the fetcher during ingest. The (timestamp, fetcher) + /// pair is registered in the ObjectStore and the fetcher fires only when a + /// consumer pulls. No scalar timeseries are produced for this topic — its + /// scalar fields (header.stamp, width, height, …) do not appear in the + /// Datastore. The topic shows up as an ObjectTopic without children in the + /// unified curve tree. Best for very large blobs (point clouds, 4K video) + /// when scalar timeseries are not interesting. + kPureLazy, + + /// Host invokes the fetcher once during ingest to obtain bytes; parser's + /// parseScalars runs and writes scalar fields to the Datastore; bytes are + /// then dropped from RAM. The ObjectStore retains only the fetcher closure + /// for re-invocation on pull (which means the file/source is read again). + /// Best for the common case: scalar timeseries appear in the tree, the + /// blob does not stay in RAM, and pulls re-read on demand. + kLazyObjectsEagerScalars, + + /// Host invokes the fetcher once during ingest, parser's parseScalars and + /// parseObject both run, the canonical object is serialized into the + /// ObjectStore via pushOwned. Pull is trivial — bytes are already there. + /// Highest memory cost; the only viable mode for streaming sources that + /// have no persistent reader to re-read from. Streaming-only fallback. + kEager, +}; + +/// Resolver with hierarchical overrides: +/// +/// topic > data_source > kind > default +/// +/// The application sets the levels it cares about during setup; the host +/// queries resolve(source_id, topic, kind) for each message. The resolver +/// is intentionally an opaque carrier — its policy decisions are the +/// host's concern, not the DataSource plugin's. +/// +/// Typical setup: +/// +/// resolver.setDefault(kLazyObjectsEagerScalars); +/// resolver.setForKind(CanonicalObjectKind::kCompressedImage, kPureLazy); +/// resolver.setForKind(CanonicalObjectKind::kPointCloud, kPureLazy); +/// // kImage stays at kLazyObjectsEagerScalars: width/height/encoding columns are useful +/// +class ObjectIngestPolicyResolver { + public: + /// Default policy applied when no more specific override matches. + void setDefault(ObjectIngestPolicy policy) { + default_ = policy; + } + + /// Override the default for a specific canonical object kind. Useful when + /// (e.g.) all PointCloud2 topics should be lazy regardless of source. + void setForKind(CanonicalObjectKind kind, ObjectIngestPolicy policy) { + by_kind_[kind] = policy; + } + + /// Override the default for all topics of a specific DataSource, keyed by + /// the plugin manifest "id". + void setForDataSource(std::string_view source_id, ObjectIngestPolicy policy) { + by_source_[std::string(source_id)] = policy; + } + + /// Override the default for a specific topic name. Highest precedence. + void setForTopic(std::string_view topic_name, ObjectIngestPolicy policy) { + by_topic_[std::string(topic_name)] = policy; + } + + /// Resolve the policy for a given (source_id, topic_name, object_kind). + /// Precedence: topic > source > kind > default. The first match wins — + /// no merging or composition between levels. + [[nodiscard]] ObjectIngestPolicy resolve( + std::string_view source_id, std::string_view topic_name, CanonicalObjectKind object_kind) const { + if (auto it = by_topic_.find(std::string(topic_name)); it != by_topic_.end()) { + return it->second; + } + if (auto it = by_source_.find(std::string(source_id)); it != by_source_.end()) { + return it->second; + } + if (auto it = by_kind_.find(object_kind); it != by_kind_.end()) { + return it->second; + } + return default_; + } + + private: + ObjectIngestPolicy default_ = ObjectIngestPolicy::kLazyObjectsEagerScalars; + std::unordered_map by_kind_; + std::unordered_map by_source_; + std::unordered_map by_topic_; +}; + +} // namespace sdk +} // namespace PJ diff --git a/pj_base/tests/abi_layout_sentinels_test.cpp b/pj_base/tests/abi_layout_sentinels_test.cpp index ccbf2fe..59777ef 100644 --- a/pj_base/tests/abi_layout_sentinels_test.cpp +++ b/pj_base/tests/abi_layout_sentinels_test.cpp @@ -79,7 +79,8 @@ static_assert(offsetof(PJ_message_parser_vtable_t, struct_size) == 4, "v4 prefix static_assert(offsetof(PJ_message_parser_vtable_t, bind) == 32, "v4 bind slot pinned"); static_assert(offsetof(PJ_message_parser_vtable_t, parse) == 64, "v4 parse slot pinned"); static_assert(offsetof(PJ_message_parser_vtable_t, get_plugin_extension) == 72, "v4 last baseline slot pinned"); -static_assert(sizeof(PJ_message_parser_vtable_t) == 80, "MessageParser vtable size (update deliberately on append)"); +// 80 baseline (v4.0) + 1 tail slot × 8 bytes = 88. +static_assert(sizeof(PJ_message_parser_vtable_t) == 88, "MessageParser vtable size (update deliberately on append)"); static_assert(PJ_MESSAGE_PARSER_MIN_VTABLE_SIZE == 80, "MIN vtable size is pinned at v4.0 — NEVER INCREASE"); static_assert(PJ_MESSAGE_PARSER_MIN_VTABLE_SIZE <= sizeof(PJ_message_parser_vtable_t), "MIN must never exceed current"); @@ -93,6 +94,44 @@ static_assert(sizeof(PJ_toolbox_vtable_t) == 88, "Toolbox vtable size (update de static_assert(PJ_TOOLBOX_MIN_VTABLE_SIZE == 88, "MIN vtable size is pinned at v4.0 — NEVER INCREASE"); static_assert(PJ_TOOLBOX_MIN_VTABLE_SIZE <= sizeof(PJ_toolbox_vtable_t), "MIN must never exceed current"); +// --- Canonical-object pipeline structs --------------------------------------- +// Public ABI types crossing the boundary for the v4 canonical-object pipeline. +// Sizes and offsets are pinned; any change is a deliberate ABI revision. +static_assert(sizeof(PJ_canonical_object_kind_t) == 4, "enum layout pinned"); +static_assert(sizeof(PJ_schema_classification_t) == 4, "PJ_schema_classification_t layout pinned"); +static_assert(offsetof(PJ_schema_classification_t, object_kind) == 0, "object_kind at offset 0"); +static_assert(offsetof(PJ_schema_classification_t, reserved) == 2, "reserved at offset 2"); + +static_assert(sizeof(PJ_payload_anchor_t) == 16, "PJ_payload_anchor_t pinned (ctx + release fn ptr)"); +static_assert(offsetof(PJ_payload_anchor_t, ctx) == 0, "ctx at offset 0"); +static_assert(offsetof(PJ_payload_anchor_t, release) == 8, "release at offset 8"); + +static_assert(sizeof(PJ_payload_t) == 32, "PJ_payload_t pinned (data + size + anchor)"); +static_assert(offsetof(PJ_payload_t, data) == 0, "data at offset 0"); +static_assert(offsetof(PJ_payload_t, size) == 8, "size at offset 8"); +static_assert(offsetof(PJ_payload_t, anchor) == 16, "anchor at offset 16"); + +static_assert(sizeof(PJ_payload_fetcher_t) == 24, "PJ_payload_fetcher_t pinned (ctx + fetch + release)"); +static_assert(offsetof(PJ_payload_fetcher_t, ctx) == 0, "ctx at offset 0"); +static_assert(offsetof(PJ_payload_fetcher_t, fetch) == 8, "fetch at offset 8"); +static_assert(offsetof(PJ_payload_fetcher_t, release) == 16, "release at offset 16"); + +// --- DataSource runtime host vtable (ABI-APPENDABLE within v4) --------------- +// The vtable the host exposes to plugins under "pj.runtime.v1". Offsets of +// existing slots are pinned; size grows deliberately as tail slots append. +static_assert(offsetof(PJ_data_source_runtime_host_vtable_t, protocol_version) == 0, "v1 prefix pinned"); +static_assert(offsetof(PJ_data_source_runtime_host_vtable_t, struct_size) == 4, "v1 prefix pinned"); +static_assert(offsetof(PJ_data_source_runtime_host_vtable_t, report_message) == 8, "v1 first slot pinned"); +static_assert( + offsetof(PJ_data_source_runtime_host_vtable_t, push_raw_message) == 72, "v1 push_raw_message slot pinned"); +static_assert( + offsetof(PJ_data_source_runtime_host_vtable_t, list_available_encodings) == 88, + "v1 list_available_encodings slot pinned"); +static_assert( + offsetof(PJ_data_source_runtime_host_vtable_t, push_message_v2) == 96, "v1 push_message_v2 tail slot pinned"); +static_assert( + sizeof(PJ_data_source_runtime_host_vtable_t) == 104, "Runtime host vtable size (update deliberately on append)"); + // --- ABI version symbol ------------------------------------------------------ static_assert(PJ_ABI_VERSION == 4, "v4 ABI version"); diff --git a/pj_base/tests/object_ingest_policy_test.cpp b/pj_base/tests/object_ingest_policy_test.cpp new file mode 100644 index 0000000..a6c9f68 --- /dev/null +++ b/pj_base/tests/object_ingest_policy_test.cpp @@ -0,0 +1,82 @@ +#include "pj_base/sdk/object_ingest_policy.hpp" + +#include + +using PJ::sdk::CanonicalObjectKind; +using PJ::sdk::ObjectIngestPolicy; +using PJ::sdk::ObjectIngestPolicyResolver; + +TEST(ObjectIngestPolicyResolverTest, DefaultPolicyIsLazyObjectsEagerScalars) { + ObjectIngestPolicyResolver r; + EXPECT_EQ( + r.resolve("any_source", "/any/topic", CanonicalObjectKind::kImage), ObjectIngestPolicy::kLazyObjectsEagerScalars); +} + +TEST(ObjectIngestPolicyResolverTest, SetDefaultIsRespected) { + ObjectIngestPolicyResolver r; + r.setDefault(ObjectIngestPolicy::kEager); + EXPECT_EQ(r.resolve("any_source", "/any/topic", CanonicalObjectKind::kImage), ObjectIngestPolicy::kEager); +} + +TEST(ObjectIngestPolicyResolverTest, KindOverrideFiresOnMatch) { + ObjectIngestPolicyResolver r; + r.setDefault(ObjectIngestPolicy::kLazyObjectsEagerScalars); + r.setForKind(CanonicalObjectKind::kPointCloud, ObjectIngestPolicy::kPureLazy); + + EXPECT_EQ(r.resolve("src", "/lidar/points", CanonicalObjectKind::kPointCloud), ObjectIngestPolicy::kPureLazy); + // Different kind falls through to default. + EXPECT_EQ(r.resolve("src", "/cam/image", CanonicalObjectKind::kImage), ObjectIngestPolicy::kLazyObjectsEagerScalars); +} + +TEST(ObjectIngestPolicyResolverTest, SourceOverridesKind) { + ObjectIngestPolicyResolver r; + r.setDefault(ObjectIngestPolicy::kLazyObjectsEagerScalars); + r.setForKind(CanonicalObjectKind::kPointCloud, ObjectIngestPolicy::kPureLazy); + r.setForDataSource("mcap_source", ObjectIngestPolicy::kEager); + + // Source matches → kEager beats the kPointCloud kind override. + EXPECT_EQ(r.resolve("mcap_source", "/lidar/points", CanonicalObjectKind::kPointCloud), ObjectIngestPolicy::kEager); + // Different source → kind override fires. + EXPECT_EQ(r.resolve("ros2_stream", "/lidar/points", CanonicalObjectKind::kPointCloud), ObjectIngestPolicy::kPureLazy); +} + +TEST(ObjectIngestPolicyResolverTest, TopicOverridesEverything) { + ObjectIngestPolicyResolver r; + r.setDefault(ObjectIngestPolicy::kLazyObjectsEagerScalars); + r.setForKind(CanonicalObjectKind::kPointCloud, ObjectIngestPolicy::kPureLazy); + r.setForDataSource("mcap_source", ObjectIngestPolicy::kEager); + r.setForTopic("/diagnostics/lidar", ObjectIngestPolicy::kPureLazy); + + // Topic match wins over source and kind. + EXPECT_EQ( + r.resolve("mcap_source", "/diagnostics/lidar", CanonicalObjectKind::kPointCloud), ObjectIngestPolicy::kPureLazy); + // Different topic → source override fires. + EXPECT_EQ(r.resolve("mcap_source", "/other/lidar", CanonicalObjectKind::kPointCloud), ObjectIngestPolicy::kEager); +} + +TEST(ObjectIngestPolicyResolverTest, TypicalApplicationSetup) { + // Mirror the recommended setup: large blobs lazy by default, raw images keep + // their metadata as columns. + ObjectIngestPolicyResolver r; + r.setDefault(ObjectIngestPolicy::kLazyObjectsEagerScalars); + r.setForKind(CanonicalObjectKind::kCompressedImage, ObjectIngestPolicy::kPureLazy); + r.setForKind(CanonicalObjectKind::kPointCloud, ObjectIngestPolicy::kPureLazy); + + EXPECT_EQ(r.resolve("mcap", "/cam/raw", CanonicalObjectKind::kImage), ObjectIngestPolicy::kLazyObjectsEagerScalars); + EXPECT_EQ(r.resolve("mcap", "/cam/jpeg", CanonicalObjectKind::kCompressedImage), ObjectIngestPolicy::kPureLazy); + EXPECT_EQ(r.resolve("mcap", "/lidar", CanonicalObjectKind::kPointCloud), ObjectIngestPolicy::kPureLazy); + // Scalar-only topic (no canonical) takes the default. + EXPECT_EQ( + r.resolve("mcap", "/diagnostics", CanonicalObjectKind::kNone), ObjectIngestPolicy::kLazyObjectsEagerScalars); +} + +TEST(ObjectIngestPolicyResolverTest, LastWriteWinsForSameKey) { + ObjectIngestPolicyResolver r; + r.setForKind(CanonicalObjectKind::kImage, ObjectIngestPolicy::kEager); + r.setForKind(CanonicalObjectKind::kImage, ObjectIngestPolicy::kPureLazy); + EXPECT_EQ(r.resolve("src", "/topic", CanonicalObjectKind::kImage), ObjectIngestPolicy::kPureLazy); + + r.setForTopic("/x", ObjectIngestPolicy::kLazyObjectsEagerScalars); + r.setForTopic("/x", ObjectIngestPolicy::kEager); + EXPECT_EQ(r.resolve("src", "/x", CanonicalObjectKind::kImage), ObjectIngestPolicy::kEager); +} diff --git a/pj_base/tests/push_message_v2_test.cpp b/pj_base/tests/push_message_v2_test.cpp new file mode 100644 index 0000000..509650a --- /dev/null +++ b/pj_base/tests/push_message_v2_test.cpp @@ -0,0 +1,214 @@ +// Tests for the SDK template `DataSourceRuntimeHostView::pushMessage` and +// its delegation to the C ABI slot `push_message_v2`. We exercise: +// +// 1. Vector closure → captured fetcher in the host yields the same bytes. +// 2. PayloadView closure → ditto, with the producer-supplied anchor +// flowing through the C ABI. +// 3. Multiple fetcher invocations are idempotent (same bytes each time). +// 4. The heap-held closure context is destroyed exactly once when the +// host calls fetcher.release. +// 5. When the host does not expose push_message_v2 (struct_size short +// or field NULL), pushMessage returns an explicit error rather than +// degrading silently. + +#include + +#include +#include +#include +#include + +#include "pj_base/data_source_protocol.h" +#include "pj_base/sdk/canonical_object.hpp" +#include "pj_base/sdk/data_source_host_views.hpp" + +namespace { + +// Captured state from a push_message_v2 invocation. +struct CapturedPush { + PJ_parser_binding_handle_t handle{}; + int64_t timestamp_ns = 0; + PJ_payload_fetcher_t fetcher{}; + bool received = false; +}; + +// Mock runtime host — exposes a vtable that captures push_message_v2 calls +// and, alternatively, push_raw_message calls (for the legacy fallback). +class MockHost { + public: + MockHost() { + vtable_.protocol_version = PJ_DATA_SOURCE_PROTOCOL_VERSION; + vtable_.struct_size = sizeof(PJ_data_source_runtime_host_vtable_t); + vtable_.push_raw_message = &MockHost::pushRawMessageThunk; + vtable_.push_message_v2 = &MockHost::pushMessageV2Thunk; + host_.ctx = this; + host_.vtable = &vtable_; + } + + // Drop the v2 slot — both clearing the field and shrinking struct_size, + // matching the runtime scenario where the host predates the addition. + void disablePushMessageV2() { + vtable_.push_message_v2 = nullptr; + vtable_.struct_size = offsetof(PJ_data_source_runtime_host_vtable_t, push_message_v2); + } + + PJ::DataSourceRuntimeHostView view() const { + return PJ::DataSourceRuntimeHostView(host_); + } + + CapturedPush& captured() { + return captured_; + } + std::vector& receivedRawBytes() { + return raw_bytes_; + } + + private: + static bool pushRawMessageThunk( + void* ctx, PJ_parser_binding_handle_t /*handle*/, int64_t /*ts*/, PJ_bytes_view_t payload, + PJ_error_t* /*err*/) noexcept { + auto* self = static_cast(ctx); + self->raw_bytes_.assign(payload.data, payload.data + payload.size); + return true; + } + + static bool pushMessageV2Thunk( + void* ctx, PJ_parser_binding_handle_t handle, int64_t ts, PJ_payload_fetcher_t fetcher, + PJ_error_t* /*err*/) noexcept { + auto* self = static_cast(ctx); + self->captured_.handle = handle; + self->captured_.timestamp_ns = ts; + self->captured_.fetcher = fetcher; + self->captured_.received = true; + return true; + } + + PJ_data_source_runtime_host_vtable_t vtable_{}; + PJ_data_source_runtime_host_t host_{}; + CapturedPush captured_; + std::vector raw_bytes_; +}; + +// Helper: invoke a captured fetcher and assert the produced bytes match +// the expected content. Releases the payload anchor. +void invokeFetcherAndExpect(PJ_payload_fetcher_t& fetcher, const std::vector& expected) { + PJ_payload_t payload{}; + PJ_error_t err{}; + ASSERT_NE(fetcher.fetch, nullptr); + ASSERT_TRUE(fetcher.fetch(fetcher.ctx, &payload, &err)); + ASSERT_EQ(payload.size, expected.size()); + EXPECT_EQ(0, std::memcmp(payload.data, expected.data(), expected.size())); + if (payload.anchor.release) { + payload.anchor.release(payload.anchor.ctx); + } +} + +// ---------- Tests against the new push_message_v2 path ---------- + +TEST(PushMessageV2Test, VectorClosureFlowsThroughSlot) { + MockHost host; + std::vector expected{1, 2, 3, 4, 5}; + + auto status = host.view().pushMessage(PJ::ParserBindingHandle{42}, 1000, [bytes = expected]() { return bytes; }); + + ASSERT_TRUE(status); + ASSERT_TRUE(host.captured().received); + EXPECT_EQ(host.captured().handle.id, 42U); + EXPECT_EQ(host.captured().timestamp_ns, 1000); + invokeFetcherAndExpect(host.captured().fetcher, expected); + host.captured().fetcher.release(host.captured().fetcher.ctx); +} + +TEST(PushMessageV2Test, PayloadViewClosureFlowsThroughSlot) { + MockHost host; + std::vector expected{10, 20, 30}; + auto owned = std::make_shared>(expected); + + auto status = host.view().pushMessage(PJ::ParserBindingHandle{7}, 2000, [owned]() -> PJ::sdk::PayloadView { + return {PJ::Span(owned->data(), owned->size()), owned}; + }); + + ASSERT_TRUE(status); + invokeFetcherAndExpect(host.captured().fetcher, expected); + host.captured().fetcher.release(host.captured().fetcher.ctx); +} + +TEST(PushMessageV2Test, FetchIsIdempotent) { + MockHost host; + std::vector expected{0x42, 0x43}; + + ASSERT_TRUE(host.view().pushMessage(PJ::ParserBindingHandle{1}, 0, [bytes = expected]() { return bytes; })); + + // Multiple invocations must yield the same bytes each time. + for (int i = 0; i < 3; ++i) { + invokeFetcherAndExpect(host.captured().fetcher, expected); + } + host.captured().fetcher.release(host.captured().fetcher.ctx); +} + +TEST(PushMessageV2Test, FetcherCtxReleasedAfterHostCalls) { + MockHost host; + auto canary = std::make_shared(42); + std::weak_ptr witness = canary; + + ASSERT_TRUE(host.view().pushMessage(PJ::ParserBindingHandle{1}, 0, [canary]() { return std::vector{}; })); + + // Drop our local reference; the heap-held closure copy keeps the canary + // alive while the fetcher is owned by the host. + canary.reset(); + EXPECT_FALSE(witness.expired()) << "closure should still keep the canary alive (held in heap fetcher ctx)"; + + // Host releases the fetcher → closure destroyed → captured shared_ptr + // destroyed → canary's last reference drops. + host.captured().fetcher.release(host.captured().fetcher.ctx); + EXPECT_TRUE(witness.expired()) << "after release, the captured shared_ptr should have been the last reference"; +} + +TEST(PushMessageV2Test, PayloadAnchorPropagates) { + MockHost host; + auto owned = std::make_shared>(std::vector{0x99, 0x9A}); + std::weak_ptr> witness = owned; + + ASSERT_TRUE(host.view().pushMessage(PJ::ParserBindingHandle{1}, 0, [owned]() -> PJ::sdk::PayloadView { + return {PJ::Span(owned->data(), owned->size()), owned}; + })); + + // The closure holds the owned vector via its shared_ptr capture. + // After releasing our local owned, the closure's copy keeps it alive. + owned.reset(); + EXPECT_FALSE(witness.expired()); + + // Invoke the fetcher: it builds a PayloadView into the same buffer; the + // anchor returned to the host is yet another shared_ptr copy, so the + // buffer survives even past the closure's release. + PJ_payload_t payload{}; + PJ_error_t err{}; + ASSERT_TRUE(host.captured().fetcher.fetch(host.captured().fetcher.ctx, &payload, &err)); + EXPECT_EQ(payload.size, 2U); + + // Releasing the fetcher (closure dies) does NOT kill the buffer because + // the active payload anchor still holds a reference. + host.captured().fetcher.release(host.captured().fetcher.ctx); + EXPECT_FALSE(witness.expired()) << "active payload anchor should still keep the buffer alive"; + + // Releasing the payload anchor drops the last reference. + if (payload.anchor.release) { + payload.anchor.release(payload.anchor.ctx); + } + EXPECT_TRUE(witness.expired()); +} + +// ---------- Host without push_message_v2 returns explicit error ---------- + +TEST(PushMessageV2Test, ReturnsErrorWhenSlotMissing) { + MockHost host; + host.disablePushMessageV2(); + + std::vector expected{0xA, 0xB, 0xC}; + auto status = host.view().pushMessage(PJ::ParserBindingHandle{1}, 100, [bytes = expected]() { return bytes; }); + EXPECT_FALSE(status); // explicit failure — no silent fallback to push_raw_message + EXPECT_FALSE(host.captured().received); + EXPECT_TRUE(host.receivedRawBytes().empty()); +} + +} // namespace diff --git a/pj_plugins/include/pj_plugins/host/message_parser_handle.hpp b/pj_plugins/include/pj_plugins/host/message_parser_handle.hpp index 4d7e50d..f299e6f 100644 --- a/pj_plugins/include/pj_plugins/host/message_parser_handle.hpp +++ b/pj_plugins/include/pj_plugins/host/message_parser_handle.hpp @@ -4,10 +4,12 @@ */ #pragma once +#include #include #include #include +#include #include #include #include @@ -103,6 +105,24 @@ class MessageParserHandle { return okStatus(); } + /// A priori classification of the bound schema. Tail-slot gated; when + /// the plugin doesn't expose classify_schema (older protocol header) + /// returns kNone, matching the host contract documented in + /// message_parser_protocol.h. + [[nodiscard]] sdk::CanonicalObjectKind classifySchema(std::string_view type_name, Span schema) const { + if (!PJ_HAS_TAIL_SLOT(PJ_message_parser_vtable_t, vt_, classify_schema)) { + return sdk::CanonicalObjectKind::kNone; + } + PJ_string_view_t tn{type_name.data(), type_name.size()}; + PJ_bytes_view_t sc{schema.data(), schema.size()}; + PJ_schema_classification_t out{}; + PJ_error_t err{}; + if (!vt_->classify_schema(ctx_, tn, sc, &out, &err)) { + return sdk::CanonicalObjectKind::kNone; + } + return static_cast(out.object_kind); + } + /// Query a plugin-exposed extension by reverse-DNS id. Tail-slot gated. [[nodiscard]] const void* getPluginExtension(std::string_view id) const { if (!PJ_HAS_TAIL_SLOT(PJ_message_parser_vtable_t, vt_, get_plugin_extension)) { diff --git a/pj_plugins/tests/data_source_library_test.cpp b/pj_plugins/tests/data_source_library_test.cpp index 2f6854d..b2cd9c8 100644 --- a/pj_plugins/tests/data_source_library_test.cpp +++ b/pj_plugins/tests/data_source_library_test.cpp @@ -100,6 +100,7 @@ PJ_data_source_runtime_host_t makeRuntimeHost(bool with_encodings) { .push_raw_message = rhPushRawMessage, .show_message_box = rhShowMessageBox, .list_available_encodings = rhListEncodings, + .push_message_v2 = nullptr, }; static const PJ_data_source_runtime_host_vtable_t no_enc_vt = { .protocol_version = 1, @@ -115,6 +116,7 @@ PJ_data_source_runtime_host_t makeRuntimeHost(bool with_encodings) { .push_raw_message = rhPushRawMessage, .show_message_box = rhShowMessageBox, .list_available_encodings = nullptr, + .push_message_v2 = nullptr, }; return PJ_data_source_runtime_host_t{ .ctx = reinterpret_cast(0x2), diff --git a/pj_plugins/tests/file_source_integration_test.cpp b/pj_plugins/tests/file_source_integration_test.cpp index d1fbef5..99b1954 100644 --- a/pj_plugins/tests/file_source_integration_test.cpp +++ b/pj_plugins/tests/file_source_integration_test.cpp @@ -154,6 +154,7 @@ PJ_data_source_runtime_host_t makeRuntimeHost(RuntimeHostState* state) { .push_raw_message = rhPushRawMessage, .show_message_box = rhShowMessageBox, .list_available_encodings = nullptr, + .push_message_v2 = nullptr, }; return PJ_data_source_runtime_host_t{.ctx = state, .vtable = &vtable}; }