From 777b1b02c7e151746dcbdd2a0a91a4c6fc913d8f Mon Sep 17 00:00:00 2001 From: caiyanbin Date: Sat, 16 May 2026 10:59:39 +0800 Subject: [PATCH 1/4] enhancement: redis dump script content --- scheduler/app/faas_scheduler/utils.py | 43 ++++++++++++++++++++ scheduler/app/scheduler.py | 8 ++-- starter/runner.py | 58 +++++---------------------- 3 files changed, 56 insertions(+), 53 deletions(-) diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index 2ea87ad..5c14a45 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -27,6 +27,12 @@ DELETE_LOG_DAYS = os.environ.get("DELETE_LOG_DAYS", "30") DELETE_STATISTICS_DAYS = os.environ.get("DELETE_STATISTICS_DAYS", "90") LOG_LEVEL = os.environ.get("PYTHON_SCHEDULER_LOG_LEVEL", "INFO") +ALTERNATIVE_FILE_SERVER_ROOT = os.environ.get( + "PYTHON_STARTER_ALTERNATIVE_FILE_SERVER_ROOT", "" +) +USE_ALTERNATIVE_FILE_SERVER_ROOT = os.environ.get( + "PYTHON_STARTER_USE_ALTERNATIVE_FILE_SERVER_ROOT", "" +) # defaults... LOG_DIR = "/opt/scheduler/logs/" @@ -192,6 +198,43 @@ def get_script_file(dtable_uuid, script_name): return response.json() +def get_script_content(dtable_uuid, script_name): + script_file_info = get_script_file(dtable_uuid, script_name) + script_url = script_file_info["script_url"] + if ( + USE_ALTERNATIVE_FILE_SERVER_ROOT.lower() == "true" + and ALTERNATIVE_FILE_SERVER_ROOT + ): + logger.info("old script_url: %s", script_url) + script_url = requests.utils.requote_uri( + script_url.replace( + script_url.split("/", 3)[:3][-1], + script_url.split("/", 3)[:3][-1], + ) + ) + script_url = requests.compat.re.sub( + r"https?://.*?/", ALTERNATIVE_FILE_SERVER_ROOT.strip("/") + "/", script_url + ) + logger.info("new script_url: %s", script_url) + + headers = {"User-Agent": "python-scheduler/" + VERSION} + response = requests.get(script_url, headers=headers, timeout=60) + if response.status_code < 200 or response.status_code >= 300: + logger.error( + "Fail to get script content: %s %s, error response: %s, %s", + dtable_uuid, + script_name, + response.status_code, + response.text, + ) + raise ValueError("script content not found") + + return { + "script_content": response.text, + "temp_api_token": script_file_info["temp_api_token"], + } + + def update_stats(db_session, dtable_uuid, owner, org_id, spend_time): run_date = datetime.today().strftime("%Y-%m-%d") try: diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index b9bfeae..2188204 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -12,7 +12,7 @@ from faas_scheduler.models import ScriptLog from faas_scheduler.redis_client import RedisClient from faas_scheduler.utils import ( - get_script_file, + get_script_content, update_script, delete_log_after_days, delete_statistics_after_days, @@ -117,7 +117,7 @@ def run_script(self, script_log_info: dict): ScriptLog.id == script_log_info["id"] ).update({ScriptLog.state: ScriptLog.RUNNING}) db_session.commit() - script_file_info = get_script_file( + script_content_info = get_script_content( script_log_info["dtable_uuid"], script_log_info["script_name"] ) self.redis_client.lpush( @@ -125,11 +125,11 @@ def run_script(self, script_log_info: dict): json.dumps( { "script_id": script_log_info["id"], - "script_url": script_file_info["script_url"], + "script_content": script_content_info["script_content"], "dtable_uuid": script_log_info["dtable_uuid"], "env": { "dtable_web_url": SEATABLE_SERVER_URL.rstrip("/"), - "api_token": script_file_info["temp_api_token"], + "api_token": script_content_info["temp_api_token"], }, "context_data": script_log_info["context_data"], } diff --git a/starter/runner.py b/starter/runner.py index 1a3f8e0..5a46472 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -1,7 +1,6 @@ import json import logging import os -import re import shutil import subprocess import time @@ -26,12 +25,6 @@ THREAD_COUNT = int(os.environ.get("PYTHON_STARTER_THREAD_COUNT", 10)) SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)) # 15 mins -ALTERNATIVE_FILE_SERVER_ROOT = os.environ.get( - "PYTHON_STARTER_ALTERNATIVE_FILE_SERVER_ROOT", "" -) -USE_ALTERNATIVE_FILE_SERVER_ROOT = os.environ.get( - "PYTHON_STARTER_USE_ALTERNATIVE_FILE_SERVER_ROOT", "" -) OUTPUT_LIMIT = int(os.environ.get("PYTHON_RUNNER_OUTPUT_LIMIT", 1000000)) CONTAINER_MEMORY = os.environ.get( @@ -128,15 +121,6 @@ def basic_log(log_file): time_zone_str = time_zone_str.strip() SYSTEM_TIMEZONE_COMMAND = ["-e", "TZ=%s" % time_zone_str] - -def to_python_bool(value): - if isinstance(value, bool): - return value - if not isinstance(value, str): - return False - return value.lower() == "true" - - class CallbackScriptRunningError(Exception): pass @@ -218,19 +202,12 @@ def run_python(data): callback_script_running(data.get("script_id"), started_at) - script_url = data.get("script_url") - if not script_url: - send_to_scheduler(False, None, "Script URL is missing", started_at, None, data) - return - if ( - to_python_bool(USE_ALTERNATIVE_FILE_SERVER_ROOT) - and ALTERNATIVE_FILE_SERVER_ROOT - ): - logging.info("old script_url: %s", script_url) - script_url = re.sub( - r"https?://.*?/", ALTERNATIVE_FILE_SERVER_ROOT.strip("/") + "/", script_url + script_content = data.get("script_content") + if not isinstance(script_content, str): + send_to_scheduler( + False, None, "Script content is missing", started_at, None, data ) - logging.info("new script_url: %s", script_url) + return # env must be map env = data.get("env") @@ -251,22 +228,6 @@ def run_python(data): context_data = None context_data = json.dumps(context_data) if context_data else None - logging.debug("try to get script from seatable server") - try: - headers = {"User-Agent": "python-starter/" + VERSION} - resp = requests.get(script_url, headers=headers, timeout=60) - logging.debug("response from seatable server: resp: %s", resp) - if resp.status_code < 200 or resp.status_code >= 300: - logging.error( - "Failed to get script from %s, response: %s", script_url, resp - ) - send_to_scheduler(False, None, "Fail to get script", started_at, None, data) - return - except Exception as e: - logging.error("Failed to get script from %s, error: %s", script_url, e) - send_to_scheduler(False, None, "Fail to get script", started_at, None, data) - return - logging.debug("Generate temporary random folder directory") tmp_id = uuid4().hex tmp_dir = os.path.join(PYTHON_TRANSFER_DIRECTORY, tmp_id) @@ -281,7 +242,7 @@ def run_python(data): logging.debug("try to save the script and env.list to the temporary directory") try: with open(os.path.join(tmp_dir, "index.py"), "wb") as f: - f.write(resp.content) + f.write(script_content.encode("utf-8")) # save env env_file = os.path.join(tmp_dir, "env.list") with open(env_file, "w") as f: @@ -295,7 +256,7 @@ def run_python(data): return_code, output = None, "" # init output except Exception as e: - logging.error("Failed to save script %s, error: %s", script_url, e) + logging.error("Failed to save script content, error: %s", e) send_to_scheduler(False, -1, "", started_at, 0, data) return @@ -394,8 +355,7 @@ def run_python(data): ) except Exception as stop_e: logging.warning( - "stop script: %s container: %s, error: %s", - script_url, + "stop script content container: %s, error: %s", container_name, stop_e, ) @@ -410,7 +370,7 @@ def run_python(data): return except Exception as e: logging.exception(e) - logging.error("Failed to run file %s error: %s", script_url, e) + logging.error("Failed to run script content error: %s", e) send_to_scheduler(False, None, None, started_at, None, data) return else: From 486290c4aa360ce8201da06e6dcbfd47e3a2f9e8 Mon Sep 17 00:00:00 2001 From: caiyanbin Date: Sat, 16 May 2026 15:32:32 +0800 Subject: [PATCH 2/4] feat: add run draft script api --- scheduler/app/flask_server.py | 61 ++++++++++++++++++++ scheduler/app/scheduler.py | 105 +++++++++++++++++++++++++++++++++- 2 files changed, 165 insertions(+), 1 deletion(-) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index a4b7182..a245c12 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -99,6 +99,67 @@ def scripts_api(): db_session.close() +@app.route("/run-draft-script/", methods=["POST"]) +def draft_scripts_api(): + if not check_auth_token(request): + return make_response(("Forbidden: the auth token is not correct.", 403)) + + logger.debug("dtable-web initialized the execution of a draft python script...") + try: + data = json.loads(request.data) + if not isinstance(data, dict): + return make_response(("Bad request", 400)) + except Exception: + return make_response(("Bad request", 400)) + + dtable_uuid = data.get("dtable_uuid") + script_name = data.get("script_name") + script_content = data.get("script_content") + context_data = data.get("context_data") + owner = data.get("owner") + org_id = data.get("org_id") + temp_api_token = data.get("temp_api_token") + scripts_running_limit = data.get("scripts_running_limit", -1) + operate_from = data.get("operate_from", "draft") + if ( + not dtable_uuid + or not script_name + or not owner + or not isinstance(script_content, str) + or not temp_api_token + ): + return make_response(("Parameters invalid", 400)) + + db_session = DBSession() + logger.debug("create a database entry for this draft python run...") + try: + if scripts_running_limit != -1 and not can_run_task( + owner, org_id, db_session, scripts_running_limit=scripts_running_limit + ): + return make_response(("The number of runs exceeds the limit"), 400) + + script_log = add_script( + db_session, + dtable_uuid, + owner, + org_id, + script_name, + context_data, + operate_from, + ) + script_log_info = script_log.to_dict() + script_log_info["script_content"] = script_content + script_log_info["temp_api_token"] = temp_api_token + scheduler.add_script(script_log_info) + + return make_response(({"script_id": script_log.id}, 200)) + except Exception as e: + logger.exception(e) + return make_response(("Internal server error", 500)) + finally: + db_session.close() + + # called from dtable-web to get the status of a specific run. @app.route("/run-script//", methods=["GET"]) def script_api(script_id): diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 2188204..7a9e4ee 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -25,6 +25,7 @@ SEATABLE_SERVER_URL = os.environ.get("SEATABLE_SERVER_URL") REDIS_SCRIPTS_QUEUE_MAX_SIZE = int(os.environ.get("REDIS_SCRIPTS_QUEUE_MAX_SIZE", "1")) +DRAFT_CONTENT_LOST_ERROR = "Draft script content was lost after scheduler restart" class Scheduler: @@ -182,6 +183,89 @@ def run_script(self, script_log_info: dict): db_session.close() self.on_script_done(script_log_info, 0) + def run_draft_script(self, script_log_info: dict): + now = time.time() + + db_session = DBSession() + try: + script_content = script_log_info.get("script_content") + temp_api_token = script_log_info.get("temp_api_token") + if not isinstance(script_content, str) or not temp_api_token: + raise ValueError(DRAFT_CONTENT_LOST_ERROR) + + db_session.query(ScriptLog).filter( + ScriptLog.id == script_log_info["id"] + ).update({ScriptLog.state: ScriptLog.RUNNING}) + db_session.commit() + self.redis_client.lpush( + SCRIPTS_KEY, + json.dumps( + { + "script_id": script_log_info["id"], + "script_content": script_content, + "dtable_uuid": script_log_info["dtable_uuid"], + "env": { + "dtable_web_url": SEATABLE_SERVER_URL.rstrip("/"), + "api_token": temp_api_token, + }, + "context_data": script_log_info["context_data"], + } + ), + ) + logger.info( + "dispatched script id %s org_id %s owner %s dtable_uuid %s script_name %s", + script_log_info["id"], + script_log_info["org_id"], + script_log_info["owner"], + script_log_info["dtable_uuid"], + script_log_info["script_name"], + ) + except Exception as e: + output = ( + DRAFT_CONTENT_LOST_ERROR + if str(e) == DRAFT_CONTENT_LOST_ERROR + else "Failed" + ) + logger.exception( + "dispatched script id %s org_id %s owner %s dtable_uuid %s script_name %s error %s", + script_log_info["id"], + script_log_info["org_id"], + script_log_info["owner"], + script_log_info["dtable_uuid"], + script_log_info["script_name"], + e, + ) + try: + script_log = ( + db_session.query(ScriptLog) + .filter(ScriptLog.id == script_log_info["id"]) + .first() + ) + update_script( + db_session, + script_log, + False, + -1, + output, + datetime.fromtimestamp(now), + datetime.fromtimestamp(now), + ) + except Exception as ee: + logger.exception( + "update script id %s org_id %s owner %s dtable_uuid %s script_name %s finished error %s", + script_log_info["id"], + script_log_info["org_id"], + script_log_info["owner"], + script_log_info["dtable_uuid"], + script_log_info["script_name"], + ee, + ) + finally: + db_session.close() + self.on_script_done(script_log_info, 0) + else: + db_session.close() + def load_pending_script_logs(self): db_session = DBSession() try: @@ -191,7 +275,23 @@ def load_pending_script_logs(self): .order_by(ScriptLog.id) ) logger.info("load %s pending scripts", len(script_logs)) + now = datetime.now() for script_log in script_logs: + if script_log.operate_from == "draft": + update_script( + db_session, + script_log, + False, + -1, + DRAFT_CONTENT_LOST_ERROR, + now, + now, + ) + logger.info( + "mark pending draft script %s as failed after scheduler restart", + script_log.id, + ) + continue self.add_script(script_log.to_dict()) except Exception as e: logger.exception("load pending script logs error %s", e) @@ -237,7 +337,10 @@ def schedule(self): usage / (self.window_secs * self.python_starter_total_thread_count) < self.rate_limit_percent ): - self.run_script(script_log_info) + if script_log_info.get("operate_from") == "draft": + self.run_draft_script(script_log_info) + else: + self.run_script(script_log_info) else: db_session = DBSession() try: From 8a7a611049a8e1a19ec99cbdeb4c0794944f8ddc Mon Sep 17 00:00:00 2001 From: caiyanbin Date: Sat, 16 May 2026 15:43:54 +0800 Subject: [PATCH 3/4] uodate --- starter/runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/starter/runner.py b/starter/runner.py index 5a46472..347bb23 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -121,6 +121,7 @@ def basic_log(log_file): time_zone_str = time_zone_str.strip() SYSTEM_TIMEZONE_COMMAND = ["-e", "TZ=%s" % time_zone_str] + class CallbackScriptRunningError(Exception): pass From 69ee1067e8705cc86bf7566d27337162f2ddafe5 Mon Sep 17 00:00:00 2001 From: caiyanbin Date: Sat, 16 May 2026 15:53:22 +0800 Subject: [PATCH 4/4] update --- scheduler/app/faas_scheduler/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index 5c14a45..097c09a 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -1,6 +1,7 @@ import os import json import logging +import re import requests from datetime import datetime, timedelta from typing import List, Optional, Tuple @@ -212,7 +213,7 @@ def get_script_content(dtable_uuid, script_name): script_url.split("/", 3)[:3][-1], ) ) - script_url = requests.compat.re.sub( + script_url = re.sub( r"https?://.*?/", ALTERNATIVE_FILE_SERVER_ROOT.strip("/") + "/", script_url ) logger.info("new script_url: %s", script_url)