-
Notifications
You must be signed in to change notification settings - Fork 8
feat: Deploy & Run History follow-ups — polling, metrics, deep-links, DB columns, server-side filters #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9ab3ae6
349a442
09fd9b5
04db1af
0ff6aa0
b607842
f1f3f93
e8465cb
4009b8b
40ddbbc
2eedc37
bede6a9
3c44484
8d25507
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -147,6 +147,19 @@ def _send_notification(user_task: UserTaskDetails, run: TaskRunHistory, success: | |
| _send_slack_notification(user_task, run, success) | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # BASE_RESULT cleanup helper | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| def _clear_base_result(): | ||
| """Clear the module-level BASE_RESULT global to prevent stale data across worker reuse.""" | ||
| try: | ||
| from visitran.events.printer import BASE_RESULT | ||
| BASE_RESULT.clear() | ||
| except Exception: | ||
| pass | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Job chaining helper | ||
| # --------------------------------------------------------------------------- | ||
|
|
@@ -264,6 +277,8 @@ def trigger_scheduled_run( | |
| start_time=timezone.now(), | ||
| user_task_detail=user_task, | ||
| kwargs=run_kwargs, | ||
| trigger=trigger, | ||
| scope=scope, | ||
| ) | ||
|
|
||
| # ── Mark task as running ────────────────────────────────────────── | ||
|
|
@@ -321,11 +336,47 @@ def trigger_scheduled_run( | |
| else: | ||
| app_context.execute_visitran_run_command(environment_id=environment_id) | ||
|
|
||
| # ── Capture execution metrics from BASE_RESULT ────────────── | ||
| try: | ||
| from visitran.events.printer import BASE_RESULT | ||
|
|
||
| # Snapshot and immediately clear the global to prevent stale | ||
| # data leaking into a subsequent run on the same worker process. | ||
| results_snapshot = list(BASE_RESULT) | ||
| BASE_RESULT.clear() | ||
|
|
||
| def _clean_name(raw): | ||
| if "'" in raw: | ||
| return raw.split("'")[1].split(".")[-1] | ||
| return raw | ||
|
|
||
| user_results = [ | ||
| r for r in results_snapshot | ||
| if not _clean_name(r.node_name).startswith("Source") | ||
| ] | ||
| run.result = { | ||
| "models": [ | ||
| { | ||
| "name": _clean_name(r.node_name), | ||
| "status": r.status, | ||
| "end_status": r.end_status, | ||
| "sequence": r.sequence_num, | ||
| } | ||
| for r in user_results | ||
| ], | ||
| "total": len(user_results), | ||
| "passed": sum(1 for r in user_results if r.end_status == "OK"), | ||
| "failed": sum(1 for r in user_results if r.end_status == "FAIL"), | ||
| } | ||
| except Exception: | ||
| _clear_base_result() | ||
| logger.debug("Could not capture BASE_RESULT metrics", exc_info=True) | ||
|
|
||
| # ── Mark success ────────────────────────────────────────────── | ||
| success = True | ||
| run.status = "SUCCESS" | ||
| run.end_time = timezone.now() | ||
| run.save(update_fields=["status", "end_time"]) | ||
| run.save(update_fields=["status", "end_time", "result"]) | ||
|
|
||
| user_task.status = TaskStatus.SUCCESS | ||
| user_task.task_completion_time = run.end_time | ||
|
|
@@ -335,11 +386,13 @@ def trigger_scheduled_run( | |
| except (_RunTimeout, SoftTimeLimitExceeded) as exc: | ||
| error_msg = str(exc) if str(exc) else f"Job exceeded timeout of {timeout}s" | ||
| logger.warning("Job %s timed out: %s", user_task.task_name, error_msg) | ||
| _clear_base_result() | ||
| _mark_failure(run, user_task, error_msg) | ||
|
|
||
| except Exception as exc: | ||
| error_msg = str(exc) | ||
| logger.exception("Job %s failed: %s", user_task.task_name, error_msg) | ||
| _clear_base_result() | ||
| _mark_failure(run, user_task, error_msg) | ||
|
|
||
| # ── Retry logic ─────────────────────────────────────────────────── | ||
|
|
@@ -380,10 +433,35 @@ def trigger_scheduled_run( | |
|
|
||
| def _mark_failure(run: TaskRunHistory, user_task: UserTaskDetails, error_msg: str): | ||
| """Helper to mark a run and its parent task as failed.""" | ||
| try: | ||
| from visitran.events.printer import BASE_RESULT | ||
|
|
||
| def _clean(raw): | ||
| return raw.split("'")[1].split(".")[-1] if "'" in raw else raw | ||
|
|
||
| user_results = [ | ||
| r for r in BASE_RESULT if not _clean(r.node_name).startswith("Source") | ||
| ] | ||
| run.result = { | ||
| "models": [ | ||
| { | ||
| "name": _clean(r.node_name), | ||
| "status": r.status, | ||
| "end_status": r.end_status, | ||
| "sequence": r.sequence_num, | ||
| } | ||
| for r in user_results | ||
| ], | ||
| "total": len(user_results), | ||
| "passed": sum(1 for r in user_results if r.end_status == "OK"), | ||
| "failed": sum(1 for r in user_results if r.end_status == "FAIL"), | ||
| } | ||
| except Exception: | ||
| pass | ||
| run.status = "FAILURE" | ||
| run.end_time = timezone.now() | ||
| run.error_message = error_msg | ||
| run.save(update_fields=["status", "end_time", "error_message"]) | ||
| run.save(update_fields=["status", "end_time", "error_message", "result"]) | ||
|
Comment on lines
+436
to
+464
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Both exception handlers call The success path avoids this by snapshotting before clearing ( Prompt To Fix With AIThis is a comment left during a code review.
Path: backend/backend/core/scheduler/celery_tasks.py
Line: 436-464
Comment:
**`_mark_failure` always reads an already-cleared `BASE_RESULT`**
Both exception handlers call `_clear_base_result()` immediately before `_mark_failure()`, so by the time `_mark_failure` reads `BASE_RESULT` (line 443), the list is empty. Every failed run will persist `total: 0, passed: 0, failed: 0` — the partial-execution metrics feature silently produces nothing on failure.
The success path avoids this by snapshotting before clearing (`results_snapshot = list(BASE_RESULT); BASE_RESULT.clear()`). The failure path needs the same treatment: snapshot the results first, clear the global, then pass the snapshot to `_mark_failure`.
How can I resolve this? If you propose a fix, please make it concise. |
||
|
|
||
| user_task.status = TaskStatus.FAILED | ||
| user_task.task_completion_time = run.end_time | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| from django.db import migrations, models | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ("job_scheduler", "0001_initial"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AddField( | ||
| model_name="taskrunhistory", | ||
| name="trigger", | ||
| field=models.CharField( | ||
| choices=[("scheduled", "Scheduled"), ("manual", "Manual")], | ||
| default="scheduled", | ||
| help_text="How the run was initiated: cron/interval schedule or manual dispatch.", | ||
| max_length=20, | ||
| ), | ||
| ), | ||
| migrations.AddField( | ||
| model_name="taskrunhistory", | ||
| name="scope", | ||
| field=models.CharField( | ||
| choices=[("job", "Full job"), ("model", "Single model")], | ||
| default="job", | ||
| help_text="Whether the run executed all job models or a single model.", | ||
| max_length=20, | ||
| ), | ||
| ), | ||
| migrations.AddIndex( | ||
| model_name="taskrunhistory", | ||
| index=models.Index( | ||
| fields=["trigger"], name="job_schedul_trigger_idx" | ||
| ), | ||
| ), | ||
| migrations.AddIndex( | ||
| model_name="taskrunhistory", | ||
| index=models.Index( | ||
| fields=["scope"], name="job_schedul_scope_idx" | ||
| ), | ||
| ), | ||
| ] |
Uh oh!
There was an error while loading. Please reload this page.