Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions extensions/business/deeploy/deeploy_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class DEEPLOY_KEYS:
ERROR = "error"
TRACE = "trace"
REQUEST = "request"
REQUESTS = "requests"
RESULTS = "results"
ITEM_INDEX = "item_index"
RETURN_REQUEST = "return_request"
STATUS_DETAILS = "status_details"
APPS = "apps"
Expand Down Expand Up @@ -46,6 +49,14 @@ class DEEPLOY_KEYS:
ONLINE = "online"
CHAIN_JOB = "chain_job"
JOB_CONFIG = "job_config"
STACK_JOB_CONFIG = "stack_job_config"
STACK_ID = "stack_id"
STACK_ALIAS = "stack_alias"
STACK_INDEX = "stack_index"
STACK_SIZE = "stack_size"
STACK_CONTAINER_REF = "stack_container_ref"
STACK_CONTAINER_ALIAS = "stack_container_alias"
STACK_TYPE = "stack_type"
# App params keys
APP_PARAMS = "app_params"
APP_PARAMS_IMAGE = "IMAGE"
Expand All @@ -68,6 +79,7 @@ class DEEPLOY_KEYS:

class DEEPLOY_STATUS:
SUCCESS = "success"
PARTIAL = "partial"
FAIL = "fail"
ERROR = "error"
PENDING = "pending"
Expand Down Expand Up @@ -592,6 +604,12 @@ class JOB_APP_TYPES:
]
}

DEEPLOY_CREATE_BATCH_REQUEST = {
DEEPLOY_KEYS.REQUESTS: [
DEEPLOY_CREATE_REQUEST,
],
}

###################################################################################################################
DEEPLOY_SCALE_UP_JOB_WORKERS_REQUEST = {
"job_id" : 1, # The job ID to extend the workers for
Expand Down
73 changes: 72 additions & 1 deletion extensions/business/deeploy/deeploy_manager_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from extensions.business.mixins.node_tags_mixin import _NodeTagsMixin
from extensions.business.mixins.request_tracking_mixin import _RequestTrackingMixin
from .deeploy_const import (
DEEPLOY_CREATE_REQUEST, DEEPLOY_CREATE_REQUEST_MULTI_PLUGIN, DEEPLOY_GET_APPS_REQUEST, DEEPLOY_DELETE_REQUEST,
DEEPLOY_CREATE_REQUEST, DEEPLOY_CREATE_REQUEST_MULTI_PLUGIN, DEEPLOY_CREATE_BATCH_REQUEST, DEEPLOY_GET_APPS_REQUEST, DEEPLOY_DELETE_REQUEST,
DEEPLOY_ERRORS, DEEPLOY_KEYS, DEEPLOY_SCALE_UP_JOB_WORKERS_REQUEST, DEEPLOY_STATUS, DEEPLOY_INSTANCE_COMMAND_REQUEST,
DEEPLOY_APP_COMMAND_REQUEST, DEEPLOY_GET_ORACLE_JOB_DETAILS_REQUEST, DEEPLOY_GET_R1FS_JOB_PIPELINE_REQUEST,
DEEPLOY_PLUGIN_DATA, JOB_APP_TYPES, JOB_APP_TYPES_ALL,
Expand Down Expand Up @@ -521,6 +521,7 @@ def _process_pipeline_request(
deeploy_specs_payload = self._ensure_deeploy_specs_job_config(
deeploy_specs_payload,
pipeline_params=pipeline_params,
stack_job_config=inputs.get(DEEPLOY_KEYS.STACK_JOB_CONFIG),
)

dct_status, str_status, response_keys, pipeline_to_persist = self.check_and_deploy_pipelines(
Expand Down Expand Up @@ -1032,6 +1033,76 @@ def create_pipeline(
return self._register_pending_deploy_request(result['__pending__'])
return result

@BasePlugin.endpoint(method="post")
# /create_pipelines_batch
def create_pipelines_batch(
self,
request: dict = DEEPLOY_CREATE_BATCH_REQUEST
):
"""
Create multiple pipelines in one API call.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we would like to have the stack jobs in multiple pipelines.
The main idea was having multiple CARs in the same pipeline and paying for it as per-CAR price summed up (not as a native job as we're doing it now)
For such apps it's really important to keep the CARs in the same pipeline, so they could share the same semaphoring mechanism and ENV variables injeciton


Expects `request.requests` to be a list of normal create_pipeline payloads.
Returns per-item results plus an aggregate status:
- success: all items succeeded/command_delivered
- fail: all items failed/timeout/error
- partial: mixed outcomes
"""
self.Pd(f"Called Deeploy create_pipelines_batch endpoint")

try:
requests = request.get(DEEPLOY_KEYS.REQUESTS, [])
if not isinstance(requests, list) or len(requests) == 0:
raise ValueError(f"{DEEPLOY_ERRORS.REQUEST3}: '{DEEPLOY_KEYS.REQUESTS}' must be a non-empty list.")

Comment on lines +1054 to +1057
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch endpoint currently accepts an unbounded requests list. A large payload can lead to excessive work (and long blocking time since each item is processed sequentially), which is a reliability/DoS risk. Consider enforcing a reasonable maximum batch size (configurable) and returning a validation error when exceeded.

Copilot uses AI. Check for mistakes.
results = []
statuses = []

for idx, request_item in enumerate(requests):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this is a no-go.
I think, we should create the pipeline similarly to what we're doing for native apps right now , just by allowing ONLY CAR/WAR plugins

if not isinstance(request_item, dict):
item_result = {
DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.FAIL,
DEEPLOY_KEYS.ERROR: f"{DEEPLOY_ERRORS.REQUEST3}: request[{idx}] must be an object.",
}
Comment on lines +1054 to +1066
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New validation branches in create_pipelines_batch (non-list/empty requests, and non-dict items producing per-item FAIL results) aren’t covered by tests. Adding unit tests for these cases would help lock in the API contract and prevent regressions.

Copilot uses AI. Check for mistakes.
else:
item_result = self._process_pipeline_request(
request=request_item,
is_create=True,
async_mode=False,
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_pipelines_batch calls _process_pipeline_request(..., async_mode=False) for each item, which makes this endpoint fully synchronous and potentially long-running per request. Given _get_pipeline_responses() uses a tight loop without sleeping, a larger batch can peg CPU and hold the request open until timeouts. Consider making the batch endpoint async (return per-item pending ids / statuses) or adding bounded polling/backoff so batch calls don’t busy-wait.

Suggested change
async_mode=False,
async_mode=True,

Copilot uses AI. Check for mistakes.
)
if not isinstance(item_result, dict):
item_result = {
DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.FAIL,
DEEPLOY_KEYS.ERROR: f"{DEEPLOY_ERRORS.GENERIC}: Unexpected item response type.",
}

item_result = self.deepcopy(item_result)
item_result[DEEPLOY_KEYS.ITEM_INDEX] = idx
results.append(item_result)
statuses.append(item_result.get(DEEPLOY_KEYS.STATUS, DEEPLOY_STATUS.FAIL))

success_statuses = {DEEPLOY_STATUS.SUCCESS, DEEPLOY_STATUS.COMMAND_DELIVERED}
fail_statuses = {DEEPLOY_STATUS.FAIL, DEEPLOY_STATUS.TIMEOUT, DEEPLOY_STATUS.ERROR}

if statuses and all(status in success_statuses for status in statuses):
aggregate_status = DEEPLOY_STATUS.SUCCESS
elif statuses and all(status in fail_statuses for status in statuses):
aggregate_status = DEEPLOY_STATUS.FAIL
else:
aggregate_status = DEEPLOY_STATUS.PARTIAL

result = {
DEEPLOY_KEYS.STATUS: aggregate_status,
DEEPLOY_KEYS.RESULTS: results,
}
except Exception as e:
result = self.__handle_error(e, request)

response = self._get_response({
**result
})
return response

@BasePlugin.endpoint(method="post")
# /update_pipeline
def update_pipeline(
Expand Down
26 changes: 25 additions & 1 deletion extensions/business/deeploy/deeploy_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type,
dct_deeploy_specs = self._ensure_deeploy_specs_job_config(
dct_deeploy_specs,
pipeline_params=pipeline_params,
stack_job_config=inputs.get(DEEPLOY_KEYS.STACK_JOB_CONFIG),
)

detected_job_app_type = job_app_type or self.deeploy_detect_job_app_type(plugins)
Expand Down Expand Up @@ -295,6 +296,7 @@ def __update_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type,
dct_deeploy_specs = self._ensure_deeploy_specs_job_config(
dct_deeploy_specs,
pipeline_params=pipeline_params,
stack_job_config=inputs.get(DEEPLOY_KEYS.STACK_JOB_CONFIG),
)

requested_by_instance_id, requested_by_signature, new_plugin_configs = self._organize_requested_plugins(inputs)
Expand Down Expand Up @@ -956,7 +958,7 @@ def _ensure_runner_cstore_auth_env(self, app_id, prepared_plugins):
return prepared_plugins


def _ensure_deeploy_specs_job_config(self, deeploy_specs, pipeline_params=None):
def _ensure_deeploy_specs_job_config(self, deeploy_specs, pipeline_params=None, stack_job_config=None):
"""
Ensure deeploy_specs contains a job_config section holding pipeline_params.
"""
Expand Down Expand Up @@ -984,6 +986,28 @@ def _ensure_deeploy_specs_job_config(self, deeploy_specs, pipeline_params=None):
resolved_params = {}

job_config[DEEPLOY_KEYS.PIPELINE_PARAMS] = self.deepcopy(resolved_params)

if stack_job_config is None:
stack_job_config = job_config.get(DEEPLOY_KEYS.STACK_JOB_CONFIG)

if isinstance(stack_job_config, dict):
for key in (
DEEPLOY_KEYS.STACK_ID,
DEEPLOY_KEYS.STACK_ALIAS,
DEEPLOY_KEYS.STACK_INDEX,
DEEPLOY_KEYS.STACK_SIZE,
DEEPLOY_KEYS.STACK_CONTAINER_REF,
DEEPLOY_KEYS.STACK_CONTAINER_ALIAS,
DEEPLOY_KEYS.STACK_TYPE,
):
if key in stack_job_config and stack_job_config[key] is not None:
job_config[key] = self.deepcopy(stack_job_config[key])
elif stack_job_config is not None:
self.Pd(
"Invalid stack_job_config detected while normalizing deeploy_specs; expected a dictionary.",
color='y'
)

specs[DEEPLOY_KEYS.JOB_CONFIG] = job_config
return specs

Expand Down
65 changes: 65 additions & 0 deletions extensions/business/deeploy/test_deeploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,71 @@ def _process_pipeline_request(request, is_create=True, async_mode=False):
res = self.plugin.create_pipeline({})
self.assertEqual(res["status"], "ok")

def test_create_pipelines_batch_returns_success(self):
"""
Ensure batch endpoint returns success when all items succeed.
"""
def _process_pipeline_request(request, is_create=True, async_mode=False):
return {
DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.SUCCESS,
DEEPLOY_KEYS.APP_ID: request.get("app_alias", "app"),
}

self.plugin._process_pipeline_request = _process_pipeline_request
res = self.plugin.create_pipelines_batch({
DEEPLOY_KEYS.REQUESTS: [
{"app_alias": "a1"},
{"app_alias": "a2"},
]
})

self.assertEqual(res[DEEPLOY_KEYS.STATUS], DEEPLOY_STATUS.SUCCESS)
self.assertEqual(len(res[DEEPLOY_KEYS.RESULTS]), 2)
self.assertEqual(res[DEEPLOY_KEYS.RESULTS][0][DEEPLOY_KEYS.ITEM_INDEX], 0)
self.assertEqual(res[DEEPLOY_KEYS.RESULTS][1][DEEPLOY_KEYS.ITEM_INDEX], 1)

def test_create_pipelines_batch_returns_partial(self):
"""
Ensure batch endpoint returns partial when outcomes are mixed.
"""
def _process_pipeline_request(request, is_create=True, async_mode=False):
if request.get("app_alias") == "ok":
return {DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.SUCCESS}
return {DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.FAIL}

self.plugin._process_pipeline_request = _process_pipeline_request
res = self.plugin.create_pipelines_batch({
DEEPLOY_KEYS.REQUESTS: [
{"app_alias": "ok"},
{"app_alias": "bad"},
]
})

self.assertEqual(res[DEEPLOY_KEYS.STATUS], DEEPLOY_STATUS.PARTIAL)
self.assertEqual(len(res[DEEPLOY_KEYS.RESULTS]), 2)

def test_ensure_specs_job_config_merges_stack_fields(self):
"""
Ensure stack_job_config fields are merged into deeploy_specs.job_config.
"""
normalized = self.plugin._ensure_deeploy_specs_job_config(
deeploy_specs={},
pipeline_params={"A": "1"},
stack_job_config={
DEEPLOY_KEYS.STACK_ID: "stack-1",
DEEPLOY_KEYS.STACK_ALIAS: "My Stack",
DEEPLOY_KEYS.STACK_INDEX: 0,
DEEPLOY_KEYS.STACK_SIZE: 2,
DEEPLOY_KEYS.STACK_CONTAINER_REF: "container-1",
DEEPLOY_KEYS.STACK_CONTAINER_ALIAS: "frontend",
DEEPLOY_KEYS.STACK_TYPE: "Stack",
},
)

self.assertEqual(normalized[DEEPLOY_KEYS.JOB_CONFIG][DEEPLOY_KEYS.PIPELINE_PARAMS], {"A": "1"})
self.assertEqual(normalized[DEEPLOY_KEYS.JOB_CONFIG][DEEPLOY_KEYS.STACK_ID], "stack-1")
self.assertEqual(normalized[DEEPLOY_KEYS.JOB_CONFIG][DEEPLOY_KEYS.STACK_CONTAINER_ALIAS], "frontend")

def test_scale_up_job_workers_returns_postponed(self):
"""
Ensure scale_up_job_workers returns PostponedRequest when response keys exist.
Expand Down
Loading