Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
6615ff6
Move delete_instance_health_checks to a separate module
r4victor Mar 2, 2026
85f8866
Move utils/provisioning.py to ssh_fleets/provisioning.py
r4victor Mar 2, 2026
0c74816
Use SSHProvisioningError for ssh instances errors
r4victor Mar 2, 2026
81ac9a2
Fix _add_remote() nested try-excepts
r4victor Mar 2, 2026
18921ed
Refactor _resolve_ssh_instance_network
r4victor Mar 2, 2026
20d1e08
Refactor _process_instance() into thin dispatcher
r4victor Mar 2, 2026
c16a502
Refactor instance check code
r4victor Mar 2, 2026
66f9722
Add fetchers tests
r4victor Mar 2, 2026
f781cc3
Add TestInstanceWorker
r4victor Mar 3, 2026
a11567b
Run pyright for pipeline tests
r4victor Mar 3, 2026
d4d3147
WIP: InstanceWorker
r4victor Mar 3, 2026
bb11237
Fix volumes pipeline processing active
r4victor Mar 3, 2026
3bf9006
Refactor log_lock_token
r4victor Mar 3, 2026
b508c34
Build instance events from update map
r4victor Mar 3, 2026
3a0b7dd
Rename
r4victor Mar 3, 2026
1ae0aae
Refactor instance pipeline into modules
r4victor Mar 3, 2026
197c5f6
Inline _get_effective_ helpers
r4victor Mar 3, 2026
0b3acbb
Process new instance immediately
r4victor Mar 3, 2026
07c484b
Do not refetch status
r4victor Mar 3, 2026
5f8980c
Fix sibling_update_rows
r4victor Mar 3, 2026
bd9342b
Drop redundant synchronize_session=False
r4victor Mar 3, 2026
28fa000
Add ProcessContext
r4victor Mar 3, 2026
899c18c
Simplify placement groups code
r4victor Mar 3, 2026
316fd8a
Drop _PlacementGroupState
r4victor Mar 3, 2026
2d0090b
Restore comments
r4victor Mar 3, 2026
24c74cc
Fix result.sibling_update_rows append
r4victor Mar 3, 2026
8bd898d
Fix unset typing
r4victor Mar 3, 2026
07e83e4
Add migration
r4victor Mar 4, 2026
1b99cfc
Lock instances in fleet pipeline
r4victor Mar 4, 2026
5725576
Optimize instance lock in fleet pipeline
r4victor Mar 4, 2026
8955d68
Respect instance lock in delete_fleets
r4victor Mar 4, 2026
91bbced
Skip locked instances in process_next_terminating_job
r4victor Mar 4, 2026
168b9bd
Respect instance lock in submitted_jobs
r4victor Mar 4, 2026
9cf36a4
Add ix_instances_pipeline_fetch_q_index
r4victor Mar 4, 2026
9941652
Wire instance pipeline
r4victor Mar 4, 2026
8e3a019
Set current_master_instance
r4victor Mar 4, 2026
30ca68f
Refactor current_master_instance
r4victor Mar 5, 2026
8367521
Terminate instances with MASTER_FAILED if the master dies with NO_OFFERS
r4victor Mar 5, 2026
edb225b
Fix instance unlock in fleet pipeline
r4victor Mar 5, 2026
e2152d8
Remove extra fleet_model_to_fleet
r4victor Mar 5, 2026
6e26763
Wire pipeline_hinter
r4victor Mar 5, 2026
baa79e3
Remove extra fleet_model_to_fleet
r4victor Mar 5, 2026
1bcbca0
Fix index name
r4victor Mar 5, 2026
7cac4e4
Merge branch 'master' into issue_3551_instance_pipeline
r4victor Mar 5, 2026
0767d6c
Rebase migrations
r4victor Mar 5, 2026
e14fcd4
Fix redundant fleet.instances loads in instance pipeline
r4victor Mar 5, 2026
32a2418
Do not lock all instances in delete_fleets
r4victor Mar 5, 2026
631c086
Add FIXME
r4victor Mar 5, 2026
08ca18f
Fix tests
r4victor Mar 5, 2026
c6f18eb
Retry instance lock in delete_fleets
r4victor Mar 6, 2026
f9a9338
Retry lock in all delete endpoints
r4victor Mar 6, 2026
caa0abd
Fix created_at and last_processed_at init values
r4victor Mar 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- Python targets 3.9+ with 4-space indentation and max line length of 99 (see `ruff.toml`; `E501` is ignored but keep lines readable).
- Imports are sorted via Ruff’s isort settings (`dstack` treated as first-party).
- Keep primary/public functions before local helper functions in a module section.
- Keep private classes, exceptions, and similar implementation-specific types close to the private functions that use them unless they are shared more broadly in the module.
- Prefer pydantic-style models in `core/models`.
- Tests use `test_*.py` modules and `test_*` functions; fixtures live near usage.

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ include = [
"src/dstack/_internal/core/backends/runpod",
"src/dstack/_internal/cli/services/configurators",
"src/dstack/_internal/cli/commands",
"src/tests/_internal/server/background/pipeline_tasks",
]
ignore = [
"src/dstack/_internal/server/migrations/versions",
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/core/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class ConfigurationError(DstackError):
pass


class SSHProvisioningError(DstackError):
pass


class SSHError(DstackError):
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dstack._internal.server.background.pipeline_tasks.compute_groups import ComputeGroupPipeline
from dstack._internal.server.background.pipeline_tasks.fleets import FleetPipeline
from dstack._internal.server.background.pipeline_tasks.gateways import GatewayPipeline
from dstack._internal.server.background.pipeline_tasks.instances import InstancePipeline
from dstack._internal.server.background.pipeline_tasks.placement_groups import (
PlacementGroupPipeline,
)
Expand All @@ -19,6 +20,7 @@ def __init__(self) -> None:
ComputeGroupPipeline(),
FleetPipeline(),
GatewayPipeline(),
InstancePipeline(),
PlacementGroupPipeline(),
VolumePipeline(),
]
Expand Down
47 changes: 46 additions & 1 deletion src/dstack/_internal/server/background/pipeline_tasks/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import logging
import math
import random
import time
import uuid
from abc import ABC, abstractmethod
from collections.abc import Iterable, Sequence
Expand Down Expand Up @@ -331,14 +333,20 @@ async def start(self):
self._running = True
while self._running:
item = await self._queue.get()
start_time = time.time()
logger.debug("Processing %s item %s", item.__tablename__, item.id)
try:
await self.process(item)
except Exception:
logger.exception("Unexpected exception when processing item")
finally:
await self._heartbeater.untrack(item)
logger.debug("Processed %s item %s", item.__tablename__, item.id)
logger.debug(
"Processed %s item %s in %.3f",
item.__tablename__,
item.id,
time.time() - start_time,
)

def stop(self):
self._running = False
Expand Down Expand Up @@ -416,3 +424,40 @@ def resolve_now_placeholders(update_values: _ResolveNowInput, now: datetime):
for key, value in update_values.items():
if value is NOW_PLACEHOLDER:
update_values[key] = now


def log_lock_token_mismatch(
logger: logging.Logger,
item: PipelineItem,
action: str = "process",
) -> None:
logger.warning(
"Failed to %s %s item %s: lock_token mismatch."
" The item is expected to be processed and updated on another fetch iteration.",
action,
item.__tablename__,
item.id,
)


def log_lock_token_changed_after_processing(
logger: logging.Logger,
item: PipelineItem,
action: str = "update",
expected_outcome: str = "updated",
) -> None:
logger.warning(
"Failed to %s %s item %s after processing: lock_token changed."
" The item is expected to be processed and %s on another fetch iteration.",
action,
item.__tablename__,
item.id,
expected_outcome,
)


def log_lock_token_changed_on_reset(logger: logging.Logger) -> None:
logger.warning(
"Failed to reset lock: lock_token changed."
" The item is expected to be processed and updated on another fetch iteration."
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
PipelineItem,
UpdateMapDateTime,
Worker,
log_lock_token_changed_after_processing,
log_lock_token_mismatch,
resolve_now_placeholders,
set_processed_update_map_fields,
set_unlock_update_map_fields,
Expand Down Expand Up @@ -194,12 +196,7 @@ async def process(self, item: PipelineItem):
)
compute_group_model = res.unique().scalar_one_or_none()
if compute_group_model is None:
logger.warning(
"Failed to process %s item %s: lock_token mismatch."
" The item is expected to be processed and updated on another fetch iteration.",
item.__tablename__,
item.id,
)
log_lock_token_mismatch(logger, item)
return

result = _TerminateResult()
Expand Down Expand Up @@ -228,12 +225,7 @@ async def process(self, item: PipelineItem):
)
updated_ids = list(res.scalars().all())
if len(updated_ids) == 0:
logger.warning(
"Failed to update %s item %s after processing: lock_token changed."
" The item is expected to be processed and updated on another fetch iteration.",
item.__tablename__,
item.id,
)
log_lock_token_changed_after_processing(logger, item)
return
if not result.instances_update_map:
return
Expand All @@ -249,6 +241,8 @@ async def process(self, item: PipelineItem):
instance_model=instance_model,
old_status=instance_model.status,
new_status=InstanceStatus.TERMINATED,
termination_reason=instance_model.termination_reason,
termination_reason_message=instance_model.termination_reason_message,
)


Expand Down
Loading