diff --git a/knip.json b/knip.json index 61c243b96..3c4f4a32c 100644 --- a/knip.json +++ b/knip.json @@ -6,8 +6,7 @@ "src/api/**", "src/components/ui/**", "src/config/announcements.ts", - "src/components/shared/BetaFeatureWrapper/BetaFeatureWrapper.tsx", - "src/utils/publicAsset.ts" + "src/components/shared/BetaFeatureWrapper/BetaFeatureWrapper.tsx" ], "ignoreDependencies": [ "@tanstack/react-query-devtools", diff --git a/public/example-pipelines/Intro-Data Flow.pipeline.component.png b/public/example-pipelines/Intro-Data Flow.pipeline.component.png new file mode 100644 index 000000000..643913dac Binary files /dev/null and b/public/example-pipelines/Intro-Data Flow.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Data Flow.pipeline.component.yaml b/public/example-pipelines/Intro-Data Flow.pipeline.component.yaml new file mode 100644 index 000000000..0b721f1a2 --- /dev/null +++ b/public/example-pipelines/Intro-Data Flow.pipeline.component.yaml @@ -0,0 +1,908 @@ +name: 'Intro: Data Flow' +description: | + A simple Tangle pipeline: five tasks in a straight line. Generates a synthetic regression dataset, splits into train/test, trains a linear regression model, predicts on the test set, and evaluates with standard metrics (MAE, RMSE, R²). +metadata: + annotations: + flex-nodes: '[{"id":"note-simple","properties":{"title":"Data Flow","content":"Five tasks chained in a straight line, wired up with taskOutput references. Data flows left to right: dataset, split, model, predictions, metrics. Tangle handles the flow of data between components. The output of one task can directly become the input of another.","color":"#E3F2FD"},"metadata":{"createdAt":"2026-05-19T00:00:00.000Z","createdBy":"tangle-examples"},"size":{"width":320,"height":130},"position":{"x":200,"y":-120},"zIndex":0}]' + editor.flow-direction: left-to-right +implementation: + graph: + tasks: + Split: + componentRef: + name: Split csv + digest: 7dbbe3ac41f4e820f0d168ef355ada703716f4593eb5e70664746eebe0fe79e7 + spec: + name: Split csv + description: |- + Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: split_csv.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import random + + + def split_csv( + input_data: components.InputPath("CSV"), + train_data: components.OutputPath("CSV"), + test_data: components.OutputPath("CSV"), + train_fraction: float = 0.8, + random_seed: int = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + python_original_code_path: split_csv.py + components new regenerate python-function-component: 'true' + inputs: + - name: input_data + type: CSV + description: Input CSV file. + - name: train_fraction + type: Float + description: Fraction of rows for training (0.0 to 1.0). + default: '0.8' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducible shuffling. + default: '42' + optional: true + outputs: + - name: train_data + type: CSV + description: Output CSV for the training split. + - name: test_data + type: CSV + description: Output CSV for the test split. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import random + + def split_csv( + input_data, + train_data, + test_data, + train_fraction = 0.8, + random_seed = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + + import argparse + _parser = argparse.ArgumentParser(prog='Split csv', description='Split a CSV dataset into train and test sets.\n\nRandomly shuffles rows, then splits by the given fraction.\nBoth output files keep the same header row.') + _parser.add_argument("--input-data", dest="input_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--train-fraction", dest="train_fraction", type=float, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--train-data", dest="train_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--test-data", dest="test_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = split_csv(**_parsed_args) + args: + - '--input-data' + - inputPath: input_data + - if: + cond: + isPresent: train_fraction + then: + - '--train-fraction' + - inputValue: train_fraction + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--train-data' + - outputPath: train_data + - '--test-data' + - outputPath: test_data + arguments: + input_data: + taskOutput: + outputName: output_data + taskId: Generate Data + train_fraction: '0.8' + annotations: + editor.position: '{"x": 350, "y": 100}' + Train: + componentRef: + name: Train regression + digest: e4292a5974ba0c989f95fff77d993e75eb9c6b26ebe23d8df775f804d22309f0 + spec: + name: Train regression + description: |- + Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: train_regression.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def train_regression( + training_data: components.InputPath("CSV"), + model: components.OutputPath("JSON"), + target_column: str = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + python_original_code_path: train_regression.py + components new regenerate python-function-component: 'true' + inputs: + - name: training_data + type: CSV + description: Input CSV with feature columns and a target column. + - name: target_column + type: String + description: Name of the column to predict. + default: target + optional: true + outputs: + - name: model + type: JSON + description: Output JSON file with trained model parameters. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def train_regression( + training_data, + model, + target_column = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + + import argparse + _parser = argparse.ArgumentParser(prog='Train regression', description='Train a simple linear regression model using ordinary least squares.\n\nFits weights and bias to minimise squared error. Uses only Python stdlib\n(no numpy/sklearn). The trained model is saved as a JSON file containing\nthe weight vector, bias, feature names, and training metrics.') + _parser.add_argument("--training-data", dest="training_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--target-column", dest="target_column", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = train_regression(**_parsed_args) + args: + - '--training-data' + - inputPath: training_data + - if: + cond: + isPresent: target_column + then: + - '--target-column' + - inputValue: target_column + - '--model' + - outputPath: model + arguments: + target_column: target + training_data: + taskOutput: + outputName: train_data + taskId: Split + annotations: + editor.position: '{"x": 700, "y": 100}' + Predict: + componentRef: + name: Predict + digest: 4841c31fc75f2d26a5a7d3123d6fa6fc6b43d8badd549ad3ac3e20119860938d + spec: + name: Predict + description: |- + Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: predict.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def predict( + test_data: components.InputPath("CSV"), + model: components.InputPath("JSON"), + predictions: components.OutputPath("CSV"), + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + python_original_code_path: predict.py + components new regenerate python-function-component: 'true' + inputs: + - name: test_data + type: CSV + description: Input CSV with the same feature columns used in training. + - name: model + type: JSON + description: Trained model JSON (from train_regression). + outputs: + - name: predictions + type: CSV + description: Output CSV with actual and predicted values. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def predict( + test_data, + model, + predictions, + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + + import argparse + _parser = argparse.ArgumentParser(prog='Predict', description='Apply a trained linear regression model to produce predictions.\n\nReads the model JSON (weights + bias) and the test CSV, computes\npredicted values, and writes a CSV with columns: actual, predicted.') + _parser.add_argument("--test-data", dest="test_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--predictions", dest="predictions", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = predict(**_parsed_args) + args: + - '--test-data' + - inputPath: test_data + - '--model' + - inputPath: model + - '--predictions' + - outputPath: predictions + arguments: + model: + taskOutput: + outputName: model + taskId: Train + test_data: + taskOutput: + outputName: test_data + taskId: Split + annotations: + editor.position: '{"x": 1050, "y": 100}' + Evaluate: + componentRef: + name: Evaluate + digest: c26e9e058d298c1c57dd96e15ea4261b99439a53fa9b323db4e9ef783933954c + spec: + name: Evaluate + description: |- + Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: evaluate.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + import math + + + def evaluate( + predictions: components.InputPath("CSV"), + metrics: components.OutputPath("JSON"), + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + python_original_code_path: evaluate.py + components new regenerate python-function-component: 'true' + inputs: + - name: predictions + type: CSV + description: Input CSV with actual and predicted columns. + outputs: + - name: metrics + type: JSON + description: Output JSON with computed regression metrics. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + import math + + def evaluate( + predictions, + metrics, + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + + import argparse + _parser = argparse.ArgumentParser(prog='Evaluate', description='Compute regression metrics from a predictions CSV.\n\nExpects columns: actual, predicted. Outputs a JSON file with\nMAE, MSE, RMSE, R-squared, and row count.') + _parser.add_argument("--predictions", dest="predictions", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--metrics", dest="metrics", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = evaluate(**_parsed_args) + args: + - '--predictions' + - inputPath: predictions + - '--metrics' + - outputPath: metrics + arguments: + predictions: + taskOutput: + outputName: predictions + taskId: Predict + annotations: + editor.position: '{"x": 1400, "y": 100}' + Generate Data: + componentRef: + name: Generate dataset + digest: 7f837011088acc8e081f5f2ae5c981cc3bb73ed28bf4b2aea3134bc5297e1674 + spec: + name: Generate dataset + description: |- + Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: generate_dataset.component.yaml + python_original_code: | + from cloud_pipelines import components + import random + import csv + import math + + + def generate_dataset( + output_data: components.OutputPath("CSV"), + num_rows: int = 500, + random_seed: int = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + python_original_code_path: generate_dataset.py + components new regenerate python-function-component: 'true' + inputs: + - name: num_rows + type: Integer + description: Number of rows to generate. + default: '500' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducibility. + default: '42' + optional: true + outputs: + - name: output_data + type: CSV + description: Output CSV file path. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import random + import csv + import math + + def generate_dataset( + output_data, + num_rows = 500, + random_seed = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + + import argparse + _parser = argparse.ArgumentParser(prog='Generate dataset', description='Generate a synthetic regression dataset with 4 features and a target.\n\nCreates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target.\nThe target is a noisy linear combination of the features, suitable for\ndemonstrating regression workflows.') + _parser.add_argument("--num-rows", dest="num_rows", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = generate_dataset(**_parsed_args) + args: + - if: + cond: + isPresent: num_rows + then: + - '--num-rows' + - inputValue: num_rows + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--output-data' + - outputPath: output_data + arguments: + num_rows: '500' + random_seed: '42' + annotations: + editor.position: '{"x": 0, "y": 100}' diff --git a/public/example-pipelines/Intro-Hello World.pipeline.component.png b/public/example-pipelines/Intro-Hello World.pipeline.component.png new file mode 100644 index 000000000..a2e3f55a8 Binary files /dev/null and b/public/example-pipelines/Intro-Hello World.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Hello World.pipeline.component.yaml b/public/example-pipelines/Intro-Hello World.pipeline.component.yaml new file mode 100644 index 000000000..6649a0690 --- /dev/null +++ b/public/example-pipelines/Intro-Hello World.pipeline.component.yaml @@ -0,0 +1,134 @@ +name: Hello World +description: A simple pipeline that demonstrates the hello world component. +inputs: + - name: name + type: String + description: The name to greet. + default: '' + annotations: + editor.position: '{"x":140,"y":0}' + value: '' + optional: false +outputs: + - name: greeting + type: Text + description: The generated greeting message. + annotations: + editor.position: '{"x": 850, "y": 0}' +implementation: + graph: + tasks: + Greet: + componentRef: + name: Hello world + digest: 8cf1bcc31ce9cd9be6982a7ba95cf233f23efcb7769842b6b03e917ad5dfdd0a + spec: + name: Hello world + description: A simple hello world component that generates a greeting. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: hello_world.component.yaml + python_original_code: | + from cloud_pipelines import components + + + def hello_world( + name: str, + greeting_output: components.OutputPath("Text"), + greeting_prefix: str = "Hello", + ): + """A simple hello world component that generates a greeting. + + Args: + name: The name to greet. + greeting_output: Output file containing the greeting message. + greeting_prefix: Prefix for the greeting (default: Hello). + """ + greeting = f"{greeting_prefix}, {name}! Welcome to Tangle." + print(greeting) + with open(greeting_output, "w") as f: + f.write(greeting) + components new regenerate python-function-component: 'true' + inputs: + - name: name + type: String + description: The name to greet. + - name: greeting_prefix + type: String + description: 'Prefix for the greeting (default: Hello).' + default: Hello + optional: true + outputs: + - name: greeting_output + type: Text + description: Output file containing the greeting message. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def hello_world( + name, + greeting_output, + greeting_prefix = "Hello", + ): + """A simple hello world component that generates a greeting. + + Args: + name: The name to greet. + greeting_output: Output file containing the greeting message. + greeting_prefix: Prefix for the greeting (default: Hello). + """ + greeting = f"{greeting_prefix}, {name}! Welcome to Tangle." + print(greeting) + with open(greeting_output, "w") as f: + f.write(greeting) + + import argparse + _parser = argparse.ArgumentParser(prog='Hello world', description='A simple hello world component that generates a greeting.') + _parser.add_argument("--name", dest="name", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--greeting-prefix", dest="greeting_prefix", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--greeting-output", dest="greeting_output", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = hello_world(**_parsed_args) + args: + - '--name' + - inputValue: name + - if: + cond: + isPresent: greeting_prefix + then: + - '--greeting-prefix' + - inputValue: greeting_prefix + - '--greeting-output' + - outputPath: greeting_output + arguments: + name: + graphInput: + inputName: name + annotations: + editor.position: '{"x":430,"y":0}' + editor.collapsed: 'true' + outputValues: + greeting: + taskOutput: + outputName: greeting_output + taskId: Greet +metadata: + annotations: + editor.flow-direction: left-to-right + notes: Enter a name in the Input Node or configure it when submitting the pipeline. diff --git a/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.png b/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.png new file mode 100644 index 000000000..9ad8ca831 Binary files /dev/null and b/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.yaml b/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.yaml new file mode 100644 index 000000000..e74738f6b --- /dev/null +++ b/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.yaml @@ -0,0 +1,977 @@ +name: 'Intro: Input & Output Nodes' +description: | + Pipeline-level inputs and outputs. Inputs parameterise the pipeline so the same YAML can be re-run with different settings (dataset size, train/test split ratio, target column). Outputs expose the trained model and evaluation metrics at the pipeline boundary. +metadata: + annotations: + flex-nodes: '[{"id":"note-inputs","properties":{"title":"Pipeline Inputs","content":"Inputs show up as configurable parameters when you launch a run. Tasks read them through graphInput wiring, so you can tweak values without touching the pipeline itself.","color":"#E8F5E9"},"metadata":{"createdAt":"2026-05-19T00:00:00.000Z","createdBy":"tangle-examples"},"size":{"width":250,"height":130},"position":{"x":0,"y":-150},"zIndex":0},{"id":"note-outputs","properties":{"title":"Pipeline Outputs","content":"outputValues lift task results up to the pipeline boundary. Downstream pipelines (or the UI) can grab them directly, no need to poke around inside the tasks.","color":"#FFF3E0"},"metadata":{"createdAt":"2026-05-19T00:00:00.000Z","createdBy":"tangle-examples"},"size":{"width":250,"height":130},"position":{"x":1500,"y":-150},"zIndex":0}]' + editor.flow-direction: left-to-right +inputs: + - name: Num Rows + type: Integer + description: Number of data points to generate + default: '500' + annotations: + editor.position: '{"x":17.5,"y":320}' + value: '500' + - name: Train Fraction + type: Float + description: Fraction of data for training (remainder is test) + default: '0.8' + annotations: + editor.position: '{"x":360,"y":0}' + value: '0.8' + - name: Target Column + type: String + description: Name of the column to predict + default: target + annotations: + editor.position: '{"x":763.5,"y":525.75}' + value: target + - name: Random Seed + type: Integer + description: Seed for reproducible data generation and splitting + default: '42' + annotations: + editor.position: '{"x":5,"y":120}' + value: '42' +outputs: + - name: Trained Model + type: JSON + description: Model parameters (weights, bias) as JSON + annotations: + editor.position: '{"x":1500,"y":498.75}' + - name: Predictions + type: CSV + description: Actual vs predicted values + annotations: + editor.position: '{"x":1915.5,"y":376.5}' + - name: Metrics + type: JSON + description: Regression metrics — MAE, RMSE, R², max error + annotations: + editor.position: '{"x":2230,"y":168}' +implementation: + graph: + tasks: + Split: + componentRef: + name: Split csv + digest: 7dbbe3ac41f4e820f0d168ef355ada703716f4593eb5e70664746eebe0fe79e7 + spec: + name: Split csv + description: |- + Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: split_csv.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import random + + + def split_csv( + input_data: components.InputPath("CSV"), + train_data: components.OutputPath("CSV"), + test_data: components.OutputPath("CSV"), + train_fraction: float = 0.8, + random_seed: int = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + python_original_code_path: split_csv.py + components new regenerate python-function-component: 'true' + inputs: + - name: input_data + type: CSV + description: Input CSV file. + - name: train_fraction + type: Float + description: Fraction of rows for training (0.0 to 1.0). + default: '0.8' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducible shuffling. + default: '42' + optional: true + outputs: + - name: train_data + type: CSV + description: Output CSV for the training split. + - name: test_data + type: CSV + description: Output CSV for the test split. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import random + + def split_csv( + input_data, + train_data, + test_data, + train_fraction = 0.8, + random_seed = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + + import argparse + _parser = argparse.ArgumentParser(prog='Split csv', description='Split a CSV dataset into train and test sets.\n\nRandomly shuffles rows, then splits by the given fraction.\nBoth output files keep the same header row.') + _parser.add_argument("--input-data", dest="input_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--train-fraction", dest="train_fraction", type=float, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--train-data", dest="train_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--test-data", dest="test_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = split_csv(**_parsed_args) + args: + - '--input-data' + - inputPath: input_data + - if: + cond: + isPresent: train_fraction + then: + - '--train-fraction' + - inputValue: train_fraction + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--train-data' + - outputPath: train_data + - '--test-data' + - outputPath: test_data + arguments: + input_data: + taskOutput: + outputName: output_data + taskId: Generate Data + random_seed: + graphInput: + inputName: Random Seed + train_fraction: + graphInput: + inputName: Train Fraction + annotations: + editor.position: '{"x":710,"y":53.5}' + Train: + componentRef: + name: Train regression + digest: e4292a5974ba0c989f95fff77d993e75eb9c6b26ebe23d8df775f804d22309f0 + spec: + name: Train regression + description: |- + Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: train_regression.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def train_regression( + training_data: components.InputPath("CSV"), + model: components.OutputPath("JSON"), + target_column: str = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + python_original_code_path: train_regression.py + components new regenerate python-function-component: 'true' + inputs: + - name: training_data + type: CSV + description: Input CSV with feature columns and a target column. + - name: target_column + type: String + description: Name of the column to predict. + default: target + optional: true + outputs: + - name: model + type: JSON + description: Output JSON file with trained model parameters. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def train_regression( + training_data, + model, + target_column = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + + import argparse + _parser = argparse.ArgumentParser(prog='Train regression', description='Train a simple linear regression model using ordinary least squares.\n\nFits weights and bias to minimise squared error. Uses only Python stdlib\n(no numpy/sklearn). The trained model is saved as a JSON file containing\nthe weight vector, bias, feature names, and training metrics.') + _parser.add_argument("--training-data", dest="training_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--target-column", dest="target_column", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = train_regression(**_parsed_args) + args: + - '--training-data' + - inputPath: training_data + - if: + cond: + isPresent: target_column + then: + - '--target-column' + - inputValue: target_column + - '--model' + - outputPath: model + arguments: + target_column: + graphInput: + inputName: Target Column + training_data: + taskOutput: + outputName: train_data + taskId: Split + annotations: + editor.position: '{"x":1090,"y":480.25}' + Predict: + componentRef: + name: Predict + digest: 4841c31fc75f2d26a5a7d3123d6fa6fc6b43d8badd549ad3ac3e20119860938d + spec: + name: Predict + description: |- + Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: predict.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def predict( + test_data: components.InputPath("CSV"), + model: components.InputPath("JSON"), + predictions: components.OutputPath("CSV"), + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + python_original_code_path: predict.py + components new regenerate python-function-component: 'true' + inputs: + - name: test_data + type: CSV + description: Input CSV with the same feature columns used in training. + - name: model + type: JSON + description: Trained model JSON (from train_regression). + outputs: + - name: predictions + type: CSV + description: Output CSV with actual and predicted values. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def predict( + test_data, + model, + predictions, + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + + import argparse + _parser = argparse.ArgumentParser(prog='Predict', description='Apply a trained linear regression model to produce predictions.\n\nReads the model JSON (weights + bias) and the test CSV, computes\npredicted values, and writes a CSV with columns: actual, predicted.') + _parser.add_argument("--test-data", dest="test_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--predictions", dest="predictions", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = predict(**_parsed_args) + args: + - '--test-data' + - inputPath: test_data + - '--model' + - inputPath: model + - '--predictions' + - outputPath: predictions + arguments: + model: + taskOutput: + outputName: model + taskId: Train + test_data: + taskOutput: + outputName: test_data + taskId: Split + annotations: + editor.position: '{"x":1470,"y":269.75}' + Evaluate: + componentRef: + name: Evaluate + digest: c26e9e058d298c1c57dd96e15ea4261b99439a53fa9b323db4e9ef783933954c + spec: + name: Evaluate + description: |- + Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: evaluate.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + import math + + + def evaluate( + predictions: components.InputPath("CSV"), + metrics: components.OutputPath("JSON"), + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + python_original_code_path: evaluate.py + components new regenerate python-function-component: 'true' + inputs: + - name: predictions + type: CSV + description: Input CSV with actual and predicted columns. + outputs: + - name: metrics + type: JSON + description: Output JSON with computed regression metrics. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + import math + + def evaluate( + predictions, + metrics, + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + + import argparse + _parser = argparse.ArgumentParser(prog='Evaluate', description='Compute regression metrics from a predictions CSV.\n\nExpects columns: actual, predicted. Outputs a JSON file with\nMAE, MSE, RMSE, R-squared, and row count.') + _parser.add_argument("--predictions", dest="predictions", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--metrics", dest="metrics", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = evaluate(**_parsed_args) + args: + - '--predictions' + - inputPath: predictions + - '--metrics' + - outputPath: metrics + arguments: + predictions: + taskOutput: + outputName: predictions + taskId: Predict + annotations: + editor.position: '{"x":1850,"y":183.5}' + Generate Data: + componentRef: + name: Generate dataset + digest: 7f837011088acc8e081f5f2ae5c981cc3bb73ed28bf4b2aea3134bc5297e1674 + spec: + name: Generate dataset + description: |- + Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: generate_dataset.component.yaml + python_original_code: | + from cloud_pipelines import components + import random + import csv + import math + + + def generate_dataset( + output_data: components.OutputPath("CSV"), + num_rows: int = 500, + random_seed: int = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + python_original_code_path: generate_dataset.py + components new regenerate python-function-component: 'true' + inputs: + - name: num_rows + type: Integer + description: Number of rows to generate. + default: '500' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducibility. + default: '42' + optional: true + outputs: + - name: output_data + type: CSV + description: Output CSV file path. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import random + import csv + import math + + def generate_dataset( + output_data, + num_rows = 500, + random_seed = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + + import argparse + _parser = argparse.ArgumentParser(prog='Generate dataset', description='Generate a synthetic regression dataset with 4 features and a target.\n\nCreates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target.\nThe target is a noisy linear combination of the features, suitable for\ndemonstrating regression workflows.') + _parser.add_argument("--num-rows", dest="num_rows", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = generate_dataset(**_parsed_args) + args: + - if: + cond: + isPresent: num_rows + then: + - '--num-rows' + - inputValue: num_rows + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--output-data' + - outputPath: output_data + arguments: + num_rows: + graphInput: + inputName: Num Rows + random_seed: + graphInput: + inputName: Random Seed + annotations: + editor.position: '{"x":330,"y":289.5}' + outputValues: + Metrics: + taskOutput: + outputName: metrics + taskId: Evaluate + Predictions: + taskOutput: + outputName: predictions + taskId: Predict + Trained Model: + taskOutput: + outputName: model + taskId: Train diff --git a/public/example-pipelines/Intro-Multinode.pipeline.component.png b/public/example-pipelines/Intro-Multinode.pipeline.component.png new file mode 100644 index 000000000..f07c156d8 Binary files /dev/null and b/public/example-pipelines/Intro-Multinode.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Multinode.pipeline.component.yaml b/public/example-pipelines/Intro-Multinode.pipeline.component.yaml new file mode 100644 index 000000000..329505030 --- /dev/null +++ b/public/example-pipelines/Intro-Multinode.pipeline.component.yaml @@ -0,0 +1,625 @@ +name: 'Intro: Multinode Pipelines' +implementation: + graph: + tasks: + Generate Data: + componentRef: + spec: + name: Generate data v01 + description: |- + Generate synthetic classification data, split into shards for distributed training. + + Creates a multi-shard dataset where each shard is a CSV file suitable for + parallel loading across multiple training nodes. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: generate-data.yaml + python_dependencies: '["torch>=2.0.0"]' + python_original_code_path: generate_data.py + components new regenerate python-function-component: 'true' + inputs: + - name: num_shards + type: Integer + description: Number of data shards to create. + default: '4' + optional: true + - name: rows_per_shard + type: Integer + description: Number of rows per shard. + default: '10000' + optional: true + - name: num_features + type: Integer + description: Number of input features. + default: '20' + optional: true + - name: seed + type: Integer + description: Random seed for reproducibility. + default: '42' + optional: true + outputs: + - name: output_data_dir + type: Directory + description: Directory containing the sharded CSV files. + implementation: + container: + image: python:3.12 + command: + - sh + - '-c' + - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'torch>=2.0.0' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'torch>=2.0.0' --user) && "$0" "$@" + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def generate_data_v01( + output_data_dir, + num_shards = 4, + rows_per_shard = 10000, + num_features = 20, + seed = 42, + ): + """Generate synthetic classification data, split into shards for distributed training. + + Creates a multi-shard dataset where each shard is a CSV file suitable for + parallel loading across multiple training nodes. + + Args: + output_data_dir: Directory containing the sharded CSV files. + num_shards: Number of data shards to create. + rows_per_shard: Number of rows per shard. + num_features: Number of input features. + seed: Random seed for reproducibility. + """ + import json + import math + import os + import random + + random.seed(seed) + os.makedirs(output_data_dir, exist_ok=True) + + feature_names = [f"feat_{i}" for i in range(num_features)] + header = ",".join(feature_names + ["label"]) + + # Generate a random linear decision boundary + weights = [random.gauss(0, 1) for _ in range(num_features)] + bias = random.gauss(0, 0.5) + + for shard_idx in range(num_shards): + shard_path = os.path.join(output_data_dir, f"shard_{shard_idx:04d}.csv") + with open(shard_path, "w") as f: + f.write(header + "\n") + for _ in range(rows_per_shard): + features = [random.gauss(0, 1) for _ in range(num_features)] + logit = sum(w * x for w, x in zip(weights, features)) + bias + # Add noise to make it non-trivial + logit += random.gauss(0, 0.3) + prob = 1.0 / (1.0 + math.exp(-logit)) + label = 1 if random.random() < prob else 0 + row = ",".join(f"{x:.6f}" for x in features) + f",{label}" + f.write(row + "\n") + + print(f"Wrote shard {shard_idx}: {shard_path} ({rows_per_shard} rows)") + + # Write metadata + meta = { + "num_shards": num_shards, + "rows_per_shard": rows_per_shard, + "total_rows": num_shards * rows_per_shard, + "num_features": num_features, + "feature_names": feature_names, + "seed": seed, + } + with open(os.path.join(output_data_dir, "metadata.json"), "w") as f: + json.dump(meta, f, indent=2) + + print(f"\nDataset ready: {num_shards} shards, {num_shards * rows_per_shard} total rows") + + import argparse + _parser = argparse.ArgumentParser(prog='Generate data v01', description='Generate synthetic classification data, split into shards for distributed training.\n\nCreates a multi-shard dataset where each shard is a CSV file suitable for\nparallel loading across multiple training nodes.') + _parser.add_argument("--num-shards", dest="num_shards", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--rows-per-shard", dest="rows_per_shard", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--num-features", dest="num_features", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--seed", dest="seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-data-dir", dest="output_data_dir", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = generate_data_v01(**_parsed_args) + args: + - if: + cond: + isPresent: num_shards + then: + - '--num-shards' + - inputValue: num_shards + - if: + cond: + isPresent: rows_per_shard + then: + - '--rows-per-shard' + - inputValue: rows_per_shard + - if: + cond: + isPresent: num_features + then: + - '--num-features' + - inputValue: num_features + - if: + cond: + isPresent: seed + then: + - '--seed' + - inputValue: seed + - '--output-data-dir' + - outputPath: output_data_dir + arguments: + seed: '42' + num_shards: '8' + num_features: '20' + rows_per_shard: '5000' + annotations: + editor.position: '{"x":0,"y":10}' + Distributed Train: + componentRef: + spec: + name: Distributed train v01 + description: |- + Distributed gradient-descent training across multiple nodes using PyTorch DDP. + + Each node loads its assigned data shards and participates in synchronized + gradient averaging via torch.distributed. Node 0 acts as the master and + saves the final model + metrics. + + This is a minimal but real multi-node training example using only PyTorch + (no external ML frameworks). It demonstrates: + - torch.distributed init_process_group with NCCL/gloo backend + - DistributedDataParallel model wrapping + - Shard-based data partitioning across nodes + - Gradient synchronization and synchronized evaluation + - Checkpoint saving on the master node + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: distributed-train.yaml + python_dependencies: '["torch>=2.0.0"]' + python_original_code_path: distributed_train.py + components new regenerate python-function-component: 'true' + inputs: + - name: data_dir + type: Directory + description: Directory with sharded CSV data files. + - name: num_nodes + type: Integer + description: Total number of nodes in the distributed job. + default: '1' + optional: true + - name: node_rank + type: Integer + description: Rank of the current node (0-indexed). + default: '0' + optional: true + - name: all_node_addresses + type: String + description: Comma-separated DNS/IP addresses for all nodes. + default: '' + optional: true + - name: num_epochs + type: Integer + description: Number of training epochs. + default: '10' + optional: true + - name: learning_rate + type: Float + description: SGD learning rate. + default: '0.01' + optional: true + - name: batch_size + type: Integer + description: Per-node batch size. + default: '64' + optional: true + - name: hidden_size + type: Integer + description: Hidden layer size in the MLP. + default: '64' + optional: true + outputs: + - name: output_model_dir + type: Directory + description: Directory to save the trained model checkpoint. + - name: output_metrics + type: JSON + description: JSON file with training and evaluation metrics. + implementation: + container: + image: python:3.12 + command: + - sh + - '-c' + - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'torch>=2.0.0' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'torch>=2.0.0' --user) && "$0" "$@" + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def distributed_train_v01( + data_dir, + output_model_dir, + output_metrics, + num_nodes = 1, + node_rank = 0, + all_node_addresses = "", + num_epochs = 10, + learning_rate = 0.01, + batch_size = 64, + hidden_size = 64, + ): + """Distributed gradient-descent training across multiple nodes using PyTorch DDP. + + Each node loads its assigned data shards and participates in synchronized + gradient averaging via torch.distributed. Node 0 acts as the master and + saves the final model + metrics. + + This is a minimal but real multi-node training example using only PyTorch + (no external ML frameworks). It demonstrates: + - torch.distributed init_process_group with NCCL/gloo backend + - DistributedDataParallel model wrapping + - Shard-based data partitioning across nodes + - Gradient synchronization and synchronized evaluation + - Checkpoint saving on the master node + + Args: + data_dir: Directory with sharded CSV data files. + output_model_dir: Directory to save the trained model checkpoint. + output_metrics: JSON file with training and evaluation metrics. + num_nodes: Total number of nodes in the distributed job. + node_rank: Rank of the current node (0-indexed). + all_node_addresses: Comma-separated DNS/IP addresses for all nodes. + num_epochs: Number of training epochs. + learning_rate: SGD learning rate. + batch_size: Per-node batch size. + hidden_size: Hidden layer size in the MLP. + """ + import csv + import json + import math + import os + import socket + import time + + import torch + import torch.distributed as dist + import torch.nn as nn + from torch.nn.parallel import DistributedDataParallel as DDP + from torch.utils.data import DataLoader, TensorDataset + + # ── Setup distributed ────────────────────────────────────────────── + os.makedirs(output_model_dir, exist_ok=True) + + master_port = "29500" + if all_node_addresses: + addrs = all_node_addresses.split(",") + master_addr = addrs[0].strip() + else: + master_addr = "127.0.0.1" + + os.environ["MASTER_ADDR"] = master_addr + os.environ["MASTER_PORT"] = master_port + os.environ["RANK"] = str(node_rank) + os.environ["WORLD_SIZE"] = str(num_nodes) + + backend = "nccl" if torch.cuda.is_available() else "gloo" + print(f"[Node {node_rank}/{num_nodes}] Initializing distributed: " + f"backend={backend}, master={master_addr}:{master_port}") + + dist.init_process_group( + backend=backend, + rank=node_rank, + world_size=num_nodes, + ) + print(f"[Node {node_rank}] Distributed initialized successfully") + + device = torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu") + + # ── Load data shards for this node ───────────────────────────────── + shard_files = sorted( + f for f in os.listdir(data_dir) + if f.startswith("shard_") and f.endswith(".csv") + ) + # Partition shards round-robin across nodes + my_shards = [f for i, f in enumerate(shard_files) if i % num_nodes == node_rank] + print(f"[Node {node_rank}] Loading {len(my_shards)}/{len(shard_files)} shards: {my_shards}") + + all_features = [] + all_labels = [] + for shard_name in my_shards: + shard_path = os.path.join(data_dir, shard_name) + with open(shard_path, "r") as f: + reader = csv.reader(f) + header = next(reader) + for row in reader: + features = [float(x) for x in row[:-1]] + label = int(row[-1]) + all_features.append(features) + all_labels.append(label) + + X = torch.tensor(all_features, dtype=torch.float32) + y = torch.tensor(all_labels, dtype=torch.float32) + num_features = X.shape[1] + + print(f"[Node {node_rank}] Loaded {len(X)} samples, {num_features} features") + + # Split 80/20 train/eval + n_train = int(0.8 * len(X)) + X_train, X_eval = X[:n_train], X[n_train:] + y_train, y_eval = y[:n_train], y[n_train:] + + train_dataset = TensorDataset(X_train, y_train) + eval_dataset = TensorDataset(X_eval, y_eval) + train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) + eval_loader = DataLoader(eval_dataset, batch_size=batch_size) + + # ── Define model ─────────────────────────────────────────────────── + class MLP(nn.Module): + def __init__(self, in_features, hidden_size): + super().__init__() + self.net = nn.Sequential( + nn.Linear(in_features, hidden_size), + nn.ReLU(), + nn.BatchNorm1d(hidden_size), + nn.Dropout(0.2), + nn.Linear(hidden_size, hidden_size), + nn.ReLU(), + nn.BatchNorm1d(hidden_size), + nn.Dropout(0.2), + nn.Linear(hidden_size, 1), + ) + + def forward(self, x): + return self.net(x).squeeze(-1) + + model = MLP(num_features, hidden_size).to(device) + model = DDP(model) + optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) + criterion = nn.BCEWithLogitsLoss() + + # ── Training loop ────────────────────────────────────────────────── + epoch_metrics = [] + start_time = time.time() + + for epoch in range(num_epochs): + model.train() + epoch_loss = 0.0 + epoch_correct = 0 + epoch_total = 0 + + for batch_X, batch_y in train_loader: + batch_X, batch_y = batch_X.to(device), batch_y.to(device) + optimizer.zero_grad() + logits = model(batch_X) + loss = criterion(logits, batch_y) + loss.backward() + optimizer.step() + + epoch_loss += loss.item() * len(batch_X) + preds = (torch.sigmoid(logits) > 0.5).float() + epoch_correct += (preds == batch_y).sum().item() + epoch_total += len(batch_X) + + # Aggregate metrics across nodes + metrics_tensor = torch.tensor( + [epoch_loss, epoch_correct, epoch_total], dtype=torch.float64, device=device + ) + dist.all_reduce(metrics_tensor, op=dist.ReduceOp.SUM) + + global_loss = metrics_tensor[0].item() / metrics_tensor[2].item() + global_acc = metrics_tensor[1].item() / metrics_tensor[2].item() + + # Evaluation + model.eval() + eval_loss = 0.0 + eval_correct = 0 + eval_total = 0 + + with torch.no_grad(): + for batch_X, batch_y in eval_loader: + batch_X, batch_y = batch_X.to(device), batch_y.to(device) + logits = model(batch_X) + loss = criterion(logits, batch_y) + eval_loss += loss.item() * len(batch_X) + preds = (torch.sigmoid(logits) > 0.5).float() + eval_correct += (preds == batch_y).sum().item() + eval_total += len(batch_X) + + eval_metrics_tensor = torch.tensor( + [eval_loss, eval_correct, eval_total], dtype=torch.float64, device=device + ) + dist.all_reduce(eval_metrics_tensor, op=dist.ReduceOp.SUM) + + global_eval_loss = eval_metrics_tensor[0].item() / max(eval_metrics_tensor[2].item(), 1) + global_eval_acc = eval_metrics_tensor[1].item() / max(eval_metrics_tensor[2].item(), 1) + + em = { + "epoch": epoch + 1, + "train_loss": round(global_loss, 6), + "train_accuracy": round(global_acc, 6), + "eval_loss": round(global_eval_loss, 6), + "eval_accuracy": round(global_eval_acc, 6), + } + epoch_metrics.append(em) + + if node_rank == 0: + print(f"[Epoch {epoch+1}/{num_epochs}] " + f"train_loss={em['train_loss']:.4f} train_acc={em['train_accuracy']:.4f} " + f"eval_loss={em['eval_loss']:.4f} eval_acc={em['eval_accuracy']:.4f}") + + elapsed = time.time() - start_time + + # ── Save model and metrics (node 0 only) ─────────────────────────── + if node_rank == 0: + checkpoint_path = os.path.join(output_model_dir, "model.pt") + torch.save({ + "model_state_dict": model.module.state_dict(), + "optimizer_state_dict": optimizer.state_dict(), + "num_features": num_features, + "hidden_size": hidden_size, + "num_epochs": num_epochs, + "final_eval_accuracy": epoch_metrics[-1]["eval_accuracy"], + }, checkpoint_path) + + results = { + "training": { + "num_nodes": num_nodes, + "total_samples": int(metrics_tensor[2].item()), + "samples_per_node": int(metrics_tensor[2].item()) // num_nodes, + "num_epochs": num_epochs, + "learning_rate": learning_rate, + "batch_size": batch_size, + "hidden_size": hidden_size, + "elapsed_seconds": round(elapsed, 2), + "backend": backend, + }, + "final_metrics": epoch_metrics[-1], + "epoch_history": epoch_metrics, + } + + with open(output_metrics, "w") as f: + json.dump(results, f, indent=2) + + print(f"\n[Node 0] Training complete in {elapsed:.1f}s") + print(f"[Node 0] Final eval accuracy: {epoch_metrics[-1]['eval_accuracy']:.4f}") + print(f"[Node 0] Model saved to {checkpoint_path}") + + dist.destroy_process_group() + + import argparse + _parser = argparse.ArgumentParser(prog='Distributed train v01', description='Distributed gradient-descent training across multiple nodes using PyTorch DDP.\n\nEach node loads its assigned data shards and participates in synchronized\ngradient averaging via torch.distributed. Node 0 acts as the master and\nsaves the final model + metrics.\n\nThis is a minimal but real multi-node training example using only PyTorch\n(no external ML frameworks). It demonstrates:\n- torch.distributed init_process_group with NCCL/gloo backend\n- DistributedDataParallel model wrapping\n- Shard-based data partitioning across nodes\n- Gradient synchronization and synchronized evaluation\n- Checkpoint saving on the master node') + _parser.add_argument("--data-dir", dest="data_dir", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--num-nodes", dest="num_nodes", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--node-rank", dest="node_rank", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--all-node-addresses", dest="all_node_addresses", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--num-epochs", dest="num_epochs", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--learning-rate", dest="learning_rate", type=float, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--batch-size", dest="batch_size", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--hidden-size", dest="hidden_size", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-model-dir", dest="output_model_dir", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--output-metrics", dest="output_metrics", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = distributed_train_v01(**_parsed_args) + args: + - '--data-dir' + - inputPath: data_dir + - if: + cond: + isPresent: num_nodes + then: + - '--num-nodes' + - inputValue: num_nodes + - if: + cond: + isPresent: node_rank + then: + - '--node-rank' + - inputValue: node_rank + - if: + cond: + isPresent: all_node_addresses + then: + - '--all-node-addresses' + - inputValue: all_node_addresses + - if: + cond: + isPresent: num_epochs + then: + - '--num-epochs' + - inputValue: num_epochs + - if: + cond: + isPresent: learning_rate + then: + - '--learning-rate' + - inputValue: learning_rate + - if: + cond: + isPresent: batch_size + then: + - '--batch-size' + - inputValue: batch_size + - if: + cond: + isPresent: hidden_size + then: + - '--hidden-size' + - inputValue: hidden_size + - '--output-model-dir' + - outputPath: output_model_dir + - '--output-metrics' + - outputPath: output_metrics + arguments: + data_dir: + taskOutput: + taskId: Generate Data + outputName: output_data_dir + node_rank: + dynamicData: + system/multi_node/node_index: {} + num_nodes: + dynamicData: + system/multi_node/number_of_nodes: {} + batch_size: '64' + num_epochs: '10' + hidden_size: '64' + learning_rate: '0.01' + all_node_addresses: + dynamicData: + system/multi_node/all_node_addresses: {} + annotations: + editor.position: '{"x":400,"y":0}' + tangleml.com/launchers/kubernetes/multi_node/number_of_nodes: '2' + cloud-pipelines.net/launchers/generic/resources.shared_memory: 2Gi + outputValues: + trained_model: + taskOutput: + taskId: Distributed Train + outputName: output_model_dir + training_metrics: + taskOutput: + taskId: Distributed Train + outputName: output_metrics +description: Open-source demo of Tangle multi-node distributed training. Generates synthetic classification data, then trains a PyTorch MLP across multiple nodes using DistributedDataParallel with synchronized gradient averaging. +outputs: + - name: trained_model + type: Directory + description: Directory containing the trained model checkpoint. + annotations: + editor.position: '{"x":800,"y":200}' + - name: training_metrics + type: JSON + description: JSON file with training and evaluation metrics. + annotations: + editor.position: '{"x":800,"y":430}' +metadata: + annotations: + author: tangle-demo + flex-nodes: '[{"id":"mn-overview","properties":{"title":"Multi-Node Training","content":"This pipeline shows off Tangle''s multi-node feature. The ''Distributed Train'' task runs on 2 pods at the same time. Each pod gets its own node_rank (0 or 1) plus the DNS addresses of all its peers, injected through dynamicData:\n\n• system/multi_node/node_index → per-pod rank\n• system/multi_node/number_of_nodes → total pods\n• system/multi_node/all_node_addresses → comma-separated DNS\n\nThe tangleml.com/.../number_of_nodes annotation on the task tells Tangle how many pods to spawn. Every pod runs the same container image but with different injected values, which is what lets PyTorch DistributedDataParallel (DDP) sync gradients across nodes.","color":"#E3F2FD","borderColor":"#1565C0"},"metadata":{"createdAt":"2026-05-30T00:15:00.000Z","createdBy":"tangle-demo"},"size":{"width":320,"height":380},"position":{"x":390,"y":-430},"zIndex":0},{"id":"mn-data-flow","properties":{"title":"Data Partitioning","content":"Shards get handed out round-robin:\n\n• Node 0 takes shards 0, 2, 4, 6\n• Node 1 takes shards 1, 3, 5, 7\n\nAfter each training step, gradients are averaged across all nodes with torch.distributed.all_reduce(). Only Node 0 (the master) writes out the final checkpoint and metrics.","color":"#FFF9C4","borderColor":"#F9A825"},"metadata":{"createdAt":"2026-05-30T00:15:00.000Z","createdBy":"tangle-demo"},"size":{"width":280,"height":220},"position":{"x":770,"y":-270},"zIndex":0},{"id":"flex_mpvikik96fvb_9","properties":{"title":"How to Set Up Multi-Node in Tangle UI","content":"How to turn on multi-node for any task:\n\n1. Select the task and open its Settings panel.\n\n2. Under Annotations, add:\n tangleml.com/launchers/kubernetes/\n multi_node/number_of_nodes = \"2\"\n (or however many pods you want)\n\n3. Add the shared memory annotation:\n cloud-pipelines.net/launchers/\n generic/resources.shared_memory\n = \"2Gi\" (NCCL/gloo IPC needs this)\n\n4. Add 3 component inputs of type String/Integer\n for: num_nodes, node_rank, all_node_addresses.\n\n5. Wire each input to Dynamic Data:\n • num_nodes → system/multi_node/\n number_of_nodes\n • node_rank → system/multi_node/\n node_index\n • all_node_addresses → system/\n multi_node/all_node_addresses\n\n6. In your container code, feed these values\n into torch.distributed.init_process_group(),\n using MASTER_ADDR from the first address\n and RANK from node_rank.\n\nThat''s it. Tangle spawns N identical pods\nand injects the right values into each one.","color":"#E8F5E9","borderColor":"#BCBCBC"},"metadata":{"createdAt":"2026-06-01T18:00:10.233Z","createdBy":"tangle-demo"},"size":{"width":290,"height":530},"position":{"x":1150,"y":70},"zIndex":0}]' + editor.flow-direction: left-to-right + cloned_from_run_id: 019e8446133b96b877b7 diff --git a/public/example-pipelines/Intro-Secrets.pipeline.component.png b/public/example-pipelines/Intro-Secrets.pipeline.component.png new file mode 100644 index 000000000..286b2b7aa Binary files /dev/null and b/public/example-pipelines/Intro-Secrets.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Secrets.pipeline.component.yaml b/public/example-pipelines/Intro-Secrets.pipeline.component.yaml new file mode 100644 index 000000000..cf8cb9c66 --- /dev/null +++ b/public/example-pipelines/Intro-Secrets.pipeline.component.yaml @@ -0,0 +1,133 @@ +name: 'Intro: Secrets' +description: | + Demonstrates Tangle's secret-injection feature: a pipeline input is bound to a + named secret at submit time via `dynamicData.secret`, and the value lands in + the running container as a file. The demo reads the secret, makes an + authenticated HTTP GET to https://httpbin.org/bearer using the secret as a + Bearer token, and prints httpbin's response. + + Before submitting, create the secret in Settings > Secrets: + + DEMO_BEARER_TOKEN = 'any-string-works' + + Then assign to an argument using Dynamic Data, or assign to an empty input at pipeline submission. + +metadata: + annotations: + flex-nodes: '[{"id":"note-secrets","properties":{"title":"Secrets Feature","content":"Pipeline inputs can be bound to named secrets when you submit a run. The value gets written into the container as a file at the path passed via inputPath. It''s never set as an env var or put on the command line, so it stays out of pod specs, run YAML, and anything visible in the Tangle UI.","color":"#E3F2FD"},"metadata":{"createdAt":"2026-05-22T00:00:00.000Z","createdBy":"tangle-demo"},"size":{"width":340,"height":180},"position":{"x":40,"y":-220},"zIndex":0},{"id":"note-secrets-flow","properties":{"title":"Two-step setup","content":"1) Create the secret in your Tangle instance from the Settings -> Secrets menu.\n2) Assign it to an argument via Dynamic Data, or attach it to an empty input when you submit the pipeline. The actual value never ends up in the YAML; only the name shows up, and only at submit time.","color":"#FFF3E0"},"metadata":{"createdAt":"2026-05-22T00:00:00.000Z","createdBy":"tangle-demo"},"size":{"width":340,"height":200},"position":{"x":420,"y":-220},"zIndex":0,"readOnly":false,"highlighted":false}]' + editor.flow-direction: left-to-right + cloned_from_run_id: 019e50eb1c5468741cad +inputs: + - name: DEMO_TOKEN + description: | + Bearer token, supplied at submit time via dynamicData.secret. Read by the + `Authenticated Request` task as a file (inputPath) — never logged. + annotations: + editor.position: '{"x": -150, "y": 100}' +outputs: + - name: HTTP Response + annotations: + editor.position: '{"x":980,"y":100}' +implementation: + graph: + tasks: + Show Response: + componentRef: + name: Show Response + spec: + name: Show Response + description: Echoes the masked HTTP response to stdout and to the pipeline output. + inputs: + - name: response + type: String + outputs: + - name: echoed + type: String + implementation: + container: + image: python:3.12-slim + command: + - sh + - '-ec' + - | + # $0 is the first positional arg (inputPath), $1 the second (outputPath) + cat "$0" + mkdir -p "$(dirname "$1")" + cp "$0" "$1" + - inputPath: response + - outputPath: echoed + arguments: + response: + taskOutput: + outputName: response + taskId: Authenticated Request + annotations: + editor.position: '{"x": 600, "y": 100}' + Authenticated Request: + componentRef: + name: Authenticated Request + spec: + name: Authenticated Request + description: | + Reads a bearer token from the secret file, sends a GET to + https://httpbin.org/bearer with `Authorization: Bearer `, and + writes the JSON response body to its output. Prints the token length + (never the value) and the HTTP status to stdout. + inputs: + - name: token + description: Bearer token, injected from a Tangle secret. + outputs: + - name: response + type: String + description: httpbin's JSON response body. + implementation: + container: + image: python:3.12-slim + command: + - python3 + - '-u' + - '-c' + - | + import json, os, sys, urllib.request + + token_path, response_path = sys.argv[1], sys.argv[2] + with open(token_path) as f: + token = f.read().strip() + + print(f"loaded token: length={len(token)} (value not logged)") + + req = urllib.request.Request( + "https://httpbin.org/bearer", + headers={"Authorization": f"Bearer {token}"}, + ) + with urllib.request.urlopen(req, timeout=30) as resp: + status = resp.status + body = resp.read().decode("utf-8") + + print(f"http status: {status}") + parsed = json.loads(body) + # httpbin echoes the token back — mask it before writing to the output. + if "token" in parsed: + parsed["token"] = f"<{len(parsed['token'])} chars>" + masked = json.dumps(parsed, indent=2) + print(masked) + + os.makedirs(os.path.dirname(response_path), exist_ok=True) + with open(response_path, "w") as f: + f.write(masked + "\n") + args: + - inputPath: token + - outputPath: response + arguments: + token: + graphInput: + inputName: DEMO_TOKEN + annotations: + editor.position: '{"x": 200, "y": 100}' + cloud-pipelines.net/launchers/generic/resources.cpu: '1' + cloud-pipelines.net/launchers/generic/resources.memory: 512Mi + outputValues: + HTTP Response: + taskOutput: + outputName: echoed + taskId: Show Response diff --git a/public/example-pipelines/Intro-Subgraphs.pipeline.component.png b/public/example-pipelines/Intro-Subgraphs.pipeline.component.png new file mode 100644 index 000000000..4c5fd5175 Binary files /dev/null and b/public/example-pipelines/Intro-Subgraphs.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Subgraphs.pipeline.component.yaml b/public/example-pipelines/Intro-Subgraphs.pipeline.component.yaml new file mode 100644 index 000000000..faf857c31 --- /dev/null +++ b/public/example-pipelines/Intro-Subgraphs.pipeline.component.yaml @@ -0,0 +1,1036 @@ +name: 'Intro: Subgraphs' +description: | + Organising work with subgraphs (nested pipelines). Three top-level tasks are themselves subgraphs — each with its own internal task graph, inputs, and outputs. Data Preparation generates and splits the data; Training fits a linear model; Evaluation predicts and scores. Double-click a subgraph in the UI to see inside it. +metadata: + annotations: + flex-nodes: '[{"id":"note-sg","properties":{"title":"Subgraphs","content":"Each node here is actually a subgraph: a task whose componentRef.spec has its own implementation.graph nested inside. They let you hide complexity, so the top level stays readable while each subgraph deals with its own wiring.\n\nDouble-click any subgraph to look inside.","color":"#F3E5F5"},"metadata":{"createdAt":"2026-05-19T00:00:00.000Z","createdBy":"tangle-examples"},"size":{"width":320,"height":140},"position":{"x":150,"y":-140},"zIndex":0,"readOnly":false,"highlighted":false}]' + editor.flow-direction: left-to-right +implementation: + graph: + tasks: + Training: + componentRef: + name: Training + digest: bb9f710b9685cc5e839510b94edb30cf5f5ba5b864743afa0292bcfbb821b792 + spec: + name: Training + description: Train a linear regression model + inputs: + - name: training_data + type: CSV + annotations: + editor.position: '{"x":0,"y":0}' + - name: target_column + type: String + default: target + annotations: + editor.position: '{"x":47,"y":176}' + outputs: + - name: model + type: JSON + annotations: + editor.position: '{"x":681,"y":76}' + implementation: + graph: + tasks: + Fit Model: + componentRef: + name: Train regression + digest: e4292a5974ba0c989f95fff77d993e75eb9c6b26ebe23d8df775f804d22309f0 + spec: + name: Train regression + description: |- + Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: train_regression.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def train_regression( + training_data: components.InputPath("CSV"), + model: components.OutputPath("JSON"), + target_column: str = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + python_original_code_path: train_regression.py + components new regenerate python-function-component: 'true' + inputs: + - name: training_data + type: CSV + description: Input CSV with feature columns and a target column. + - name: target_column + type: String + description: Name of the column to predict. + default: target + optional: true + outputs: + - name: model + type: JSON + description: Output JSON file with trained model parameters. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def train_regression( + training_data, + model, + target_column = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + + import argparse + _parser = argparse.ArgumentParser(prog='Train regression', description='Train a simple linear regression model using ordinary least squares.\n\nFits weights and bias to minimise squared error. Uses only Python stdlib\n(no numpy/sklearn). The trained model is saved as a JSON file containing\nthe weight vector, bias, feature names, and training metrics.') + _parser.add_argument("--training-data", dest="training_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--target-column", dest="target_column", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = train_regression(**_parsed_args) + args: + - '--training-data' + - inputPath: training_data + - if: + cond: + isPresent: target_column + then: + - '--target-column' + - inputValue: target_column + - '--model' + - outputPath: model + arguments: + target_column: + graphInput: + inputName: target_column + training_data: + graphInput: + inputName: training_data + annotations: + editor.position: '{"x":301,"y":55.5}' + outputValues: + model: + taskOutput: + outputName: model + taskId: Fit Model + arguments: + training_data: + taskOutput: + outputName: train_data + taskId: Data Preparation + annotations: + editor.position: '{"x": 500, "y": 0}' + Evaluation: + componentRef: + name: Evaluation + digest: 3e29013959318f296132514881cba7efaf948bc0618098527f6c086c3f24c66f + spec: + name: Evaluation + description: Predict on test data and compute regression metrics + inputs: + - name: test_data + type: CSV + annotations: + editor.position: '{"x":0,"y":25.25}' + - name: model + type: JSON + annotations: + editor.position: '{"x":0,"y":201.25}' + outputs: + - name: predictions + type: CSV + annotations: + editor.position: '{"x":749.5,"y":0}' + - name: metrics + type: JSON + annotations: + editor.position: '{"x":1061,"y":202.5}' + implementation: + graph: + tasks: + Predict: + componentRef: + name: Predict + digest: 4841c31fc75f2d26a5a7d3123d6fa6fc6b43d8badd549ad3ac3e20119860938d + spec: + name: Predict + description: |- + Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: predict.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def predict( + test_data: components.InputPath("CSV"), + model: components.InputPath("JSON"), + predictions: components.OutputPath("CSV"), + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + python_original_code_path: predict.py + components new regenerate python-function-component: 'true' + inputs: + - name: test_data + type: CSV + description: Input CSV with the same feature columns used in training. + - name: model + type: JSON + description: Trained model JSON (from train_regression). + outputs: + - name: predictions + type: CSV + description: Output CSV with actual and predicted values. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def predict( + test_data, + model, + predictions, + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + + import argparse + _parser = argparse.ArgumentParser(prog='Predict', description='Apply a trained linear regression model to produce predictions.\n\nReads the model JSON (weights + bias) and the test CSV, computes\npredicted values, and writes a CSV with columns: actual, predicted.') + _parser.add_argument("--test-data", dest="test_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--predictions", dest="predictions", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = predict(**_parsed_args) + args: + - '--test-data' + - inputPath: test_data + - '--model' + - inputPath: model + - '--predictions' + - outputPath: predictions + arguments: + model: + graphInput: + inputName: model + test_data: + graphInput: + inputName: test_data + annotations: + editor.position: '{"x":301,"y":86.75}' + Compute Metrics: + componentRef: + name: Evaluate + digest: c26e9e058d298c1c57dd96e15ea4261b99439a53fa9b323db4e9ef783933954c + spec: + name: Evaluate + description: |- + Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: evaluate.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + import math + + + def evaluate( + predictions: components.InputPath("CSV"), + metrics: components.OutputPath("JSON"), + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + python_original_code_path: evaluate.py + components new regenerate python-function-component: 'true' + inputs: + - name: predictions + type: CSV + description: Input CSV with actual and predicted columns. + outputs: + - name: metrics + type: JSON + description: Output JSON with computed regression metrics. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + import math + + def evaluate( + predictions, + metrics, + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + + import argparse + _parser = argparse.ArgumentParser(prog='Evaluate', description='Compute regression metrics from a predictions CSV.\n\nExpects columns: actual, predicted. Outputs a JSON file with\nMAE, MSE, RMSE, R-squared, and row count.') + _parser.add_argument("--predictions", dest="predictions", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--metrics", dest="metrics", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = evaluate(**_parsed_args) + args: + - '--predictions' + - inputPath: predictions + - '--metrics' + - outputPath: metrics + arguments: + predictions: + taskOutput: + outputName: predictions + taskId: Predict + annotations: + editor.position: '{"x":681,"y":200}' + outputValues: + metrics: + taskOutput: + outputName: metrics + taskId: Compute Metrics + predictions: + taskOutput: + outputName: predictions + taskId: Predict + arguments: + model: + taskOutput: + outputName: model + taskId: Training + test_data: + taskOutput: + outputName: test_data + taskId: Data Preparation + annotations: + editor.position: '{"x": 500, "y": 300}' + Data Preparation: + componentRef: + name: Data Preparation + digest: 653c0dddb09515926be064acab5ae1fc9ad0f25f4b9376882532740b91ca0658 + spec: + name: Data Preparation + description: Generate synthetic data and split into train/test + inputs: + - name: num_rows + type: Integer + default: '500' + annotations: + editor.position: '{"x":0,"y":32.5}' + - name: train_fraction + type: Float + default: '0.8' + annotations: + editor.position: '{"x":288,"y":241}' + outputs: + - name: train_data + type: CSV + annotations: + editor.position: '{"x":954,"y":24.75}' + - name: test_data + type: CSV + annotations: + editor.position: '{"x":958,"y":224.75}' + implementation: + graph: + tasks: + Split: + componentRef: + name: Split csv + digest: 7dbbe3ac41f4e820f0d168ef355ada703716f4593eb5e70664746eebe0fe79e7 + spec: + name: Split csv + description: |- + Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: split_csv.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import random + + + def split_csv( + input_data: components.InputPath("CSV"), + train_data: components.OutputPath("CSV"), + test_data: components.OutputPath("CSV"), + train_fraction: float = 0.8, + random_seed: int = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + python_original_code_path: split_csv.py + components new regenerate python-function-component: 'true' + inputs: + - name: input_data + type: CSV + description: Input CSV file. + - name: train_fraction + type: Float + description: Fraction of rows for training (0.0 to 1.0). + default: '0.8' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducible shuffling. + default: '42' + optional: true + outputs: + - name: train_data + type: CSV + description: Output CSV for the training split. + - name: test_data + type: CSV + description: Output CSV for the test split. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import random + + def split_csv( + input_data, + train_data, + test_data, + train_fraction = 0.8, + random_seed = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + + import argparse + _parser = argparse.ArgumentParser(prog='Split csv', description='Split a CSV dataset into train and test sets.\n\nRandomly shuffles rows, then splits by the given fraction.\nBoth output files keep the same header row.') + _parser.add_argument("--input-data", dest="input_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--train-fraction", dest="train_fraction", type=float, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--train-data", dest="train_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--test-data", dest="test_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = split_csv(**_parsed_args) + args: + - '--input-data' + - inputPath: input_data + - if: + cond: + isPresent: train_fraction + then: + - '--train-fraction' + - inputValue: train_fraction + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--train-data' + - outputPath: train_data + - '--test-data' + - outputPath: test_data + arguments: + input_data: + taskOutput: + outputName: output_data + taskId: Generate + train_fraction: + graphInput: + inputName: train_fraction + annotations: + editor.position: '{"x":574,"y":68.25}' + Generate: + componentRef: + name: Generate dataset + digest: 7f837011088acc8e081f5f2ae5c981cc3bb73ed28bf4b2aea3134bc5297e1674 + spec: + name: Generate dataset + description: |- + Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: generate_dataset.component.yaml + python_original_code: | + from cloud_pipelines import components + import random + import csv + import math + + + def generate_dataset( + output_data: components.OutputPath("CSV"), + num_rows: int = 500, + random_seed: int = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + python_original_code_path: generate_dataset.py + components new regenerate python-function-component: 'true' + inputs: + - name: num_rows + type: Integer + description: Number of rows to generate. + default: '500' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducibility. + default: '42' + optional: true + outputs: + - name: output_data + type: CSV + description: Output CSV file path. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import random + import csv + import math + + def generate_dataset( + output_data, + num_rows = 500, + random_seed = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + + import argparse + _parser = argparse.ArgumentParser(prog='Generate dataset', description='Generate a synthetic regression dataset with 4 features and a target.\n\nCreates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target.\nThe target is a noisy linear combination of the features, suitable for\ndemonstrating regression workflows.') + _parser.add_argument("--num-rows", dest="num_rows", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = generate_dataset(**_parsed_args) + args: + - if: + cond: + isPresent: num_rows + then: + - '--num-rows' + - inputValue: num_rows + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--output-data' + - outputPath: output_data + arguments: + num_rows: + graphInput: + inputName: num_rows + annotations: + editor.position: '{"x":194,"y":0}' + outputValues: + test_data: + taskOutput: + outputName: test_data + taskId: Split + train_data: + taskOutput: + outputName: train_data + taskId: Split + annotations: + editor.position: '{"x": 0, "y": 100}' diff --git a/src/components/Home/PipelineSection/PipelineSection.tsx b/src/components/Home/PipelineSection/PipelineSection.tsx index 6f97480c9..96b2b5608 100644 --- a/src/components/Home/PipelineSection/PipelineSection.tsx +++ b/src/components/Home/PipelineSection/PipelineSection.tsx @@ -1,10 +1,10 @@ import { Link } from "@tanstack/react-router"; import { useEffect, useState } from "react"; +import { ExamplePipelines } from "@/components/Learn/ExamplePipelines"; import { LoadingScreen } from "@/components/shared/LoadingScreen"; import NewPipelineButton from "@/components/shared/NewPipelineButton"; import { PaginationControls } from "@/components/shared/PaginationControls"; -import QuickStartCards from "@/components/shared/QuickStart/QuickStartCards"; import { withSuspenseWrapper } from "@/components/shared/SuspenseWrapper"; import { Button } from "@/components/ui/button"; import { Checkbox } from "@/components/ui/checkbox"; @@ -21,7 +21,7 @@ import { } from "@/components/ui/table"; import { Paragraph, Text } from "@/components/ui/typography"; import { usePagination } from "@/hooks/usePagination"; -import { QUICK_START_PATH } from "@/routes/router"; +import { APP_ROUTES } from "@/routes/router"; import { type ComponentFileEntry, getAllComponentFilesFromList, @@ -125,7 +125,7 @@ export const PipelineSection = withSuspenseWrapper( You don't have any pipelines yet. Get started with a template below. - + Or start from scratch with @@ -143,7 +143,7 @@ export const PipelineSection = withSuspenseWrapper( } + actions={} /> @@ -220,10 +220,10 @@ export const PipelineSection = withSuspenseWrapper( PipelineSectionSkeleton, ); -function QuickStartButton() { +function ExamplePipelineButton() { return (