[pull] master from ray-project:master#4058
Merged
pull[bot] merged 7 commits intomiqdigital:masterfrom Apr 16, 2026
Merged
Conversation
#62629) `validate_actor_state_name` in `utils.py` maintained its own hard-coded list of valid actor states, duplicating `ACTOR_STATUS` from `custom_types.py`. This replaces it with a direct reference to the shared constant, which is already validated against the protobuf enum at import time. Also makes the error message dynamic, so it stays in sync automatically if states are added or removed in the future. Closes #50397 - Used a lazy import to avoid circular dependency (`utils.py` is imported early) - Behavior is identical: same inputs, same outputs, same error messages --------- Signed-off-by: Rohan K <rohankmr414@gmail.com>
> source code for issue https://github.com/issues/created?issue=ray-project%7Cray%7C60200 ### Current checkpoint: <img width="2796" height="1198" alt="Image" src="https://github.com/user-attachments/assets/528ad72b-6975-4e96-8f01-39e373990647" /> The current implementation has two issues: 1. Each ReadTask copies an Arrow-typed checkpoint_id array and then converts it into a Numpy-typed array. This step is very time-consuming(see [previous testing](#60002)) The most time-consuming operation is repeated in every ReadTask. 2. Each ReadTask holds a copy of the checkpoint_id array, resulting in high memory usage of the cluster. ### Improved Checkpoint (Initial design, single actor): Maintain a global `checkpoint_filter` actor that holds the `checkpoint_ids` array; this actor is responsible for filtering all input blocks. <img width="2096" height="1278" alt="Image" src="https://github.com/user-attachments/assets/b9956eff-c807-45c4-bc4c-f0497974370d" /> There are two advantages to this approach: 1. The most time-consuming operation: the conversion from Arrow-typed array to Numpy-typed array is performed only once. 2. Reduced memory usage: Each read task no longer needs to hold a large array; only the `checkpoint_filter `actor holds it. ### Performance test test code: ``` import shutil from typing import Dict import os import time import numpy as np import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import ray from ray.data.checkpoint import CheckpointConfig INPUT_PATH="/tmp/ray_test/input/" OUTPUT_PATH="/tmp/ray_test/output/" CKPT_PATH="/tmp/ray_test/ckpt/" class Qwen3ASRPredictor: def __init__(self): print("download ckpt") def __call__(self, batch_input: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: return batch_input def setup(): if os.path.exists(INPUT_PATH): shutil.rmtree(INPUT_PATH) if os.path.exists(CKPT_PATH): shutil.rmtree(CKPT_PATH) if os.path.exists(OUTPUT_PATH): shutil.rmtree(OUTPUT_PATH) # generate input data if not os.path.exists(INPUT_PATH): os.makedirs(INPUT_PATH) for i in range(10000): ids = [str(i) for i in range(i * 10000, (i + 1) * 10000)] df = pd.DataFrame({'id': ids}) table = pa.Table.from_pandas(df) pq.write_table(table, os.path.join(INPUT_PATH, f"{i}.parquet")) # generate checkpoint if not os.path.exists(CKPT_PATH): os.makedirs(CKPT_PATH) ids = [str(i) for i in range(0, 80_000_000)] df = pd.DataFrame({'id': ids}) table = pa.Table.from_pandas(df) pq.write_table(table, os.path.join(CKPT_PATH, "ckpt.parquet")) if __name__ == "__main__": ray.init() setup() ctx = ray.data.DataContext.get_current() ctx.checkpoint_config = CheckpointConfig( id_column="id", checkpoint_path=CKPT_PATH, delete_checkpoint_on_success=False, ) start_time = time.time() input = ray.data.read_parquet( INPUT_PATH, parallelism=1000, # memory=8 * 1024 **3 # set for origin ray to avoid oom ) pred = input.map_batches(Qwen3ASRPredictor, batch_size=1000) pred.write_parquet(OUTPUT_PATH) end_time = time.time() print(f"costs: {end_time - start_time}s") # check result result_ds = ray.data.read_parquet(OUTPUT_PATH) assert result_ds.count() == 20_000_000 ``` node: 16 cores with 64GB memory (make sure you have memory at least 16GB to avoid oom) #### origin ray: ``` pip install ray==2.54.0 python test.py ``` #### Speedup: ``` pip install https://ray-wheel.oss-cn-beijing.aliyuncs.com/speedup/ray-3.0.0.dev0-cp310-cp310-manylinux2014_x86_64.whl python test.py ``` #### Test Result origin: 680s speedup: 190s You can see that even the end2end running time of the task has been accelerated by 3.6 times. #### Memory If we delete this row: ``` memory=8 * 1024 **3 # set for origin ray to avoid oom ``` original ray will oom, the fixed ray passed. This demonstrates that this PR has enhanced the stability. ---- ### Updated 20260225 (ActorPool) As @owenowenisme methoned, if filtering is performed by a single actor, the single actor could be the bottleneck. Therefore, I extended a single Actor into an ActorPool. For more details, please refer to the link. #60294 (comment)  --------- Co-authored-by: xiaowen.wxw <wxw403883@alibaba-inc.com> Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
…wn, terminal ops, OutputSplitter) (#62456) ### Summary `Dataset.streaming_split()` can keep `ObjectRef`s alive across epochs because the split coordinator holds an iterator that closes over the executor topology, and because `OutputSplitter` can leave the metrics “shadow” input queue out of sync with the real buffer after `equal=True` truncation. This PR fixes both issues. ### What was going wrong 1. **Topology + queues across epochs** The coordinator keeps `_output_iterator` until the next epoch replaces it. Anything reachable from the old executor’s topology (including `OpState` queues) can pin the object store until that chain is dropped and queues are drained. 2. **Truncated remainder + metrics** After equalizing splits, a bare `_buffer.clear()` emptied the real buffer without `on_input_dequeued`, so `OpRuntimeMetrics` could still hold `RefBundle`s for the truncated remainder. 3. **Terminal `OutputSplitter` and `mark_execution_finished`** `mark_execution_finished()` (which clears internal queues via the mixin) was not run for operators with no downstream deps, so terminal splitters did not follow the same lifecycle path as other ops. ### What we changed - **`OutputSplitter.all_inputs_done`**: replace `_buffer.clear()` with `clear_internal_input_queue()` so truncation drains through metrics and releases refs consistently. - **`update_operator_states`**: treat fully completed terminal ops like split sinks as finished for execution and run `mark_execution_finished()` when `output_dependencies` is empty. - **`StreamingExecutor.shutdown`**: after `op.shutdown()`, call `_clear_topology_queues_post_shutdown(force)` to clear internal queues and topology `OpState` queues; **for the DAG sink with `num_output_splits > 1`, skip clearing the external output queues on cooperative (`force=False`) shutdown** so other splits keep their bundles; **`force=True` still clears everything** (e.g. epoch rollover in `SplitCoordinator._try_start_new_epoch`). - **Tests**: `test_output_split_shutdown_preserves_sibling_split_queues` covers fast vs slow split consumers. ### Links (for reviewers) - Cooperative shutdown from first exhausted split: [`streaming_executor.py` (`_ClosingIterator.get_next`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/execution/streaming_executor.py#L688-L710) - Forced teardown on epoch advance: [`stream_split_iterator.py` (`_try_start_new_epoch`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L216-L228) - Barrier / when the next epoch starts (not tied to first consumer finishing): [`start_epoch` / `_barrier`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L205-L214), [`_barrier` → `_try_start_new_epoch`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L357-L382) --------- Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
…61789) #### Summary - Adds a shared `serve_test_env.bzl` that centralizes "always-on" env vars for all Ray Serve tests - Sets `RAY_SERVE_LOG_CLIENT_ADDRESS=1` on every Serve test target (~32 targets across `tests/BUILD.bazel` and `tests/unit/BUILD.bazel`) - Future global env vars only need a one-line addition to `SERVE_COMMON_ENV` instead of editing 30+ test blocks 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: harshit-anyscale <harshit@anyscale.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
-removing requirement & constraint files from wanda file -only copying the lock file to the image -remove REQUIREMENTS_FILE arg Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…testing with empty file (#62647) ## Description The valueerror should be raised only in the event that the contents of the file are invalid not if the file is empty ## Related issues > Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. Signed-off-by: Goutam <goutam@anyscale.com>
to pick up latest ubuntu apt source updates Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot] (v2.0.0-alpha.4)
Can you help keep this open source service alive? 💖 Please sponsor : )