Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions scheduler/app/faas_scheduler/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import json
import logging
import re
import requests
from datetime import datetime, timedelta
from typing import List, Optional, Tuple
Expand All @@ -27,6 +28,12 @@
DELETE_LOG_DAYS = os.environ.get("DELETE_LOG_DAYS", "30")
DELETE_STATISTICS_DAYS = os.environ.get("DELETE_STATISTICS_DAYS", "90")
LOG_LEVEL = os.environ.get("PYTHON_SCHEDULER_LOG_LEVEL", "INFO")
ALTERNATIVE_FILE_SERVER_ROOT = os.environ.get(
"PYTHON_STARTER_ALTERNATIVE_FILE_SERVER_ROOT", ""
)
USE_ALTERNATIVE_FILE_SERVER_ROOT = os.environ.get(
"PYTHON_STARTER_USE_ALTERNATIVE_FILE_SERVER_ROOT", ""
)

# defaults...
LOG_DIR = "/opt/scheduler/logs/"
Expand Down Expand Up @@ -192,6 +199,43 @@ def get_script_file(dtable_uuid, script_name):
return response.json()


def get_script_content(dtable_uuid, script_name):
script_file_info = get_script_file(dtable_uuid, script_name)
script_url = script_file_info["script_url"]
if (
USE_ALTERNATIVE_FILE_SERVER_ROOT.lower() == "true"
and ALTERNATIVE_FILE_SERVER_ROOT
):
logger.info("old script_url: %s", script_url)
script_url = requests.utils.requote_uri(
script_url.replace(
script_url.split("/", 3)[:3][-1],
script_url.split("/", 3)[:3][-1],
)
)
script_url = re.sub(
r"https?://.*?/", ALTERNATIVE_FILE_SERVER_ROOT.strip("/") + "/", script_url
)
logger.info("new script_url: %s", script_url)

headers = {"User-Agent": "python-scheduler/" + VERSION}
response = requests.get(script_url, headers=headers, timeout=60)
if response.status_code < 200 or response.status_code >= 300:
logger.error(
"Fail to get script content: %s %s, error response: %s, %s",
dtable_uuid,
script_name,
response.status_code,
response.text,
)
raise ValueError("script content not found")

return {
"script_content": response.text,
"temp_api_token": script_file_info["temp_api_token"],
}


def update_stats(db_session, dtable_uuid, owner, org_id, spend_time):
run_date = datetime.today().strftime("%Y-%m-%d")
try:
Expand Down
61 changes: 61 additions & 0 deletions scheduler/app/flask_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,67 @@ def scripts_api():
db_session.close()


@app.route("/run-draft-script/", methods=["POST"])
def draft_scripts_api():
if not check_auth_token(request):
return make_response(("Forbidden: the auth token is not correct.", 403))

logger.debug("dtable-web initialized the execution of a draft python script...")
try:
data = json.loads(request.data)
if not isinstance(data, dict):
return make_response(("Bad request", 400))
except Exception:
return make_response(("Bad request", 400))

dtable_uuid = data.get("dtable_uuid")
script_name = data.get("script_name")
script_content = data.get("script_content")
context_data = data.get("context_data")
owner = data.get("owner")
org_id = data.get("org_id")
temp_api_token = data.get("temp_api_token")
scripts_running_limit = data.get("scripts_running_limit", -1)
operate_from = data.get("operate_from", "draft")
if (
not dtable_uuid
or not script_name
or not owner
or not isinstance(script_content, str)
or not temp_api_token
):
return make_response(("Parameters invalid", 400))

db_session = DBSession()
logger.debug("create a database entry for this draft python run...")
try:
if scripts_running_limit != -1 and not can_run_task(
owner, org_id, db_session, scripts_running_limit=scripts_running_limit
):
return make_response(("The number of runs exceeds the limit"), 400)

script_log = add_script(
db_session,
dtable_uuid,
owner,
org_id,
script_name,
context_data,
operate_from,
)
script_log_info = script_log.to_dict()
script_log_info["script_content"] = script_content
script_log_info["temp_api_token"] = temp_api_token
scheduler.add_script(script_log_info)

return make_response(({"script_id": script_log.id}, 200))
except Exception as e:
logger.exception(e)
return make_response(("Internal server error", 500))
finally:
db_session.close()


# called from dtable-web to get the status of a specific run.
@app.route("/run-script/<script_id>/", methods=["GET"])
def script_api(script_id):
Expand Down
113 changes: 108 additions & 5 deletions scheduler/app/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from faas_scheduler.models import ScriptLog
from faas_scheduler.redis_client import RedisClient
from faas_scheduler.utils import (
get_script_file,
get_script_content,
update_script,
delete_log_after_days,
delete_statistics_after_days,
Expand All @@ -25,6 +25,7 @@

SEATABLE_SERVER_URL = os.environ.get("SEATABLE_SERVER_URL")
REDIS_SCRIPTS_QUEUE_MAX_SIZE = int(os.environ.get("REDIS_SCRIPTS_QUEUE_MAX_SIZE", "1"))
DRAFT_CONTENT_LOST_ERROR = "Draft script content was lost after scheduler restart"


class Scheduler:
Expand Down Expand Up @@ -117,19 +118,19 @@ def run_script(self, script_log_info: dict):
ScriptLog.id == script_log_info["id"]
).update({ScriptLog.state: ScriptLog.RUNNING})
db_session.commit()
script_file_info = get_script_file(
script_content_info = get_script_content(
script_log_info["dtable_uuid"], script_log_info["script_name"]
)
self.redis_client.lpush(
SCRIPTS_KEY,
json.dumps(
{
"script_id": script_log_info["id"],
"script_url": script_file_info["script_url"],
"script_content": script_content_info["script_content"],
"dtable_uuid": script_log_info["dtable_uuid"],
"env": {
"dtable_web_url": SEATABLE_SERVER_URL.rstrip("/"),
"api_token": script_file_info["temp_api_token"],
"api_token": script_content_info["temp_api_token"],
},
"context_data": script_log_info["context_data"],
}
Expand Down Expand Up @@ -182,6 +183,89 @@ def run_script(self, script_log_info: dict):
db_session.close()
self.on_script_done(script_log_info, 0)

def run_draft_script(self, script_log_info: dict):
now = time.time()

db_session = DBSession()
try:
script_content = script_log_info.get("script_content")
temp_api_token = script_log_info.get("temp_api_token")
if not isinstance(script_content, str) or not temp_api_token:
raise ValueError(DRAFT_CONTENT_LOST_ERROR)

db_session.query(ScriptLog).filter(
ScriptLog.id == script_log_info["id"]
).update({ScriptLog.state: ScriptLog.RUNNING})
db_session.commit()
self.redis_client.lpush(
SCRIPTS_KEY,
json.dumps(
{
"script_id": script_log_info["id"],
"script_content": script_content,
"dtable_uuid": script_log_info["dtable_uuid"],
"env": {
"dtable_web_url": SEATABLE_SERVER_URL.rstrip("/"),
"api_token": temp_api_token,
},
"context_data": script_log_info["context_data"],
}
),
)
logger.info(
"dispatched script id %s org_id %s owner %s dtable_uuid %s script_name %s",
script_log_info["id"],
script_log_info["org_id"],
script_log_info["owner"],
script_log_info["dtable_uuid"],
script_log_info["script_name"],
)
except Exception as e:
output = (
DRAFT_CONTENT_LOST_ERROR
if str(e) == DRAFT_CONTENT_LOST_ERROR
else "Failed"
)
logger.exception(
"dispatched script id %s org_id %s owner %s dtable_uuid %s script_name %s error %s",
script_log_info["id"],
script_log_info["org_id"],
script_log_info["owner"],
script_log_info["dtable_uuid"],
script_log_info["script_name"],
e,
)
try:
script_log = (
db_session.query(ScriptLog)
.filter(ScriptLog.id == script_log_info["id"])
.first()
)
update_script(
db_session,
script_log,
False,
-1,
output,
datetime.fromtimestamp(now),
datetime.fromtimestamp(now),
)
except Exception as ee:
logger.exception(
"update script id %s org_id %s owner %s dtable_uuid %s script_name %s finished error %s",
script_log_info["id"],
script_log_info["org_id"],
script_log_info["owner"],
script_log_info["dtable_uuid"],
script_log_info["script_name"],
ee,
)
finally:
db_session.close()
self.on_script_done(script_log_info, 0)
else:
db_session.close()

def load_pending_script_logs(self):
db_session = DBSession()
try:
Expand All @@ -191,7 +275,23 @@ def load_pending_script_logs(self):
.order_by(ScriptLog.id)
)
logger.info("load %s pending scripts", len(script_logs))
now = datetime.now()
for script_log in script_logs:
if script_log.operate_from == "draft":
update_script(
db_session,
script_log,
False,
-1,
DRAFT_CONTENT_LOST_ERROR,
now,
now,
)
logger.info(
"mark pending draft script %s as failed after scheduler restart",
script_log.id,
)
continue
self.add_script(script_log.to_dict())
except Exception as e:
logger.exception("load pending script logs error %s", e)
Expand Down Expand Up @@ -237,7 +337,10 @@ def schedule(self):
usage / (self.window_secs * self.python_starter_total_thread_count)
< self.rate_limit_percent
):
self.run_script(script_log_info)
if script_log_info.get("operate_from") == "draft":
self.run_draft_script(script_log_info)
else:
self.run_script(script_log_info)
else:
db_session = DBSession()
try:
Expand Down
Loading
Loading