From 9846e4ddf8f41e995f4873063804f3a8eeb21c68 Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Thu, 12 Mar 2026 12:53:27 -0300 Subject: [PATCH 01/12] feat: add run uid to start document logging line Signed-off-by: Sofia Donato Ferreira --- src/sophys/common/utils/kafka/monitor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/sophys/common/utils/kafka/monitor.py b/src/sophys/common/utils/kafka/monitor.py index 53c5f16..0b89be1 100644 --- a/src/sophys/common/utils/kafka/monitor.py +++ b/src/sophys/common/utils/kafka/monitor.py @@ -401,10 +401,13 @@ def handle_event(self, event): return if data[0] == "start": - self._logger.info("Received a 'start' document.") - self.__documents.append(data) + new_run_uid = self.__documents[data].identifier + self._logger.info( + "Run '{}': Received a 'start' document.".format(new_run_uid) + ) + self.__incomplete_documents.append(new_run_uid) self.__documents[data].subscribe( From a5e0ee670f8106ac7dde34abb424e3ef4d026ac7 Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Thu, 20 Mar 2025 15:34:11 -0300 Subject: [PATCH 02/12] meta: replace kafka-python-ng with kafka-python It seems like development has picked up again in the original kafka-python project, and some good bug fixes are in on the newest versions. Thus, we can switch our dependency. Signed-off-by: Sofia Donato Ferreira --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 79d5de2..e86728e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dev = [ "caproto!=1.2.0", ] kafka = [ - "kafka-python-ng", + "kafka-python", "msgpack", "msgpack-numpy" ] From e0610d5cb4ebd018dfde952a18eed827b411b2bf Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Wed, 19 Mar 2025 17:08:54 -0300 Subject: [PATCH 03/12] feat: improve 'seek_start' logic in kafka monitor Now the seeking will work on any document type, including the 'stop' document, which previously wouldn't work at all. The function was moved out of the class for ease of testing, in the next commit of this patchset. Signed-off-by: Sofia Donato Ferreira --- src/sophys/common/utils/kafka/monitor.py | 56 ++++++++++++++++-------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/src/sophys/common/utils/kafka/monitor.py b/src/sophys/common/utils/kafka/monitor.py index 0b89be1..2f67fbb 100644 --- a/src/sophys/common/utils/kafka/monitor.py +++ b/src/sophys/common/utils/kafka/monitor.py @@ -36,6 +36,38 @@ def _get_start_uid_from_event_data(event_data: dict): return event_data.get("run_start", None) +def seek_start( + consumer: KafkaConsumer, + topic: str, + partition_id: int, + offset: int, + event_name: str, + event_data: dict, +) -> int: + """Attempt to seek into the start document of the current run.""" + topic_partition = TopicPartition(topic, partition_id) + + beginning_offset = consumer.beginning_offsets([topic_partition])[topic_partition] + while event_name != "start" and offset != beginning_offset: + if "seq_num" in event_data: + offset = offset - event_data["seq_num"] - 1 + else: + offset -= 1 + consumer.seek(topic_partition, offset) + + records = consumer.poll(timeout_ms=5_000, max_records=1, update_offsets=False) + assert ( + topic_partition in records + ), "Could not retrieve data from Kafka in seek_start." + + event_name, event_data = records[topic_partition][0].value + + if consumer.position(topic_partition) != offset: + consumer.seek(topic_partition, offset) + + return offset + + class DocumentDictionary(dict): """Auxiliary class for accumulating Document entries.""" @@ -332,20 +364,6 @@ def topic(self): """Get the name of the Kafka topic monitored by this object.""" return "".join(self.subscription()) - def seek_start( - self, topic: str, partition_id: int, offset: int, event_data: dict - ) -> None: - """Attempt to seek into the start document of the current run. May not seek if the current event does not have a sequence number.""" - if "seq_num" not in event_data: - self._logger.debug( - "Sequence numbers are not available! o.O\n {}".format(str(event_data)) - ) - # Hopefully a future event will have it! - return - self.seek( - TopicPartition(topic, partition_id), offset - event_data["seq_num"] - 1 - ) - def _commit_pending_documents(self): """Commit pending documents to the save queue, when possible.""" if self.__save_queue is None: @@ -389,7 +407,7 @@ def _commit_pending_documents(self): def handle_event(self, event): self._logger.debug("Event received.") - seek_start = False + should_seek_start = False try: data = event.value @@ -419,13 +437,13 @@ def handle_event(self, event): try: if len(self.__documents[data]) == 0: # In the middle of a run, try to go back to the beginning - seek_start = True + should_seek_start = True except KeyError: # In the middle of a run, try to go back to the beginning - seek_start = True + should_seek_start = True - if seek_start: - self.seek_start(event.topic, event.partition, event.offset, data[1]) + if should_seek_start: + seek_start(self, event.topic, event.partition, event.offset, *data) return self.__documents[data].append(*data) From 75d941677c6cd6a0d31117c5c95668cacc881b08 Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Wed, 19 Mar 2025 17:11:32 -0300 Subject: [PATCH 04/12] test: add unit tests for 'seek_start' logic in kafka monitor Signed-off-by: Sofia Donato Ferreira --- tests/utils/test_kafka_monitor.py | 52 +++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/utils/test_kafka_monitor.py b/tests/utils/test_kafka_monitor.py index ac8337d..4f652ab 100644 --- a/tests/utils/test_kafka_monitor.py +++ b/tests/utils/test_kafka_monitor.py @@ -15,6 +15,8 @@ from sophys.common.utils.kafka.monitor import ThreadedMonitor +from sophys.common.utils.kafka.monitor import seek_start + from . import _wait @@ -368,3 +370,53 @@ def custom_plan(): # One start doc, one descriptor doc, one event doc, one stop doc assert len(docs) == 4, docs.get_raw_data() + + +def test_seek_start(kafka_producer, kafka_consumer, kafka_topic, run_engine_without_md): + _hw = hw() + det = _hw.det + + partition_number = list(kafka_consumer.partitions_for_topic(kafka_topic))[0] + topic_partition = TopicPartition(kafka_topic, partition_number) + original_offset = kafka_consumer.position(topic_partition) + + uid, *_ = run_engine_without_md(bp.count([det], num=10)) + + kafka_producer.flush(timeout=5.0) + kafka_consumer.poll(timeout_ms=5_000) + + new_offset = kafka_consumer.position(topic_partition) + # start (1) + descriptor (1) + events (10) + stop (1) + assert new_offset - original_offset == 13 + + # From stop to start + new_offset = seek_start( + kafka_consumer, kafka_topic, partition_number, new_offset, "stop", {} + ) + assert kafka_consumer.position(topic_partition) == new_offset + assert new_offset == original_offset + + # From event in the middle to start + kafka_consumer.seek(topic_partition, original_offset + 5) + records = kafka_consumer.poll(timeout_ms=5_000, max_records=1) + new_offset = seek_start( + kafka_consumer, + kafka_topic, + partition_number, + original_offset + 5, + *records[topic_partition][0].value, + ) + assert kafka_consumer.position(topic_partition) == new_offset + assert new_offset == original_offset + + # From start to start (do nothing) + records = kafka_consumer.poll(timeout_ms=5_000, max_records=1) + new_offset = seek_start( + kafka_consumer, + kafka_topic, + partition_number, + original_offset, + *records[topic_partition][0].value, + ) + assert kafka_consumer.position(topic_partition) == new_offset + assert new_offset == original_offset From 97257e4a08d25df371629c4bb38fe2156b674c0d Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Thu, 15 May 2025 16:50:51 -0300 Subject: [PATCH 05/12] test: add special-case for kafka-python-ng in seek test Signed-off-by: Sofia Donato Ferreira --- tests/utils/test_kafka_monitor.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/utils/test_kafka_monitor.py b/tests/utils/test_kafka_monitor.py index 4f652ab..07bbf04 100644 --- a/tests/utils/test_kafka_monitor.py +++ b/tests/utils/test_kafka_monitor.py @@ -411,6 +411,11 @@ def test_seek_start(kafka_producer, kafka_consumer, kafka_topic, run_engine_with # From start to start (do nothing) records = kafka_consumer.poll(timeout_ms=5_000, max_records=1) + if topic_partition not in records: + # NOTE: This test won't work for kafka-python-ng, + # which doesn't return the correct thing here. + return + new_offset = seek_start( kafka_consumer, kafka_topic, From 551935819f90b019c6c08298c706cf5f49d15023 Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Thu, 12 Mar 2026 13:09:00 -0300 Subject: [PATCH 06/12] refactor: downgrade error to warning when save queue is full Signed-off-by: Sofia Donato Ferreira --- src/sophys/common/utils/kafka/monitor.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/sophys/common/utils/kafka/monitor.py b/src/sophys/common/utils/kafka/monitor.py index 2f67fbb..a5a39cb 100644 --- a/src/sophys/common/utils/kafka/monitor.py +++ b/src/sophys/common/utils/kafka/monitor.py @@ -377,11 +377,16 @@ def _commit_pending_documents(self): self.__save_queue.put(doc, block=True, timeout=1.0) self.__saved_document_uids.add(id) except Exception as e: - self._logger.error( - "Unhandled exception while trying to save documents. Will try to continue regardless." - ) - self._logger.error("Exception if you're into that:") - self._logger.exception(e) + if isinstance(e, QueueFullException): + self._logger.warning( + "Save queue is full. Failed to add run '%s'.", id + ) + else: + self._logger.error( + "Unhandled exception while trying to save documents. Will try to continue regardless." + ) + self._logger.error("Exception if you're into that:") + self._logger.exception(e) self.__to_save_documents_save_attempts[id] += 1 From 3bc075fd9a011fa4f9fc383afdba836eb4f891a5 Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Thu, 12 Mar 2026 14:14:56 -0300 Subject: [PATCH 07/12] fix: overhaul seek_start test logic and remove unneeded seek Signed-off-by: Sofia Donato Ferreira --- src/sophys/common/utils/kafka/monitor.py | 3 - tests/utils/test_kafka_monitor.py | 71 ++++++++++++------------ 2 files changed, 36 insertions(+), 38 deletions(-) diff --git a/src/sophys/common/utils/kafka/monitor.py b/src/sophys/common/utils/kafka/monitor.py index a5a39cb..8e431ab 100644 --- a/src/sophys/common/utils/kafka/monitor.py +++ b/src/sophys/common/utils/kafka/monitor.py @@ -62,9 +62,6 @@ def seek_start( event_name, event_data = records[topic_partition][0].value - if consumer.position(topic_partition) != offset: - consumer.seek(topic_partition, offset) - return offset diff --git a/tests/utils/test_kafka_monitor.py b/tests/utils/test_kafka_monitor.py index 07bbf04..a643220 100644 --- a/tests/utils/test_kafka_monitor.py +++ b/tests/utils/test_kafka_monitor.py @@ -376,52 +376,53 @@ def test_seek_start(kafka_producer, kafka_consumer, kafka_topic, run_engine_with _hw = hw() det = _hw.det + uid, *_ = run_engine_without_md(bp.count([det], num=10)) + partition_number = list(kafka_consumer.partitions_for_topic(kafka_topic))[0] topic_partition = TopicPartition(kafka_topic, partition_number) original_offset = kafka_consumer.position(topic_partition) - uid, *_ = run_engine_without_md(bp.count([det], num=10)) + def seek_and_assert_positions(offset: int, seeked_event_name: str): + kafka_consumer.seek(topic_partition, offset) + + records = kafka_consumer.poll( + timeout_ms=1_000, max_records=1, update_offsets=False + ) + event_name, event_data = records[topic_partition][0].value + + assert event_name == seeked_event_name + + seek_start( + kafka_consumer, + kafka_topic, + partition_number, + offset, + event_name, + event_data, + ) + + records = kafka_consumer.poll( + timeout_ms=1_000, max_records=1, update_offsets=False + ) + event_name, _ = records[topic_partition][0].value + + assert event_name == "start" - kafka_producer.flush(timeout=5.0) - kafka_consumer.poll(timeout_ms=5_000) + kafka_producer.flush(timeout=1.0) + kafka_consumer.poll(timeout_ms=1_000) new_offset = kafka_consumer.position(topic_partition) # start (1) + descriptor (1) + events (10) + stop (1) assert new_offset - original_offset == 13 # From stop to start - new_offset = seek_start( - kafka_consumer, kafka_topic, partition_number, new_offset, "stop", {} - ) - assert kafka_consumer.position(topic_partition) == new_offset - assert new_offset == original_offset + seek_and_assert_positions(new_offset - 1, "stop") + + # From start to start (do nothing) + seek_and_assert_positions(original_offset, "start") # From event in the middle to start - kafka_consumer.seek(topic_partition, original_offset + 5) - records = kafka_consumer.poll(timeout_ms=5_000, max_records=1) - new_offset = seek_start( - kafka_consumer, - kafka_topic, - partition_number, - original_offset + 5, - *records[topic_partition][0].value, - ) - assert kafka_consumer.position(topic_partition) == new_offset - assert new_offset == original_offset + seek_and_assert_positions(original_offset + 5, "event") - # From start to start (do nothing) - records = kafka_consumer.poll(timeout_ms=5_000, max_records=1) - if topic_partition not in records: - # NOTE: This test won't work for kafka-python-ng, - # which doesn't return the correct thing here. - return - - new_offset = seek_start( - kafka_consumer, - kafka_topic, - partition_number, - original_offset, - *records[topic_partition][0].value, - ) - assert kafka_consumer.position(topic_partition) == new_offset - assert new_offset == original_offset + # From descriptor to start + seek_and_assert_positions(original_offset + 1, "descriptor") From f2bf456952f29ec3d2a787f814347045044bbdef Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Thu, 12 Mar 2026 15:39:52 -0300 Subject: [PATCH 08/12] refactor: streamline kafka event handling and improve seek_start testing Signed-off-by: Sofia Donato Ferreira --- src/sophys/common/utils/kafka/monitor.py | 106 ++++++++++++----------- tests/utils/test_kafka_monitor.py | 56 +++++++++++- 2 files changed, 109 insertions(+), 53 deletions(-) diff --git a/src/sophys/common/utils/kafka/monitor.py b/src/sophys/common/utils/kafka/monitor.py index 8e431ab..f5ef552 100644 --- a/src/sophys/common/utils/kafka/monitor.py +++ b/src/sophys/common/utils/kafka/monitor.py @@ -361,6 +361,22 @@ def topic(self): """Get the name of the Kafka topic monitored by this object.""" return "".join(self.subscription()) + def _seek_start_if_needed(self, event) -> bool: + should_seek_start = False + + try: + if event.value[0] != "start" and len(self.__documents[event.value]) == 0: + # In the middle of a run, try to go back to the beginning + should_seek_start = True + except KeyError: + # In the middle of a run, try to go back to the beginning + should_seek_start = True + + if should_seek_start: + seek_start(self, event.topic, event.partition, event.offset, *event.value) + + return should_seek_start + def _commit_pending_documents(self): """Commit pending documents to the save queue, when possible.""" if self.__save_queue is None: @@ -406,64 +422,50 @@ def _commit_pending_documents(self): if id in self.__to_save_documents_save_attempts: del self.__to_save_documents_save_attempts[id] - def handle_event(self, event): - self._logger.debug("Event received.") - - should_seek_start = False + def _handle_kafka_event(self, event): + data = event.value + if len(data) != 2: + self._logger.warning( + "Event data does not have two elements.\n {}".format(str(data)) + ) + return try: - data = event.value - - if len(data) != 2: - self._logger.warning( - "Event data does not have two elements.\n {}".format(str(data)) - ) + if self._seek_start_if_needed(event): return - if data[0] == "start": - self.__documents.append(data) + match data: + case ("start", _): + self.__documents.append(data) - new_run_uid = self.__documents[data].identifier - self._logger.info( - "Run '{}': Received a 'start' document.".format(new_run_uid) - ) - - self.__incomplete_documents.append(new_run_uid) - - self.__documents[data].subscribe( - partial(self._run_subscriptions, new_run_uid) - ) - - return - - try: - if len(self.__documents[data]) == 0: - # In the middle of a run, try to go back to the beginning - should_seek_start = True - except KeyError: - # In the middle of a run, try to go back to the beginning - should_seek_start = True - - if should_seek_start: - seek_start(self, event.topic, event.partition, event.offset, *data) - return + new_run_uid = self.__documents[data].identifier + self._logger.info( + "Run '{}': Received a 'start' document.".format(new_run_uid) + ) - self.__documents[data].append(*data) + self.__incomplete_documents.append(new_run_uid) - if data[0] == "stop": - self._logger.info( - "Run '{}': Received a 'stop' document.".format( - self.__documents[data].identifier + self.__documents[data].subscribe( + partial(self._run_subscriptions, new_run_uid) ) - ) + case ("stop", _): + self._logger.info( + "Run '{}': Received a 'stop' document.".format( + self.__documents[data].identifier + ) + ) + + self.__documents[data].append(*data) - self.__documents[data].clear_subscriptions() - self.__to_save_documents.append(self.__documents[data].identifier) + self.__documents[data].clear_subscriptions() + self.__to_save_documents.append(self.__documents[data].identifier) - # TODO: Validate number of saved entries via the stop document's num_events - # TODO: Validate successful run via the stop document's exit_status + # TODO: Validate number of saved entries via the stop document's num_events + # TODO: Validate successful run via the stop document's exit_status - self._commit_pending_documents() + self._commit_pending_documents() + case (_, _): + self.__documents[data].append(*data) except Exception as e: self._logger.error("Unhandled exception. Will try to continue regardless.") @@ -475,15 +477,17 @@ def handle_event(self, event): def run(self): """Start monitoring the Kafka topic.""" - partition_number = list(self.partitions_for_topic(self.topic()))[0] - self._update_fetch_positions([TopicPartition(self.topic(), partition_number)]) + # NOTE: Configure the current offset before setting the 'running' flag. + # The timeout time here doesn't matter too much, as we don't care + # whether we received new data or not. + self.poll(timeout_ms=100, max_records=1, update_offsets=False) self.running.set() while not self._closed: try: for event in self: - self.handle_event(event) - except StopIteration: + self._handle_kafka_event(event) + except (StopIteration, AssertionError): pass self.running.clear() diff --git a/tests/utils/test_kafka_monitor.py b/tests/utils/test_kafka_monitor.py index a643220..aec85c4 100644 --- a/tests/utils/test_kafka_monitor.py +++ b/tests/utils/test_kafka_monitor.py @@ -132,8 +132,7 @@ def run_engine_without_md(kafka_producer, kafka_topic): return RE -@pytest.fixture(scope="function") -def good_monitor( +def _create_good_monitor( save_queue, incomplete_documents, kafka_topic, kafka_bootstrap_ip ) -> ThreadedMonitor: mon = ThreadedMonitor( @@ -150,6 +149,15 @@ def good_monitor( return mon +@pytest.fixture(scope="function") +def good_monitor( + save_queue, incomplete_documents, kafka_topic, kafka_bootstrap_ip +) -> ThreadedMonitor: + return _create_good_monitor( + save_queue, incomplete_documents, kafka_topic, kafka_bootstrap_ip + ) + + # # Tests # @@ -426,3 +434,47 @@ def seek_and_assert_positions(offset: int, seeked_event_name: str): # From descriptor to start seek_and_assert_positions(original_offset + 1, "descriptor") + + +def test_seek_start_in_monitor( + run_engine_without_md, + incomplete_documents, + save_queue: queue.Queue, + kafka_topic, + kafka_bootstrap_ip, +): + det = hw().det + + # NOTE: Add another run before this one just to check it doesn't skip into the previous run. + run_engine_without_md(bp.count([det], num=1)) + + def custom_plan(): + yield from bps.open_run({}) + yield from bps.declare_stream(det, name="primary") + for _ in range(5): + yield from bps.create() + yield from bps.read(det) + yield from bps.save() + + monitor = _create_good_monitor( + save_queue, incomplete_documents, kafka_topic, kafka_bootstrap_ip + ) + assert monitor.is_alive() + + for _ in range(5): + yield from bps.create() + yield from bps.read(det) + yield from bps.save() + yield from bps.close_run("success") + + # Only populated if 'monitor' is working properly, and rewinded to the start document. + assert save_queue.get(True, timeout=2.0) is not None + + monitor.close() + _wait( + lambda: not monitor.running.is_set(), + timeout=2.0, + timeout_msg="Monitor took too long to close.", + ) + + run_engine_without_md(custom_plan()) From 1a0932ea8126b92cce32ce26093221dee5870b10 Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Thu, 12 Mar 2026 17:14:07 -0300 Subject: [PATCH 09/12] feat: add 'seek_back_in_time' function for kafka consumers Signed-off-by: Sofia Donato Ferreira --- src/sophys/common/utils/kafka/monitor.py | 26 ++++++++ tests/utils/test_kafka_monitor.py | 75 ++++++++++++++++++------ 2 files changed, 84 insertions(+), 17 deletions(-) diff --git a/src/sophys/common/utils/kafka/monitor.py b/src/sophys/common/utils/kafka/monitor.py index f5ef552..c66c389 100644 --- a/src/sophys/common/utils/kafka/monitor.py +++ b/src/sophys/common/utils/kafka/monitor.py @@ -1,6 +1,7 @@ import logging import json from collections import defaultdict +from datetime import datetime, timedelta, timezone, tzinfo from functools import wraps, partial from typing import Optional @@ -65,6 +66,31 @@ def seek_start( return offset +def seek_back_in_time( + consumer: KafkaConsumer, + rewind_time: timedelta, + server_timezone: tzinfo = timezone.utc, +): + """Rewind the consumer by 'rewind_time'.""" + now = datetime.now(server_timezone) + + all_partitions = [ + TopicPartition(topic_name, p) + for topic_name in consumer.subscription() + for p in consumer.partitions_for_topic(topic_name) + ] + + # NOTE: offsets_for_times expects timestamps in ms, so we multiply by 1000. + rewind_timestamp = int((now - rewind_time).timestamp() * 1000) + timestamp_offsets = consumer.offsets_for_times( + {p: rewind_timestamp for p in all_partitions} + ) + + for partition, offset_ts in timestamp_offsets.items(): + if offset_ts is not None: + consumer.seek(partition, offset_ts.offset) + + class DocumentDictionary(dict): """Auxiliary class for accumulating Document entries.""" diff --git a/tests/utils/test_kafka_monitor.py b/tests/utils/test_kafka_monitor.py index aec85c4..dd12e34 100644 --- a/tests/utils/test_kafka_monitor.py +++ b/tests/utils/test_kafka_monitor.py @@ -1,7 +1,7 @@ import pytest import queue -import time +from datetime import datetime, timezone import msgpack import msgpack_numpy as _m @@ -10,12 +10,14 @@ from kafka.consumer import KafkaConsumer from kafka.structs import TopicPartition +import numpy as np + from ophyd.sim import hw from bluesky import RunEngine, plans as bp, plan_stubs as bps, preprocessors as bpp from sophys.common.utils.kafka.monitor import ThreadedMonitor -from sophys.common.utils.kafka.monitor import seek_start +from sophys.common.utils.kafka.monitor import seek_start, seek_back_in_time from . import _wait @@ -95,17 +97,13 @@ def kafka_producer(kafka_bootstrap_ip): @pytest.fixture(scope="function") def kafka_consumer(kafka_bootstrap_ip, kafka_topic): consumer = KafkaConsumer( - bootstrap_servers=[kafka_bootstrap_ip], value_deserializer=msgpack.unpackb + kafka_topic, + bootstrap_servers=[kafka_bootstrap_ip], + value_deserializer=msgpack.unpackb, ) # Connect the consumer properly to the topic - partition = TopicPartition(kafka_topic, 0) - consumer.assign([partition]) - print("Starting offset:") - # Fun fact: this is actually required for the tests to work properly, - # because otherwise it doesn't update the current offset before the - # producer starts throwing events at the topic. :))))) - print(consumer.position(partition)) + consumer.poll(timeout_ms=100, max_records=1, update_offsets=False) return consumer @@ -380,16 +378,15 @@ def custom_plan(): assert len(docs) == 4, docs.get_raw_data() -def test_seek_start(kafka_producer, kafka_consumer, kafka_topic, run_engine_without_md): - _hw = hw() - det = _hw.det - - uid, *_ = run_engine_without_md(bp.count([det], num=10)) - +def test_seek_start( + kafka_producer, kafka_consumer: KafkaConsumer, kafka_topic, run_engine_without_md +): partition_number = list(kafka_consumer.partitions_for_topic(kafka_topic))[0] topic_partition = TopicPartition(kafka_topic, partition_number) original_offset = kafka_consumer.position(topic_partition) + uid, *_ = run_engine_without_md(bp.count([hw().det], num=10)) + def seek_and_assert_positions(offset: int, seeked_event_name: str): kafka_consumer.seek(topic_partition, offset) @@ -417,7 +414,8 @@ def seek_and_assert_positions(offset: int, seeked_event_name: str): assert event_name == "start" kafka_producer.flush(timeout=1.0) - kafka_consumer.poll(timeout_ms=1_000) + while kafka_consumer.poll(timeout_ms=100) != {}: + pass new_offset = kafka_consumer.position(topic_partition) # start (1) + descriptor (1) + events (10) + stop (1) @@ -478,3 +476,46 @@ def custom_plan(): ) run_engine_without_md(custom_plan()) + + +def test_seek_back_in_time( + kafka_producer, kafka_consumer: KafkaConsumer, kafka_topic, run_engine_without_md +): + partition_number = list(kafka_consumer.partitions_for_topic(kafka_topic))[0] + topic_partition = TopicPartition(kafka_topic, partition_number) + + kafka_consumer.seek_to_beginning() + oldest_offset = kafka_consumer.position(topic_partition) + + kafka_consumer.seek_to_end() + newest_offset = kafka_consumer.position(topic_partition) + + if newest_offset - oldest_offset < 5: + # Add some new time-spaced data. + for _ in range(5): + run_engine_without_md(bp.count([hw().det], num=2, delay=1)) + while kafka_consumer.poll(timeout_ms=100) != {}: + pass + newest_offset = kafka_consumer.position(topic_partition) + + offsets = [round(x) for x in np.linspace(oldest_offset, newest_offset - 1, num=5)] + timestamps = list() + for offset in offsets: + kafka_consumer.seek(topic_partition, offset) + + record = kafka_consumer.poll( + timeout_ms=1_000, max_records=1, update_offsets=False + )[topic_partition][0] + timestamps.append(record.timestamp // 1000) + + for expected_timestamp in timestamps: + time_delta = datetime.now(timezone.utc) - datetime.fromtimestamp( + expected_timestamp, tz=timezone.utc + ) + seek_back_in_time(kafka_consumer, time_delta) + + record = kafka_consumer.poll( + timeout_ms=1_000, max_records=1, update_offsets=False + )[topic_partition][0] + + assert np.isclose(record.timestamp // 1000, expected_timestamp, atol=1) From 27794ceb15ecf4e1119f6c8cbed51f0a11a30662 Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Thu, 12 Mar 2026 17:14:36 -0300 Subject: [PATCH 10/12] feat: add 'hour_offset' option for kafka monitors Signed-off-by: Sofia Donato Ferreira --- src/sophys/common/utils/kafka/monitor.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/sophys/common/utils/kafka/monitor.py b/src/sophys/common/utils/kafka/monitor.py index c66c389..f3e71f5 100644 --- a/src/sophys/common/utils/kafka/monitor.py +++ b/src/sophys/common/utils/kafka/monitor.py @@ -321,6 +321,7 @@ def __init__( incomplete_documents: list, topic_name: str, logger_name: str, + hour_offset: Optional[float] = None, **configs, ): """ @@ -337,6 +338,8 @@ def __init__( The Kafka topic to monitor. logger_name : str, optional Name of the logger to use for info / debug during the monitor processing. + hour_offset : float, optional + Time in hours to look back in Kafka right after the start of monitoring. **configs : dict or keyword arguments Extra arguments to pass to the KafkaConsumer's constructor. """ @@ -345,6 +348,8 @@ def __init__( self.name = repr(self) self.running = Event() + self.__hour_offset = hour_offset + self.__documents = MultipleDocumentDictionary() self.__save_queue = save_queue @@ -508,6 +513,9 @@ def run(self): # whether we received new data or not. self.poll(timeout_ms=100, max_records=1, update_offsets=False) + if self.__hour_offset is not None: + seek_back_in_time(self, timedelta(hours=self.__hour_offset)) + self.running.set() while not self._closed: try: From 2f859e70178fc698ee1a516c44c7342a962cd172 Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Thu, 12 Mar 2026 17:40:51 -0300 Subject: [PATCH 11/12] refactor: move general kafka consumer utilities into separate module Signed-off-by: Sofia Donato Ferreira --- src/sophys/common/utils/kafka/consumer.py | 57 ++++++++ src/sophys/common/utils/kafka/monitor.py | 61 +-------- tests/conftest.py | 47 +++++++ tests/utils/test_kafka_consumer.py | 110 +++++++++++++++ tests/utils/test_kafka_monitor.py | 155 ---------------------- 5 files changed, 218 insertions(+), 212 deletions(-) create mode 100644 src/sophys/common/utils/kafka/consumer.py create mode 100644 tests/utils/test_kafka_consumer.py diff --git a/src/sophys/common/utils/kafka/consumer.py b/src/sophys/common/utils/kafka/consumer.py new file mode 100644 index 0000000..72a5004 --- /dev/null +++ b/src/sophys/common/utils/kafka/consumer.py @@ -0,0 +1,57 @@ +from datetime import datetime, timedelta, timezone, tzinfo + +from kafka import KafkaConsumer +from kafka.consumer.fetcher import ConsumerRecord +from kafka.structs import TopicPartition + + +def seek_start_document(consumer: KafkaConsumer, record: ConsumerRecord): + """ + Attempt to seek into the start document of the current run, based on data from the last received document. + """ + topic_partition = TopicPartition(record.topic, record.partition) + + offset = record.offset + event_name, event_data = record.value + + beginning_offset = consumer.beginning_offsets([topic_partition])[topic_partition] + while event_name != "start" and offset != beginning_offset: + if "seq_num" in event_data: + offset = offset - event_data["seq_num"] - 1 + else: + offset -= 1 + consumer.seek(topic_partition, offset) + + records = consumer.poll(timeout_ms=5_000, max_records=1, update_offsets=False) + assert ( + topic_partition in records + ), "Could not retrieve data from Kafka in seek_start." + + event_name, event_data = records[topic_partition][0].value + + +def seek_back_in_time( + consumer: KafkaConsumer, + rewind_time: timedelta, + server_timezone: tzinfo = timezone.utc, +): + """ + Rewind the consumer by 'rewind_time', up to the beginning offset. + """ + now = datetime.now(server_timezone) + + all_partitions = [ + TopicPartition(topic_name, p) + for topic_name in consumer.subscription() + for p in consumer.partitions_for_topic(topic_name) + ] + + # NOTE: offsets_for_times expects timestamps in ms, so we multiply by 1000. + rewind_timestamp = int((now - rewind_time).timestamp() * 1000) + timestamp_offsets = consumer.offsets_for_times( + {p: rewind_timestamp for p in all_partitions} + ) + + for partition, offset_ts in timestamp_offsets.items(): + if offset_ts is not None: + consumer.seek(partition, offset_ts.offset) diff --git a/src/sophys/common/utils/kafka/monitor.py b/src/sophys/common/utils/kafka/monitor.py index f3e71f5..acb0d27 100644 --- a/src/sophys/common/utils/kafka/monitor.py +++ b/src/sophys/common/utils/kafka/monitor.py @@ -1,7 +1,7 @@ import logging import json from collections import defaultdict -from datetime import datetime, timedelta, timezone, tzinfo +from datetime import timedelta from functools import wraps, partial from typing import Optional @@ -11,10 +11,11 @@ import msgpack_numpy as _m from kafka import KafkaConsumer -from kafka.structs import TopicPartition from event_model import EventPage, unpack_event_page +from .consumer import seek_start_document, seek_back_in_time + def _get_uid_from_event_data(event_data: dict): return event_data.get("uid", None) @@ -37,60 +38,6 @@ def _get_start_uid_from_event_data(event_data: dict): return event_data.get("run_start", None) -def seek_start( - consumer: KafkaConsumer, - topic: str, - partition_id: int, - offset: int, - event_name: str, - event_data: dict, -) -> int: - """Attempt to seek into the start document of the current run.""" - topic_partition = TopicPartition(topic, partition_id) - - beginning_offset = consumer.beginning_offsets([topic_partition])[topic_partition] - while event_name != "start" and offset != beginning_offset: - if "seq_num" in event_data: - offset = offset - event_data["seq_num"] - 1 - else: - offset -= 1 - consumer.seek(topic_partition, offset) - - records = consumer.poll(timeout_ms=5_000, max_records=1, update_offsets=False) - assert ( - topic_partition in records - ), "Could not retrieve data from Kafka in seek_start." - - event_name, event_data = records[topic_partition][0].value - - return offset - - -def seek_back_in_time( - consumer: KafkaConsumer, - rewind_time: timedelta, - server_timezone: tzinfo = timezone.utc, -): - """Rewind the consumer by 'rewind_time'.""" - now = datetime.now(server_timezone) - - all_partitions = [ - TopicPartition(topic_name, p) - for topic_name in consumer.subscription() - for p in consumer.partitions_for_topic(topic_name) - ] - - # NOTE: offsets_for_times expects timestamps in ms, so we multiply by 1000. - rewind_timestamp = int((now - rewind_time).timestamp() * 1000) - timestamp_offsets = consumer.offsets_for_times( - {p: rewind_timestamp for p in all_partitions} - ) - - for partition, offset_ts in timestamp_offsets.items(): - if offset_ts is not None: - consumer.seek(partition, offset_ts.offset) - - class DocumentDictionary(dict): """Auxiliary class for accumulating Document entries.""" @@ -404,7 +351,7 @@ def _seek_start_if_needed(self, event) -> bool: should_seek_start = True if should_seek_start: - seek_start(self, event.topic, event.partition, event.offset, *event.value) + seek_start_document(self, event) return should_seek_start diff --git a/tests/conftest.py b/tests/conftest.py index 4e4a8a2..77f7ed3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,11 @@ import pytest +import msgpack_numpy as msgpack + +from kafka import KafkaConsumer, KafkaProducer + +from bluesky import RunEngine + from .soft_ioc import start_soft_ioc @@ -8,3 +14,44 @@ def soft_ioc(): soft_ioc_prefix, stop_soft_ioc = start_soft_ioc() yield soft_ioc_prefix stop_soft_ioc() + + +@pytest.fixture(scope="session") +def kafka_bootstrap_ip(): + return "localhost:9092" + + +@pytest.fixture(scope="session") +def kafka_topic(): + return "test_bluesky_raw_docs" + + +@pytest.fixture(scope="function") +def kafka_producer(kafka_bootstrap_ip): + producer = KafkaProducer( + bootstrap_servers=[kafka_bootstrap_ip], value_serializer=msgpack.dumps + ) + yield producer + producer.flush() + producer.close() + + +@pytest.fixture(scope="function") +def kafka_consumer(kafka_bootstrap_ip, kafka_topic): + consumer = KafkaConsumer( + kafka_topic, + bootstrap_servers=[kafka_bootstrap_ip], + value_deserializer=msgpack.unpackb, + ) + + # Connect the consumer properly to the topic + consumer.poll(timeout_ms=100, max_records=1, update_offsets=False) + + return consumer + + +@pytest.fixture(scope="function") +def run_engine_without_md(kafka_producer, kafka_topic): + RE = RunEngine() + RE.subscribe(lambda name, doc: kafka_producer.send(kafka_topic, (name, doc))) + return RE diff --git a/tests/utils/test_kafka_consumer.py b/tests/utils/test_kafka_consumer.py new file mode 100644 index 0000000..3685d55 --- /dev/null +++ b/tests/utils/test_kafka_consumer.py @@ -0,0 +1,110 @@ +from datetime import datetime, timezone + +from kafka.producer import KafkaProducer +from kafka.consumer import KafkaConsumer +from kafka.structs import TopicPartition + +import numpy as np + +from ophyd.sim import hw +from bluesky import plans as bp + +from sophys.common.utils.kafka.monitor import seek_start_document, seek_back_in_time + + +def test_seek_start( + kafka_producer: KafkaProducer, + kafka_consumer: KafkaConsumer, + kafka_topic, + run_engine_without_md, +): + partition_number = list(kafka_consumer.partitions_for_topic(kafka_topic))[0] + topic_partition = TopicPartition(kafka_topic, partition_number) + original_offset = kafka_consumer.position(topic_partition) + + uid, *_ = run_engine_without_md(bp.count([hw().det], num=10)) + + def seek_and_assert_positions(offset: int, seeked_event_name: str): + kafka_consumer.seek(topic_partition, offset) + + record = kafka_consumer.poll( + timeout_ms=1_000, max_records=1, update_offsets=False + )[topic_partition][0] + + event_name, _ = record.value + assert event_name == seeked_event_name + + seek_start_document(kafka_consumer, record) + + record = kafka_consumer.poll( + timeout_ms=1_000, max_records=1, update_offsets=False + )[topic_partition][0] + + event_name, _ = record.value + assert event_name == "start" + + kafka_producer.flush(timeout=1.0) + while kafka_consumer.poll(timeout_ms=100) != {}: + pass + + new_offset = kafka_consumer.position(topic_partition) + # start (1) + descriptor (1) + events (10) + stop (1) + assert new_offset - original_offset == 13 + + # From stop to start + seek_and_assert_positions(new_offset - 1, "stop") + + # From start to start (do nothing) + seek_and_assert_positions(original_offset, "start") + + # From event in the middle to start + seek_and_assert_positions(original_offset + 5, "event") + + # From descriptor to start + seek_and_assert_positions(original_offset + 1, "descriptor") + + +def test_seek_back_in_time( + kafka_producer: KafkaProducer, + kafka_consumer: KafkaConsumer, + kafka_topic, + run_engine_without_md, +): + partition_number = list(kafka_consumer.partitions_for_topic(kafka_topic))[0] + topic_partition = TopicPartition(kafka_topic, partition_number) + + kafka_consumer.seek_to_beginning() + oldest_offset = kafka_consumer.position(topic_partition) + + kafka_consumer.seek_to_end() + newest_offset = kafka_consumer.position(topic_partition) + + if newest_offset - oldest_offset < 5: + # Add some new time-spaced data. + for _ in range(5): + run_engine_without_md(bp.count([hw().det], num=2, delay=1)) + while kafka_consumer.poll(timeout_ms=100) != {}: + pass + newest_offset = kafka_consumer.position(topic_partition) + + offsets = [round(x) for x in np.linspace(oldest_offset, newest_offset - 1, num=5)] + timestamps = list() + for offset in offsets: + kafka_consumer.seek(topic_partition, offset) + + record = kafka_consumer.poll( + timeout_ms=1_000, max_records=1, update_offsets=False + )[topic_partition][0] + timestamps.append(record.timestamp // 1000) + + for expected_timestamp in timestamps: + time_delta = datetime.now(timezone.utc) - datetime.fromtimestamp( + expected_timestamp, tz=timezone.utc + ) + seek_back_in_time(kafka_consumer, time_delta) + + record = kafka_consumer.poll( + timeout_ms=1_000, max_records=1, update_offsets=False + )[topic_partition][0] + + assert np.isclose(record.timestamp // 1000, expected_timestamp, atol=1) diff --git a/tests/utils/test_kafka_monitor.py b/tests/utils/test_kafka_monitor.py index dd12e34..0d35c84 100644 --- a/tests/utils/test_kafka_monitor.py +++ b/tests/utils/test_kafka_monitor.py @@ -1,40 +1,15 @@ import pytest import queue -from datetime import datetime, timezone - -import msgpack -import msgpack_numpy as _m - -from kafka.producer import KafkaProducer -from kafka.consumer import KafkaConsumer -from kafka.structs import TopicPartition - -import numpy as np from ophyd.sim import hw from bluesky import RunEngine, plans as bp, plan_stubs as bps, preprocessors as bpp from sophys.common.utils.kafka.monitor import ThreadedMonitor -from sophys.common.utils.kafka.monitor import seek_start, seek_back_in_time - from . import _wait -_m.patch() - - -@pytest.fixture(scope="session") -def kafka_bootstrap_ip(): - return "localhost:9092" - - -@pytest.fixture(scope="session") -def kafka_topic(): - return "test_bluesky_raw_docs" - - @pytest.fixture(scope="function") def save_queue_size() -> int: return 4 @@ -84,30 +59,6 @@ def incomplete_documents(_incomplete_documents): return _incomplete_documents -@pytest.fixture(scope="function") -def kafka_producer(kafka_bootstrap_ip): - producer = KafkaProducer( - bootstrap_servers=[kafka_bootstrap_ip], value_serializer=msgpack.dumps - ) - yield producer - producer.flush() - producer.close() - - -@pytest.fixture(scope="function") -def kafka_consumer(kafka_bootstrap_ip, kafka_topic): - consumer = KafkaConsumer( - kafka_topic, - bootstrap_servers=[kafka_bootstrap_ip], - value_deserializer=msgpack.unpackb, - ) - - # Connect the consumer properly to the topic - consumer.poll(timeout_ms=100, max_records=1, update_offsets=False) - - return consumer - - @pytest.fixture(scope="function") def base_md(tmp_path_factory): return { @@ -123,13 +74,6 @@ def run_engine_with_md(base_md, kafka_producer, kafka_topic): return RE -@pytest.fixture(scope="function") -def run_engine_without_md(kafka_producer, kafka_topic): - RE = RunEngine() - RE.subscribe(lambda name, doc: kafka_producer.send(kafka_topic, (name, doc))) - return RE - - def _create_good_monitor( save_queue, incomplete_documents, kafka_topic, kafka_bootstrap_ip ) -> ThreadedMonitor: @@ -378,62 +322,6 @@ def custom_plan(): assert len(docs) == 4, docs.get_raw_data() -def test_seek_start( - kafka_producer, kafka_consumer: KafkaConsumer, kafka_topic, run_engine_without_md -): - partition_number = list(kafka_consumer.partitions_for_topic(kafka_topic))[0] - topic_partition = TopicPartition(kafka_topic, partition_number) - original_offset = kafka_consumer.position(topic_partition) - - uid, *_ = run_engine_without_md(bp.count([hw().det], num=10)) - - def seek_and_assert_positions(offset: int, seeked_event_name: str): - kafka_consumer.seek(topic_partition, offset) - - records = kafka_consumer.poll( - timeout_ms=1_000, max_records=1, update_offsets=False - ) - event_name, event_data = records[topic_partition][0].value - - assert event_name == seeked_event_name - - seek_start( - kafka_consumer, - kafka_topic, - partition_number, - offset, - event_name, - event_data, - ) - - records = kafka_consumer.poll( - timeout_ms=1_000, max_records=1, update_offsets=False - ) - event_name, _ = records[topic_partition][0].value - - assert event_name == "start" - - kafka_producer.flush(timeout=1.0) - while kafka_consumer.poll(timeout_ms=100) != {}: - pass - - new_offset = kafka_consumer.position(topic_partition) - # start (1) + descriptor (1) + events (10) + stop (1) - assert new_offset - original_offset == 13 - - # From stop to start - seek_and_assert_positions(new_offset - 1, "stop") - - # From start to start (do nothing) - seek_and_assert_positions(original_offset, "start") - - # From event in the middle to start - seek_and_assert_positions(original_offset + 5, "event") - - # From descriptor to start - seek_and_assert_positions(original_offset + 1, "descriptor") - - def test_seek_start_in_monitor( run_engine_without_md, incomplete_documents, @@ -476,46 +364,3 @@ def custom_plan(): ) run_engine_without_md(custom_plan()) - - -def test_seek_back_in_time( - kafka_producer, kafka_consumer: KafkaConsumer, kafka_topic, run_engine_without_md -): - partition_number = list(kafka_consumer.partitions_for_topic(kafka_topic))[0] - topic_partition = TopicPartition(kafka_topic, partition_number) - - kafka_consumer.seek_to_beginning() - oldest_offset = kafka_consumer.position(topic_partition) - - kafka_consumer.seek_to_end() - newest_offset = kafka_consumer.position(topic_partition) - - if newest_offset - oldest_offset < 5: - # Add some new time-spaced data. - for _ in range(5): - run_engine_without_md(bp.count([hw().det], num=2, delay=1)) - while kafka_consumer.poll(timeout_ms=100) != {}: - pass - newest_offset = kafka_consumer.position(topic_partition) - - offsets = [round(x) for x in np.linspace(oldest_offset, newest_offset - 1, num=5)] - timestamps = list() - for offset in offsets: - kafka_consumer.seek(topic_partition, offset) - - record = kafka_consumer.poll( - timeout_ms=1_000, max_records=1, update_offsets=False - )[topic_partition][0] - timestamps.append(record.timestamp // 1000) - - for expected_timestamp in timestamps: - time_delta = datetime.now(timezone.utc) - datetime.fromtimestamp( - expected_timestamp, tz=timezone.utc - ) - seek_back_in_time(kafka_consumer, time_delta) - - record = kafka_consumer.poll( - timeout_ms=1_000, max_records=1, update_offsets=False - )[topic_partition][0] - - assert np.isclose(record.timestamp // 1000, expected_timestamp, atol=1) From 422eca2cddacc963c08fbb1df8c43603476d02b9 Mon Sep 17 00:00:00 2001 From: Sofia Donato Ferreira Date: Wed, 6 May 2026 09:32:55 -0300 Subject: [PATCH 12/12] fix: add retries and runtime handling of failures in seek start document logic Signed-off-by: Sofia Donato Ferreira --- src/sophys/common/utils/kafka/consumer.py | 17 +++++++--- src/sophys/common/utils/kafka/monitor.py | 41 ++++++++++++++++++++--- 2 files changed, 50 insertions(+), 8 deletions(-) diff --git a/src/sophys/common/utils/kafka/consumer.py b/src/sophys/common/utils/kafka/consumer.py index 72a5004..37bb864 100644 --- a/src/sophys/common/utils/kafka/consumer.py +++ b/src/sophys/common/utils/kafka/consumer.py @@ -22,10 +22,19 @@ def seek_start_document(consumer: KafkaConsumer, record: ConsumerRecord): offset -= 1 consumer.seek(topic_partition, offset) - records = consumer.poll(timeout_ms=5_000, max_records=1, update_offsets=False) - assert ( - topic_partition in records - ), "Could not retrieve data from Kafka in seek_start." + for attempt_number in range(1, 4): + timeout = 1_000 * attempt_number + records = consumer.poll( + timeout_ms=timeout, max_records=1, update_offsets=False + ) + + if topic_partition in records: + break + + if topic_partition not in records: + raise RuntimeError( + f"Failed to retrieve records for the current partition ('{record.partition}' for '{record.topic}')" + ) event_name, event_data = records[topic_partition][0].value diff --git a/src/sophys/common/utils/kafka/monitor.py b/src/sophys/common/utils/kafka/monitor.py index acb0d27..7441d79 100644 --- a/src/sophys/common/utils/kafka/monitor.py +++ b/src/sophys/common/utils/kafka/monitor.py @@ -1,3 +1,4 @@ +import enum import logging import json from collections import defaultdict @@ -261,6 +262,12 @@ def __getitem__(self, data: tuple): return super().__getitem__(self.find_with_resource(resource_uid)) +class SeekStartResult(enum.Enum): + NO_SEEK = enum.auto() + SEEK_SUCCEEDED = enum.auto() + SEEK_FAILED = enum.auto() + + class MonitorBase(KafkaConsumer): def __init__( self, @@ -295,6 +302,8 @@ def __init__( self.name = repr(self) self.running = Event() + self._runs_to_ignore = set() + self.__hour_offset = hour_offset self.__documents = MultipleDocumentDictionary() @@ -339,7 +348,7 @@ def topic(self): """Get the name of the Kafka topic monitored by this object.""" return "".join(self.subscription()) - def _seek_start_if_needed(self, event) -> bool: + def _seek_start_if_needed(self, event) -> SeekStartResult: should_seek_start = False try: @@ -350,10 +359,19 @@ def _seek_start_if_needed(self, event) -> bool: # In the middle of a run, try to go back to the beginning should_seek_start = True + seek_start_succeeded = True if should_seek_start: - seek_start_document(self, event) + try: + seek_start_document(self, event) + except RuntimeError: + self._logger.error("Run '{}': {}", event) + seek_start_succeeded = False - return should_seek_start + if not should_seek_start: + return SeekStartResult.NO_SEEK + if not seek_start_succeeded: + return SeekStartResult.SEEK_FAILED + return SeekStartResult.SEEK_SUCCEEDED def _commit_pending_documents(self): """Commit pending documents to the save queue, when possible.""" @@ -409,7 +427,19 @@ def _handle_kafka_event(self, event): return try: - if self._seek_start_if_needed(event): + match self._seek_start_if_needed(event): + case SeekStartResult.NO_SEEK: + pass + case SeekStartResult.SEEK_SUCCEEDED: + return + case SeekStartResult.SEEK_FAILED: + start_uid = _get_start_uid_from_event_data(data[1]) + if start_uid is not None: + self._runs_to_ignore.add(start_uid) + return + + start_uid = _get_start_uid_from_event_data(data[1]) + if start_uid in self._runs_to_ignore: return match data: @@ -438,6 +468,9 @@ def _handle_kafka_event(self, event): self.__documents[data].clear_subscriptions() self.__to_save_documents.append(self.__documents[data].identifier) + start_uid = self.__documents[data].identifier + self._runs_to_ignore.discard(start_uid) + # TODO: Validate number of saved entries via the stop document's num_events # TODO: Validate successful run via the stop document's exit_status