Skip to content

[pull] master from ray-project:master#4058

Merged
pull[bot] merged 7 commits intomiqdigital:masterfrom
ray-project:master
Apr 16, 2026
Merged

[pull] master from ray-project:master#4058
pull[bot] merged 7 commits intomiqdigital:masterfrom
ray-project:master

Conversation

@pull
Copy link
Copy Markdown

@pull pull bot commented Apr 16, 2026

See Commits and Changes for more details.


Created by pull[bot] (v2.0.0-alpha.4)

Can you help keep this open source service alive? 💖 Please sponsor : )

rohankmr414 and others added 7 commits April 16, 2026 14:18
#62629)

`validate_actor_state_name` in `utils.py` maintained its own hard-coded
list of
valid actor states, duplicating `ACTOR_STATUS` from `custom_types.py`.
This
replaces it with a direct reference to the shared constant, which is
already
validated against the protobuf enum at import time.

Also makes the error message dynamic, so it stays in sync automatically
if
states are added or removed in the future.

Closes #50397

- Used a lazy import to avoid circular dependency (`utils.py` is
imported early)
- Behavior is identical: same inputs, same outputs, same error messages

---------

Signed-off-by: Rohan K <rohankmr414@gmail.com>
> source code for issue
https://github.com/issues/created?issue=ray-project%7Cray%7C60200

### Current checkpoint:

<img width="2796" height="1198" alt="Image"
src="https://github.com/user-attachments/assets/528ad72b-6975-4e96-8f01-39e373990647"
/>

The current implementation has two issues:
1. Each ReadTask copies an Arrow-typed checkpoint_id array and then
converts it into a Numpy-typed array. This step is very
time-consuming(see [previous
testing](#60002)) The most
time-consuming operation is repeated in every ReadTask.
2. Each ReadTask holds a copy of the checkpoint_id array, resulting in
high memory usage of the cluster.


### Improved Checkpoint (Initial design, single actor):
Maintain a global `checkpoint_filter` actor that holds the
`checkpoint_ids` array; this actor is responsible for filtering all
input blocks.

<img width="2096" height="1278" alt="Image"
src="https://github.com/user-attachments/assets/b9956eff-c807-45c4-bc4c-f0497974370d"
/>

There are two advantages to this approach:
1. The most time-consuming operation: the conversion from Arrow-typed
array to Numpy-typed array is performed only once.
2. Reduced memory usage: Each read task no longer needs to hold a large
array; only the `checkpoint_filter `actor holds it.

### Performance test
test code:
```
import shutil
from typing import Dict
import os
import time

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import ray
from ray.data.checkpoint import CheckpointConfig

INPUT_PATH="/tmp/ray_test/input/"
OUTPUT_PATH="/tmp/ray_test/output/"
CKPT_PATH="/tmp/ray_test/ckpt/"


class Qwen3ASRPredictor:
    def __init__(self):
        print("download ckpt")

    def __call__(self, batch_input: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        return batch_input

def setup():
    if os.path.exists(INPUT_PATH):
        shutil.rmtree(INPUT_PATH)
    if os.path.exists(CKPT_PATH):
        shutil.rmtree(CKPT_PATH)
    if os.path.exists(OUTPUT_PATH):
        shutil.rmtree(OUTPUT_PATH)

    # generate input data
    if not os.path.exists(INPUT_PATH):
        os.makedirs(INPUT_PATH)
    for i in range(10000):
        ids = [str(i) for i in range(i * 10000, (i + 1) * 10000)]
        df = pd.DataFrame({'id': ids})
        table = pa.Table.from_pandas(df)
        pq.write_table(table, os.path.join(INPUT_PATH, f"{i}.parquet"))

    # generate checkpoint
    if not os.path.exists(CKPT_PATH):
        os.makedirs(CKPT_PATH)
    ids = [str(i) for i in range(0, 80_000_000)]
    df = pd.DataFrame({'id': ids})
    table = pa.Table.from_pandas(df)
    pq.write_table(table, os.path.join(CKPT_PATH, "ckpt.parquet"))



if __name__ == "__main__":
    ray.init()

    setup()

    ctx = ray.data.DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        id_column="id",
        checkpoint_path=CKPT_PATH,
        delete_checkpoint_on_success=False,
    )

    start_time = time.time()

    input = ray.data.read_parquet(
        INPUT_PATH,
        parallelism=1000,
        # memory=8 * 1024 **3 # set for origin ray to avoid oom
    )

    pred = input.map_batches(Qwen3ASRPredictor, batch_size=1000)

    pred.write_parquet(OUTPUT_PATH)

    end_time = time.time()

    print(f"costs: {end_time - start_time}s")

    # check result
    result_ds = ray.data.read_parquet(OUTPUT_PATH)
    assert result_ds.count() == 20_000_000
```

node: 16 cores with 64GB memory (make sure you have memory at least 16GB
to avoid oom)

#### origin ray:
```
pip install ray==2.54.0
python test.py
```
#### Speedup:
```
pip install https://ray-wheel.oss-cn-beijing.aliyuncs.com/speedup/ray-3.0.0.dev0-cp310-cp310-manylinux2014_x86_64.whl
python test.py
```

#### Test Result
origin: 680s
speedup: 190s
You can see that even the end2end running time of the task has been
accelerated by 3.6 times.

#### Memory
If we delete this row:
```
memory=8 * 1024 **3 # set for origin ray to avoid oom
```
original ray will oom, the fixed ray passed. This demonstrates that this
PR has enhanced the stability.


----
### Updated 20260225 (ActorPool)
As @owenowenisme methoned, if filtering is performed by a single actor,
the single actor could be the bottleneck. Therefore, I extended a single
Actor into an ActorPool. For more details, please refer to the link.
#60294 (comment)


![20260223162250](https://github.com/user-attachments/assets/9bd1067f-f2a8-47dd-8f99-e232be64155e)

---------

Co-authored-by: xiaowen.wxw <wxw403883@alibaba-inc.com>
Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
…wn, terminal ops, OutputSplitter) (#62456)

### Summary

`Dataset.streaming_split()` can keep `ObjectRef`s alive across epochs
because the split coordinator holds an iterator that closes over the
executor topology, and because `OutputSplitter` can leave the metrics
“shadow” input queue out of sync with the real buffer after `equal=True`
truncation. This PR fixes both issues.

### What was going wrong

1. **Topology + queues across epochs**  
The coordinator keeps `_output_iterator` until the next epoch replaces
it. Anything reachable from the old executor’s topology (including
`OpState` queues) can pin the object store until that chain is dropped
and queues are drained.

2. **Truncated remainder + metrics**  
After equalizing splits, a bare `_buffer.clear()` emptied the real
buffer without `on_input_dequeued`, so `OpRuntimeMetrics` could still
hold `RefBundle`s for the truncated remainder.

3. **Terminal `OutputSplitter` and `mark_execution_finished`**  
`mark_execution_finished()` (which clears internal queues via the mixin)
was not run for operators with no downstream deps, so terminal splitters
did not follow the same lifecycle path as other ops.

### What we changed

- **`OutputSplitter.all_inputs_done`**: replace `_buffer.clear()` with
`clear_internal_input_queue()` so truncation drains through metrics and
releases refs consistently.
- **`update_operator_states`**: treat fully completed terminal ops like
split sinks as finished for execution and run
`mark_execution_finished()` when `output_dependencies` is empty.
- **`StreamingExecutor.shutdown`**: after `op.shutdown()`, call
`_clear_topology_queues_post_shutdown(force)` to clear internal queues
and topology `OpState` queues; **for the DAG sink with
`num_output_splits > 1`, skip clearing the external output queues on
cooperative (`force=False`) shutdown** so other splits keep their
bundles; **`force=True` still clears everything** (e.g. epoch rollover
in `SplitCoordinator._try_start_new_epoch`).
- **Tests**: `test_output_split_shutdown_preserves_sibling_split_queues`
covers fast vs slow split consumers.

### Links (for reviewers)

- Cooperative shutdown from first exhausted split:
[`streaming_executor.py`
(`_ClosingIterator.get_next`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/execution/streaming_executor.py#L688-L710)
- Forced teardown on epoch advance: [`stream_split_iterator.py`
(`_try_start_new_epoch`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L216-L228)
- Barrier / when the next epoch starts (not tied to first consumer
finishing): [`start_epoch` /
`_barrier`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L205-L214),
[`_barrier` →
`_try_start_new_epoch`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L357-L382)

---------

Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
…61789)

#### Summary
- Adds a shared `serve_test_env.bzl` that centralizes "always-on" env
vars for all Ray Serve tests
- Sets `RAY_SERVE_LOG_CLIENT_ADDRESS=1` on every Serve test target (~32
targets across `tests/BUILD.bazel` and `tests/unit/BUILD.bazel`)
- Future global env vars only need a one-line addition to
`SERVE_COMMON_ENV` instead of editing 30+ test blocks

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit-anyscale <harshit@anyscale.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
-removing requirement & constraint files from wanda file
-only copying the lock file to the image
-remove REQUIREMENTS_FILE arg

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…testing with empty file (#62647)

## Description
The valueerror should be raised only in the event that the contents of
the file are invalid not if the file is empty

## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: Goutam <goutam@anyscale.com>
to pick up latest ubuntu apt source updates

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
@pull pull bot locked and limited conversation to collaborators Apr 16, 2026
@pull pull bot added the ⤵️ pull label Apr 16, 2026
@pull pull bot merged commit 4c9a9d8 into miqdigital:master Apr 16, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants