Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 80 additions & 2 deletions backend/backend/core/scheduler/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,19 @@ def _send_notification(user_task: UserTaskDetails, run: TaskRunHistory, success:
_send_slack_notification(user_task, run, success)


# ---------------------------------------------------------------------------
# BASE_RESULT cleanup helper
# ---------------------------------------------------------------------------

def _clear_base_result():
"""Clear the module-level BASE_RESULT global to prevent stale data across worker reuse."""
try:
from visitran.events.printer import BASE_RESULT
BASE_RESULT.clear()
except Exception:
pass


# ---------------------------------------------------------------------------
# Job chaining helper
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -264,6 +277,8 @@ def trigger_scheduled_run(
start_time=timezone.now(),
user_task_detail=user_task,
kwargs=run_kwargs,
trigger=trigger,
scope=scope,
)

# ── Mark task as running ──────────────────────────────────────────
Expand Down Expand Up @@ -321,11 +336,47 @@ def trigger_scheduled_run(
else:
app_context.execute_visitran_run_command(environment_id=environment_id)

# ── Capture execution metrics from BASE_RESULT ──────────────
try:
from visitran.events.printer import BASE_RESULT

# Snapshot and immediately clear the global to prevent stale
# data leaking into a subsequent run on the same worker process.
results_snapshot = list(BASE_RESULT)
BASE_RESULT.clear()

def _clean_name(raw):
if "'" in raw:
return raw.split("'")[1].split(".")[-1]
return raw

user_results = [
r for r in results_snapshot
if not _clean_name(r.node_name).startswith("Source")
]
run.result = {
"models": [
{
"name": _clean_name(r.node_name),
"status": r.status,
"end_status": r.end_status,
"sequence": r.sequence_num,
}
for r in user_results
],
"total": len(user_results),
"passed": sum(1 for r in user_results if r.end_status == "OK"),
"failed": sum(1 for r in user_results if r.end_status == "FAIL"),
}
except Exception:
_clear_base_result()
logger.debug("Could not capture BASE_RESULT metrics", exc_info=True)
Comment thread
greptile-apps[bot] marked this conversation as resolved.

# ── Mark success ──────────────────────────────────────────────
success = True
run.status = "SUCCESS"
run.end_time = timezone.now()
run.save(update_fields=["status", "end_time"])
run.save(update_fields=["status", "end_time", "result"])

user_task.status = TaskStatus.SUCCESS
user_task.task_completion_time = run.end_time
Expand All @@ -335,11 +386,13 @@ def trigger_scheduled_run(
except (_RunTimeout, SoftTimeLimitExceeded) as exc:
error_msg = str(exc) if str(exc) else f"Job exceeded timeout of {timeout}s"
logger.warning("Job %s timed out: %s", user_task.task_name, error_msg)
_clear_base_result()
_mark_failure(run, user_task, error_msg)

except Exception as exc:
error_msg = str(exc)
logger.exception("Job %s failed: %s", user_task.task_name, error_msg)
_clear_base_result()
_mark_failure(run, user_task, error_msg)

# ── Retry logic ───────────────────────────────────────────────────
Expand Down Expand Up @@ -380,10 +433,35 @@ def trigger_scheduled_run(

def _mark_failure(run: TaskRunHistory, user_task: UserTaskDetails, error_msg: str):
"""Helper to mark a run and its parent task as failed."""
try:
from visitran.events.printer import BASE_RESULT

def _clean(raw):
return raw.split("'")[1].split(".")[-1] if "'" in raw else raw

user_results = [
r for r in BASE_RESULT if not _clean(r.node_name).startswith("Source")
]
run.result = {
"models": [
{
"name": _clean(r.node_name),
"status": r.status,
"end_status": r.end_status,
"sequence": r.sequence_num,
}
for r in user_results
],
"total": len(user_results),
"passed": sum(1 for r in user_results if r.end_status == "OK"),
"failed": sum(1 for r in user_results if r.end_status == "FAIL"),
}
except Exception:
pass
run.status = "FAILURE"
run.end_time = timezone.now()
run.error_message = error_msg
run.save(update_fields=["status", "end_time", "error_message"])
run.save(update_fields=["status", "end_time", "error_message", "result"])
Comment on lines +436 to +464
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _mark_failure always reads an already-cleared BASE_RESULT

Both exception handlers call _clear_base_result() immediately before _mark_failure(), so by the time _mark_failure reads BASE_RESULT (line 443), the list is empty. Every failed run will persist total: 0, passed: 0, failed: 0 — the partial-execution metrics feature silently produces nothing on failure.

The success path avoids this by snapshotting before clearing (results_snapshot = list(BASE_RESULT); BASE_RESULT.clear()). The failure path needs the same treatment: snapshot the results first, clear the global, then pass the snapshot to _mark_failure.

Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/core/scheduler/celery_tasks.py
Line: 436-464

Comment:
**`_mark_failure` always reads an already-cleared `BASE_RESULT`**

Both exception handlers call `_clear_base_result()` immediately before `_mark_failure()`, so by the time `_mark_failure` reads `BASE_RESULT` (line 443), the list is empty. Every failed run will persist `total: 0, passed: 0, failed: 0` — the partial-execution metrics feature silently produces nothing on failure.

The success path avoids this by snapshotting before clearing (`results_snapshot = list(BASE_RESULT); BASE_RESULT.clear()`). The failure path needs the same treatment: snapshot the results first, clear the global, then pass the snapshot to `_mark_failure`.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code


user_task.status = TaskStatus.FAILED
user_task.task_completion_time = run.end_time
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("job_scheduler", "0001_initial"),
]

operations = [
migrations.AddField(
model_name="taskrunhistory",
name="trigger",
field=models.CharField(
choices=[("scheduled", "Scheduled"), ("manual", "Manual")],
default="scheduled",
help_text="How the run was initiated: cron/interval schedule or manual dispatch.",
max_length=20,
),
),
migrations.AddField(
model_name="taskrunhistory",
name="scope",
field=models.CharField(
choices=[("job", "Full job"), ("model", "Single model")],
default="job",
help_text="Whether the run executed all job models or a single model.",
max_length=20,
),
),
migrations.AddIndex(
model_name="taskrunhistory",
index=models.Index(
fields=["trigger"], name="job_schedul_trigger_idx"
),
),
migrations.AddIndex(
model_name="taskrunhistory",
index=models.Index(
fields=["scope"], name="job_schedul_scope_idx"
),
),
]
14 changes: 14 additions & 0 deletions backend/backend/core/scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ class TaskRunHistory(DefaultOrganizationMixin, BaseModel):
kwargs = models.JSONField(blank=True, null=True)
result = models.JSONField(blank=True, null=True)
error_message = models.TextField(blank=True, null=True)
trigger = models.CharField(
max_length=20,
choices=[("scheduled", "Scheduled"), ("manual", "Manual")],
default="scheduled",
help_text="How the run was initiated: cron/interval schedule or manual dispatch.",
)
scope = models.CharField(
max_length=20,
choices=[("job", "Full job"), ("model", "Single model")],
default="job",
help_text="Whether the run executed all job models or a single model.",
)

user_task_detail = models.ForeignKey(
UserTaskDetails,
Expand All @@ -154,6 +166,8 @@ class Meta:
models.Index(
fields=["user_task_detail"], name="job_schedul_user_ta_5cd43a_idx"
),
models.Index(fields=["trigger"], name="job_schedul_trigger_idx"),
models.Index(fields=["scope"], name="job_schedul_scope_idx"),
]

def __str__(self):
Expand Down
22 changes: 17 additions & 5 deletions backend/backend/core/scheduler/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,19 @@ def task_run_history(request, project_id, user_task_id):
if _is_valid_project_id(project_id):
query["project__project_uuid"] = project_id
task = UserTaskDetails.objects.get(**query)
runs = TaskRunHistory.objects.filter(user_task_detail=task).order_by("-start_time")
runs = TaskRunHistory.objects.filter(user_task_detail=task)

trigger_filter = request.GET.get("trigger")
scope_filter = request.GET.get("scope")
status_filter = request.GET.get("status")
if trigger_filter:
runs = runs.filter(trigger=trigger_filter)
if scope_filter:
runs = runs.filter(scope=scope_filter)
if status_filter:
runs = runs.filter(status=status_filter)

runs = runs.order_by("-start_time")
total = runs.count()

offset = (page - 1) * limit
Expand Down Expand Up @@ -747,13 +759,13 @@ def list_recent_runs_for_model(request, project_id, model_name):
env = task.environment
kwargs = run.kwargs or {}
models_override = kwargs.get("models_override") or []
# Back-compat: rows written before the trigger/scope split only
# carried kwargs.source=="quick_deploy" as their manual-model marker.
# Prefer first-class DB columns; fall back to kwargs for rows
# written before the trigger/scope migration.
legacy_source = kwargs.get("source")
trigger = kwargs.get("trigger") or (
trigger = run.trigger or kwargs.get("trigger") or (
"manual" if legacy_source == "quick_deploy" else "scheduled"
)
scope = kwargs.get("scope") or (
scope = run.scope or kwargs.get("scope") or (
"model" if models_override or legacy_source == "quick_deploy" else "job"
)
data.append({
Expand Down
99 changes: 89 additions & 10 deletions frontend/src/ide/editor/no-code-model/no-code-model.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ function NoCodeModel({ nodeData }) {
runTaskForModel,
runTask,
listRecentRunsForModel,
getLatestRunStatus,
} = useJobService();

const [quickDeployModal, setQuickDeployModal] = useState({
Expand All @@ -272,8 +273,10 @@ function NoCodeModel({ nodeData }) {
const [recentRunsState, setRecentRunsState] = useState({
loading: false,
runs: [],
fetchedFor: null, // model name the current runs are for
fetchedFor: null,
});
const [deployPolling, setDeployPolling] = useState(null);
const pollingRef = useRef(null);

const modelName =
nodeData?.node?.title ||
Expand Down Expand Up @@ -1829,16 +1832,29 @@ function NoCodeModel({ nodeData }) {
);
const envName = selected?.environment_name || "the selected environment";
const jobName = selected?.task_name || "";
const taskId = encodeURIComponent(quickDeployModal.selectedTaskId);
notify({
type: "success",
message: "Deploy Triggered",
description:
selectedScope === "job"
? `Job "${jobName}" is running on "${envName}" (all enabled models). Check Run History for progress.`
: `"${currentModelName}" is running on "${envName}" via job "${jobName}". Check Run History for progress.`,
renderMarkdown: false,
description: (
<span>
{selectedScope === "job"
? `Job "${jobName}" is running on "${envName}" (all enabled models). `
: `"${currentModelName}" is running on "${envName}" via job "${jobName}". `}
<a
href={`/project/job/history?task=${taskId}`}
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
onClick={(e) => {
e.preventDefault();
navigate(`/project/job/history?task=${taskId}`);
}}
>
View in Run History →
</a>
</span>
),
});
setRefreshModels(true);
setRecentRunsState((prev) => ({ ...prev, fetchedFor: null }));
startDeployPolling(quickDeployModal.selectedTaskId);
setQuickDeployModal((prev) => ({
...prev,
open: false,
Expand All @@ -1850,9 +1866,71 @@ function NoCodeModel({ nodeData }) {
}
};

const startDeployPolling = (taskId) => {
if (pollingRef.current) clearInterval(pollingRef.current);
setDeployPolling({ taskId, status: "STARTED" });
pollingRef.current = setInterval(async () => {
try {
const run = await getLatestRunStatus(projectId, taskId);
if (!run) return;
const terminal = ["SUCCESS", "FAILURE", "REVOKED"].includes(run.status);
if (terminal) {
clearInterval(pollingRef.current);
pollingRef.current = null;
setDeployPolling(null);
setRefreshModels(true);
setRecentRunsState((prev) => ({ ...prev, fetchedFor: null }));
notify({
type: run.status === "SUCCESS" ? "success" : "error",
message:
run.status === "SUCCESS" ? "Deploy Completed" : "Deploy Failed",
renderMarkdown: false,
description: (
<span>
{run.status === "SUCCESS"
? "Model deployed successfully."
: run.error_message || "Check Run History for details."}{" "}
<a
href={`/project/job/history?task=${encodeURIComponent(
taskId
)}`}
onClick={(e) => {
e.preventDefault();
navigate(
`/project/job/history?task=${encodeURIComponent(taskId)}`
);
}}
>
View in Run History →
</a>
</span>
),
});
} else {
setDeployPolling((prev) =>
prev ? { ...prev, status: run.status } : null
);
}
} catch {
// Silently retry on next interval
}
}, 5000);
};

useEffect(() => {
return () => {
if (pollingRef.current) clearInterval(pollingRef.current);
};
}, []);

const goToScheduler = () => {
setQuickDeployModal((prev) => ({ ...prev, open: false }));
navigate("/project/job/list");
const params = new URLSearchParams();
params.set("create", "1");
if (projectId) params.set("project", projectId);
const modelTitle = nodeData?.node?.title;
if (modelTitle) params.set("model", modelTitle);
navigate(`/project/job/list?${params.toString()}`);
};

const runTransformation = (spec) => {
Expand Down Expand Up @@ -2823,9 +2901,10 @@ function NoCodeModel({ nodeData }) {
!can_write ||
!nodeData?.node?.title
}
icon={<PlayCircleOutlined />}
loading={!!deployPolling}
icon={!deployPolling ? <PlayCircleOutlined /> : undefined}
>
Quick Deploy
{deployPolling ? "Deploying…" : "Quick Deploy"}
</Button>
<Dropdown
trigger={["click"]}
Expand Down
Loading
Loading