diff --git a/backend/backend/core/scheduler/celery_tasks.py b/backend/backend/core/scheduler/celery_tasks.py index add5a2f..1ce72d8 100644 --- a/backend/backend/core/scheduler/celery_tasks.py +++ b/backend/backend/core/scheduler/celery_tasks.py @@ -147,6 +147,19 @@ def _send_notification(user_task: UserTaskDetails, run: TaskRunHistory, success: _send_slack_notification(user_task, run, success) +# --------------------------------------------------------------------------- +# BASE_RESULT cleanup helper +# --------------------------------------------------------------------------- + +def _clear_base_result(): + """Clear the module-level BASE_RESULT global to prevent stale data across worker reuse.""" + try: + from visitran.events.printer import BASE_RESULT + BASE_RESULT.clear() + except Exception: + pass + + # --------------------------------------------------------------------------- # Job chaining helper # --------------------------------------------------------------------------- @@ -264,6 +277,8 @@ def trigger_scheduled_run( start_time=timezone.now(), user_task_detail=user_task, kwargs=run_kwargs, + trigger=trigger, + scope=scope, ) # ── Mark task as running ────────────────────────────────────────── @@ -321,11 +336,47 @@ def trigger_scheduled_run( else: app_context.execute_visitran_run_command(environment_id=environment_id) + # ── Capture execution metrics from BASE_RESULT ────────────── + try: + from visitran.events.printer import BASE_RESULT + + # Snapshot and immediately clear the global to prevent stale + # data leaking into a subsequent run on the same worker process. + results_snapshot = list(BASE_RESULT) + BASE_RESULT.clear() + + def _clean_name(raw): + if "'" in raw: + return raw.split("'")[1].split(".")[-1] + return raw + + user_results = [ + r for r in results_snapshot + if not _clean_name(r.node_name).startswith("Source") + ] + run.result = { + "models": [ + { + "name": _clean_name(r.node_name), + "status": r.status, + "end_status": r.end_status, + "sequence": r.sequence_num, + } + for r in user_results + ], + "total": len(user_results), + "passed": sum(1 for r in user_results if r.end_status == "OK"), + "failed": sum(1 for r in user_results if r.end_status == "FAIL"), + } + except Exception: + _clear_base_result() + logger.debug("Could not capture BASE_RESULT metrics", exc_info=True) + # ── Mark success ────────────────────────────────────────────── success = True run.status = "SUCCESS" run.end_time = timezone.now() - run.save(update_fields=["status", "end_time"]) + run.save(update_fields=["status", "end_time", "result"]) user_task.status = TaskStatus.SUCCESS user_task.task_completion_time = run.end_time @@ -336,11 +387,13 @@ def trigger_scheduled_run( error_msg = str(exc) if str(exc) else f"Job exceeded timeout of {timeout}s" logger.warning("Job %s timed out: %s", user_task.task_name, error_msg) _mark_failure(run, user_task, error_msg) + _clear_base_result() except Exception as exc: error_msg = str(exc) logger.exception("Job %s failed: %s", user_task.task_name, error_msg) _mark_failure(run, user_task, error_msg) + _clear_base_result() # ── Retry logic ─────────────────────────────────────────────────── if not success and user_task.max_retries > 0 and retry_num < user_task.max_retries: @@ -380,10 +433,35 @@ def trigger_scheduled_run( def _mark_failure(run: TaskRunHistory, user_task: UserTaskDetails, error_msg: str): """Helper to mark a run and its parent task as failed.""" + try: + from visitran.events.printer import BASE_RESULT + + def _clean(raw): + return raw.split("'")[1].split(".")[-1] if "'" in raw else raw + + user_results = [ + r for r in BASE_RESULT if not _clean(r.node_name).startswith("Source") + ] + run.result = { + "models": [ + { + "name": _clean(r.node_name), + "status": r.status, + "end_status": r.end_status, + "sequence": r.sequence_num, + } + for r in user_results + ], + "total": len(user_results), + "passed": sum(1 for r in user_results if r.end_status == "OK"), + "failed": sum(1 for r in user_results if r.end_status == "FAIL"), + } + except Exception: + pass run.status = "FAILURE" run.end_time = timezone.now() run.error_message = error_msg - run.save(update_fields=["status", "end_time", "error_message"]) + run.save(update_fields=["status", "end_time", "error_message", "result"]) user_task.status = TaskStatus.FAILED user_task.task_completion_time = run.end_time diff --git a/backend/backend/core/scheduler/migrations/0002_taskrunhistory_trigger_scope.py b/backend/backend/core/scheduler/migrations/0002_taskrunhistory_trigger_scope.py new file mode 100644 index 0000000..aee8c7a --- /dev/null +++ b/backend/backend/core/scheduler/migrations/0002_taskrunhistory_trigger_scope.py @@ -0,0 +1,43 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("job_scheduler", "0001_initial"), + ] + + operations = [ + migrations.AddField( + model_name="taskrunhistory", + name="trigger", + field=models.CharField( + choices=[("scheduled", "Scheduled"), ("manual", "Manual")], + default="scheduled", + help_text="How the run was initiated: cron/interval schedule or manual dispatch.", + max_length=20, + ), + ), + migrations.AddField( + model_name="taskrunhistory", + name="scope", + field=models.CharField( + choices=[("job", "Full job"), ("model", "Single model")], + default="job", + help_text="Whether the run executed all job models or a single model.", + max_length=20, + ), + ), + migrations.AddIndex( + model_name="taskrunhistory", + index=models.Index( + fields=["trigger"], name="job_schedul_trigger_idx" + ), + ), + migrations.AddIndex( + model_name="taskrunhistory", + index=models.Index( + fields=["scope"], name="job_schedul_scope_idx" + ), + ), + ] diff --git a/backend/backend/core/scheduler/models.py b/backend/backend/core/scheduler/models.py index a2130d7..799ac84 100644 --- a/backend/backend/core/scheduler/models.py +++ b/backend/backend/core/scheduler/models.py @@ -132,6 +132,18 @@ class TaskRunHistory(DefaultOrganizationMixin, BaseModel): kwargs = models.JSONField(blank=True, null=True) result = models.JSONField(blank=True, null=True) error_message = models.TextField(blank=True, null=True) + trigger = models.CharField( + max_length=20, + choices=[("scheduled", "Scheduled"), ("manual", "Manual")], + default="scheduled", + help_text="How the run was initiated: cron/interval schedule or manual dispatch.", + ) + scope = models.CharField( + max_length=20, + choices=[("job", "Full job"), ("model", "Single model")], + default="job", + help_text="Whether the run executed all job models or a single model.", + ) user_task_detail = models.ForeignKey( UserTaskDetails, @@ -154,6 +166,8 @@ class Meta: models.Index( fields=["user_task_detail"], name="job_schedul_user_ta_5cd43a_idx" ), + models.Index(fields=["trigger"], name="job_schedul_trigger_idx"), + models.Index(fields=["scope"], name="job_schedul_scope_idx"), ] def __str__(self): diff --git a/backend/backend/core/scheduler/views.py b/backend/backend/core/scheduler/views.py index c0c341e..6fcd35b 100644 --- a/backend/backend/core/scheduler/views.py +++ b/backend/backend/core/scheduler/views.py @@ -586,7 +586,19 @@ def task_run_history(request, project_id, user_task_id): if _is_valid_project_id(project_id): query["project__project_uuid"] = project_id task = UserTaskDetails.objects.get(**query) - runs = TaskRunHistory.objects.filter(user_task_detail=task).order_by("-start_time") + runs = TaskRunHistory.objects.filter(user_task_detail=task) + + trigger_filter = request.GET.get("trigger") + scope_filter = request.GET.get("scope") + status_filter = request.GET.get("status") + if trigger_filter: + runs = runs.filter(trigger=trigger_filter) + if scope_filter: + runs = runs.filter(scope=scope_filter) + if status_filter: + runs = runs.filter(status=status_filter) + + runs = runs.order_by("-start_time") total = runs.count() offset = (page - 1) * limit @@ -747,13 +759,13 @@ def list_recent_runs_for_model(request, project_id, model_name): env = task.environment kwargs = run.kwargs or {} models_override = kwargs.get("models_override") or [] - # Back-compat: rows written before the trigger/scope split only - # carried kwargs.source=="quick_deploy" as their manual-model marker. + # Prefer first-class DB columns; fall back to kwargs for rows + # written before the trigger/scope migration. legacy_source = kwargs.get("source") - trigger = kwargs.get("trigger") or ( + trigger = run.trigger or kwargs.get("trigger") or ( "manual" if legacy_source == "quick_deploy" else "scheduled" ) - scope = kwargs.get("scope") or ( + scope = run.scope or kwargs.get("scope") or ( "model" if models_override or legacy_source == "quick_deploy" else "job" ) data.append({ diff --git a/frontend/src/ide/editor/no-code-model/no-code-model.jsx b/frontend/src/ide/editor/no-code-model/no-code-model.jsx index fe7aa9e..ee73807 100644 --- a/frontend/src/ide/editor/no-code-model/no-code-model.jsx +++ b/frontend/src/ide/editor/no-code-model/no-code-model.jsx @@ -258,6 +258,7 @@ function NoCodeModel({ nodeData }) { runTaskForModel, runTask, listRecentRunsForModel, + getLatestRunStatus, } = useJobService(); const [quickDeployModal, setQuickDeployModal] = useState({ @@ -272,8 +273,10 @@ function NoCodeModel({ nodeData }) { const [recentRunsState, setRecentRunsState] = useState({ loading: false, runs: [], - fetchedFor: null, // model name the current runs are for + fetchedFor: null, }); + const [deployPolling, setDeployPolling] = useState(null); + const pollingRef = useRef(null); const modelName = nodeData?.node?.title || @@ -1829,16 +1832,29 @@ function NoCodeModel({ nodeData }) { ); const envName = selected?.environment_name || "the selected environment"; const jobName = selected?.task_name || ""; + const taskId = encodeURIComponent(quickDeployModal.selectedTaskId); notify({ type: "success", message: "Deploy Triggered", - description: - selectedScope === "job" - ? `Job "${jobName}" is running on "${envName}" (all enabled models). Check Run History for progress.` - : `"${currentModelName}" is running on "${envName}" via job "${jobName}". Check Run History for progress.`, + renderMarkdown: false, + description: ( + + {selectedScope === "job" + ? `Job "${jobName}" is running on "${envName}" (all enabled models). ` + : `"${currentModelName}" is running on "${envName}" via job "${jobName}". `} + { + e.preventDefault(); + navigate(`/project/job/history?task=${taskId}`); + }} + > + View in Run History → + + + ), }); - setRefreshModels(true); - setRecentRunsState((prev) => ({ ...prev, fetchedFor: null })); + startDeployPolling(quickDeployModal.selectedTaskId); setQuickDeployModal((prev) => ({ ...prev, open: false, @@ -1850,9 +1866,71 @@ function NoCodeModel({ nodeData }) { } }; + const startDeployPolling = (taskId) => { + if (pollingRef.current) clearInterval(pollingRef.current); + setDeployPolling({ taskId, status: "STARTED" }); + pollingRef.current = setInterval(async () => { + try { + const run = await getLatestRunStatus(projectId, taskId); + if (!run) return; + const terminal = ["SUCCESS", "FAILURE", "REVOKED"].includes(run.status); + if (terminal) { + clearInterval(pollingRef.current); + pollingRef.current = null; + setDeployPolling(null); + setRefreshModels(true); + setRecentRunsState((prev) => ({ ...prev, fetchedFor: null })); + notify({ + type: run.status === "SUCCESS" ? "success" : "error", + message: + run.status === "SUCCESS" ? "Deploy Completed" : "Deploy Failed", + renderMarkdown: false, + description: ( + + {run.status === "SUCCESS" + ? "Model deployed successfully." + : run.error_message || "Check Run History for details."}{" "} + { + e.preventDefault(); + navigate( + `/project/job/history?task=${encodeURIComponent(taskId)}` + ); + }} + > + View in Run History → + + + ), + }); + } else { + setDeployPolling((prev) => + prev ? { ...prev, status: run.status } : null + ); + } + } catch { + // Silently retry on next interval + } + }, 5000); + }; + + useEffect(() => { + return () => { + if (pollingRef.current) clearInterval(pollingRef.current); + }; + }, []); + const goToScheduler = () => { setQuickDeployModal((prev) => ({ ...prev, open: false })); - navigate("/project/job/list"); + const params = new URLSearchParams(); + params.set("create", "1"); + if (projectId) params.set("project", projectId); + const modelTitle = nodeData?.node?.title; + if (modelTitle) params.set("model", modelTitle); + navigate(`/project/job/list?${params.toString()}`); }; const runTransformation = (spec) => { @@ -2823,9 +2901,10 @@ function NoCodeModel({ nodeData }) { !can_write || !nodeData?.node?.title } - icon={} + loading={!!deployPolling} + icon={!deployPolling ? : undefined} > - Quick Deploy + {deployPolling ? "Deploying…" : "Quick Deploy"} { const kw = row?.kwargs || {}; const legacyQuick = kw.source === "quick_deploy"; const models = kw.models_override || []; - const trigger = kw.trigger || (legacyQuick ? "manual" : "scheduled"); + const trigger = + row?.trigger || kw.trigger || (legacyQuick ? "manual" : "scheduled"); const scope = - kw.scope || (models.length > 0 || legacyQuick ? "model" : "job"); + row?.scope || + kw.scope || + (models.length > 0 || legacyQuick ? "model" : "job"); return { trigger, scope, models }; }; @@ -114,15 +117,19 @@ const Runhistory = () => { /* ─── API calls ─── */ const getRunHistoryList = useCallback( - async (Id, page = currentPage, limit = pageSize) => { + async (Id, page = currentPage, limit = pageSize, filters = {}) => { setLoading(true); try { + const params = { page, limit }; + if (filters.status) params.status = filters.status; + if (filters.trigger) params.trigger = filters.trigger; + if (filters.scope) params.scope = filters.scope; const res = await axios({ method: "GET", url: `/api/v1/visitran/${ selectedOrgId || "default_org" }/project/_all/jobs/run-history/${Id}`, - params: { page, limit }, + params, }); const { page_items, total_items, current_page } = res.data.data; setTotalCount(total_items); @@ -137,7 +144,7 @@ const Runhistory = () => { setLoading(false); } }, - [axios, selectedOrgId, currentPage, pageSize, notify] + [axios, selectedOrgId, notify] ); const getJobList = async () => { @@ -184,37 +191,37 @@ const Runhistory = () => { getJobList(); }, []); - /* ─── client-side status + trigger + scope filters ─── */ + /* ─── server-side filtering: refetch when filters change ─── */ useEffect(() => { - let filtered = backUpData; - if (filterQueries.status) { - filtered = filtered.filter((el) => el.status === filterQueries.status); - } - if (filterQueries.trigger) { - filtered = filtered.filter( - (el) => getRunTriggerScope(el).trigger === filterQueries.trigger - ); - } - if (filterQueries.scope) { - filtered = filtered.filter( - (el) => getRunTriggerScope(el).scope === filterQueries.scope - ); - } - setJobHistoryData(filtered); + if (!filterQueries.job) return; + getRunHistoryList(filterQueries.job, 1, pageSize, { + status: filterQueries.status, + trigger: filterQueries.trigger, + scope: filterQueries.scope, + }); }, [ filterQueries.status, filterQueries.trigger, filterQueries.scope, - backUpData, + filterQueries.job, + getRunHistoryList, + pageSize, ]); - /* ─── auto-expand failed rows on fresh data load (not on filter changes) ─── */ + /* ─── auto-expand on fresh data load ─── */ useEffect(() => { - const failedIds = (backUpData || []) + const ids = []; + const fromDeepLink = searchParams.has("task"); + if (fromDeepLink && backUpData.length > 0) { + ids.push(backUpData[0].id); + } + (backUpData || []) .filter((r) => r.status === "FAILURE" && r.error_message) - .map((r) => r.id); - setExpandedRowKeys(failedIds); - }, [backUpData]); + .forEach((r) => { + if (!ids.includes(r.id)) ids.push(r.id); + }); + setExpandedRowKeys(ids); + }, [backUpData, searchParams]); /* ─── handlers ─── */ const handleJobChange = useCallback( @@ -248,19 +255,27 @@ const Runhistory = () => { const handleRefresh = useCallback(() => { if (filterQueries.job) { - getRunHistoryList(filterQueries.job); + getRunHistoryList(filterQueries.job, currentPage, pageSize, { + status: filterQueries.status, + trigger: filterQueries.trigger, + scope: filterQueries.scope, + }); } - }, [filterQueries.job]); + }, [filterQueries, currentPage, pageSize, getRunHistoryList]); const handlePagination = useCallback( (newPage, newPageSize) => { if (currentPage !== newPage || pageSize !== newPageSize) { setCurrentPage(newPage); setPageSize(newPageSize); - getRunHistoryList(envInfo.id, newPage, newPageSize); + getRunHistoryList(envInfo.id, newPage, newPageSize, { + status: filterQueries.status, + trigger: filterQueries.trigger, + scope: filterQueries.scope, + }); } }, - [currentPage, pageSize, envInfo.id] + [currentPage, pageSize, envInfo.id, filterQueries, getRunHistoryList] ); const handleExpand = useCallback((expanded, record) => { @@ -590,6 +605,37 @@ const Runhistory = () => { : "No model configuration recorded for this run."} + {record.result?.total > 0 && ( +
+ + {record.result.total || 0} models + attempted + + + {record.result.passed || 0} passed + + + {record.result.failed || 0} failed + + {record.result.models?.length > 0 && ( + + {record.result.models + .map((m) => `${m.name} (${m.end_status})`) + .join(", ")} + + )} +
+ )} {isFailure && record.error_message && ( { + if (!open || !prefillProject || isEditMode) return; + form.setFieldsValue({ project: prefillProject }); + setSelectedProjectId(prefillProject); + }, [open, prefillProject, isEditMode, form]); + + /* ─── pre-fill model from Quick Deploy CTA ─── */ + useEffect(() => { + if (!open || !prefillModel || isEditMode) return; + setModelConfigs((prev) => ({ + ...prev, + [prefillModel]: { + ...prev[prefillModel], + enabled: true, + materialization: prev[prefillModel]?.materialization || "TABLE", + }, + })); + setModelConfigActiveKey((prev) => + prev.includes("model-config") ? prev : ["model-config"] + ); + }, [open, prefillModel, isEditMode]); + /* ─── load existing job when editing ─── */ useEffect(() => { if (!open || !selectedJobDeployId) return; @@ -779,6 +804,8 @@ JobDeploy.propTypes = { PropTypes.number, ]), setIsJobListModified: PropTypes.func.isRequired, + prefillModel: PropTypes.string, + prefillProject: PropTypes.string, }; JobDeploy.displayName = "JobDeploy"; diff --git a/frontend/src/ide/scheduler/JobList.jsx b/frontend/src/ide/scheduler/JobList.jsx index ca0a521..7bca882 100644 --- a/frontend/src/ide/scheduler/JobList.jsx +++ b/frontend/src/ide/scheduler/JobList.jsx @@ -1,7 +1,7 @@ import { useEffect, useState, useCallback, useMemo } from "react"; import { Alert, Button, Space, Typography, Modal, Pagination } from "antd"; import debounce from "lodash/debounce"; -import { useNavigate } from "react-router-dom"; +import { useNavigate, useSearchParams } from "react-router-dom"; import { checkPermission } from "../../common/helpers"; import { useNotificationService } from "../../service/notification-service"; @@ -38,6 +38,9 @@ const JobList = () => { const [openJobDeploy, setOpenJobDeploy] = useState(false); const [selectedJobId, setSelectedJobId] = useState(null); const [searchQuery, setSearchQuery] = useState(""); + const [searchParams, setSearchParams] = useSearchParams(); + const [prefillModel, setPrefillModel] = useState(null); + const [prefillProject, setPrefillProject] = useState(null); const [filters, setFilters] = useState({ proj: "all", env: "all" }); const { currentPage, @@ -154,9 +157,22 @@ const JobList = () => { }, []); useEffect(() => { - if (!openJobDeploy) setSelectedJobId(null); + if (!openJobDeploy) { + setSelectedJobId(null); + setPrefillModel(null); + setPrefillProject(null); + } }, [openJobDeploy]); + useEffect(() => { + if (searchParams.get("create") === "1") { + setPrefillModel(searchParams.get("model") || null); + setPrefillProject(searchParams.get("project") || null); + setOpenJobDeploy(true); + setSearchParams({}, { replace: true }); + } + }, [searchParams, setSearchParams]); + const onDelete = async () => { try { await deleteTask(delTaskDetail.projectId, delTaskDetail.taskId); @@ -259,6 +275,8 @@ const JobList = () => { setOpen={setOpenJobDeploy} selectedJobDeployId={selectedJobId} setIsJobListModified={setIsJobListModified} + prefillModel={prefillModel} + prefillProject={prefillProject} /> { + const url = `${jobsUrl(projId)}/run-history/${taskId}`; + const response = await axiosPrivate.get(url, { + params: { page: 1, limit: 1 }, + }); + const runs = response.data?.data?.page_items?.run_history || []; + return runs.length > 0 ? runs[0] : null; + }; + const listRecentRunsForModel = async (projId, modelName, limit = 5) => { const url = `${jobsUrl( projId @@ -144,6 +153,7 @@ export function useJobService() { runTaskForModel, listDeployCandidates, listRecentRunsForModel, + getLatestRunStatus, getProjects, getEnvironments, getProjectModels,