diff --git a/.gitignore b/.gitignore index cb64eaa..701dcbb 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ local_settings.py seatable-python-runner/ seatable-python-runner.zip + +.python-version diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index 5f76c20..24cb2be 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -31,6 +31,12 @@ class ScriptLog(Base): return_code = Column(Integer, nullable=True) output = Column(Text, nullable=True) operate_from = Column(String(255)) + state = Column(String(10)) + created_at = Column(DateTime, index=True) + + PENDING = "pending" + RUNNING = "running" + FINISHED = "finished" def __init__( self, @@ -39,7 +45,8 @@ def __init__( org_id, script_name, context_data, - started_at, + state, + created_at, operate_from=None, ): self.dtable_uuid = dtable_uuid @@ -47,7 +54,8 @@ def __init__( self.org_id = org_id self.script_name = script_name self.context_data = context_data - self.started_at = started_at + self.state = state + self.created_at = created_at self.operate_from = operate_from def to_dict(self, include_context_data=True, include_output=True): @@ -57,6 +65,7 @@ def to_dict(self, include_context_data=True, include_output=True): "id": self.id, "dtable_uuid": self.dtable_uuid, "owner": self.owner, + "org_id": self.org_id, "script_name": self.script_name, "started_at": datetime_to_isoformat_timestr(self.started_at), "finished_at": self.finished_at @@ -64,6 +73,9 @@ def to_dict(self, include_context_data=True, include_output=True): "success": self.success, "return_code": self.return_code, "operate_from": self.operate_from, + "state": self.state, + "created_at": self.created_at + and datetime_to_isoformat_timestr(self.created_at), } if include_context_data: diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index 331a3f1..32db4ba 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -2,19 +2,23 @@ import json import logging import requests -from datetime import datetime +from datetime import datetime, timedelta from typing import List, Optional, Tuple from uuid import UUID from tzlocal import get_localzone -from sqlalchemy import case, desc, func, text +from sqlalchemy import case, desc, func, text, and_ from sqlalchemy.orm import load_only -from faas_scheduler.models import ScriptLog +from faas_scheduler.models import ( + ScriptLog, + UserRunScriptStatistics, + OrgRunScriptStatistics, + DTableRunScriptStatistics, +) import sys sys.path.append("/opt/scheduler") -from database import DBSession logger = logging.getLogger(__name__) @@ -68,6 +72,10 @@ class ScriptInvalidException(Exception): pass +class RunScriptError(Exception): + pass + + ## part of ping to get the check if the python starter can be reached. def ping_starter(): response = requests.get(STARTER_URL.rstrip("/") + "/ping/", timeout=30) @@ -122,6 +130,34 @@ def delete_statistics_after_days(db_session): db_session.close() +def update_running_scripts_timeout(db_session): + deadline = datetime.now() - timedelta(seconds=SUB_PROCESS_TIMEOUT) + updated_count = ( + db_session.query(ScriptLog) + .filter( + and_( + ScriptLog.started_at.isnot(None), + ScriptLog.started_at <= deadline, + ScriptLog.state == "running", + ) + ) + .update( + { + ScriptLog.output: "timeout", + ScriptLog.return_code: -1, + ScriptLog.success: False, + ScriptLog.finished_at: datetime.now(), + ScriptLog.state: "finished", + }, + synchronize_session=False, + ) + ) + + db_session.commit() + + logger.info("updated %s script logs", updated_count) + + def check_auth_token(request): value = request.headers.get("Authorization", "") if ( @@ -174,6 +210,7 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None): }, "context_data": context_data, "script_id": script_id, + "timeout": int(SUB_PROCESS_TIMEOUT), } headers = {"User-Agent": "python-scheduler/" + VERSION} logger.debug("I call starter at url %s", RUN_FUNC_URL) @@ -182,6 +219,14 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None): # script will be executed asynchronously, so there will be nothing in response # so only check response + except Exception as e: + logger.error( + "Fail to call scheduler: %s, data: %s, error: %s", RUN_FUNC_URL, data, e + ) + raise RunScriptError( + f"Failed to call scheduler {RUN_FUNC_URL} data {data} error {e}" + ) from e + else: if response.status_code != 200: logger.error( "Fail to call scheduler: %s, data: %s, error response: %s, %s", @@ -190,74 +235,145 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None): response.status_code, response.text, ) + raise RunScriptError( + f"Failed to call scheduler {RUN_FUNC_URL} data {data} error response status {response.status_code} text {response.text}" + ) + +def update_stats_run_count(db_session, dtable_uuid, owner, org_id): + run_date = datetime.today().strftime("%Y-%m-%d") + try: + dtable_stats = ( + db_session.query(DTableRunScriptStatistics) + .filter_by(dtable_uuid=dtable_uuid, run_date=run_date) + .first() + ) + if not dtable_stats: + dtable_stats = DTableRunScriptStatistics( + dtable_uuid=dtable_uuid, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(dtable_stats) + else: + dtable_stats.total_run_count += 1 + dtable_stats.update_at = datetime.now() + if org_id == -1: + if "@seafile_group" not in owner: + user_stats = ( + db_session.query(UserRunScriptStatistics) + .filter_by(username=owner, run_date=run_date) + .first() + ) + if not user_stats: + user_stats = UserRunScriptStatistics( + username=owner, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(user_stats) + else: + user_stats.total_run_count += 1 + user_stats.update_at = datetime.now() + else: + org_stats = ( + db_session.query(OrgRunScriptStatistics) + .filter_by(org_id=org_id, run_date=run_date) + .first() + ) + if not org_stats: + org_stats = OrgRunScriptStatistics( + org_id=org_id, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(org_stats) + else: + org_stats.total_run_count += 1 + org_stats.update_at = datetime.now() + db_session.commit() except Exception as e: - logger.error( - "Fail to call scheduler: %s, data: %s, error: %s", RUN_FUNC_URL, data, e + logger.exception( + "update stats for org_id %s owner %s dtable %s run count error %s", + org_id, + owner, + dtable_uuid, + e, ) - return None - - -def update_statistics(db_session, dtable_uuid, owner, org_id, spend_time): - if not spend_time: - return - username = owner - # dtable_run_script_statistcis - sqls = [ - """ - INSERT INTO dtable_run_script_statistics(dtable_uuid, run_date, total_run_count, total_run_time, update_at) VALUES - (:dtable_uuid, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] - - # org_run_script_statistics - if org_id and org_id != -1: - sqls += [ - """ - INSERT INTO org_run_script_statistics(org_id, run_date, total_run_count, total_run_time, update_at) VALUES - (:org_id, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] - - # user_run_script_statistics - if "@seafile_group" not in username: - sqls += [ - """ - INSERT INTO user_run_script_statistics(username, org_id, run_date, total_run_count, total_run_time, update_at) VALUES - (:username, :org_id, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - org_id=:org_id, - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] +def update_stats_run_time(db_session, dtable_uuid, owner, org_id, spend_time): + run_date = datetime.today().strftime("%Y-%m-%d") try: - for sql in sqls: - db_session.execute( - text(sql), - { - "dtable_uuid": dtable_uuid, - "username": username, - "org_id": org_id, - "run_date": datetime.today(), - "spend_time": spend_time, - "update_at": datetime.now(), - }, + dtable_stats = ( + db_session.query(DTableRunScriptStatistics) + .filter_by(dtable_uuid=dtable_uuid, run_date=run_date) + .first() + ) + if not dtable_stats: + dtable_stats = DTableRunScriptStatistics( + dtable_uuid=dtable_uuid, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(dtable_stats) + else: + dtable_stats.total_run_time += spend_time + dtable_stats.update_at = datetime.now() + if org_id == -1: + if "@seafile_group" not in owner: + user_stats = ( + db_session.query(UserRunScriptStatistics) + .filter_by(username=owner, run_date=run_date) + .first() + ) + if not user_stats: + user_stats = UserRunScriptStatistics( + username=owner, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(user_stats) + else: + user_stats.total_run_time += spend_time + user_stats.update_at = datetime.now() + else: + org_stats = ( + db_session.query(OrgRunScriptStatistics) + .filter_by(org_id=org_id, run_date=run_date) + .first() ) + if not org_stats: + org_stats = OrgRunScriptStatistics( + org_id=org_id, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(org_stats) + else: + org_stats.total_run_time += spend_time + org_stats.update_at = datetime.now() db_session.commit() except Exception as e: - logger.exception("update statistics sql error: %s", e) + logger.exception( + "update stats for org_id %s owner %s dtable %s run time error %s", + org_id, + owner, + dtable_uuid, + e, + ) # required to get "script logs" in dtable-web @@ -381,20 +497,33 @@ def add_script( org_id, script_name, context_data, + ScriptLog.PENDING, datetime.now(), operate_from, ) db_session.add(script) db_session.commit() + update_stats_run_count(db_session, dtable_uuid, owner, org_id) + return script -def update_script(db_session, script, success, return_code, output): - script.finished_at = datetime.now() +def update_script_running(db_session, started_at, script): + script.started_at = started_at + script.state = ScriptLog.RUNNING + db_session.commit() + + +def update_script( + db_session, script, success, return_code, output, started_at, finished_at +): + script.started_at = started_at + script.finished_at = finished_at script.success = success script.return_code = return_code script.output = output + script.state = ScriptLog.FINISHED db_session.commit() return script @@ -405,28 +534,23 @@ def run_script( script_id, dtable_uuid, script_name, script_url, temp_api_token, context_data ): """Only for flask-server""" - # from faas_scheduler import DBSession - db_session = DBSession() # for multithreading + if not script_url: + script_file = get_script_file(dtable_uuid, script_name) + script_url = script_file.get("script_url", "") + logger.debug("run_script executed...") + call_faas_func(script_url, temp_api_token, context_data, script_id=script_id) - try: - if not script_url: - script_file = get_script_file(dtable_uuid, script_name) - script_url = script_file.get("script_url", "") - logger.debug("run_script executed...") - call_faas_func(script_url, temp_api_token, context_data, script_id=script_id) - except Exception as e: - logger.exception("Run script %d error: %s", script_id, e) - finally: - db_session.close() - - return True - -def hook_update_script(db_session, script_id, success, return_code, output, spend_time): +def hook_update_script( + db_session, script_id, success, return_code, output, started_at, spend_time +): script = db_session.query(ScriptLog).filter_by(id=script_id).first() if script: - update_script(db_session, script, success, return_code, output) - update_statistics( + finished_at = started_at + timedelta(seconds=spend_time) + update_script( + db_session, script, success, return_code, output, started_at, finished_at + ) + update_stats_run_time( db_session, script.dtable_uuid, script.owner, script.org_id, spend_time ) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index fd1047e..ac5494e 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -8,7 +8,6 @@ from datetime import datetime, timedelta from flask import Flask, request, make_response from gevent.pywsgi import WSGIServer -from concurrent.futures import ThreadPoolExecutor from database import DBSession from faas_scheduler.utils import ( @@ -17,11 +16,8 @@ get_statistics_grouped_by_base, get_statistics_grouped_by_day, is_date_yyyy_mm_dd, - run_script, get_script, - add_script, get_run_script_statistics_by_month, - hook_update_script, can_run_task, get_run_scripts_count_monthly, ping_starter, @@ -29,22 +25,29 @@ list_task_logs, uuid_str_to_32_chars, basic_log, + add_script, + hook_update_script, + update_script_running, ) - +from scheduler import Scheduler basic_log("scheduler.log") +scheduler = Scheduler( + int(os.environ.get("PYTHON_SCHEDULER_WINDOW_SECS", 300)), + float(os.environ.get("PYTHON_SCHEDULER_RATE_LIMIT_PERCENT", 25)), +) + # defaults... -SCRIPT_WORKERS = int(os.environ.get("PYTHON_SCHEDULER_SCRIPT_WORKERS", 5)) SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)) TIMEOUT_OUTPUT = ( "The script's running time exceeded the limit and the execution was aborted." ) +HOST = os.environ.get("PYTHON_SCHEDULER_BIND_HOST", "127.0.0.1") app = Flask(__name__) logger = logging.getLogger(__name__) -executor = ThreadPoolExecutor(max_workers=SCRIPT_WORKERS) @app.route("/ping/", methods=["GET"]) @@ -78,8 +81,6 @@ def scripts_api(): context_data = data.get("context_data") owner = data.get("owner") org_id = data.get("org_id") - script_url = data.get("script_url") - temp_api_token = data.get("temp_api_token") scripts_running_limit = data.get("scripts_running_limit", -1) operate_from = data.get("operate_from", "manualy") if not dtable_uuid or not script_name or not owner: @@ -93,27 +94,19 @@ def scripts_api(): owner, org_id, db_session, scripts_running_limit=scripts_running_limit ): return make_response(("The number of runs exceeds the limit"), 400) - script = add_script( + + script_log = add_script( db_session, - dtable_uuid, + uuid_str_to_32_chars(dtable_uuid), owner, org_id, script_name, context_data, operate_from, ) - logger.debug("lets call the starter to fire up the runner...") - executor.submit( - run_script, - script.id, - dtable_uuid, - script_name, - script_url, - temp_api_token, - context_data, - ) + scheduler.add_script(script_log.to_dict()) - return make_response(({"script_id": script.id}, 200)) + return make_response(({"script_id": script_log.id}, 200)) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) @@ -146,18 +139,21 @@ def script_api(script_id): script = get_script(db_session, script_id) if not script: return make_response(("Not found", 404)) - if dtable_uuid != script.dtable_uuid or script_name != script.script_name: + if ( + uuid_str_to_32_chars(dtable_uuid) != script.dtable_uuid + or script_name != script.script_name + ): return make_response(("Bad request", 400)) - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - now = datetime.now() - duration_seconds = (now - script.started_at).seconds - if duration_seconds > SUB_PROCESS_TIMEOUT: - script.success = False - script.return_code = -1 - script.finished_at = now - script.output = TIMEOUT_OUTPUT - db_session.commit() + # if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): + # now = datetime.now() + # duration_seconds = (now - script.created_at).seconds + # if duration_seconds > SUB_PROCESS_TIMEOUT: + # script.success = False + # script.return_code = -1 + # script.finished_at = now + # script.output = TIMEOUT_OUTPUT + # db_session.commit() return make_response(({"script": script.to_dict()}, 200)) @@ -190,7 +186,9 @@ def task_logs_api(dtable_uuid, script_name): db_session = DBSession() try: - task_logs = list_task_logs(db_session, dtable_uuid, script_name, order_by) + task_logs = list_task_logs( + db_session, uuid_str_to_32_chars(dtable_uuid), script_name, order_by + ) count = task_logs.count() task_logs = task_logs[start:end] task_log_list = [task_log.to_dict() for task_log in task_logs] @@ -277,6 +275,36 @@ def scripts_running_count(): return make_response(({"count": count}, 200)) +# endpoint to be informed that a script starts to run. (from starter) +@app.route("/script-running-callback/", methods=["POST"]) +def callback_script_running(): + """ + Update script_log is running, called from python-starter + """ + try: + data = request.get_json() + except Exception: + return make_response("Bad Request.", 400) + script_id = data.get("script_id") + started_at = data.get("started_at") + + db_session = DBSession() + try: + script_log = get_script(db_session, script_id) + if not script_log: + return {"error_msg": "Script not found"}, 404 + update_script_running( + db_session, datetime.fromtimestamp(started_at), script_log + ) + scheduler.on_script_start(script_log.to_dict(), started_at) + except Exception as e: + logger.exception("update script %s running error %s", script_id, e) + finally: + db_session.close() + + return {"success": True}, 200 + + # endpoint to be informed that the execution of python code is done. (from starter) @app.route("/script-result/", methods=["POST"]) def record_script_result(): @@ -290,17 +318,26 @@ def record_script_result(): success = data.get("success", False) return_code = data.get("return_code") output = data.get("output") - spend_time = data.get("spend_time") + started_at = data.get("started_at") + spend_time = data.get("spend_time") or 0 script_id = data.get("script_id") - db_session = DBSession() - # update script_log and run-time statistics + db_session = DBSession() try: - if script_id: - hook_update_script( - db_session, script_id, success, return_code, output, spend_time - ) + script_log = get_script(db_session, script_id) + if not script_log: + return {"error_msg": "Script not found"}, 404 + hook_update_script( + db_session, + script_id, + success, + return_code, + output, + datetime.fromtimestamp(started_at), + spend_time, + ) + scheduler.on_script_done(script_log.to_dict(), started_at + spend_time) except Exception as e: logger.exception(e) @@ -308,7 +345,7 @@ def record_script_result(): finally: db_session.close() - return "success" + return {"success": True}, 200 # internal function... @@ -578,5 +615,6 @@ def get_run_statistics_grouped_by_day(): if __name__ == "__main__": - http_server = WSGIServer(("127.0.0.1", 5055), app) + scheduler.start() + http_server = WSGIServer((HOST, 5055), app) http_server.serve_forever() diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 7d6bfe1..99bf4c1 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -1,56 +1,264 @@ +import logging import os -import gc import time -import logging -from threading import Thread +from collections import deque, defaultdict +from datetime import datetime, timedelta +from threading import Thread, Lock, Event from database import DBSession +from faas_scheduler.models import ScriptLog from faas_scheduler.utils import ( - check_and_set_tasks_timeout, + run_script, + get_script_file, + hook_update_script, delete_log_after_days, delete_statistics_after_days, - basic_log, + update_running_scripts_timeout, ) -basic_log("scheduler.log") +logger = logging.getLogger(__name__) -SUB_PROCESS_TIMEOUT = int( - os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15) -) # 15 minutes -logger = logging.getLogger(__name__) +class Scheduler: + def __init__(self, window_secs: int, rate_limit_percent: float): + self.window_secs = window_secs + self.rate_limit_percent = rate_limit_percent -class FAASTaskTimeoutSetter(Thread): + self.queue = deque() - def __init__(self): - super(FAASTaskTimeoutSetter, self).__init__() - self.interval = 60 * 30 # every half an hour + self.key_history = defaultdict(deque) # key -> [(start, end)] + self.key_running = defaultdict(dict) # key -> {script_log_id: start_time} + self.key_dispatched = defaultdict(dict) # key -> {script_log_id: dispatch_time} - def run(self): - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - while True: - logger.info("Start automatic cleanup ...") - db_session = DBSession() - try: - check_and_set_tasks_timeout(db_session) - except Exception as e: - logger.exception("task cleaner error: %s", e) - finally: - db_session.close() + self.lock = Lock() - # python garbage collection - logger.info("gc.collect: %s", str(gc.collect())) + self.wakeup_event = Event() + self.clean_db_records_event = Event() + self.update_running_scripts_timeout_event = Event() - # remove old script_logs and statistics - delete_log_after_days(db_session) - delete_statistics_after_days(db_session) + # -------- public apis -------- + def add_script(self, script_log_info: dict): + with self.lock: + self.queue.append(script_log_info) + self.wakeup_event.set() + + def on_script_start(self, script_log_info: dict, start_time: float): + key = self.get_script_key(script_log_info) + with self.lock: + self.key_dispatched[key].pop(script_log_info["id"], None) + self.key_running[key][script_log_info["id"]] = start_time + self.wakeup_event.set() + + def on_script_done(self, script_log_info: dict, end_time: float): + key = self.get_script_key(script_log_info) + with self.lock: + start_time = self.key_running[key].pop(script_log_info["id"], None) + self.key_history[key].append((start_time, end_time)) + self.wakeup_event.set() + + def start(self): + self.load_pending_script_logs() + Thread(target=self.schedule, daemon=True).start() + Thread(target=self.loop_clean_db_records, daemon=True).start() + Thread(target=self.loop_update_running_scripts_timeout, daemon=True).start() + + # -------- private methods -------- + def get_script_key(self, script_log_info: dict): + if script_log_info["org_id"] == -1: + return script_log_info["owner"] + else: + return script_log_info["org_id"] + + def clean_up(self, now): + for runs in self.key_history.values(): + while runs and runs[0][1] <= now - self.window_secs: + runs.popleft() + + def get_used_time_by_key(self, key, now): + used = 0.0 + + for s, e in self.key_history[key]: + used += e - max(s, now - self.window_secs) + + # running scripts + for s in self.key_running[key].values(): + used += now - s + + # dispatched but not started scripts, use (now - dispatch_time) as used time temporarily + for d in self.key_dispatched[key].values(): + used += now - d + + return used + + def get_earliest_release_time(self, key, now): + times = [] - # sleep - logger.info("Sleep for %d seconds ...", self.interval) - time.sleep(self.interval) + for _, e in self.key_history[key]: + times.append(e + self.window_secs) + for s in self.key_running[key].values(): + times.append(s + self.window_secs) + + for s in self.key_dispatched[key].values(): + times.append(s + self.window_secs) + + return min(times) if times else now + + def run_script(self, script_log_info: dict): + now = time.time() + key = self.get_script_key(script_log_info) + self.key_dispatched[key][script_log_info["id"]] = now + + try: + script_file_info = get_script_file( + script_log_info["dtable_uuid"], script_log_info["script_name"] + ) + run_script( + script_log_info["id"], + script_log_info["dtable_uuid"], + script_log_info["script_name"], + script_file_info["script_url"], + script_file_info["temp_api_token"], + script_log_info["context_data"], + ) + except Exception as e: + logger.exception( + "run script %s %s %s %s %s error %s", + script_log_info["id"], + script_log_info["org_id"], + script_log_info["owner"], + script_log_info["dtable_uuid"], + script_log_info["script_name"], + e, + ) + # record script result + db_session = DBSession() + try: + hook_update_script( + db_session, + script_log_info["id"], + False, + -1, + "Failed", + datetime.fromtimestamp(now), + 0, + ) + except Exception as ee: + logger.exception( + "record script %s %s %s %s %s result error %s", + script_log_info["id"], + script_log_info["org_id"], + script_log_info["owner"], + script_log_info["dtable_uuid"], + script_log_info["script_name"], + ee, + ) + finally: + db_session.close() + self.on_script_done(script_log_info, time.time()) + + def load_pending_script_logs(self): + db_session = DBSession() + try: + script_logs = list( + db_session.query(ScriptLog) + .filter_by(state=ScriptLog.PENDING) + .filter(ScriptLog.created_at > (datetime.now() - timedelta(minutes=5))) + .order_by(ScriptLog.id) + ) + logger.info( + "load %s pending scripts created within 5 mins", len(script_logs) + ) + for script_log in script_logs: + self.add_script(script_log.to_dict()) + except Exception as e: + logger.exception("load pending script logs error %s", e) + finally: + db_session.close() + + # -------- scheduler loop -------- + def schedule(self): + logger.info( + "Start scheduler loop, window_secs: %s rate_limit_percent: %s%%", + self.window_secs, + self.rate_limit_percent, + ) + while True: + now = time.time() + started = False + sleep_until = None + with self.lock: + self.clean_up(now) + + skipped = deque() + + while self.queue: + script_log_info = self.queue.popleft() + key = self.get_script_key(script_log_info) + used = self.get_used_time_by_key(key, now) + + logger.debug( + "script %s key %s used %s used_percent %s%%", + script_log_info["id"], + key, + used, + used / self.window_secs * 100, + ) + + if used / self.window_secs * 100 < self.rate_limit_percent: + logger.info( + "script %s key %s used %s start to dispatch", + script_log_info["id"], + key, + used, + ) + self.run_script(script_log_info) + started = True + break + else: + skipped.append(script_log_info) + t = self.get_earliest_release_time(key, now) + sleep_until = t if sleep_until is None else min(sleep_until, t) + + self.queue = skipped + self.queue + + if started: + continue + + if sleep_until: + logger.info( + "No script can run, sleep until %s", + datetime.fromtimestamp(sleep_until), + ) + timeout = max(0.0, sleep_until - time.time()) + self.wakeup_event.wait(timeout=timeout) + else: + self.wakeup_event.wait(timeout=0.1) + + self.wakeup_event.clear() + + # -------- clean db records loop -------- + def loop_clean_db_records(self): + while True: + db_session = DBSession() + try: + delete_log_after_days(db_session) + delete_statistics_after_days(db_session) + except Exception as e: + logger.exception("clean db records error %s", e) + finally: + db_session.close() + self.clean_db_records_event.wait(timeout=24 * 60 * 60) -if __name__ == "__main__": - task_timeout_setter = FAASTaskTimeoutSetter() - task_timeout_setter.start() + # -------- update state loop -------- + def loop_update_running_scripts_timeout(self): + while True: + db_session = DBSession() + try: + update_running_scripts_timeout(db_session) + except Exception as e: + logger.exception("update running scripts timeout error %s", e) + finally: + db_session.close() + self.update_running_scripts_timeout_event.wait(timeout=15 * 60) diff --git a/starter/runner.py b/starter/runner.py index e4cb429..fe3e73f 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -70,6 +70,9 @@ SEATABLE_USER_UID = 1000 SEATABLE_USER_GID = 1000 +# bind host +HOST = os.environ.get("PYTHON_STARTER_BIND_HOST", "127.0.0.1") + def get_log_level(level): if level.lower() == "info": @@ -132,12 +135,19 @@ def to_python_bool(value): return value.lower() == "true" -def send_to_scheduler(success, return_code, output, spend_time, request_data): +class CallbackScriptRunningError(Exception): + pass + + +def send_to_scheduler( + success, return_code, output, started_at, spend_time, request_data +): """ This function is used to send result of script to scheduler - success: whether script running successfully - return_code: return-code of subprocess - output: output of subprocess or error message + - started_at: start timestamp - spend_time: time subprocess took - request_data: data from request """ @@ -152,7 +162,8 @@ def send_to_scheduler(success, return_code, output, spend_time, request_data): "success": success, "return_code": return_code, "output": output, - "spend_time": spend_time, + "started_at": started_at, + "spend_time": spend_time or 0, } result_data.update( { @@ -182,13 +193,32 @@ def send_to_scheduler(success, return_code, output, spend_time, request_data): ) +def callback_script_running(script_id, started_at): + url = PYTHON_SCHEDULER_URL.strip("/") + "/script-running-callback/" + headers = {"User-Agent": "python-starter/" + VERSION} + resp = requests.post( + url, + headers=headers, + json={"script_id": script_id, "started_at": started_at}, + timeout=30, + ) + if not resp.ok: + raise CallbackScriptRunningError( + f"script {script_id} callback script running error status {resp.status_code} content {resp.content}" + ) + + def run_python(data): logging.info("New python run initalized... (v%s)", VERSION) + started_at = time.time() + + callback_script_running(data.get("script_id"), started_at) + script_url = data.get("script_url") if not script_url: - send_to_scheduler(False, None, "Script URL is missing", None, data) + send_to_scheduler(False, None, "Script URL is missing", started_at, None, data) return if ( to_python_bool(USE_ALTERNATIVE_FILE_SERVER_ROOT) @@ -228,11 +258,11 @@ def run_python(data): logging.error( "Failed to get script from %s, response: %s", script_url, resp ) - send_to_scheduler(False, None, "Fail to get script", None, data) + send_to_scheduler(False, None, "Fail to get script", started_at, None, data) return except Exception as e: logging.error("Failed to get script from %s, error: %s", script_url, e) - send_to_scheduler(False, None, "Fail to get script", None, data) + send_to_scheduler(False, None, "Fail to get script", started_at, None, data) return logging.debug("Generate temporary random folder directory") @@ -264,6 +294,7 @@ def run_python(data): return_code, output = None, "" # init output except Exception as e: logging.error("Failed to save script %s, error: %s", script_url, e) + send_to_scheduler(False, -1, "", started_at, 0, data) return try: @@ -271,6 +302,7 @@ def run_python(data): os.chown(tmp_dir, SEATABLE_USER_UID, SEATABLE_USER_GID) except Exception as e: logging.error("Failed to chown %s, error: %s", tmp_dir, e) + send_to_scheduler(False, -1, "", started_at, 0, data) return logging.debug("prepare the command to start the python runner") @@ -339,8 +371,6 @@ def run_python(data): command.append("run") # override command logging.debug("command: %s", command) - start_at = time.time() - logging.debug("try to start the python runner image") try: result = subprocess.run( @@ -371,6 +401,7 @@ def run_python(data): False, -1, "The script's running time exceeded the limit and the execution was aborted.", + started_at, DEFAULT_SUB_PROCESS_TIMEOUT, data, ) @@ -378,7 +409,7 @@ def run_python(data): except Exception as e: logging.exception(e) logging.error("Failed to run file %s error: %s", script_url, e) - send_to_scheduler(False, None, None, None, data) + send_to_scheduler(False, None, None, started_at, None, data) return else: logging.debug( @@ -388,7 +419,12 @@ def run_python(data): if os.path.isfile(output_file_path): if os.path.islink(output_file_path): send_to_scheduler( - False, -1, "Script invalid!", time.time() - start_at, data + False, + -1, + "Script invalid!", + started_at, + time.time() - started_at, + data, ) return with open(output_file_path, "r") as f: @@ -418,7 +454,7 @@ def run_python(data): except Exception as e: logging.warning("Fail to remove container error: %s", e) - spend_time = time.time() - start_at + spend_time = time.time() - started_at logging.info("python run finished successful. duration was: %s", spend_time) logging.debug( "send this to the scheduler. return_code: %s, output: %s, spend_time: %s, data: %s", @@ -427,7 +463,9 @@ def run_python(data): spend_time, data, ) - send_to_scheduler(return_code == 0, return_code, output, spend_time, data) + send_to_scheduler( + return_code == 0, return_code, output, started_at, spend_time, data + ) #################### @@ -459,4 +497,4 @@ def health_check(): if __name__ == "__main__": - app.run(port=8088, debug=False) + app.run(host=HOST, port=8088, debug=False)