diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index 2afd71ef..dc5d40f4 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -16,6 +16,9 @@ class DEEPLOY_KEYS: ERROR = "error" TRACE = "trace" REQUEST = "request" + REQUESTS = "requests" + RESULTS = "results" + ITEM_INDEX = "item_index" RETURN_REQUEST = "return_request" STATUS_DETAILS = "status_details" APPS = "apps" @@ -46,6 +49,14 @@ class DEEPLOY_KEYS: ONLINE = "online" CHAIN_JOB = "chain_job" JOB_CONFIG = "job_config" + STACK_JOB_CONFIG = "stack_job_config" + STACK_ID = "stack_id" + STACK_ALIAS = "stack_alias" + STACK_INDEX = "stack_index" + STACK_SIZE = "stack_size" + STACK_CONTAINER_REF = "stack_container_ref" + STACK_CONTAINER_ALIAS = "stack_container_alias" + STACK_TYPE = "stack_type" # App params keys APP_PARAMS = "app_params" APP_PARAMS_IMAGE = "IMAGE" @@ -68,6 +79,7 @@ class DEEPLOY_KEYS: class DEEPLOY_STATUS: SUCCESS = "success" + PARTIAL = "partial" FAIL = "fail" ERROR = "error" PENDING = "pending" @@ -592,6 +604,12 @@ class JOB_APP_TYPES: ] } +DEEPLOY_CREATE_BATCH_REQUEST = { + DEEPLOY_KEYS.REQUESTS: [ + DEEPLOY_CREATE_REQUEST, + ], +} + ################################################################################################################### DEEPLOY_SCALE_UP_JOB_WORKERS_REQUEST = { "job_id" : 1, # The job ID to extend the workers for diff --git a/extensions/business/deeploy/deeploy_manager_api.py b/extensions/business/deeploy/deeploy_manager_api.py index 36cb2bf5..ea5a1b36 100644 --- a/extensions/business/deeploy/deeploy_manager_api.py +++ b/extensions/business/deeploy/deeploy_manager_api.py @@ -15,7 +15,7 @@ from extensions.business.mixins.node_tags_mixin import _NodeTagsMixin from extensions.business.mixins.request_tracking_mixin import _RequestTrackingMixin from .deeploy_const import ( - DEEPLOY_CREATE_REQUEST, DEEPLOY_CREATE_REQUEST_MULTI_PLUGIN, DEEPLOY_GET_APPS_REQUEST, DEEPLOY_DELETE_REQUEST, + DEEPLOY_CREATE_REQUEST, DEEPLOY_CREATE_REQUEST_MULTI_PLUGIN, DEEPLOY_CREATE_BATCH_REQUEST, DEEPLOY_GET_APPS_REQUEST, DEEPLOY_DELETE_REQUEST, DEEPLOY_ERRORS, DEEPLOY_KEYS, DEEPLOY_SCALE_UP_JOB_WORKERS_REQUEST, DEEPLOY_STATUS, DEEPLOY_INSTANCE_COMMAND_REQUEST, DEEPLOY_APP_COMMAND_REQUEST, DEEPLOY_GET_ORACLE_JOB_DETAILS_REQUEST, DEEPLOY_GET_R1FS_JOB_PIPELINE_REQUEST, DEEPLOY_PLUGIN_DATA, JOB_APP_TYPES, JOB_APP_TYPES_ALL, @@ -521,6 +521,7 @@ def _process_pipeline_request( deeploy_specs_payload = self._ensure_deeploy_specs_job_config( deeploy_specs_payload, pipeline_params=pipeline_params, + stack_job_config=inputs.get(DEEPLOY_KEYS.STACK_JOB_CONFIG), ) dct_status, str_status, response_keys, pipeline_to_persist = self.check_and_deploy_pipelines( @@ -1032,6 +1033,76 @@ def create_pipeline( return self._register_pending_deploy_request(result['__pending__']) return result + @BasePlugin.endpoint(method="post") + # /create_pipelines_batch + def create_pipelines_batch( + self, + request: dict = DEEPLOY_CREATE_BATCH_REQUEST + ): + """ + Create multiple pipelines in one API call. + + Expects `request.requests` to be a list of normal create_pipeline payloads. + Returns per-item results plus an aggregate status: + - success: all items succeeded/command_delivered + - fail: all items failed/timeout/error + - partial: mixed outcomes + """ + self.Pd(f"Called Deeploy create_pipelines_batch endpoint") + + try: + requests = request.get(DEEPLOY_KEYS.REQUESTS, []) + if not isinstance(requests, list) or len(requests) == 0: + raise ValueError(f"{DEEPLOY_ERRORS.REQUEST3}: '{DEEPLOY_KEYS.REQUESTS}' must be a non-empty list.") + + results = [] + statuses = [] + + for idx, request_item in enumerate(requests): + if not isinstance(request_item, dict): + item_result = { + DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.FAIL, + DEEPLOY_KEYS.ERROR: f"{DEEPLOY_ERRORS.REQUEST3}: request[{idx}] must be an object.", + } + else: + item_result = self._process_pipeline_request( + request=request_item, + is_create=True, + async_mode=False, + ) + if not isinstance(item_result, dict): + item_result = { + DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.FAIL, + DEEPLOY_KEYS.ERROR: f"{DEEPLOY_ERRORS.GENERIC}: Unexpected item response type.", + } + + item_result = self.deepcopy(item_result) + item_result[DEEPLOY_KEYS.ITEM_INDEX] = idx + results.append(item_result) + statuses.append(item_result.get(DEEPLOY_KEYS.STATUS, DEEPLOY_STATUS.FAIL)) + + success_statuses = {DEEPLOY_STATUS.SUCCESS, DEEPLOY_STATUS.COMMAND_DELIVERED} + fail_statuses = {DEEPLOY_STATUS.FAIL, DEEPLOY_STATUS.TIMEOUT, DEEPLOY_STATUS.ERROR} + + if statuses and all(status in success_statuses for status in statuses): + aggregate_status = DEEPLOY_STATUS.SUCCESS + elif statuses and all(status in fail_statuses for status in statuses): + aggregate_status = DEEPLOY_STATUS.FAIL + else: + aggregate_status = DEEPLOY_STATUS.PARTIAL + + result = { + DEEPLOY_KEYS.STATUS: aggregate_status, + DEEPLOY_KEYS.RESULTS: results, + } + except Exception as e: + result = self.__handle_error(e, request) + + response = self._get_response({ + **result + }) + return response + @BasePlugin.endpoint(method="post") # /update_pipeline def update_pipeline( diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 61e2ea58..635a1d46 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -170,6 +170,7 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, dct_deeploy_specs = self._ensure_deeploy_specs_job_config( dct_deeploy_specs, pipeline_params=pipeline_params, + stack_job_config=inputs.get(DEEPLOY_KEYS.STACK_JOB_CONFIG), ) detected_job_app_type = job_app_type or self.deeploy_detect_job_app_type(plugins) @@ -295,6 +296,7 @@ def __update_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, dct_deeploy_specs = self._ensure_deeploy_specs_job_config( dct_deeploy_specs, pipeline_params=pipeline_params, + stack_job_config=inputs.get(DEEPLOY_KEYS.STACK_JOB_CONFIG), ) requested_by_instance_id, requested_by_signature, new_plugin_configs = self._organize_requested_plugins(inputs) @@ -956,7 +958,7 @@ def _ensure_runner_cstore_auth_env(self, app_id, prepared_plugins): return prepared_plugins - def _ensure_deeploy_specs_job_config(self, deeploy_specs, pipeline_params=None): + def _ensure_deeploy_specs_job_config(self, deeploy_specs, pipeline_params=None, stack_job_config=None): """ Ensure deeploy_specs contains a job_config section holding pipeline_params. """ @@ -984,6 +986,28 @@ def _ensure_deeploy_specs_job_config(self, deeploy_specs, pipeline_params=None): resolved_params = {} job_config[DEEPLOY_KEYS.PIPELINE_PARAMS] = self.deepcopy(resolved_params) + + if stack_job_config is None: + stack_job_config = job_config.get(DEEPLOY_KEYS.STACK_JOB_CONFIG) + + if isinstance(stack_job_config, dict): + for key in ( + DEEPLOY_KEYS.STACK_ID, + DEEPLOY_KEYS.STACK_ALIAS, + DEEPLOY_KEYS.STACK_INDEX, + DEEPLOY_KEYS.STACK_SIZE, + DEEPLOY_KEYS.STACK_CONTAINER_REF, + DEEPLOY_KEYS.STACK_CONTAINER_ALIAS, + DEEPLOY_KEYS.STACK_TYPE, + ): + if key in stack_job_config and stack_job_config[key] is not None: + job_config[key] = self.deepcopy(stack_job_config[key]) + elif stack_job_config is not None: + self.Pd( + "Invalid stack_job_config detected while normalizing deeploy_specs; expected a dictionary.", + color='y' + ) + specs[DEEPLOY_KEYS.JOB_CONFIG] = job_config return specs diff --git a/extensions/business/deeploy/test_deeploy.py b/extensions/business/deeploy/test_deeploy.py index e67831c7..7091b1f7 100644 --- a/extensions/business/deeploy/test_deeploy.py +++ b/extensions/business/deeploy/test_deeploy.py @@ -532,6 +532,71 @@ def _process_pipeline_request(request, is_create=True, async_mode=False): res = self.plugin.create_pipeline({}) self.assertEqual(res["status"], "ok") + def test_create_pipelines_batch_returns_success(self): + """ + Ensure batch endpoint returns success when all items succeed. + """ + def _process_pipeline_request(request, is_create=True, async_mode=False): + return { + DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.SUCCESS, + DEEPLOY_KEYS.APP_ID: request.get("app_alias", "app"), + } + + self.plugin._process_pipeline_request = _process_pipeline_request + res = self.plugin.create_pipelines_batch({ + DEEPLOY_KEYS.REQUESTS: [ + {"app_alias": "a1"}, + {"app_alias": "a2"}, + ] + }) + + self.assertEqual(res[DEEPLOY_KEYS.STATUS], DEEPLOY_STATUS.SUCCESS) + self.assertEqual(len(res[DEEPLOY_KEYS.RESULTS]), 2) + self.assertEqual(res[DEEPLOY_KEYS.RESULTS][0][DEEPLOY_KEYS.ITEM_INDEX], 0) + self.assertEqual(res[DEEPLOY_KEYS.RESULTS][1][DEEPLOY_KEYS.ITEM_INDEX], 1) + + def test_create_pipelines_batch_returns_partial(self): + """ + Ensure batch endpoint returns partial when outcomes are mixed. + """ + def _process_pipeline_request(request, is_create=True, async_mode=False): + if request.get("app_alias") == "ok": + return {DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.SUCCESS} + return {DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.FAIL} + + self.plugin._process_pipeline_request = _process_pipeline_request + res = self.plugin.create_pipelines_batch({ + DEEPLOY_KEYS.REQUESTS: [ + {"app_alias": "ok"}, + {"app_alias": "bad"}, + ] + }) + + self.assertEqual(res[DEEPLOY_KEYS.STATUS], DEEPLOY_STATUS.PARTIAL) + self.assertEqual(len(res[DEEPLOY_KEYS.RESULTS]), 2) + + def test_ensure_specs_job_config_merges_stack_fields(self): + """ + Ensure stack_job_config fields are merged into deeploy_specs.job_config. + """ + normalized = self.plugin._ensure_deeploy_specs_job_config( + deeploy_specs={}, + pipeline_params={"A": "1"}, + stack_job_config={ + DEEPLOY_KEYS.STACK_ID: "stack-1", + DEEPLOY_KEYS.STACK_ALIAS: "My Stack", + DEEPLOY_KEYS.STACK_INDEX: 0, + DEEPLOY_KEYS.STACK_SIZE: 2, + DEEPLOY_KEYS.STACK_CONTAINER_REF: "container-1", + DEEPLOY_KEYS.STACK_CONTAINER_ALIAS: "frontend", + DEEPLOY_KEYS.STACK_TYPE: "Stack", + }, + ) + + self.assertEqual(normalized[DEEPLOY_KEYS.JOB_CONFIG][DEEPLOY_KEYS.PIPELINE_PARAMS], {"A": "1"}) + self.assertEqual(normalized[DEEPLOY_KEYS.JOB_CONFIG][DEEPLOY_KEYS.STACK_ID], "stack-1") + self.assertEqual(normalized[DEEPLOY_KEYS.JOB_CONFIG][DEEPLOY_KEYS.STACK_CONTAINER_ALIAS], "frontend") + def test_scale_up_job_workers_returns_postponed(self): """ Ensure scale_up_job_workers returns PostponedRequest when response keys exist.