diff --git a/extensions/business/container_apps/container_app_runner.py b/extensions/business/container_apps/container_app_runner.py index 04eac0fd..87d46b77 100644 --- a/extensions/business/container_apps/container_app_runner.py +++ b/extensions/business/container_apps/container_app_runner.py @@ -66,6 +66,7 @@ import os import requests import shutil +import signal import threading import time import socket @@ -365,6 +366,9 @@ def from_dict(cls, config_dict: dict) -> "HealthCheckConfig": "MAX_LOG_LINES" : 10_000, # max lines to keep in memory # When container is STOPPED_MANUALLY (PAUSED state), this will define how often we log its existance "PAUSED_STATE_LOG_INTERVAL": 60, + # Container apps can need more than the core plugin default to stop Docker, + # tunnel processes, runtime readers, and loop-backed fixed volumes safely. + "PLUGIN_STOP_TIMEOUT": 45, # Semaphore synchronization for paired plugins # List of semaphore keys to wait for before starting container @@ -599,6 +603,8 @@ def __reset_vars(self): # Container state machine self.container_state = ContainerState.UNINITIALIZED self.stop_reason = StopReason.UNKNOWN + self._cleanup_failed = False + self._manual_stop_pending = False # Restart policy and retry logic self._consecutive_failures = 0 @@ -640,6 +646,8 @@ def __reset_vars(self): # Volume-sync state. SyncManager is lazy-init'd by _ensure_sync_manager # the first time a tick fires (or on_init for early provisioning). self._sync_manager = None + self._sync_unavailable = False + self._runtime_stop_degraded = False # Image update tracking self.current_image_hash = None @@ -1077,7 +1085,7 @@ def _validate_runner_config(self): def _validate_sync_config(self): """ Validate the SYNC config block when ENABLED. Disables SYNC with a - warning rather than raising — the system volume itself is independent + warning rather than raising; the system volume itself is independent and the rest of the plugin must keep running. """ if not self._sync_enabled(): @@ -1087,14 +1095,14 @@ def _validate_sync_config(self): role = sync.get("TYPE") if not key or not isinstance(key, str): self.P( - "[sync] SYNC.ENABLED but SYNC.KEY missing/empty — disabling SYNC.", + "[sync] SYNC.ENABLED but SYNC.KEY missing/empty; disabling SYNC.", color="r", ) sync["ENABLED"] = False return if role not in ("provider", "consumer"): self.P( - f"[sync] SYNC.TYPE must be 'provider' or 'consumer' (got {role!r}) — disabling SYNC.", + f"[sync] SYNC.TYPE must be 'provider' or 'consumer' (got {role!r}); disabling SYNC.", color="r", ) sync["ENABLED"] = False @@ -1182,7 +1190,6 @@ def on_init(self): # If a prior plugin run crashed mid-publish, request.json.processing may # be left over inside volume-sync/. Rename it back so the next tick retries. self._recover_stale_processing() - self._validate_sync_config() # If we have semaphored keys, defer _setup_env_and_ports() until semaphores are ready @@ -1259,16 +1266,27 @@ def on_command(self, data, **kwargs): if data == "RESTART": self.P("Restarting container...") self._clear_manual_stop_state() # Clear persistent stop state + # RESTART is an explicit operator override for a previously failed STOP. + # Clear the in-memory pause intent too, otherwise a later cleanup retry + # could incorrectly persist PAUSED instead of relaunching the container. + self._manual_stop_pending = False self._set_container_state(ContainerState.RESTARTING, StopReason.CONFIG_UPDATE) - self._stop_container_and_save_logs_to_disk() self._restart_container(StopReason.CONFIG_UPDATE) return elif data == "STOP": self.P("Stopping container (manual stop - restart policy will not trigger)...") - self._save_persistent_state(manually_stopped=True) # Save persistent stop state - self._stop_container_and_save_logs_to_disk() - self._set_container_state(ContainerState.PAUSED, StopReason.MANUAL_STOP) + self._manual_stop_pending = True + cleanup_ok = self._stop_container_and_save_logs_to_disk() + if cleanup_ok: + self._save_persistent_state(manually_stopped=True) # Persist only after cleanup succeeds. + self._manual_stop_pending = False + self._set_container_state(ContainerState.PAUSED, StopReason.MANUAL_STOP) + else: + # Keep the failed cleanup retryable without persisting a paused state + # that would later make config restarts look intentionally ignored. + self._clear_manual_stop_state() + self._set_container_state(ContainerState.FAILED, StopReason.UNKNOWN) return else: self.P(f"Unknown plugin command: {data}") @@ -1307,6 +1325,16 @@ def _handle_config_restart(self, restart_callable): ) return + if self._manual_stop_pending: + self.P( + "Manual STOP cleanup is still pending. Ignoring config restart; " + "send RESTART to override the pending stop intent.", + color='y', + ) + if self._cleanup_failed: + self._retry_failed_cleanup() + return + # Check persistent state as fallback (in case container_state not yet set) if self._load_manual_stop_state(): self.P( @@ -1316,7 +1344,12 @@ def _handle_config_restart(self, restart_callable): ) return - self._stop_container_and_save_logs_to_disk() + cleanup_ok = self._stop_container_and_save_logs_to_disk() + if not cleanup_ok: + self.P("Config restart aborted because previous runtime cleanup failed.", color='r') + self._set_container_state(ContainerState.FAILED, StopReason.UNKNOWN) + self._record_restart_failure() + return restart_callable() return @@ -1338,7 +1371,7 @@ def on_config(self, *args, **kwargs): ------- None """ - return self._handle_config_restart(lambda: self._restart_container(StopReason.CONFIG_UPDATE)) + return self._handle_config_restart(lambda: self._restart_container(StopReason.CONFIG_UPDATE, cleanup_first=False)) def on_post_container_start(self): @@ -1426,6 +1459,112 @@ def get_cloudflare_protocol(self): return super(ContainerAppRunnerPlugin, self).get_cloudflare_protocol() + def _remember_process_group(self, process): + """ + Record tunnel process-group ids even when deployed with an older core. + + Newer cores provide this on ``BaseTunnelEnginePlugin``. Keeping a local + fallback lets the edge PR roll out before the matching core PR without + breaking extra tunnel startup after ``subprocess.Popen`` succeeds. + """ + base_method = getattr(super(ContainerAppRunnerPlugin, self), "_remember_process_group", None) + if callable(base_method): + return base_method(process) + if process is not None and os.name != "nt": + try: + process._r1_process_group_id = os.getpgid(process.pid) + except Exception as exc: + self.P(f"Could not record tunnel process group: {exc}", color='r') + return process + + + def _terminate_subprocess_tree(self, process, label="subprocess", terminate_timeout=5, kill_timeout=5): + """ + Terminate a tunnel subprocess tree with a compatibility fallback. + + Prefer the core implementation when present; otherwise use the same bounded + POSIX process-group shutdown strategy locally so mixed-version deployments + do not leak extra tunnel children. + """ + base_method = getattr(super(ContainerAppRunnerPlugin, self), "_terminate_subprocess_tree", None) + if callable(base_method): + return base_method( + process, + label=label, + terminate_timeout=terminate_timeout, + kill_timeout=kill_timeout, + ) + if process is None: + return True + + pgid = getattr(process, "_r1_process_group_id", None) + + def is_process_group_alive(): + if os.name == "nt" or pgid is None: + return False + try: + os.killpg(pgid, 0) + return True + except ProcessLookupError: + return False + except Exception as exc: + self.P(f"Could not probe {label} process group {pgid}: {exc}", color='r') + return True + + def wait_process_tree(timeout): + deadline = time.monotonic() + timeout + process_stopped = process.poll() is not None + if not process_stopped: + try: + process.wait(timeout=timeout) + process_stopped = True + except subprocess.TimeoutExpired: + process_stopped = False + except Exception as exc: + self.P(f"Error waiting for {label}: {exc}", color='r') + process_stopped = process.poll() is not None + if os.name == "nt" or pgid is None: + return process_stopped + while time.monotonic() < deadline: + if not is_process_group_alive(): + return process_stopped + time.sleep(0.05) + return process_stopped and not is_process_group_alive() + + def send_signal(sig, fallback): + if os.name != "nt" and pgid is not None and sig is not None: + try: + os.killpg(pgid, sig) + return True + except ProcessLookupError: + return True + except Exception as exc: + self.P(f"Error signaling {label} process group {pgid}: {exc}", color='r') + if process.poll() is None: + try: + fallback() + return True + except Exception as exc: + self.P(f"Error signaling {label}: {exc}", color='r') + return False + return True + + if process.poll() is None or is_process_group_alive(): + if not send_signal(signal.SIGTERM, process.terminate): + return False + if wait_process_tree(terminate_timeout): + return True + + self.P(f"{label} did not stop after terminate; killing it.", color='r') + kill_signal = getattr(signal, "SIGKILL", None) + if not send_signal(kill_signal, process.kill): + return False + if wait_process_tree(kill_timeout): + return True + self.P(f"{label} did not exit after kill; continuing shutdown.", color='r') + return False + + def stop_tunnel_engine(self): """ Stop the main tunnel engine. @@ -1434,16 +1573,28 @@ def stop_tunnel_engine(self): Returns ------- - None + bool + True when the tunnel process and log readers stopped, False otherwise. """ if self.tunnel_process: engine_name = "Cloudflare" if self.use_cloudflare() else "ngrok" self.P(f"Stopping {engine_name} tunnel...") - self.stop_tunnel_command(self.tunnel_process) - self.tunnel_process = None - self.P(f"{engine_name} tunnel stopped") + process = self.tunnel_process + result = True + try: + result = self.stop_tunnel_command(process) + except Exception as exc: + result = False + self.P(f"Error stopping {engine_name} tunnel: {exc}", color='r') + finally: + if result: + self.tunnel_process = None + self.P(f"{engine_name} tunnel stopped") + else: + self.P(f"{engine_name} tunnel did not fully stop; preserving process handle for retry.", color='r') + return result # end if - return + return True def get_tunnel_engine_ping_data(self): @@ -1704,12 +1855,16 @@ def _start_extra_tunnel(self, container_port, tunnel_config): self.Pd(f" Command: {' '.join(command)}") # Use list-based subprocess to prevent shell injection - process = subprocess.Popen( - command, + popen_kwargs = dict( + args=command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - bufsize=0 + bufsize=0, ) + if os.name != "nt": + popen_kwargs["start_new_session"] = True + process = subprocess.Popen(**popen_kwargs) + self._remember_process_group(process) # Create log readers for this tunnel logs_reader = self.LogReader(process.stdout, size=100, daemon=None) @@ -1778,12 +1933,16 @@ def _stop_extra_tunnel(self, container_port): Returns ------- - None + bool + True when the tunnel process and log readers stopped, False otherwise. """ process = self.extra_tunnel_processes.get(container_port) if not process: - return + return True + result = True + process_stopped = process.poll() is not None + readers_stopped = True try: self.P(f"Stopping extra tunnel for port {container_port}...") @@ -1792,12 +1951,11 @@ def _stop_extra_tunnel(self, container_port): # Stop process if process.poll() is None: # Still running - process.terminate() - try: - process.wait(timeout=5) - except Exception: - process.kill() - process.wait() + process_stopped = self._terminate_subprocess_tree( + process, + label=f"Extra tunnel for port {container_port}", + ) + result = process_stopped and result # Clean up log readers (following base class pattern) log_readers = self.extra_tunnel_log_readers.get(container_port, {}) @@ -1806,36 +1964,49 @@ def _stop_extra_tunnel(self, container_port): stdout_reader = log_readers.get("stdout") if stdout_reader: try: - stdout_reader.stop() + reader_stopped = stdout_reader.stop() + readers_stopped = reader_stopped and readers_stopped + result = reader_stopped and result # Read any remaining logs before cleanup remaining_logs = stdout_reader.get_next_characters() if remaining_logs: self._process_extra_tunnel_log(container_port, remaining_logs, is_error=False) except Exception as e: + readers_stopped = False + result = False self.Pd(f"Error stopping stdout reader: {e}") # Stop stderr reader and read remaining logs stderr_reader = log_readers.get("stderr") if stderr_reader: try: - stderr_reader.stop() + reader_stopped = stderr_reader.stop() + readers_stopped = reader_stopped and readers_stopped + result = reader_stopped and result # Read any remaining error logs before cleanup remaining_err_logs = stderr_reader.get_next_characters() if remaining_err_logs: self._process_extra_tunnel_log(container_port, remaining_err_logs, is_error=True) except Exception as e: + readers_stopped = False + result = False self.Pd(f"Error stopping stderr reader: {e}") - # Clean up references - self.extra_tunnel_processes.pop(container_port, None) - self.extra_tunnel_log_readers.pop(container_port, None) - self.extra_tunnel_urls.pop(container_port, None) - self.extra_tunnel_start_times.pop(container_port, None) + if result: + self.extra_tunnel_processes.pop(container_port, None) + self.extra_tunnel_log_readers.pop(container_port, None) + self.extra_tunnel_urls.pop(container_port, None) + self.extra_tunnel_start_times.pop(container_port, None) - self.P(f"Extra tunnel for port {container_port} stopped") + if result: + self.P(f"Extra tunnel for port {container_port} stopped") + else: + self.P(f"Extra tunnel for port {container_port} did not fully stop; preserving live handles for retry.", color='r') except Exception as e: + result = False self.P(f"Error stopping extra tunnel for port {container_port}: {e}", color='r') + return result def stop_extra_tunnels(self): @@ -1847,17 +2018,23 @@ def stop_extra_tunnels(self): Returns ------- - None + bool + True when all extra tunnels stopped, False otherwise. """ if not self.extra_tunnel_processes: - return + return True self.P(f"Stopping {len(self.extra_tunnel_processes)} extra tunnel(s)...") + result = True for container_port in list(self.extra_tunnel_processes.keys()): - self._stop_extra_tunnel(container_port) + result = self._stop_extra_tunnel(container_port) and result - self.P("All extra tunnels stopped") + if result: + self.P("All extra tunnels stopped", color='g') + else: + self.P("One or more extra tunnels failed to stop; preserving live handles for retry.", color='r') + return result def _read_extra_tunnel_logs(self, container_port): @@ -2146,11 +2323,15 @@ def stop_container(self): ) self.container = None self.container_id = None + else: + # Keep the handle so a later cleanup retry can remove the same Docker + # object instead of losing track of a possibly still-running container. + self.P("Preserving container handle after failed stop/remove for retry.", color='r') # end try return removed_ok - def _stream_logs(self, log_stream): + def _stream_logs(self, log_stream, stop_event=None): """ Consume a log iterator from container logs and print its output. @@ -2167,6 +2348,9 @@ def _stream_logs(self, log_stream): self.P("No log stream provided", color='r') return + if stop_event is None: + stop_event = self._stop_event + try: for log_bytes in log_stream: if log_bytes is None: @@ -2180,7 +2364,7 @@ def _stream_logs(self, log_stream): self.P(f"[CONTAINER] {log_str}", end='') self.container_logs.append(log_str) - if self._stop_event.is_set(): + if stop_event.is_set(): self.P("Log streaming stopped by stop event") break except Exception as e: @@ -2207,7 +2391,7 @@ def _start_container_log_stream(self): log_stream = self.container.logs(stream=True, follow=True) self.log_thread = threading.Thread( target=self._stream_logs, - args=(log_stream,), + args=(log_stream, self._stop_event), daemon=True, ) self.log_thread.start() @@ -2294,7 +2478,7 @@ def _run_container_exec(self, shell_cmd): ) thread = threading.Thread( target=self._stream_logs, - args=(exec_result.output,), + args=(exec_result.output, self._stop_event), daemon=True, ) thread.start() @@ -2757,75 +2941,120 @@ def _stop_container_runtime_for_restart(self): """ self.P(f"Stopping container app '{self.container_id}' ...") + cleanup_errors = [] + + def safe_cleanup_step(step_name, callback): + try: + result = callback() + if result is False: + cleanup_errors.append(step_name) + self.P(f"Container cleanup step '{step_name}' reported failure.", color='r') + except Exception as exc: + cleanup_errors.append(step_name) + self.P(f"Container cleanup step '{step_name}' failed: {exc}", color='r') + # Clear semaphore and reset signaling state for potential restart - self._semaphore_reset_signal() + safe_cleanup_step("semaphore reset", self._semaphore_reset_signal) - # Stop log streaming - self._stop_event.set() - if self.log_thread: - self.log_thread.join(timeout=5) - self.log_thread = None + def signal_runtime_threads(): + self._stop_event.set() + self._commands_started = False + return True - if getattr(self, 'exec_threads', None): - for thread in self.exec_threads: - if thread and thread.is_alive(): - thread.join(timeout=5) - self.exec_threads = [] + def join_runtime_threads(): + result = True + stop_deadline = time.monotonic() + 5 - self._stop_event = threading.Event() - self._commands_started = False + if self.log_thread: + self.log_thread.join(timeout=max(0, stop_deadline - time.monotonic())) + if self.log_thread.is_alive(): + result = False + self.P("Container log thread is still alive after stop timeout.", color='r') + else: + self.log_thread = None + + if getattr(self, 'exec_threads', None): + alive_threads = [] + for thread in self.exec_threads: + remaining = max(0, stop_deadline - time.monotonic()) + if thread and thread.is_alive() and remaining > 0: + thread.join(timeout=remaining) + if thread and thread.is_alive(): + result = False + alive_threads.append(thread) + self.exec_threads = alive_threads + + if result: + self._stop_event = threading.Event() + self._commands_started = False + return result + + # Signal log/exec readers early, but join after Docker stop; quiet Docker + # streams usually unblock only when the container stops. + safe_cleanup_step("runtime thread signal", signal_runtime_threads) # Stop tunnel engine if needed - self.stop_tunnel_engine() + safe_cleanup_step("main tunnel", self.stop_tunnel_engine) # Stop extra tunnels - self.stop_extra_tunnels() + safe_cleanup_step("extra tunnels", self.stop_extra_tunnels) - # Stop the container if it's running - stopped = self.stop_container() - if not stopped: - self._runtime_stop_degraded = True - self.P( - "Container runtime stop failed after sidecars were stopped; container " - "may still be running and volume mutation/cleanup must be skipped.", - color='r', - ) - else: - self._runtime_stop_degraded = False + def stop_runtime_container(): + stopped = self.stop_container() + self._runtime_stop_degraded = not stopped + if not stopped: + self.P( + "Container runtime stop failed after sidecars were stopped; container " + "may still be running and volume mutation/cleanup must be skipped.", + color='r', + ) + return stopped + + # Stop the container if it's running. A false result preserves the Docker + # handle for retry and prevents volume mutation against a possibly live app. + safe_cleanup_step("docker container", stop_runtime_container) - return stopped + # Stop log streaming threads after Docker stop has had a chance to unblock + # the log/exec streams. + safe_cleanup_step("runtime threads", join_runtime_threads) + + if cleanup_errors: + self._cleanup_failed = True + self.P("Container runtime cleanup completed with failed step(s): {}.".format(", ".join(cleanup_errors)), color='r') + return False + self._runtime_stop_degraded = False + self._cleanup_failed = False + return True def _stop_container_and_save_logs_to_disk(self): """ - Stop the container and all tunnels, then save logs to disk. - - Performs full shutdown sequence: - - Clears semaphore (signals dependent plugins container is stopping) - - Stops log streaming threads - - Stops main tunnel engine - - Stops all extra tunnels - - Stops and removes container - - Cleans fixed-size volumes - - Saves logs to disk + Stop the container, all tunnels, fixed volumes, then save logs to disk. Returns ------- - None + bool + True when cleanup completed without required-step failures, False otherwise. """ - stopped = self._stop_container_runtime_for_restart() + runtime_ok = self._stop_container_runtime_for_restart() + cleanup_errors = [] - # Cleanup fixed-size volumes (unmount + detach loop devices) - if stopped: - self._cleanup_fixed_size_volumes() + if runtime_ok: + try: + if self._cleanup_fixed_size_volumes() is False: + cleanup_errors.append("fixed-size volumes") + except Exception as exc: + cleanup_errors.append("fixed-size volumes") + self.P(f"Container cleanup step 'fixed-size volumes' failed: {exc}", color='r') else: + cleanup_errors.append("runtime") self.P( "Skipping fixed-size volume cleanup because container stop/remove failed.", color='r', ) - # Save logs to disk under the instance's `logs/` sibling folder - # (resolves to pipelines_data/{sid}/{iid}/logs/container_logs.pkl) + # Save logs to disk even when cleanup is degraded; logs are diagnostic data + # and should not be lost just because Docker/tunnel teardown needs a retry. try: self.diskapi_save_pickle_to_data( obj=list(self.container_logs), @@ -2835,7 +3064,13 @@ def _stop_container_and_save_logs_to_disk(self): self.P("Container logs saved to disk.") except Exception as exc: self.P(f"Failed to save logs: {exc}", color='r') - return + + if cleanup_errors: + self._cleanup_failed = True + self.P("Container cleanup completed with failed step(s): {}.".format(", ".join(cleanup_errors)), color='r') + return False + self._cleanup_failed = False + return True def on_close(self): @@ -3148,7 +3383,7 @@ def _reset_runtime_state_post_start(self): - run image-defined build/run commands Shared between ``_restart_container`` and the volume-sync ticks - (``_SyncMixin._sync_safe_start_container``) so they stay in lockstep — + (``_SyncMixin._sync_safe_start_container``) so they stay in lockstep; sync slices stop+start the container inline (to keep the system volume mounted), and without this helper the readiness/probe state would still point at the previous container instance. @@ -3162,7 +3397,7 @@ def _reset_runtime_state_post_start(self): self._maybe_execute_build_and_run() - def _restart_container(self, stop_reason=None): + def _restart_container(self, stop_reason=None, cleanup_first=True): """ Restart the container from scratch. @@ -3170,10 +3405,15 @@ def _restart_container(self, stop_reason=None): ---------- stop_reason : StopReason, optional Optional StopReason enum indicating why restart was triggered + cleanup_first : bool, optional + If True, stop the existing runtime before resetting state. Set to False + when the caller already performed cleanup and checked its result. Returns ------- - None + bool + True when restart setup succeeded or was deferred waiting for + semaphores, False when cleanup or start failed. """ self.P("Restarting container from scratch...") @@ -3183,7 +3423,14 @@ def _restart_container(self, stop_reason=None): preserved_last_image_check = self._last_image_check preserved_current_hash = self.current_image_hash - self._stop_container_and_save_logs_to_disk() + if cleanup_first: + cleanup_ok = self._stop_container_and_save_logs_to_disk() + if not cleanup_ok: + self.P("Restart aborted because previous runtime cleanup failed.", color='r') + self._set_container_state(ContainerState.FAILED, stop_reason or StopReason.UNKNOWN) + self._record_restart_failure() + return False + self.__reset_vars() # Reset chainstore response for restart cycle @@ -3222,7 +3469,7 @@ def _restart_container(self, stop_reason=None): self._validate_extra_tunnels_config() self._validate_runner_config() self.P("Consumer container with semaphore dependencies: deferring start until providers are ready") - return + return True # Non-semaphored containers (providers): configure env and start immediately self._configure_dynamic_env() @@ -3239,15 +3486,15 @@ def _restart_container(self, stop_reason=None): self.P("Failed to ensure image availability during restart, cannot start container", color='r') self._set_container_state(ContainerState.FAILED, StopReason.CRASH) self._record_restart_failure() - return + return False self.container = self.start_container() if not self.container: # start_container already recorded the failure - return + return False self._reset_runtime_state_post_start() - return + return True def _ensure_image_always_pull(self): @@ -3506,7 +3753,7 @@ def _perform_additional_checks(self, current_time): # mount survives the archive/extract window. We must not return a # StopReason from here because that would route through _restart_container, # which calls _cleanup_fixed_size_volumes() and unmounts before our work - # can run. See plan Step 1 verification for the full rationale. + # can run. if self._sync_enabled(): role = self._sync_role() if role == "provider": @@ -3516,6 +3763,46 @@ def _perform_additional_checks(self, current_time): return None + def _retry_failed_cleanup(self): + """ + Retry a previously failed cleanup cycle from the normal process loop. + + Cleanup failure is a backoff/retry state, not a permanent latch. This keeps + transient Docker, tunnel, log-reader, or fixed-volume failures visible while + still giving the plugin an automatic recovery path. + """ + if not self._cleanup_failed: + return True + + if self._has_exceeded_max_retries(): + self.P( + "Container cleanup retry abandoned after {} consecutive failure(s).".format( + self._consecutive_failures + ), + color='r', + ) + return False + + if self._is_restart_backoff_active(): + return False + + self.P("Retrying previously failed container cleanup...", color='y') + cleanup_ok = self._stop_container_and_save_logs_to_disk() + if cleanup_ok: + self._cleanup_failed = False + self.P("Previously failed container cleanup succeeded.", color='g') + if self._manual_stop_pending: + self._save_persistent_state(manually_stopped=True) + self._manual_stop_pending = False + self._set_container_state(ContainerState.PAUSED, StopReason.MANUAL_STOP) + return False + return True + + self._record_restart_failure() + self._set_container_state(ContainerState.FAILED, self.stop_reason or StopReason.UNKNOWN) + return False + + def process(self): """ Main process loop for the plugin. @@ -3547,6 +3834,9 @@ def process(self): self._last_paused_log = current_time return + if self._cleanup_failed and not self._retry_failed_cleanup(): + return + if not self.container: # Check if we're in backoff period if self._is_restart_backoff_active(): diff --git a/extensions/business/container_apps/fixed_volume.py b/extensions/business/container_apps/fixed_volume.py index f584f08f..6e38a5bc 100644 --- a/extensions/business/container_apps/fixed_volume.py +++ b/extensions/business/container_apps/fixed_volume.py @@ -48,6 +48,33 @@ def _log(logger: Optional[Callable], level: str, message: str) -> None: print(f"[FixedVolume] [{level}] {message}", flush=True) +def _decode_proc_mount_field(value: str) -> str: + """Decode the octal escapes used by /proc/mounts fields.""" + return (value.replace("\\040", " ") + .replace("\\011", "\t") + .replace("\\012", "\n") + .replace("\\134", "\\")) + + +def _get_mount_source(mount_path) -> Optional[str]: + """Return the exact source device mounted at ``mount_path``, if any.""" + try: + with open("/proc/mounts", "r", encoding="utf-8") as f: + lines = f.readlines() + except OSError: + return None + target = str(mount_path).rstrip("/") + for line in lines: + parts = line.split() + if len(parts) < 2: + continue + source = _decode_proc_mount_field(parts[0]) + mp = _decode_proc_mount_field(parts[1]) + if mp.rstrip("/") == target: + return source + return None + + def _is_path_mounted(mount_path) -> bool: """Return True iff `mount_path` is an exact mountpoint in /proc/mounts. @@ -72,11 +99,7 @@ def _is_path_mounted(mount_path) -> bool: parts = line.split() if len(parts) < 2: continue - mp = parts[1] - mp = (mp.replace("\\040", " ") - .replace("\\011", "\t") - .replace("\\012", "\n") - .replace("\\134", "\\")) + mp = _decode_proc_mount_field(parts[1]) if mp.rstrip("/") == target: return True return False @@ -387,39 +410,90 @@ def provision( def cleanup( vol: FixedVolume, logger: Optional[Callable] = None, -) -> None: +) -> bool: """Unmount and detach the loop device for a volume. Graceful -- never raises. All errors are caught and logged as warnings. + Returns False when unmount/detach could not be confirmed so callers can + preserve cleanup handles and retry later. """ _log( logger, "STEP", f"Cleaning up volume={vol.name} mount_path={vol.mount_path}", ) + result = True loop_dev = None + metadata_error = False if vol.meta_path.exists(): try: meta = json.loads(vol.meta_path.read_text(encoding="utf-8")) loop_dev = meta.get("loop_dev") _log(logger, "INFO", f"Loaded metadata loop_dev={loop_dev}") except Exception as exc: + metadata_error = True _log(logger, "WARN", f"Failed to read metadata error={exc}") - try: - _run(["umount", str(vol.mount_path)], logger=logger) - except Exception as exc: - _log(logger, "WARN", f"Unmount failed mount_path={vol.mount_path} error={exc}") + mount_source = _get_mount_source(vol.mount_path) + mount_source_is_loop = mount_source and str(mount_source).startswith("/dev/loop") + if mount_source_is_loop and loop_dev is None: + # A mounted loop source is a stronger identity than the sidecar metadata: + # it lets us unmount and detach safely even when metadata was lost/corrupt. + loop_dev = mount_source + metadata_error = False + _log(logger, "WARN", f"Recovered loop device from /proc/mounts loop_dev={loop_dev}") + elif mount_source_is_loop and loop_dev != mount_source: + # Metadata can be stale after interrupted cleanup/restart. The mounted + # source is the device that must be detached after unmount, so prefer it. + _log( + logger, "WARN", + f"Metadata loop_dev={loop_dev} differs from mounted source={mount_source}; using mounted source.", + ) + loop_dev = mount_source + elif mount_source and loop_dev is None: + # A mounted path without a positive loop-device identity must not be + # reported as a clean fixed-volume teardown; callers need to retain it for + # operator inspection/retry instead of dropping cleanup tracking. + result = False + _log(logger, "WARN", f"Mounted path has no loop metadata source={mount_source}") + elif mount_source: + # A fixed-size volume should be mounted from a loop device. If /proc/mounts + # says otherwise, fail closed instead of detaching a possibly unrelated + # metadata loop device and reporting success. + result = False + _log( + logger, "WARN", + f"Mounted path source is not a loop device source={mount_source}; refusing metadata loop detach.", + ) + loop_dev = None + + if metadata_error: + result = False + + if mount_source is not None: + try: + _run(["umount", str(vol.mount_path)], logger=logger) + except Exception as exc: + result = False + _log(logger, "WARN", f"Unmount failed mount_path={vol.mount_path} error={exc}") + else: + _log(logger, "INFO", f"Mount path is not mounted mount_path={vol.mount_path}") if loop_dev: try: _run(["losetup", "-d", loop_dev], logger=logger) except Exception as exc: + result = False _log(logger, "WARN", f"Detach loop failed loop_dev={loop_dev} error={exc}") + if _is_path_mounted(vol.mount_path): + result = False + _log(logger, "WARN", f"Mount path is still mounted mount_path={vol.mount_path}") + _log( logger, "INFO", - f"Cleanup complete mount_path={vol.mount_path} loop_dev={loop_dev}", + f"Cleanup complete mount_path={vol.mount_path} loop_dev={loop_dev} ok={result}", ) + return result def docker_bind_spec(vol: FixedVolume, container_target: str) -> Dict[str, Dict[str, str]]: diff --git a/extensions/business/container_apps/mixins/fixed_size_volumes.py b/extensions/business/container_apps/mixins/fixed_size_volumes.py index ef9bf868..b077aacd 100644 --- a/extensions/business/container_apps/mixins/fixed_size_volumes.py +++ b/extensions/business/container_apps/mixins/fixed_size_volumes.py @@ -226,12 +226,19 @@ def _cleanup_fixed_size_volumes(self): Called during container stop/close to free loop device resources. """ if not hasattr(self, '_fixed_volumes') or not self._fixed_volumes: - return + return True + result = True + remaining_volumes = [] for vol in self._fixed_volumes: try: - fixed_volume.cleanup(vol, logger=self.P) + cleaned = fixed_volume.cleanup(vol, logger=self.P) + if not cleaned: + result = False + remaining_volumes.append(vol) except Exception as exc: + result = False + remaining_volumes.append(vol) self.P(f"Failed to cleanup fixed volume '{vol.name}': {exc}", color='r') - self._fixed_volumes = [] - return + self._fixed_volumes = remaining_volumes + return result diff --git a/extensions/business/container_apps/tests/support.py b/extensions/business/container_apps/tests/support.py index fce7f5a8..dc09bf67 100644 --- a/extensions/business/container_apps/tests/support.py +++ b/extensions/business/container_apps/tests/support.py @@ -8,6 +8,41 @@ import numpy as _np +def install_docker_stub_if_needed(): + """Provide the tiny docker-py surface these unit tests need.""" + if "docker" in sys.modules and "docker.errors" in sys.modules and "docker.types" in sys.modules: + return + + docker_mod = types.ModuleType("docker") + errors_mod = types.ModuleType("docker.errors") + types_mod = types.ModuleType("docker.types") + + class DockerException(Exception): + pass + + class NotFound(DockerException): + pass + + class DeviceRequest: + def __init__(self, **kwargs): + self.kwargs = kwargs + + errors_mod.DockerException = DockerException + errors_mod.NotFound = NotFound + types_mod.DeviceRequest = DeviceRequest + docker_mod.errors = errors_mod + docker_mod.types = types_mod + docker_mod.from_env = MagicMock() + + sys.modules.setdefault("docker", docker_mod) + sys.modules.setdefault("docker.errors", errors_mod) + sys.modules.setdefault("docker.types", types_mod) + return + + +install_docker_stub_if_needed() + + class _DummyBasePlugin: CONFIG = {'VALIDATION_RULES': {}} @@ -210,6 +245,7 @@ def _log(*args, **kwargs): plugin.cfg_extra_tunnels_ping_interval = 30 plugin.cfg_health_check = {} plugin.cfg_restart_policy = "always" + plugin.cfg_plugin_stop_timeout = 45 plugin.volumes = {} plugin.extra_ports_mapping = {} plugin.inverted_ports_mapping = {} @@ -225,6 +261,8 @@ def _log(*args, **kwargs): plugin._health_probing_disabled = False plugin._normalized_exposed_ports = {} plugin._normalized_main_exposed_port = None + plugin._cleanup_failed = False + plugin._manual_stop_pending = False plugin.container = object() plugin.container_name = "car_instance" plugin.log = types.SimpleNamespace(get_localhost_ip=lambda: "127.0.0.1") @@ -328,6 +366,8 @@ def make_lifecycle_runner(docker_client=None, mock_container=None, **cfg_overrid # State machine plugin.container_state = ContainerState.UNINITIALIZED plugin.stop_reason = StopReason.UNKNOWN + plugin._cleanup_failed = False + plugin._manual_stop_pending = False # Restart/backoff plugin._consecutive_failures = 0 @@ -371,6 +411,7 @@ def make_lifecycle_runner(docker_client=None, mock_container=None, **cfg_overrid plugin._last_extra_tunnels_ping = 0 plugin._last_paused_log = 0 plugin.cfg_paused_state_log_interval = 60 + plugin.cfg_plugin_stop_timeout = 45 plugin.cfg_show_log_each = 60 plugin.cfg_show_log_last_lines = 5 plugin.cfg_semaphore_log_interval = 10 diff --git a/extensions/business/container_apps/tests/test_container_lifecycle.py b/extensions/business/container_apps/tests/test_container_lifecycle.py index 5e0f19ca..20141f97 100644 --- a/extensions/business/container_apps/tests/test_container_lifecycle.py +++ b/extensions/business/container_apps/tests/test_container_lifecycle.py @@ -10,13 +10,19 @@ """ import unittest +import subprocess from pathlib import Path from unittest.mock import patch, MagicMock +from extensions.business.container_apps.tests.support import install_docker_stub_if_needed + +install_docker_stub_if_needed() + import docker.errors import docker.types from extensions.business.container_apps.tests.support import ( + make_container_app_runner, make_lifecycle_runner, make_mock_container, make_mock_docker_client, @@ -220,6 +226,27 @@ def test_container_none_returns_false(self): self.assertFalse(plugin._check_container_status()) +class TestExtraTunnelCleanup(unittest.TestCase): + + def test_stop_extra_tunnels_logs_failure_when_any_tunnel_fails(self): + plugin = make_container_app_runner() + plugin.extra_tunnel_processes = { + 8001: object(), + 8002: object(), + } + plugin._stop_extra_tunnel = MagicMock(side_effect=[False, True]) + + result = plugin.stop_extra_tunnels() + + self.assertFalse(result) + self.assertEqual(plugin._stop_extra_tunnel.call_count, 2) + self.assertIn( + "One or more extra tunnels failed to stop", + plugin.logged_messages[-1], + ) + self.assertNotIn("All extra tunnels stopped", plugin.logged_messages[-1]) + + # =========================================================================== # Restart # =========================================================================== @@ -431,10 +458,14 @@ def test_runtime_stop_cleans_sidecars_without_fixed_volume_cleanup(self): self.assertTrue(result) plugin._semaphore_reset_signal.assert_called_once() self.assertEqual(log_thread.join_calls, 1) - self.assertEqual(log_thread.join_timeout, 5) + # Runtime shutdown uses one shared deadline, so each join receives the + # remaining budget rather than exactly the original 5 second timeout. + self.assertLessEqual(log_thread.join_timeout, 5) + self.assertGreater(log_thread.join_timeout, 0) self.assertIsNone(plugin.log_thread) self.assertEqual(exec_thread.join_calls, 1) - self.assertEqual(exec_thread.join_timeout, 5) + self.assertLessEqual(exec_thread.join_timeout, 5) + self.assertGreater(exec_thread.join_timeout, 0) self.assertEqual(plugin.exec_threads, []) self.assertFalse(plugin._stop_event.is_set()) self.assertFalse(plugin._commands_started) @@ -605,6 +636,107 @@ def test_process_respects_max_retries(self): errors = [m for m in plugin.logged_messages if "abandoned" in m.lower()] self.assertTrue(len(errors) > 0) + def test_process_retries_failed_cleanup_then_restarts(self): + """A transient cleanup failure must not permanently block process().""" + clock = {"now": 100} + plugin, client, _ = make_lifecycle_runner(cfg_restart_backoff_initial=0) + plugin.time = lambda: clock["now"] + plugin._cleanup_failed = True + plugin.container_state = ContainerState.FAILED + + attempts = {"count": 0} + + def retry_cleanup(): + attempts["count"] += 1 + plugin._cleanup_failed = attempts["count"] == 1 + return not plugin._cleanup_failed + + plugin._stop_container_and_save_logs_to_disk = retry_cleanup + + plugin.process() + self.assertTrue(plugin._cleanup_failed) + client.containers.run.assert_not_called() + + with _patch_docker_module(client): + plugin.process() + + self.assertFalse(plugin._cleanup_failed) + client.containers.run.assert_called_once() + self.assertEqual(plugin.container_state, ContainerState.RUNNING) + + def test_manual_stop_persists_only_after_cleanup_success(self): + plugin, _, _ = make_lifecycle_runner(cfg_restart_backoff_initial=0) + plugin._save_persistent_state = MagicMock() + plugin._clear_manual_stop_state = MagicMock() + plugin._stop_container_and_save_logs_to_disk = MagicMock(return_value=False) + + plugin.on_command("STOP") + + plugin._save_persistent_state.assert_not_called() + plugin._clear_manual_stop_state.assert_called_once() + self.assertTrue(plugin._manual_stop_pending) + self.assertEqual(plugin.container_state, ContainerState.FAILED) + + def test_pending_manual_stop_pauses_after_cleanup_retry_success(self): + plugin, client, _ = make_lifecycle_runner(cfg_restart_backoff_initial=0) + plugin._cleanup_failed = True + plugin._manual_stop_pending = True + plugin.container_state = ContainerState.FAILED + plugin._save_persistent_state = MagicMock() + plugin._stop_container_and_save_logs_to_disk = MagicMock(return_value=True) + + plugin.process() + + plugin._save_persistent_state.assert_called_once_with(manually_stopped=True) + client.containers.run.assert_not_called() + self.assertFalse(plugin._manual_stop_pending) + self.assertEqual(plugin.container_state, ContainerState.PAUSED) + + def test_restart_clears_pending_manual_stop_before_cleanup_retry(self): + plugin, client, _ = make_lifecycle_runner(cfg_restart_backoff_initial=0) + plugin._manual_stop_pending = True + plugin._cleanup_failed = True + plugin._save_persistent_state = MagicMock() + plugin._clear_manual_stop_state = MagicMock() + attempts = {"count": 0} + + def cleanup(): + attempts["count"] += 1 + plugin._cleanup_failed = attempts["count"] == 1 + return not plugin._cleanup_failed + + plugin._stop_container_and_save_logs_to_disk = cleanup + + plugin.on_command("RESTART") + + plugin._clear_manual_stop_state.assert_called_once() + plugin._save_persistent_state.assert_not_called() + self.assertFalse(plugin._manual_stop_pending) + self.assertTrue(plugin._cleanup_failed) + + with _patch_docker_module(client): + plugin.process() + + plugin._save_persistent_state.assert_not_called() + self.assertFalse(plugin._cleanup_failed) + self.assertEqual(plugin.container_state, ContainerState.RUNNING) + + def test_config_restart_respects_pending_manual_stop_cleanup(self): + plugin, _, _ = make_lifecycle_runner(cfg_restart_backoff_initial=0) + plugin._manual_stop_pending = True + plugin._cleanup_failed = True + plugin._save_persistent_state = MagicMock() + plugin._stop_container_and_save_logs_to_disk = MagicMock(return_value=True) + restart_callable = MagicMock() + + plugin._handle_config_restart(restart_callable) + + restart_callable.assert_not_called() + plugin._save_persistent_state.assert_called_once_with(manually_stopped=True) + self.assertFalse(plugin._manual_stop_pending) + self.assertFalse(plugin._cleanup_failed) + self.assertEqual(plugin.container_state, ContainerState.PAUSED) + def test_process_multiple_iterations_running(self): """Multiple process() calls with a healthy container should all succeed.""" plugin, _, container = make_lifecycle_runner() @@ -617,6 +749,45 @@ def test_process_multiple_iterations_running(self): self.assertEqual(plugin.container_state, ContainerState.RUNNING) +class _FakeProcess: + def __init__(self): + self.terminated = False + self.killed = False + self.wait_calls = 0 + + def poll(self): + return 0 if self.killed else None + + def terminate(self): + self.terminated = True + return + + def kill(self): + self.killed = True + return + + def wait(self, timeout=None): + self.wait_calls += 1 + if self.wait_calls == 1: + raise subprocess.TimeoutExpired(cmd="fake", timeout=timeout) + self.killed = True + return 0 + + +class TestTunnelCompatibilityFallbacks(unittest.TestCase): + """The edge PR must work even before the matching core PR is deployed.""" + + def test_local_subprocess_termination_fallback_without_core_helper(self): + plugin, _, _ = make_lifecycle_runner() + process = _FakeProcess() + + with patch("extensions.business.container_apps.container_app_runner.os.name", "nt"): + self.assertTrue(plugin._terminate_subprocess_tree(process, terminate_timeout=0, kill_timeout=0)) + + self.assertTrue(process.terminated) + self.assertTrue(process.killed) + + # =========================================================================== # Fixed-Size Volume Integration # =========================================================================== diff --git a/extensions/business/container_apps/tests/test_fixed_volume.py b/extensions/business/container_apps/tests/test_fixed_volume.py index cdc37bf7..20845054 100644 --- a/extensions/business/container_apps/tests/test_fixed_volume.py +++ b/extensions/business/container_apps/tests/test_fixed_volume.py @@ -250,6 +250,67 @@ def test_handles_missing_metadata(self, mock_run): # Should not raise even if meta_path doesn't exist cleanup(vol) + @patch("extensions.business.container_apps.fixed_volume._run") + def test_missing_metadata_with_mounted_path_reports_failure(self, mock_run): + vol = FixedVolume(name="data", size="100M", root=Path("/r")) + proc = f"/dev/sdb1 {vol.mount_path} ext4 rw 0 0\n" + with patch.object(Path, "exists", return_value=False), \ + patch("builtins.open", mock_open(read_data=proc)): + result = cleanup(vol) + self.assertFalse(result) + mock_run.assert_called_once_with(["umount", str(vol.mount_path)], logger=None) + + @patch("extensions.business.container_apps.fixed_volume._run") + def test_missing_metadata_with_loop_mount_recovers_and_detaches(self, mock_run): + vol = FixedVolume(name="data", size="100M", root=Path("/r")) + with patch.object(Path, "exists", return_value=False), \ + patch("extensions.business.container_apps.fixed_volume._get_mount_source", return_value="/dev/loop7"), \ + patch("extensions.business.container_apps.fixed_volume._is_path_mounted", return_value=False): + result = cleanup(vol) + self.assertTrue(result) + self.assertEqual( + [call_args.args[0] for call_args in mock_run.call_args_list], + [["umount", str(vol.mount_path)], ["losetup", "-d", "/dev/loop7"]], + ) + + @patch("extensions.business.container_apps.fixed_volume._run") + def test_malformed_metadata_reports_failure(self, mock_run): + vol = FixedVolume(name="data", size="100M", root=Path("/r")) + with patch.object(Path, "exists", return_value=True), \ + patch.object(Path, "read_text", return_value="{not-json"), \ + patch("builtins.open", mock_open(read_data="")): + result = cleanup(vol) + self.assertFalse(result) + + @patch("extensions.business.container_apps.fixed_volume._run") + def test_malformed_metadata_with_loop_mount_recovers_and_detaches(self, mock_run): + vol = FixedVolume(name="data", size="100M", root=Path("/r")) + with patch.object(Path, "exists", return_value=True), \ + patch.object(Path, "read_text", return_value="{not-json"), \ + patch("extensions.business.container_apps.fixed_volume._get_mount_source", return_value="/dev/loop9"), \ + patch("extensions.business.container_apps.fixed_volume._is_path_mounted", return_value=False): + result = cleanup(vol) + self.assertTrue(result) + self.assertEqual( + [call_args.args[0] for call_args in mock_run.call_args_list], + [["umount", str(vol.mount_path)], ["losetup", "-d", "/dev/loop9"]], + ) + + @patch("extensions.business.container_apps.fixed_volume._run") + def test_mounted_loop_source_overrides_stale_metadata_loop(self, mock_run): + vol = FixedVolume(name="data", size="100M", root=Path("/r")) + meta = {"loop_dev": "/dev/loop3"} + with patch.object(Path, "exists", return_value=True), \ + patch.object(Path, "read_text", return_value=json.dumps(meta)), \ + patch("extensions.business.container_apps.fixed_volume._get_mount_source", return_value="/dev/loop7"), \ + patch("extensions.business.container_apps.fixed_volume._is_path_mounted", return_value=False): + result = cleanup(vol) + self.assertTrue(result) + self.assertEqual( + [call_args.args[0] for call_args in mock_run.call_args_list], + [["umount", str(vol.mount_path)], ["losetup", "-d", "/dev/loop7"]], + ) + @patch("extensions.business.container_apps.fixed_volume._run") def test_handles_umount_failure(self, mock_run): vol = FixedVolume(name="data", size="100M", root=Path("/tmp/fv")) @@ -432,7 +493,18 @@ def test_calls_cleanup_for_each_volume(self, mock_cleanup): self.assertEqual(plugin._fixed_volumes, []) @patch("extensions.business.container_apps.fixed_volume.cleanup", - side_effect=[Exception("fail"), None]) + side_effect=[False, True]) + def test_retains_volume_when_cleanup_returns_false(self, mock_cleanup): + plugin = make_container_app_runner() + vol1 = FixedVolume(name="a", size="50M", root=Path("/r")) + vol2 = FixedVolume(name="b", size="50M", root=Path("/r")) + plugin._fixed_volumes = [vol1, vol2] + self.assertFalse(plugin._cleanup_fixed_size_volumes()) + self.assertEqual(mock_cleanup.call_count, 2) + self.assertEqual(plugin._fixed_volumes, [vol1]) + + @patch("extensions.business.container_apps.fixed_volume.cleanup", + side_effect=[Exception("fail"), True]) def test_continues_on_failure(self, mock_cleanup): plugin = make_container_app_runner() vol1 = FixedVolume(name="a", size="50M", root=Path("/r")) @@ -440,7 +512,7 @@ def test_continues_on_failure(self, mock_cleanup): plugin._fixed_volumes = [vol1, vol2] plugin._cleanup_fixed_size_volumes() # should not raise self.assertEqual(mock_cleanup.call_count, 2) - self.assertEqual(plugin._fixed_volumes, []) + self.assertEqual(plugin._fixed_volumes, [vol1]) if __name__ == "__main__": diff --git a/ver.py b/ver.py index b8401daa..6257bc8a 100644 --- a/ver.py +++ b/ver.py @@ -1 +1,2 @@ -__VER__ = '2.10.220' +__VER__ = '2.10.221' +