Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions backend/backend/application/file_explorer/file_explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,25 @@ def load_models(self, session: Session):
# Sort models by execution order (DAG order)
sorted_model_names = topological_sort_models(models_with_refs)

# Build a lookup from model name -> model object for status fields
model_lookup = {m.model_name: m for m in all_models}

# Build the model structure in sorted order
no_code_model_structure = []
for no_code_model_name in sorted_model_names:
no_code_model_structure.append(
{
"extension": no_code_model_name,
"title": no_code_model_name,
"key": f"{self.project_name}/models/no_code/{no_code_model_name}",
"is_folder": False,
"type": "NO_CODE_MODEL",
}
)
model = model_lookup.get(no_code_model_name)
model_data = {
"extension": no_code_model_name,
"title": no_code_model_name,
"key": f"{self.project_name}/models/no_code/{no_code_model_name}",
"is_folder": False,
"type": "NO_CODE_MODEL",
"run_status": getattr(model, "run_status", None),
"failure_reason": getattr(model, "failure_reason", None),
"last_run_at": model.last_run_at.isoformat() if getattr(model, "last_run_at", None) else None,
"run_duration": getattr(model, "run_duration", None),
}
no_code_model_structure.append(model_data)
model_structure: dict[str, Any] = {
"title": "models",
"key": f"{self.project_name}/models",
Expand Down
62 changes: 62 additions & 0 deletions backend/backend/core/migrations/0003_add_model_run_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from django.db import migrations, models
Comment thread
tahierhussain marked this conversation as resolved.


class Migration(migrations.Migration):

dependencies = [
("core", "0002_seed_data"),
]

operations = [
migrations.AddField(
model_name="configmodels",
name="run_status",
field=models.CharField(
choices=[
("NOT_STARTED", "Not Started"),
("RUNNING", "Running"),
("SUCCESS", "Success"),
("FAILED", "Failed"),
],
default="NOT_STARTED",
help_text="Current execution status of the model",
max_length=20,
),
),
migrations.AddField(
model_name="configmodels",
name="failure_reason",
field=models.TextField(
blank=True,
help_text="Error message if the model execution failed",
null=True,
),
),
migrations.AddField(
model_name="configmodels",
name="last_run_at",
field=models.DateTimeField(
blank=True,
help_text="Timestamp of the last execution",
null=True,
),
),
migrations.AddField(
model_name="configmodels",
name="run_duration",
field=models.FloatField(
blank=True,
help_text="Duration of last execution in seconds",
null=True,
),
),
migrations.AlterField(
model_name="projectdetails",
name="project_schema",
field=models.CharField(
max_length=1024,
blank=True,
null=True,
),
),
]
29 changes: 29 additions & 0 deletions backend/backend/core/models/config_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ class ConfigModels(DefaultOrganizationMixin, BaseModel):
This model is used to store the no code models.
"""

class RunStatus(models.TextChoices):
NOT_STARTED = "NOT_STARTED", "Not Started"
RUNNING = "RUNNING", "Running"
SUCCESS = "SUCCESS", "Success"
FAILED = "FAILED", "Failed"

def get_model_upload_path(self, filename: str) -> str:
"""
This returns the file path based on the org and project dynamically.
Expand Down Expand Up @@ -94,6 +100,29 @@ class Meta:
last_modified_by = models.JSONField(default=dict)
last_modified_at = models.DateTimeField(auto_now=True)

# Execution status tracking
run_status = models.CharField(
max_length=20,
choices=RunStatus.choices,
default=RunStatus.NOT_STARTED,
help_text="Current execution status of the model",
)
failure_reason = models.TextField(
null=True,
blank=True,
help_text="Error message if the model execution failed",
)
last_run_at = models.DateTimeField(
null=True,
blank=True,
help_text="Timestamp of the last execution",
)
run_duration = models.FloatField(
null=True,
blank=True,
help_text="Duration of last execution in seconds",
)

# Current Manager
config_objects = models.Manager()

Expand Down
2 changes: 1 addition & 1 deletion backend/backend/core/models/project_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def delete(self, *args, **kwargs):
project_path = models.CharField(max_length=100)
profile_path = models.CharField(max_length=100)

project_schema = models.CharField(max_length=20, blank=True, null=True)
project_schema = models.CharField(max_length=1024, blank=True, null=True)
# User specific access control fields
created_by = models.JSONField(default=dict)
created_at = models.DateTimeField(auto_now_add=True)
Expand Down
18 changes: 12 additions & 6 deletions backend/backend/core/routers/execute/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ def execute_run_command(request: Request, project_id: str) -> Response:
)
logger.info(f"[execute_run_command] API called - project_id={project_id}, file_name={file_name}, environment_id={environment_id}")
app = ApplicationContext(project_id=project_id)
app.execute_visitran_run_command(current_model=file_name, environment_id=environment_id)
app.visitran_context.close_db_connection()
app.backup_current_no_code_model()
logger.info(f"[execute_run_command] Completed successfully for file_name={file_name}")
_data = {"status": "success"}
return Response(data=_data)
try:
app.execute_visitran_run_command(current_model=file_name, environment_id=environment_id)
app.backup_current_no_code_model()
logger.info(f"[execute_run_command] Completed successfully for file_name={file_name}")
_data = {"status": "success"}
return Response(data=_data)
except Exception:
logger.exception(f"[execute_run_command] DAG execution failed for file_name={file_name}")
_data = {"status": "failed", "error_message": "Model execution failed. Check server logs for details."}
return Response(data=_data, status=status.HTTP_400_BAD_REQUEST)
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Comment thread
greptile-apps[bot] marked this conversation as resolved.
finally:
app.visitran_context.close_db_connection()



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def wrapped(view_or_request, *args, **kwargs):

except Exception as e:
Logger.exception("Error executing view function")
raise

return response

Expand Down
82 changes: 82 additions & 0 deletions backend/visitran/visitran.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import concurrent.futures
import datetime
import time
import importlib
import logging
import re
Expand All @@ -15,6 +16,15 @@

import ibis
import networkx as nx
try:
from django.utils import timezone
except ImportError:
from datetime import datetime, timezone as _tz

class timezone:
@staticmethod
def now():
return datetime.now(_tz.utc)
Comment thread
abhizipstack marked this conversation as resolved.
from visitran import utils
from visitran.adapters.adapter import BaseAdapter
from visitran.adapters.seed import BaseSeed
Expand Down Expand Up @@ -71,6 +81,8 @@
from visitran.templates.model import VisitranModel
from visitran.templates.snapshot import VisitranSnapshot

from backend.core.models.config_models import ConfigModels

warnings.filterwarnings("ignore", message=".*?pkg_resources.*?")
from matplotlib import pyplot as plt # noqa: E402

Expand Down Expand Up @@ -228,6 +240,53 @@ def sort_func(node_key: str):
self.sorted_dag_nodes = list(nx.lexicographical_topological_sort(self.dag, key=sort_func))
fire_event(SortedDAGNodes(sorted_dag_nodes=str(self.sorted_dag_nodes)))

def _update_model_status(
self,
model_name: str,
run_status: str,
failure_reason: str = None,
run_duration: float = None,
) -> None:
"""Update the run status of a model in the database."""
try:
# node_name str looks like: "<class 'project.models.stg_order_summaries.StgOrderSummaries'>"
# ConfigModels.model_name stores the module/file name (e.g. 'stg_order_summaries'),
# which is the second-to-last dotted segment — not the CamelCase class name.
class_name = model_name.split("'")[1].split(".")[-2] if "'" in model_name else model_name

session = getattr(self.context, "session", None)
if not session:
raise ValueError(
f"Cannot update status for model '{class_name}': no session on execution context"
)

project_id = session.project_id
if not project_id:
raise ValueError(
f"Cannot update status for model '{class_name}': session has no project_id"
)

model_instance = ConfigModels.objects.get(
project_instance__project_uuid=project_id,
model_name=class_name,
)
model_instance.run_status = run_status
model_instance.last_run_at = timezone.now()

if run_status == ConfigModels.RunStatus.FAILED:
model_instance.failure_reason = failure_reason
elif run_status == ConfigModels.RunStatus.SUCCESS:
model_instance.failure_reason = None

update_fields = ["run_status", "last_run_at", "failure_reason"]
if run_duration is not None:
model_instance.run_duration = run_duration
update_fields.append("run_duration")

model_instance.save(update_fields=update_fields)
except Exception:
logging.exception(f"Failed to update model status for {model_name}")
Comment thread
abhizipstack marked this conversation as resolved.

def execute_graph(self) -> None:
"""Executes the sorted DAG elements one by one."""
dag_nodes = self.sorted_dag_nodes
Expand All @@ -236,7 +295,11 @@ def execute_graph(self) -> None:
node_name: VisitranModel = dag_nodes.pop(0)
node = self.dag.nodes[node_name]["model_object"]
is_executable = self.dag.nodes[node_name].get("executable", True)
start_time = time.monotonic()
try:
if is_executable:
self._update_model_status(str(node_name), ConfigModels.RunStatus.RUNNING)

# Apply model_configs override from deployment configuration
self._apply_model_config_override(node)

Expand Down Expand Up @@ -270,6 +333,12 @@ def execute_graph(self) -> None:
self.db_adapter.db_connection.create_schema(node.destination_schema_name) # create if not exists
self.db_adapter.run_model(visitran_model=node)

self._update_model_status(
str(node_name),
ConfigModels.RunStatus.SUCCESS,
run_duration=time.monotonic() - start_time,
)

base_result = BaseResult(
node_name=str(node_name),
sequence_num=sequence_number,
Expand All @@ -282,11 +351,24 @@ def execute_graph(self) -> None:
sequence_number += 1
BASE_RESULT.append(base_result)
except VisitranBaseExceptions as visitran_err:
self._update_model_status(
str(node_name),
ConfigModels.RunStatus.FAILED,
failure_reason=str(visitran_err),
run_duration=time.monotonic() - start_time,
)
Comment thread
tahierhussain marked this conversation as resolved.
raise visitran_err
except Exception as err:
dest_table = node.destination_table_name
sch_name = node.destination_schema_name
err_trace = repr(err)

self._update_model_status(
str(node_name),
ConfigModels.RunStatus.FAILED,
failure_reason=err_trace,
run_duration=time.monotonic() - start_time,
)
Comment thread
abhizipstack marked this conversation as resolved.
base_result = BaseResult(
node_name=str(node_name),
sequence_num=sequence_number,
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/ide/editor/no-code-model/no-code-model.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,7 @@ function NoCodeModel({ nodeData }) {
axios(requestOptions)
.then(() => {
getSampleData(undefined, undefined, spec);
setRefreshModels(true);
})
.catch((error) => {
const notifKey = notify({
Expand Down Expand Up @@ -1636,6 +1637,7 @@ function NoCodeModel({ nodeData }) {
});
setTransformationErrorFlag(true);
setIsLoading(false);
setRefreshModels(true);
});
};

Expand Down
Loading
Loading