diff --git a/.gitmodules b/.gitmodules index cfa2b86..e69de29 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +0,0 @@ -[submodule "examples/bmeg-dictionary"] - path = examples/bmeg-dictionary - url = https://github.com/bmeg/bmeg-dictionary.git diff --git a/README.md b/README.md index 9e79793..8074cfe 100644 --- a/README.md +++ b/README.md @@ -18,10 +18,9 @@ name: census_2010 params: census: - type: File + type: file default: ../data/census_2010_byzip.json date: - type: string default: "2010-01-01" schema: type: path @@ -37,6 +36,9 @@ outputs: json: path: census_data.ndjson +outputs: + + pipelines: transform: - from: censusData diff --git a/cmd/root.go b/cmd/root.go index d23161c..25e0bb2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -5,6 +5,7 @@ import ( "github.com/bmeg/sifter/cmd/inspect" "github.com/bmeg/sifter/cmd/run" + "github.com/bmeg/sifter/cmd/web" "github.com/spf13/cobra" ) @@ -18,6 +19,7 @@ var RootCmd = &cobra.Command{ func init() { RootCmd.AddCommand(run.Cmd) RootCmd.AddCommand(inspect.Cmd) + RootCmd.AddCommand(web.Cmd) } var genBashCompletionCmd = &cobra.Command{ diff --git a/cmd/run/main.go b/cmd/run/main.go index 8b3f016..cbab677 100644 --- a/cmd/run/main.go +++ b/cmd/run/main.go @@ -14,6 +14,8 @@ var outDir string = "" var paramsFile string = "" var verbose bool = false var cmdParams map[string]string +var captureDir string = "" +var captureLimit int = 10 // Cmd is the declaration of the command line var Cmd = &cobra.Command{ @@ -46,11 +48,11 @@ var Cmd = &cobra.Command{ } pb := playbook.Playbook{} playbook.ParseBytes(yaml, "./playbook.yaml", &pb) - if err := Execute(pb, "./", "./", outDir, params); err != nil { + if err := Execute(pb, "./", "./", outDir, params, captureDir, captureLimit); err != nil { return err } } else { - if err := ExecuteFile(playFile, "./", outDir, params); err != nil { + if err := ExecuteFile(playFile, "./", outDir, params, captureDir, captureLimit); err != nil { return err } } @@ -65,4 +67,7 @@ func init() { flags.BoolVarP(&verbose, "verbose", "v", verbose, "Verbose logging") flags.StringToStringVarP(&cmdParams, "param", "p", cmdParams, "Parameter variable") flags.StringVarP(¶msFile, "params-file", "f", paramsFile, "Parameter file") + flags.StringVarP(&captureDir, "capture-dir", "d", "", "Directory for capture files (default: None)") + flags.IntVarP(&captureLimit, "capture-limit", "l", 10, "Max records to capture per step (0 = unlimited)") + flags.StringVarP(&outDir, "output", "o", outDir, "Output directory for playbook results (default: current directory or value specified in playbook)") } diff --git a/cmd/run/run.go b/cmd/run/run.go index 42651b2..0c491e0 100644 --- a/cmd/run/run.go +++ b/cmd/run/run.go @@ -1,6 +1,7 @@ package run import ( + "fmt" "os" "path/filepath" @@ -9,7 +10,7 @@ import ( "github.com/bmeg/sifter/task" ) -func ExecuteFile(playFile string, workDir string, outDir string, inputs map[string]string) error { +func ExecuteFile(playFile string, workDir string, outDir string, inputs map[string]string, debugDir string, debugLimit int) error { logger.Info("Starting", "playFile", playFile) pb := playbook.Playbook{} if err := playbook.ParseFile(playFile, &pb); err != nil { @@ -19,10 +20,10 @@ func ExecuteFile(playFile string, workDir string, outDir string, inputs map[stri a, _ := filepath.Abs(playFile) baseDir := filepath.Dir(a) logger.Debug("parsed file", "baseDir", baseDir, "playbook", pb) - return Execute(pb, baseDir, workDir, outDir, inputs) + return Execute(pb, baseDir, workDir, outDir, inputs, debugDir, debugLimit) } -func Execute(pb playbook.Playbook, baseDir string, workDir string, outDir string, params map[string]string) error { +func Execute(pb playbook.Playbook, baseDir string, workDir string, outDir string, params map[string]string, debugDir string, debugLimit int) error { if outDir == "" { outDir = pb.GetDefaultOutDir() @@ -32,6 +33,32 @@ func Execute(pb playbook.Playbook, baseDir string, workDir string, outDir string os.MkdirAll(outDir, 0777) } + // Setup debug capture directory if enabled + // Enable if: user explicitly set dir, OR user changed limit from default + enableDebug := debugDir != "" || (debugLimit != 10) + if enableDebug { + if debugDir == "" { + debugDir = filepath.Join(workDir, "debug-capture") + } else if !filepath.IsAbs(debugDir) { + debugDir = filepath.Join(workDir, debugDir) + } + if info, err := os.Stat(debugDir); err != nil { + if os.IsNotExist(err) { + if mkErr := os.MkdirAll(debugDir, 0777); mkErr != nil { + logger.Error("Failed to create debug directory", "error", mkErr) + return mkErr + } + } else { + logger.Error("Failed to access debug directory", "path", debugDir, "error", err) + return err + } + } else if !info.IsDir() { + logger.Error("Debug path exists but is not a directory", "path", debugDir) + return fmt.Errorf("debug path %s exists but is not a directory", debugDir) + } + logger.Info("Debug capture enabled", "dir", debugDir, "limit", debugLimit) + } + nInputs, err := pb.PrepConfig(params, workDir) if err != nil { return err @@ -39,6 +66,6 @@ func Execute(pb playbook.Playbook, baseDir string, workDir string, outDir string logger.Debug("Running", "outDir", outDir) t := task.NewTask(pb.Name, baseDir, workDir, outDir, nInputs) - err = pb.Execute(t) + err = pb.ExecuteWithCapture(t, debugDir, debugLimit) return err } diff --git a/cmd/web/main.go b/cmd/web/main.go new file mode 100644 index 0000000..4a4e9b9 --- /dev/null +++ b/cmd/web/main.go @@ -0,0 +1,183 @@ +package web + +import ( + "embed" + "encoding/json" + "fmt" + "io/fs" + "log" + "net/http" + "os" + "path/filepath" + "sort" + "strings" + + "sigs.k8s.io/yaml" + + "github.com/spf13/cobra" +) + +//go:embed static/* +var staticFS embed.FS + +var playbookDir string +var siteDir string +var port string = "8081" + +// Cmd is the declaration of the command line +var Cmd = &cobra.Command{ + Use: "web + + + + +
+ +
+
+
+
+ + + + + \ No newline at end of file diff --git a/cmd/web/static/style.css b/cmd/web/static/style.css new file mode 100644 index 0000000..5312646 --- /dev/null +++ b/cmd/web/static/style.css @@ -0,0 +1,89 @@ +/* Premium dark theme with glassmorphism and smooth gradients */ +:root { + --font-family: 'Inter', sans-serif; + --bg-gradient: linear-gradient(135deg, #1e1e2f, #2a2a3d); + --sidebar-bg: rgba(255, 255, 255, 0.08); + --content-bg: rgba(255, 255, 255, 0.04); + --text-color: #e0e0e0; + --accent-color: #4f9bff; +} + +* { + box-sizing: border-box; + margin: 0; + padding: 0; +} + +body { + font-family: var(--font-family); + background: var(--bg-gradient); + color: var(--text-color); + height: 100vh; + display: flex; + align-items: center; + justify-content: center; +} + +.container { + display: flex; + width: 90vw; + height: 80vh; + backdrop-filter: blur(12px); + border-radius: 12px; + overflow: hidden; + box-shadow: 0 8px 32px rgba(0,0,0,0.6); +} + +.sidebar { + flex: 0 0 250px; + background: var(--sidebar-bg); + padding: 20px; + overflow-y: auto; +} + +.sidebar h1 { + font-size: 1.4rem; + margin-bottom: 1rem; + color: var(--accent-color); +} + +.sidebar ul { + list-style: none; +} + +.sidebar li { + margin: 8px 0; + cursor: pointer; + padding: 6px 10px; + border-radius: 6px; + transition: background 0.2s, transform 0.1s; +} + +.sidebar li:hover { + background: rgba(255,255,255,0.1); + transform: translateX(4px); +} + +.content { + flex: 1; + background: var(--content-bg); + padding: 20px; + overflow-y: auto; +} + +pre { + margin: 0; + height: 100%; + overflow: auto; +} + +code { + font-family: 'Source Code Pro', monospace; + font-size: 0.9rem; +} + +@media (max-width: 768px) { + .container { flex-direction: column; height: auto; } + .sidebar { flex: none; width: 100%; } + .content { flex: none; height: 60vh; } +} diff --git a/examples/Makefile b/examples/Makefile deleted file mode 100644 index b9128fd..0000000 --- a/examples/Makefile +++ /dev/null @@ -1,10 +0,0 @@ - -cbio-example: out/gene.table.table.gz - sifter run cbio.yaml --inputs tar=chol_tcga_pan_can_atlas_2018.tar.gz,geneTable=out/gene.table.table.gz - -out/gene.table.table.gz: - sifter run gene-table.yaml - - -chol_tcga_pan_can_atlas_2018.tar.gz: - curl -O http://download.cbioportal.org/chol_tcga_pan_can_atlas_2018.tar.gz diff --git a/examples/bmeg-dictionary b/examples/bmeg-dictionary deleted file mode 160000 index 6edc8d8..0000000 --- a/examples/bmeg-dictionary +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 6edc8d805d08f3230d63660a7cbb1166410c6377 diff --git a/examples/cbio.yaml b/examples/cbio.yaml deleted file mode 100644 index ea36b31..0000000 --- a/examples/cbio.yaml +++ /dev/null @@ -1,144 +0,0 @@ - -class: sifter -name: CBioPortal - -params: - tar: "" - geneTable: "" - schema: bmeg-dictionary/gdcdictionary/schemas - -inputs: - untar: - plugin: - commandLine: tar -xzf {{params.tar}} - - patientReader: - tableLoad: - input: data_clinical_patient.txt - - sampleReader: - tableLoad: - input: data_clinical_sample.txt - - rnaReader: - transposeLoad: - input: data_RNA_Seq_expression_median.txt - rowSkip: 1 - - mutationReader: - tableLoad: - input: data_mutations_extended.txt - -pipelines: - cases: - - from: patientReader - - project: - mapping: - id: "{{row.PATIENT_ID}}" - submitter_id: "{{row.PATIENT_ID}}" - type: "case" - experiments: - submitter_id: "TCGA" - - objectValidate: - title: Case - schema: "{{params.schema}}" - - emit: - name: case - - samples: - - from: sampleReader - - project: - mapping: - id: "{{row.SAMPLE_ID}}" - submitter_id: "{{row.SAMPLE_ID}}" - cases: - submitter_id: "{{row.PATIENT_ID}}" - type: "sample" - - objectValidate: - title: Sample - schema: "{{params.schema}}" - - emit: - name: sample - - aliquots: - - from: sampleReader - - project: - mapping: - id: "{{row.SAMPLE_ID}}-0000" - submitter_id: "{{row.SAMPLE_ID}}-0000" - samples: - submitter_id: "{{row.SAMPLE_ID}}" - type: "aliquot" - - objectValidate: - title: Aliquot - schema: "{{params.schema}}" - - emit: - name: aliquot - - gene_expression: - - from: rnaReader - - project: - mapping: - id: "gexp:{{row.Entrez_Gene_Id}}" - aliquot_id: "{{row.Entrez_Gene_Id}}-0000" - - lookup: - tsv: - input: "{{params.geneTable}}" - lookup: "{{row.Entrez_Gene_Id}}" - - map: - method: nodeMap - python: | - def nodeMap(x): - values = {} - for k, v in x.items(): - if k not in ["id", "aliquot_id", "Entrez_Gene_Id"]: - values[k] = v - return { - "id": x["id"], - "aliquot_id": x["aliquot_id"], - "metric": "OTHER", - "values": values - } - - emit: - name: gene_expression - - mutations: - - from: mutationReader - - map: - method: alleleID - python: | - import hashlib - def alleleID(row): - s = "GRCh37" + row["Chromosome"] + str(row["Start_Position"]) + str(row["End_Position"]) + row["Reference_Allele"] + row["Tumor_Seq_Allele1"] - row["allele_id"] = "Allele:" + hashlib.sha1(s.encode()).hexdigest() - return row - - project: - mapping: - aliquot: "{{row.Tumor_Sample_Barcode}}-0000" - ref: "{{row.Reference_Allele}}" - alt: "{{row.Tumor_Seq_Allele1}}" - ensembl_transcript: "{{row.Transcript_ID}}" - - objectValidate: - title: SomaticVariant - schema: "{{params.schema}}" - - emit: - name: somatic_variant - - alleles: - - from: mutationReader - - project: - mapping: - genome: "{{row.NCBI_Build}}" - chromosome: "{{row.Chromosome}}" - start: "{{row.Start_Position}}" - end: "{{row.End_Position}}" - strand: "{{row.Strand}}" - reference_bases: "{{row.Reference_Allele}}" - alternate_bases: "{{row.Tumor_Seq_Allele1}}" - hugo_symbol: "{{row.Hugo_Symbol}}" - effect: "{{row.Variant_Classification}}" - - objectValidate: - title: Allele - schema: "{{params.schema}}" - - emit: - name: allele diff --git a/examples/gdc-convert.yaml b/examples/gdc-convert.yaml deleted file mode 100644 index c5c16ee..0000000 --- a/examples/gdc-convert.yaml +++ /dev/null @@ -1,98 +0,0 @@ - - -class: sifter -name: GDCConvert - -params: - schema: - type: path - default: bmeg-dictionary/gdcdictionary/schemas - -inputs: - projects_scrape: - plugin: - commandLine: docker run --rm bmeg/sifter-gdc-scan /opt/gdc-scan.py projects - cases_scrape: - plugin: - commandLine: docker run --rm bmeg/sifter-gdc-scan /opt/gdc-scan.py cases - projects_data: - jsonLoad: - input: out.projects.json - cases_data: - jsonLoad: - input: out.case.json - -pipelines: - projects: - - from: projects_data - - project: - mapping: - code: "{{row.project_id}}" - programs: "{{row.program.name}}" - - objectValidate: - title: project - schema: "{{params.schema}}" - - emit: - name: project - - experiments: - - from: projects_data - - project: - mapping: - code: "{{row.project_id}}" - programs: "{{row.program.name}}" - submitter_id: "{{row.program.name}}" - projects: "{{row.project_id}}" - type: experiment - - objectValidate: - title: experiment - schema: "{{params.schema}}" - - emit: - name: experiment - - cases: - - from: cases_data - - project: - mapping: - studies: "{{row.project.project_id}}" - experiments: "exp:{{row.project.project_id}}" - type: case - - objectValidate: - title: case - schema: "{{params.schema}}" - - emit: - name: case - - samples: - - from: cases_data - - fieldProcess: - field: samples - - project: - mapping: - type: sample - id: "{{row.sample_id}}" - - objectValidate: - title: sample - schema: "{{params.schema}}" - - emit: - name: sample - - aliquots: - - from: cases_data - - fieldProcess: - field: samples - - fieldProcess: - field: portions - - fieldProcess: - field: analytes - - fieldProcess: - field: aliquots - - project: - mapping: - type: aliquot - id: "{{row.aliquot_id}}" - - objectValidate: - title: aliquot - schema: "{{params.schema}}" - - emit: - name: aliquot diff --git a/examples/gdc-scan-docker/Dockerfile b/examples/gdc-scan-docker/Dockerfile deleted file mode 100644 index 39236f7..0000000 --- a/examples/gdc-scan-docker/Dockerfile +++ /dev/null @@ -1,4 +0,0 @@ -FROM python:3.7 - -RUN pip install requests -ADD gdc-scan.py /opt/ diff --git a/examples/gdc-scan-docker/gdc-scan.py b/examples/gdc-scan-docker/gdc-scan.py deleted file mode 100755 index b9a4cf2..0000000 --- a/examples/gdc-scan-docker/gdc-scan.py +++ /dev/null @@ -1,134 +0,0 @@ -#!/usr/bin/env python - -import json -import logging -import os -import sys - -import requests - -URL_BASE = "https://api.gdc.cancer.gov/" - -client = requests - -def query_gdc(endpoint, params): - """ - query_gdc makes a query to the GDC API while handling common issues - like pagination, retries, etc. - - The return value is an iterator. - """ - # Copy input params to avoid modification. - params = dict(params) - page_size = 100 - params['size'] = page_size - # With a GET request, the filters parameter needs to be converted - # from a dictionary to JSON-formatted string - if 'filters' in params: - params['filters'] = json.dumps(params['filters']) - - # Iterate through all the pages. - while True: - try: - req = client.get(URL_BASE + endpoint, params=params) - data = req.json() - data = data['data'] - - hits = data.get("hits", []) - if len(hits) == 0: - return - - for hit in hits: - yield hit - - # Get the next page. - params['from'] = data['pagination']['from'] + page_size - except Exception as e: - logging.warning(str(e)) - logging.warning(json.dumps(params)) - raise - - -# The GDC API requires you to request that nested fields be expanded. -# https://docs.gdc.cancer.gov/API/Users_Guide/Appendix_A_Available_Fields/#cases-field-groups -# -# Note that (as of this writing) we are expanding most but -# not all possible fields. Mostly we're skipping "files" data. -expand_case_fields = ",".join(""" -demographic -diagnoses -diagnoses.treatments -exposures -family_histories -project -project.program -samples -samples.annotations -samples.portions -samples.portions.analytes -samples.portions.analytes.aliquots -samples.portions.analytes.aliquots.annotations -samples.portions.analytes.aliquots.center -samples.portions.analytes.annotations -samples.portions.annotations -samples.portions.center -samples.portions.slides -samples.portions.slides.annotations -summary -summary.data_categories -summary.experimental_strategies -tissue_source_site -type -""".strip().split()) - -# These are the fields we want to keep from the GDC Case (BMEG Case). -keep_case_fields = """ -diagnoses -demographic -disease_type -primary_site -summary -project -""".strip().split() - -expand_project_fields = ",".join(""" -dbgap_accession_number -disease_type -name -primary_site -project_id -released -state -program -summary -""".strip().split()) - - -def scrapeProjects(): - projectOut = open("out.projects.json", "w") - for row in query_gdc("projects", {"expand": expand_project_fields}): - projectOut.write(json.dumps(row)) - projectOut.write("\n") - projectOut.close() - - -def scrapeCases(): - # Crawl all cases, samples, aliquots to generate - # BMEG Cases, Samples, and Aliquots. - parameters={} - parameters['expand'] = expand_case_fields - case_gids = [] - caseOut = open("out.case.json", "w") - - for row in query_gdc("cases", parameters): - caseOut.write(json.dumps(row)) - caseOut.write("\n") - - caseOut.close() - - -if __name__ == "__main__": - if sys.argv[1] == "projects": - scrapeProjects() - if sys.argv[1] == "cases": - scrapeCases() diff --git a/examples/gene-table.yaml b/examples/gene-table.yaml deleted file mode 100644 index 5126308..0000000 --- a/examples/gene-table.yaml +++ /dev/null @@ -1,31 +0,0 @@ - -class: sifter -name: gene-table - -params: - geneTSV: ftp://ftp.ncbi.nih.gov/gene/DATA/gene2ensembl.gz - -inputs: - geneReader: - tableLoad: - input: "{{params.geneTSV}}" - columns: - - tax_id - - GeneID - - Ensembl_gene_identifier - - RNA_nucleotide_accession.version - - Ensembl_rna_identifier - - protein_accession.version - - Ensembl_protein_identifier - -pipelines: - transform: - - from: geneReader - - filter: - field: tax_id - match: "9606" - - tableWrite: - output: gene.table - columns: - - GeneID - - Ensembl_gene_identifier diff --git a/examples/genome.yaml b/examples/genome.yaml deleted file mode 100644 index 510ee81..0000000 --- a/examples/genome.yaml +++ /dev/null @@ -1,92 +0,0 @@ - -class: sifter -name: RefGenome - -params: - gtfPath: ftp://ftp.ensembl.org/pub/grch37/release-96/gff3/homo_sapiens/Homo_sapiens.GRCh37.87.gff3.gz - schema: ./bmeg-dictionary/gdcdictionary/schemas - -inputs: - gtfReader: - tableLoad: - input: "{{params.gtfPath}}" - columns: - - seqid - - source - - type - - start - - end - - score - - strand - - phase - - attributes - -pipelines: - transform: - - from: gtfReader - - fieldParse: - field: attributes - sep: ";" - - fieldType: - start: integer - end: integer - - exons: - - from: transform - - filter: - field: type - match: exon - - regexReplace: - field: Parent - regex: "^transcript:" - replace: "" - dst: transcript_id - - project: - mapping: - exon_id: "{{row.exon_id}}" - transcript_id: ["{{row.transcript_id}}"] - - reduce: - field: exon_id - method: merge - python: | - def merge(x, y): - x['transcript_id'] = x['transcript_id'] + y['transcript_id'] - return x - - objectValidate: - title: exon - schema: "{{params.schema}}" - - emit: - name: exon - - genes: - - from: transform - - filter: - field: type - match: gene - - project: - mapping: - gene_id: "{{row.gene_id}}" - - objectValidate: - title: gene - schema: "{{params.schema}}" - - emit: - name: gene - - transcripts: - - from: transform - - filter: - field: type - match: mRNA - - regexReplace: - field: Parent - regex: "^gene:" - replace: "" - dst: gene_id - - project: - mapping: - transcript_id: "{{row.transcript_id}}" - - objectValidate: - title: transcript - schema: "{{params.schema}}" - - emit: - name: transcript diff --git a/examples/hugo-ensembl.yaml b/examples/hugo-ensembl.yaml deleted file mode 100644 index 295f68d..0000000 --- a/examples/hugo-ensembl.yaml +++ /dev/null @@ -1,25 +0,0 @@ - -class: sifter -name: hugo-ensembl - -params: - hugoJSON: ftp://ftp.ebi.ac.uk/pub/databases/genenames/hgnc/json/locus_types/gene_with_protein_product.json - -inputs: - hugoReader: - jsonLoad: - input: "{{params.hugoJSON}}" - -pipelines: - transform: - - from: hugoReader - - fieldProcess: - field: response.docs - - filter: - field: ensembl_gene_id - check: exists - - tableWrite: - output: hugo-ensembl.table - columns: - - symbol - - ensembl_gene_id diff --git a/extractors/interface.go b/extractors/interface.go index 9655dc8..bc99ae0 100644 --- a/extractors/interface.go +++ b/extractors/interface.go @@ -44,6 +44,20 @@ func (ex *Extractor) Start(t task.RuntimeTask) (chan map[string]interface{}, err return nil, fmt.Errorf(("Extractor not defined")) } +func (ex *Extractor) GetType() reflect.Type { + v := reflect.ValueOf(ex).Elem() + for i := 0; i < v.NumField(); i++ { + f := v.Field(i) + x := f.Interface() + if _, ok := x.(Source); ok { + if !f.IsNil() { + return f.Type() + } + } + } + return nil +} + func (ex *Extractor) GetRequiredParams() []config.ParamRequest { out := []config.ParamRequest{} v := reflect.ValueOf(ex).Elem() diff --git a/playbook/execute.go b/playbook/execute.go index ed126ab..518084b 100644 --- a/playbook/execute.go +++ b/playbook/execute.go @@ -1,9 +1,14 @@ package playbook import ( + "encoding/json" "fmt" + "os" "path/filepath" "strings" + "sync" + "sync/atomic" + "time" "github.com/bmeg/flame" "github.com/bmeg/sifter/logger" @@ -119,7 +124,73 @@ type joinStruct struct { proc transform.JoinProcessor } +// stepCaptureState tracks debug capture state for a single step +type stepCaptureState struct { + pipelineName string + stepIndex int + stepType string + count uint64 + limit int + file *os.File + mu sync.Mutex +} + +// captureRecord writes a debug record to the capture file +func (s *stepCaptureState) captureRecord(record map[string]any) { + var recordNum uint64 + + for { + currentCount := atomic.LoadUint64(&s.count) + + // Enforce limit strictly under concurrency + if s.limit > 0 && currentCount >= uint64(s.limit) { + return + } + + next := currentCount + 1 + if atomic.CompareAndSwapUint64(&s.count, currentCount, next) { + recordNum = next + break + } + } + + envelope := map[string]any{ + "pipeline": s.pipelineName, + "step_index": s.stepIndex, + "step_type": s.stepType, + "record_num": recordNum, + "timestamp": time.Now().UTC().Format(time.RFC3339), + "data": record, + } + + s.mu.Lock() + defer s.mu.Unlock() + + if s.file != nil { + data, err := json.Marshal(envelope) + if err == nil { + if _, writeErr := s.file.Write(data); writeErr != nil { + logger.Error("Failed to write debug record data", "error", writeErr) + return + } + if _, writeErr := s.file.Write([]byte("\n")); writeErr != nil { + logger.Error("Failed to write debug record newline", "error", writeErr) + return + } + } else { + logger.Error("Failed to marshal debug record", "error", err) + } + } +} + +// Execute runs the playbook without debug capture. +// This maintains the original public API signature for backward compatibility. func (pb *Playbook) Execute(task task.RuntimeTask) error { + return pb.ExecuteWithCapture(task, "", 0) +} + +// ExecuteWithCapture runs the playbook with optional debug capture configuration. +func (pb *Playbook) ExecuteWithCapture(task task.RuntimeTask, captureDir string, captureLimit int) error { logger.Debug("Running playbook") logger.Debug("Inputs", "config", task.GetConfig()) @@ -130,6 +201,73 @@ func (pb *Playbook) Execute(task task.RuntimeTask) error { } task.SetName(pb.Name) + procs := []transform.Processor{} + joins := []joinStruct{} + captureFiles := []*os.File{} // Track all open capture files for cleanup + defer func() { + for _, f := range captureFiles { + if f != nil { + _ = f.Close() + } + } + }() + + // Helper function to sanitize filename components + sanitizeFilename := func(s string) string { + s = strings.ReplaceAll(s, "*", "") + s = strings.ReplaceAll(s, "/", "_") + s = strings.ReplaceAll(s, "\\", "_") + s = strings.ReplaceAll(s, "transform.", "") // Remove package prefix for readability + s = strings.ReplaceAll(s, "extractors.", "") // Remove package prefix for readability + return s + } + + // Helper function to sanitize pipeline names used in filenames + sanitizePipelineName := func(s string) string { + // Use only the last path element to avoid directory traversal + s = filepath.Base(s) + + // Treat empty, current-dir, parent-dir, or bare separator as invalid and use a default + if s == "" || s == "." || s == ".." || s == string(os.PathSeparator) { + s = "pipeline" + } + + // Replace any remaining path separators with underscores + s = strings.ReplaceAll(s, string(os.PathSeparator), "_") + s = strings.ReplaceAll(s, "/", "_") + s = strings.ReplaceAll(s, "\\", "_") + + return s + } + + // Helper function to create capture state for a step + createCaptureState := func(pipelineName string, stepIndex int, stepType string) *stepCaptureState { + if captureDir == "" { + return nil + } + + filename := fmt.Sprintf("%s.%d.%s.ndjson", sanitizePipelineName(pipelineName), stepIndex, sanitizeFilename(stepType)) + filePath := filepath.Join(captureDir, filename) + + file, err := os.Create(filePath) + if err != nil { + logger.Error("Failed to create debug capture file", "path", filePath, "error", err) + return nil + } + + captureFiles = append(captureFiles, file) + logger.Debug("Created debug capture file", "path", filePath) + + return &stepCaptureState{ + pipelineName: pipelineName, + stepIndex: stepIndex, + stepType: stepType, + count: 0, + limit: captureLimit, + file: file, + } + } + outNodes := map[string]flame.Emitter[map[string]any]{} inNodes := map[string]flame.Receiver[map[string]any]{} outputs := map[string]OutputProcessor{} @@ -138,21 +276,30 @@ func (pb *Playbook) Execute(task task.RuntimeTask) error { logger.Debug("Setting Up", "name", n) s, err := v.Start(task) if err == nil { - c := flame.AddSourceChan(wf, s) - outNodes[n] = c + sourceNode := flame.AddSourceChan(wf, s) + + captureState := createCaptureState(n, 0, v.GetType().String()) + if captureState != nil { + captureMapper := flame.AddMapper(wf, func(record map[string]any) map[string]any { + captureState.captureRecord(record) + return record + }) + captureMapper.Connect(sourceNode) + outNodes[n] = captureMapper + } else { + outNodes[n] = sourceNode + } } else { logger.Error("Source error", "error", err) return err } } - procs := []transform.Processor{} - joins := []joinStruct{} - for k, v := range pb.Pipelines { var lastStep flame.Emitter[map[string]any] var firstStep flame.Receiver[map[string]any] for i, s := range v { + b, err := s.Init(task) if err != nil { logger.Error("Pipeline error", "name", k, "error", err) @@ -163,7 +310,25 @@ func (pb *Playbook) Execute(task task.RuntimeTask) error { if mProcess, ok := b.(transform.NodeProcessor); ok { logger.Debug("PipelineSetup", "name", k, "step", i, "processor", fmt.Sprintf("%T", mProcess)) - c := flame.AddFlatMapper(wf, mProcess.Process) + + // Create capture state for this step + captureState := createCaptureState(k, i, fmt.Sprintf("%T", mProcess)) + + // Wrap the process function if capture is enabled + var processFunc func(map[string]any) []map[string]any + if captureState != nil { + processFunc = func(record map[string]any) []map[string]any { + out := mProcess.Process(record) + for _, r := range out { + captureState.captureRecord(r) + } + return out + } + } else { + processFunc = mProcess.Process + } + + c := flame.AddFlatMapper(wf, processFunc) if lastStep != nil { c.Connect(lastStep) } @@ -178,12 +343,28 @@ func (pb *Playbook) Execute(task task.RuntimeTask) error { } } else if mProcess, ok := b.(transform.MapProcessor); ok { logger.Debug("Pipeline Pool", "name", k, "step", i, "processor", b) + + // Create capture state for this step + captureState := createCaptureState(k, i, fmt.Sprintf("%T", mProcess)) + + // Wrap the process function if capture is enabled + var processFunc func(map[string]any) map[string]any + if captureState != nil { + processFunc = func(record map[string]any) map[string]any { + out := mProcess.Process(record) + captureState.captureRecord(out) + return out + } + } else { + processFunc = mProcess.Process + } + var c flame.Node[map[string]any, map[string]any] if mProcess.PoolReady() { logger.Debug("Starting pool worker") - c = flame.AddMapperPool(wf, mProcess.Process, 4) // TODO: config pool count + c = flame.AddMapperPool(wf, processFunc, 4) // TODO: config pool count } else { - c = flame.AddMapper(wf, mProcess.Process) + c = flame.AddMapper(wf, processFunc) } if lastStep != nil { c.Connect(lastStep) @@ -199,12 +380,30 @@ func (pb *Playbook) Execute(task task.RuntimeTask) error { } } else if mProcess, ok := b.(transform.FlatMapProcessor); ok { logger.Debug("Pipeline flatmap", "name", k, "step", i, "processor", b) + + // Create capture state for this step + captureState := createCaptureState(k, i, fmt.Sprintf("%T", mProcess)) + + // Wrap the process function if capture is enabled + var processFunc func(map[string]any) []map[string]any + if captureState != nil { + processFunc = func(record map[string]any) []map[string]any { + out := mProcess.Process(record) + for _, r := range out { + captureState.captureRecord(r) + } + return out + } + } else { + processFunc = mProcess.Process + } + var c flame.Node[map[string]any, map[string]any] if mProcess.PoolReady() { // log.Printf("Starting pool worker") - c = flame.AddFlatMapperPool(wf, mProcess.Process, 4) // TODO: config pool count + c = flame.AddFlatMapperPool(wf, processFunc, 4) // TODO: config pool count } else { - c = flame.AddFlatMapper(wf, mProcess.Process) + c = flame.AddFlatMapper(wf, processFunc) } if lastStep != nil { c.Connect(lastStep) @@ -220,6 +419,8 @@ func (pb *Playbook) Execute(task task.RuntimeTask) error { } } else if mProcess, ok := b.(transform.StreamProcessor); ok { logger.Info("Pipeline stream %s step %d: %T", k, i, b) + // Note: StreamProcessor uses channels, not suitable for simple record capture + // Would need to wrap the entire channel processing, which is complex c := flame.AddStreamer(wf, mProcess.Process) if c != nil { if lastStep != nil { @@ -235,6 +436,8 @@ func (pb *Playbook) Execute(task task.RuntimeTask) error { } } else if jProcess, ok := b.(transform.JoinProcessor); ok { logger.Debug("Pipeline Join Step") + // Note: JoinProcessor uses channels, not suitable for simple record capture + // Would need to wrap the entire channel processing, which is complex c := flame.AddJoin(wf, jProcess.Process) if c != nil { if lastStep != nil { @@ -253,9 +456,38 @@ func (pb *Playbook) Execute(task task.RuntimeTask) error { } } else if rProcess, ok := b.(transform.ReduceProcessor); ok { logger.Debug("Pipeline reduce %s step %d: %T", k, i, b) + + // Create capture state for this step (pre-reduce) + captureStateInput := createCaptureState(k, i, fmt.Sprintf("%T-input", rProcess)) + captureStateOutput := createCaptureState(k, i, fmt.Sprintf("%T-output", rProcess)) + wrap := reduceWrapper{rProcess} - k := flame.AddMapper(wf, wrap.addKeyValue) - r := flame.AddReduceKey(wf, rProcess.Reduce, rProcess.GetInit()) + + // Wrap addKeyValue if capturing input + var addKeyValueFunc func(map[string]any) flame.KeyValue[string, map[string]any] + if captureStateInput != nil { + addKeyValueFunc = func(x map[string]any) flame.KeyValue[string, map[string]any] { + captureStateInput.captureRecord(x) + return wrap.addKeyValue(x) + } + } else { + addKeyValueFunc = wrap.addKeyValue + } + + // Wrap reduce function if capturing output + var reduceFunc func(string, map[string]any, map[string]any) map[string]any + if captureStateOutput != nil { + reduceFunc = func(key string, acc map[string]any, val map[string]any) map[string]any { + result := rProcess.Reduce(key, acc, val) + captureStateOutput.captureRecord(result) + return result + } + } else { + reduceFunc = rProcess.Reduce + } + + k := flame.AddMapper(wf, addKeyValueFunc) + r := flame.AddReduceKey(wf, reduceFunc, rProcess.GetInit()) c := flame.AddFlatMapper(wf, wrap.removeKeyValue) if lastStep != nil { k.Connect(lastStep) @@ -269,9 +501,37 @@ func (pb *Playbook) Execute(task task.RuntimeTask) error { } else if rProcess, ok := b.(transform.AccumulateProcessor); ok { logger.Debug("Pipeline accumulate %s step %d: %T", k, i, b) + // Create capture state for this step + captureStateInput := createCaptureState(k, i, fmt.Sprintf("%T-input", rProcess)) + captureStateOutput := createCaptureState(k, i, fmt.Sprintf("%T-output", rProcess)) + wrap := accumulateWrapper{rProcess} - k := flame.AddMapper(wf, wrap.addKeyValue) - r := flame.AddAccumulate(wf, rProcess.Accumulate) + + // Wrap addKeyValue if capturing input + var addKeyValueFunc func(map[string]any) flame.KeyValue[string, map[string]any] + if captureStateInput != nil { + addKeyValueFunc = func(x map[string]any) flame.KeyValue[string, map[string]any] { + captureStateInput.captureRecord(x) + return wrap.addKeyValue(x) + } + } else { + addKeyValueFunc = wrap.addKeyValue + } + + // Wrap accumulate function if capturing output + var accumulateFunc func(string, []map[string]any) map[string]any + if captureStateOutput != nil { + accumulateFunc = func(key string, vals []map[string]any) map[string]any { + result := rProcess.Accumulate(key, vals) + captureStateOutput.captureRecord(result) + return result + } + } else { + accumulateFunc = rProcess.Accumulate + } + + k := flame.AddMapper(wf, addKeyValueFunc) + r := flame.AddAccumulate(wf, accumulateFunc) c := flame.AddFlatMapper(wf, wrap.removeKeyValue) if lastStep != nil { k.Connect(lastStep) diff --git a/sifter-view/.gitignore b/sifter-view/.gitignore new file mode 100644 index 0000000..5ef6a52 --- /dev/null +++ b/sifter-view/.gitignore @@ -0,0 +1,41 @@ +# See https://help.github.com/articles/ignoring-files/ for more about ignoring files. + +# dependencies +/node_modules +/.pnp +.pnp.* +.yarn/* +!.yarn/patches +!.yarn/plugins +!.yarn/releases +!.yarn/versions + +# testing +/coverage + +# next.js +/.next/ +/out/ + +# production +/build + +# misc +.DS_Store +*.pem + +# debug +npm-debug.log* +yarn-debug.log* +yarn-error.log* +.pnpm-debug.log* + +# env files (can opt-in for committing if needed) +.env* + +# vercel +.vercel + +# typescript +*.tsbuildinfo +next-env.d.ts diff --git a/sifter-view/README.md b/sifter-view/README.md new file mode 100644 index 0000000..e215bc4 --- /dev/null +++ b/sifter-view/README.md @@ -0,0 +1,36 @@ +This is a [Next.js](https://nextjs.org) project bootstrapped with [`create-next-app`](https://nextjs.org/docs/app/api-reference/cli/create-next-app). + +## Getting Started + +First, run the development server: + +```bash +npm run dev +# or +yarn dev +# or +pnpm dev +# or +bun dev +``` + +Open [http://localhost:3000](http://localhost:3000) with your browser to see the result. + +You can start editing the page by modifying `app/page.tsx`. The page auto-updates as you edit the file. + +This project uses [`next/font`](https://nextjs.org/docs/app/building-your-application/optimizing/fonts) to automatically optimize and load [Geist](https://vercel.com/font), a new font family for Vercel. + +## Learn More + +To learn more about Next.js, take a look at the following resources: + +- [Next.js Documentation](https://nextjs.org/docs) - learn about Next.js features and API. +- [Learn Next.js](https://nextjs.org/learn) - an interactive Next.js tutorial. + +You can check out [the Next.js GitHub repository](https://github.com/vercel/next.js) - your feedback and contributions are welcome! + +## Deploy on Vercel + +The easiest way to deploy your Next.js app is to use the [Vercel Platform](https://vercel.com/new?utm_medium=default-template&filter=next.js&utm_source=create-next-app&utm_campaign=create-next-app-readme) from the creators of Next.js. + +Check out our [Next.js deployment documentation](https://nextjs.org/docs/app/building-your-application/deploying) for more details. diff --git a/sifter-view/app/favicon.ico b/sifter-view/app/favicon.ico new file mode 100644 index 0000000..718d6fe Binary files /dev/null and b/sifter-view/app/favicon.ico differ diff --git a/sifter-view/app/globals.css b/sifter-view/app/globals.css new file mode 100644 index 0000000..2e8d4fd --- /dev/null +++ b/sifter-view/app/globals.css @@ -0,0 +1,38 @@ +@import "tailwindcss"; + +:root { + --background: #ffffff; + --foreground: #171717; +} + +@theme inline { + --color-background: var(--background); + --color-foreground: var(--foreground); + --font-sans: var(--font-geist-sans); + --font-mono: var(--font-geist-mono); +} + +@media (prefers-color-scheme: dark) { + :root { + --background: #0a0a0a; + --foreground: #ededed; + } +} + +body { + background: var(--background); + color: var(--foreground); + font-family: Arial, Helvetica, sans-serif; +} + +.react-flow { + --xy-node-color: #000; + --xy-node-color-default: #000; +} + +.react-flow__node-input, +.react-flow__node-default, +.react-flow__node-output, +.react-flow__node-group { + color: #000; +} diff --git a/sifter-view/app/layout.tsx b/sifter-view/app/layout.tsx new file mode 100644 index 0000000..8b8b564 --- /dev/null +++ b/sifter-view/app/layout.tsx @@ -0,0 +1,36 @@ +import type { Metadata } from "next"; +import { Geist, Geist_Mono } from "next/font/google"; +import { MantineProvider } from "@mantine/core"; +import "@mantine/core/styles.css"; +import "./globals.css"; + +const geistSans = Geist({ + variable: "--font-geist-sans", + subsets: ["latin"], +}); + +const geistMono = Geist_Mono({ + variable: "--font-geist-mono", + subsets: ["latin"], +}); + +export const metadata: Metadata = { + title: "Create Next App", + description: "Generated by create next app", +}; + +export default function RootLayout({ + children, +}: Readonly<{ + children: React.ReactNode; +}>) { + return ( + + + {children} + + + ); +} diff --git a/sifter-view/app/page.tsx b/sifter-view/app/page.tsx new file mode 100644 index 0000000..be317a4 --- /dev/null +++ b/sifter-view/app/page.tsx @@ -0,0 +1,16 @@ + +'use client' + +import PlaybookViewer from "@/components/PlaybookViewer"; +import PlaybookFlow from "@/components/PlaybookFlow" +import Image from "next/image"; + +export default function Home() { + return ( +
+
+ +
+
+ ); +} diff --git a/sifter-view/components/PlaybookFlow.tsx b/sifter-view/components/PlaybookFlow.tsx new file mode 100644 index 0000000..8311da2 --- /dev/null +++ b/sifter-view/components/PlaybookFlow.tsx @@ -0,0 +1,387 @@ +'use client'; + +import React, { memo, useCallback, useEffect, useMemo, useState } from 'react'; + +import { + ReactFlow, + useNodesState, + useEdgesState, + addEdge, + Background, + Controls, + Handle, + Position, + NodeProps, + OnConnect, + Connection +} from "@xyflow/react"; + +// Import the necessary styles +import "@xyflow/react/dist/style.css"; + +import dagre from 'dagre'; +import type { Node, Edge } from '@xyflow/react'; +import { getFiles, getPlaybook, type Playbook, type PlaybookFileNode } from '@/lib/playbookApi'; +import { getStepCellComponent, STEP_OPERATIONS } from './playbook-steps/registry'; +import PlaybookInspectorPanel from './PlaybookInspectorPanel'; +import type { PipelineStep } from './playbook-steps/types'; +import { usePlaybookPipelineEditor } from './usePlaybookPipelineEditor'; + + +type PipelineNodeData = { + label: string; + steps: PipelineStep[]; +}; + +const BASE_NODE_WIDTH = 170; +const BASE_NODE_HEIGHT = 44; +const PIPELINE_NODE_WIDTH = 260; +const PIPELINE_CELL_HEIGHT = 34; +const PIPELINE_HEADER_HEIGHT = 36; + +const PipelineStackNode = memo(function PipelineStackNode({ data }: NodeProps) { + const typedData = data as PipelineNodeData; + const steps = typedData?.steps ?? []; + + return ( +
+ + +
+ {typedData.label} +
+ {steps.map((step: PipelineStep, index: number) => { + const CellComponent = getStepCellComponent(step.operation); + return ( + + ); + })} +
+ ); +}); + +// ------------------------------------------------------------------- +// Build a React‑Flow graph from a Playbook, using Dagre for layout +// ------------------------------------------------------------------- + +function getOutputSourcePipeline(outputDefinition: Record): string | undefined { + for (const value of Object.values(outputDefinition)) { + if (value && typeof value === 'object' && typeof value.from === 'string') { + return value.from; + } + } + return undefined; +} + +function buildGraph(pb: Playbook | null): { nodes: Node[]; edges: Edge[] } { + if (!pb || !pb.inputs || !pb.pipelines) { + return { nodes: [], edges: [] }; + } + + // Create a directed Dagre graph + const g = new dagre.graphlib.Graph({ directed: true }); + g.setGraph({ rankdir: 'LR', nodesep: 60, ranksep: 100 }); + + g.setDefaultEdgeLabel(function() { return {}; }); + + // ---- INPUT NODES --------------------------------------------------- + Object.keys(pb.inputs).forEach((name) => { + const id = `input-${name}`; + g.setNode(id, { + label: name, + width: BASE_NODE_WIDTH, + height: BASE_NODE_HEIGHT, + nodeType: 'default', + nodeData: { label: `INPUT · ${name}` }, + }); + }); + + // ---- OUTPUT NODES --------------------------------------------------- + if (pb.outputs) { + Object.keys(pb.outputs).forEach((name) => { + const id = `output-${name}`; + g.setNode(id, { + label: name, + width: BASE_NODE_WIDTH, + height: BASE_NODE_HEIGHT, + nodeType: 'default', + nodeData: { label: `OUTPUT · ${name}` }, + }); + }); + } + + // ---- PIPELINE NODES -------------------------------------------------- + Object.entries(pb.pipelines).forEach(([pipelineName, steps]) => { + const stepData: PipelineStep[] = steps.map((stepObj) => { + const [operation, config] = Object.entries(stepObj)[0] ?? ['unknown', undefined]; + return { operation, config }; + }); + const nodeHeight = PIPELINE_HEADER_HEIGHT + Math.max(1, stepData.length) * PIPELINE_CELL_HEIGHT; + + g.setNode(pipelineName, { + label: pipelineName, + width: PIPELINE_NODE_WIDTH, + height: nodeHeight, + nodeType: 'pipeline', + nodeData: { label: pipelineName, steps: stepData }, + }); + + const firstStep = steps[0] as any; + const fromName = firstStep?.from; + if (fromName) { + if (pb.inputs?.[fromName]) { + g.setEdge(`input-${fromName}`, pipelineName); + } else if (pb.pipelines?.[fromName]) { + g.setEdge(fromName, pipelineName); + } + } + }); + + if (pb.outputs) { + Object.entries(pb.outputs).forEach(([outputName, outputDefinition]) => { + const sourcePipeline = getOutputSourcePipeline(outputDefinition as Record); + if (sourcePipeline && pb.pipelines?.[sourcePipeline]) { + g.setEdge(sourcePipeline, `output-${outputName}`); + } + }); + } + + // Run the layout algorithm + dagre.layout(g); + + // Convert Dagre nodes/edges to React‑Flow structures + const nodes: Node[] = g.nodes().map((id) => { + const nodeDefinition = g.node(id) as { + x: number; + y: number; + width: number; + height: number; + label?: string; + nodeType?: string; + nodeData?: Record; + }; + + const { x, y, label, width, height, nodeType, nodeData } = nodeDefinition; + const isInputNode = id.startsWith('input-'); + const isOutputNode = id.startsWith('output-'); + + return { + id, + position: { x: x - width / 2, y: y - height / 2 }, + data: nodeData ?? { label }, + type: nodeType ?? 'default', + targetPosition: Position.Left, + sourcePosition: Position.Right, + style: nodeType === 'pipeline' + ? undefined + : { + width: BASE_NODE_WIDTH, + height: BASE_NODE_HEIGHT, + color: '#000', + fontWeight: 600, + borderWidth: 2, + borderStyle: 'solid', + borderColor: isInputNode ? '#3b82f6' : isOutputNode ? '#16a34a' : '#999', + backgroundColor: isInputNode ? '#eff6ff' : isOutputNode ? '#f0fdf4' : '#fff', + }, + } as Node; + }); + + const edges: Edge[] = g.edges().map((e) => ({ + id: `e-${e.v}-${e.w}`, + source: e.v, + target: e.w, + animated: true, + style: { + stroke: '#64748b', + strokeWidth: 2.5, + }, + })); + + return { nodes, edges }; +} + +function findFirstYamlFile(nodes: PlaybookFileNode[]): string | null { + for (const node of nodes) { + if (node.isDir && node.children) { + const childMatch = findFirstYamlFile(node.children); + if (childMatch) { + return childMatch; + } + continue; + } + + if (/\.ya?ml$/i.test(node.path)) { + return node.path; + } + } + + return null; +} + +// ------------------------------------------------------------------- +// React component – PlaybookFlow +// ------------------------------------------------------------------- + + +export default function PlaybookFlow() { + const [nodes, setNodes, onNodesChange] = useNodesState([]); + const [edges, setEdges, onEdgesChange] = useEdgesState([]); + const [playbook, setPlaybook] = useState(null); + const [selectedNodeId, setSelectedNodeId] = useState(null); + const [isLoading, setIsLoading] = useState(true); + const [loadError, setLoadError] = useState(null); + const nodeTypes = useMemo(() => ({ pipeline: PipelineStackNode }), []); + + const selectedNode = useMemo( + () => nodes.find((node) => node.id === selectedNodeId) ?? null, + [nodes, selectedNodeId] + ); + + const selectedPipelineName = useMemo(() => { + if (!selectedNodeId || !playbook) { + return null; + } + return playbook.pipelines[selectedNodeId] ? selectedNodeId : null; + }, [playbook, selectedNodeId]); + + const { + selectedPipelineSteps, + updateStepOperation, + updateStepConfig, + reorderSteps, + removeStep, + addStep, + } = usePlaybookPipelineEditor({ + playbook, + setPlaybook, + selectedPipelineName, + defaultStepOperation: STEP_OPERATIONS[0], + }); + + useEffect(() => { + let isMounted = true; + + const loadPlaybook = async () => { + try { + setLoadError(null); + setIsLoading(true); + const files = await getFiles(); + const playbookPath = findFirstYamlFile(files); + if (!playbookPath) { + throw new Error('No YAML playbook files found'); + } + + const loadedPlaybook = await getPlaybook(playbookPath); + if (!isMounted) { + return; + } + setPlaybook(loadedPlaybook); + } catch (error) { + if (!isMounted) { + return; + } + setLoadError(error instanceof Error ? error.message : 'Failed to load playbook'); + } finally { + if (isMounted) { + setIsLoading(false); + } + } + }; + + void loadPlaybook(); + + return () => { + isMounted = false; + }; + }, [setEdges, setNodes]); + + useEffect(() => { + if (!playbook) { + return; + } + + const graph = buildGraph(playbook); + setNodes(graph.nodes); + setEdges(graph.edges); + }, [playbook, setEdges, setNodes]); + + const onConnect: OnConnect = useCallback( + (connection: Connection) => setEdges((currentEdges) => addEdge(connection, currentEdges)), + [setEdges] + ); + + + if (loadError) { + return ( +
+ Failed to load playbook: {loadError} +
+ ); + } + + if (isLoading) { + return ( +
+ Loading playbook... +
+ ); + } + + return ( +
+
+ setSelectedNodeId(node.id)} + onPaneClick={() => setSelectedNodeId(null)} + fitView + > + + + +
+ +
+ ); +} \ No newline at end of file diff --git a/sifter-view/components/PlaybookInspectorPanel.tsx b/sifter-view/components/PlaybookInspectorPanel.tsx new file mode 100644 index 0000000..8740dfb --- /dev/null +++ b/sifter-view/components/PlaybookInspectorPanel.tsx @@ -0,0 +1,197 @@ +'use client'; + +import React, { useState } from 'react'; +import type { Node } from '@xyflow/react'; +import { FaTrashAlt } from 'react-icons/fa'; +import { MdDragIndicator } from 'react-icons/md'; +import { getStepEditorComponent } from './playbook-steps/registry'; +import type { PipelineStep } from './playbook-steps/types'; + +type PlaybookInspectorPanelProps = { + selectedNode: Node | null; + selectedPipelineName: string | null; + selectedPipelineSteps: PipelineStep[]; + stepOperations: readonly string[]; + onUpdateStepOperation: (index: number, operation: string) => void; + onUpdateStepConfig: (index: number, config: unknown) => void; + onReorderSteps: (fromIndex: number, toIndex: number) => void; + onRemoveStep: (index: number) => void; + onAddStep: () => void; +}; + +export default function PlaybookInspectorPanel({ + selectedNode, + selectedPipelineName, + selectedPipelineSteps, + stepOperations, + onUpdateStepOperation, + onUpdateStepConfig, + onReorderSteps, + onRemoveStep, + onAddStep, +}: PlaybookInspectorPanelProps) { + const [draggingStepIndex, setDraggingStepIndex] = useState(null); + const [dragOverStepIndex, setDragOverStepIndex] = useState(null); + + return ( + + ); +} diff --git a/sifter-view/components/PlaybookViewer.tsx b/sifter-view/components/PlaybookViewer.tsx new file mode 100644 index 0000000..9ac31b8 --- /dev/null +++ b/sifter-view/components/PlaybookViewer.tsx @@ -0,0 +1,110 @@ +"use client"; + +import React, { useEffect, useMemo, useState } from "react"; +import { Tree, type TreeNodeData, useTree } from "@mantine/core"; +import { FiChevronRight as IconChevronRight } from "react-icons/fi"; +import { getFiles, getPlaybook, type PlaybookFileNode } from "@/lib/playbookApi"; +import PlaybookFlow from "./PlaybookFlow"; + +// Types for API responses +type FileTree = PlaybookFileNode[]; + +export default function PlaybookViewer() { + const [playbooks, setPlaybooks] = useState([]); + const tree = useTree(); + + const toTreeData = (nodes: FileTree): TreeNodeData[] => + nodes.map((node) => ({ + value: node.path, + label: node.name, + children: node.children ? toTreeData(node.children) : undefined, + })); + + const indexNodes = (nodes: FileTree): Record => { + const indexed: Record = {}; + const walk = (items: FileTree) => { + items.forEach((item) => { + indexed[item.path] = item; + if (item.children && item.children.length > 0) { + walk(item.children); + } + }); + }; + walk(nodes); + return indexed; + }; + + const treeData = useMemo(() => toTreeData(playbooks), [playbooks]); + const playbookIndex = useMemo(() => indexNodes(playbooks), [playbooks]); + + // Load list of playbooks on mount + useEffect(() => { + getFiles() + .then((data: FileTree) => setPlaybooks(data)) + .catch((err) => console.error(err)); + }, []); + + // Load selected playbook content + const loadPlaybook = (name: string) => { + getPlaybook(name) + .then((playbook) => { + console.log("Loaded playbook", playbook); + }) + .catch((err) => console.error(err)); + }; + + const handleNodeClick = (value: string) => { + const node = playbookIndex[value]; + if (!node || node.isDir) { + return; + } + + if (/\.ya?ml$/i.test(node.path)) { + loadPlaybook(node.path); + } + }; + + return ( +
+ {/* Sidebar */} + + + {/* Content */} +
+ +
+
+ ); +} diff --git a/sifter-view/components/playbook-steps/cells/BaseStepCell.tsx b/sifter-view/components/playbook-steps/cells/BaseStepCell.tsx new file mode 100644 index 0000000..206908e --- /dev/null +++ b/sifter-view/components/playbook-steps/cells/BaseStepCell.tsx @@ -0,0 +1,45 @@ +import type { StepCellProps } from '../types'; +import { toStepLabel } from './utils'; + +const PIPELINE_CELL_HEIGHT = 34; + +type BaseStepCellProps = StepCellProps & { + secondaryText?: string; +}; + +export function BaseStepCell({ step, index, isLast, secondaryText }: BaseStepCellProps) { + return ( +
+
+ {index + 1}. + {step.operation} +
+ {secondaryText ? ( + + {secondaryText} + + ) : null} +
+ ); +} diff --git a/sifter-view/components/playbook-steps/cells/FieldProcessStepCell.tsx b/sifter-view/components/playbook-steps/cells/FieldProcessStepCell.tsx new file mode 100644 index 0000000..59cffca --- /dev/null +++ b/sifter-view/components/playbook-steps/cells/FieldProcessStepCell.tsx @@ -0,0 +1,10 @@ +import type { StepCellProps } from '../types'; +import { isRecord } from './utils'; +import { BaseStepCell } from './BaseStepCell'; + +export function FieldProcessStepCell(props: StepCellProps) { + const config = props.step.config; + const field = isRecord(config) && typeof config.field === 'string' ? config.field : undefined; + + return ; +} diff --git a/sifter-view/components/playbook-steps/cells/FromStepCell.tsx b/sifter-view/components/playbook-steps/cells/FromStepCell.tsx new file mode 100644 index 0000000..cea69fe --- /dev/null +++ b/sifter-view/components/playbook-steps/cells/FromStepCell.tsx @@ -0,0 +1,13 @@ +import type { StepCellProps } from '../types'; +import { isRecord } from './utils'; +import { BaseStepCell } from './BaseStepCell'; + +export function FromStepCell(props: StepCellProps) { + const source = typeof props.step.config === 'string' + ? props.step.config + : isRecord(props.step.config) && typeof props.step.config.from === 'string' + ? props.step.config.from + : undefined; + + return ; +} diff --git a/sifter-view/components/playbook-steps/cells/GenericStepCell.tsx b/sifter-view/components/playbook-steps/cells/GenericStepCell.tsx new file mode 100644 index 0000000..b587fdd --- /dev/null +++ b/sifter-view/components/playbook-steps/cells/GenericStepCell.tsx @@ -0,0 +1,7 @@ +import type { StepCellProps } from '../types'; +import { getStepSummaryValue } from './utils'; +import { BaseStepCell } from './BaseStepCell'; + +export function GenericStepCell(props: StepCellProps) { + return ; +} diff --git a/sifter-view/components/playbook-steps/cells/MapStepCell.tsx b/sifter-view/components/playbook-steps/cells/MapStepCell.tsx new file mode 100644 index 0000000..6fbfcb5 --- /dev/null +++ b/sifter-view/components/playbook-steps/cells/MapStepCell.tsx @@ -0,0 +1,10 @@ +import type { StepCellProps } from '../types'; +import { isRecord } from './utils'; +import { BaseStepCell } from './BaseStepCell'; + +export function MapStepCell(props: StepCellProps) { + const config = props.step.config; + const engine = isRecord(config) && typeof config.engine === 'string' ? config.engine : undefined; + + return ; +} diff --git a/sifter-view/components/playbook-steps/cells/ObjectValidateStepCell.tsx b/sifter-view/components/playbook-steps/cells/ObjectValidateStepCell.tsx new file mode 100644 index 0000000..a649b82 --- /dev/null +++ b/sifter-view/components/playbook-steps/cells/ObjectValidateStepCell.tsx @@ -0,0 +1,10 @@ +import type { StepCellProps } from '../types'; +import { isRecord } from './utils'; +import { BaseStepCell } from './BaseStepCell'; + +export function ObjectValidateStepCell(props: StepCellProps) { + const config = props.step.config; + const schema = isRecord(config) && typeof config.schema === 'string' ? config.schema : undefined; + + return ; +} diff --git a/sifter-view/components/playbook-steps/cells/ProjectStepCell.tsx b/sifter-view/components/playbook-steps/cells/ProjectStepCell.tsx new file mode 100644 index 0000000..3d6d034 --- /dev/null +++ b/sifter-view/components/playbook-steps/cells/ProjectStepCell.tsx @@ -0,0 +1,10 @@ +import type { StepCellProps } from '../types'; +import { isRecord } from './utils'; +import { BaseStepCell } from './BaseStepCell'; + +export function ProjectStepCell(props: StepCellProps) { + const config = props.step.config; + const projectionCount = isRecord(config) ? Object.keys(config).length : undefined; + + return ; +} diff --git a/sifter-view/components/playbook-steps/cells/index.ts b/sifter-view/components/playbook-steps/cells/index.ts new file mode 100644 index 0000000..f6d02dc --- /dev/null +++ b/sifter-view/components/playbook-steps/cells/index.ts @@ -0,0 +1,6 @@ +export { GenericStepCell } from './GenericStepCell'; +export { FromStepCell } from './FromStepCell'; +export { ProjectStepCell } from './ProjectStepCell'; +export { MapStepCell } from './MapStepCell'; +export { FieldProcessStepCell } from './FieldProcessStepCell'; +export { ObjectValidateStepCell } from './ObjectValidateStepCell'; diff --git a/sifter-view/components/playbook-steps/cells/utils.ts b/sifter-view/components/playbook-steps/cells/utils.ts new file mode 100644 index 0000000..d4ba609 --- /dev/null +++ b/sifter-view/components/playbook-steps/cells/utils.ts @@ -0,0 +1,22 @@ +export function isRecord(value: unknown): value is Record { + return value !== null && typeof value === 'object' && !Array.isArray(value); +} + +export function toStepLabel(operation: string): string { + return operation + .replace(/([a-z0-9])([A-Z])/g, '$1 $2') + .replace(/^./, (x) => x.toUpperCase()); +} + +export function getStepSummaryValue(config: unknown): string | undefined { + if (typeof config === 'string' || typeof config === 'number' || typeof config === 'boolean') { + return String(config); + } + if (Array.isArray(config)) { + return `${config.length} items`; + } + if (isRecord(config)) { + return `${Object.keys(config).length} fields`; + } + return undefined; +} \ No newline at end of file diff --git a/sifter-view/components/playbook-steps/editors/FilterStepEditor.tsx b/sifter-view/components/playbook-steps/editors/FilterStepEditor.tsx new file mode 100644 index 0000000..a84b1a5 --- /dev/null +++ b/sifter-view/components/playbook-steps/editors/FilterStepEditor.tsx @@ -0,0 +1,249 @@ +import React, { useEffect, useMemo, useState } from 'react'; + +import type { StepEditorProps } from '../types'; + +type FilterCheck = '' | 'exists' | 'hasValue' | 'not'; +type ScriptKind = 'python' | 'gpython'; + +function asObject(value: unknown): Record { + if (value && typeof value === 'object' && !Array.isArray(value)) { + return value as Record; + } + return {}; +} + +function toDisplayString(value: unknown): string { + if (typeof value === 'string') { + return value; + } + if (value === undefined || value === null) { + return ''; + } + try { + return JSON.stringify(value, null, 2); + } catch { + return String(value); + } +} + +export function FilterStepEditor({ step, onChangeConfig }: StepEditorProps) { + const configObject = useMemo(() => asObject(step.config), [step.config]); + + const initialField = typeof configObject.field === 'string' ? configObject.field : ''; + const initialValue = typeof configObject.value === 'string' ? configObject.value : ''; + const initialMatch = typeof configObject.match === 'string' ? configObject.match : ''; + const initialCheck = + configObject.check === 'exists' || configObject.check === 'hasValue' || configObject.check === 'not' + ? configObject.check + : ''; + const initialMethod = typeof configObject.method === 'string' ? configObject.method : ''; + const hasPython = typeof configObject.python === 'string' && configObject.python.length > 0; + const initialScriptKind: ScriptKind = hasPython ? 'python' : 'gpython'; + const initialScript = toDisplayString(configObject[initialScriptKind]); + + const [field, setField] = useState(initialField); + const [value, setValue] = useState(initialValue); + const [match, setMatch] = useState(initialMatch); + const [check, setCheck] = useState(initialCheck); + const [method, setMethod] = useState(initialMethod); + const [scriptKind, setScriptKind] = useState(initialScriptKind); + const [script, setScript] = useState(initialScript); + + useEffect(() => { + setField(initialField); + setValue(initialValue); + setMatch(initialMatch); + setCheck(initialCheck); + setMethod(initialMethod); + setScriptKind(initialScriptKind); + setScript(initialScript); + }, [initialField, initialValue, initialMatch, initialCheck, initialMethod, initialScriptKind, initialScript]); + + const commit = (next: { + fieldValue: string; + valueValue: string; + matchValue: string; + checkValue: FilterCheck; + methodValue: string; + scriptEngine: ScriptKind; + scriptValue: string; + }) => { + const nextConfig: Record = { + ...configObject, + field: next.fieldValue, + value: next.valueValue, + match: next.matchValue, + check: next.checkValue, + method: next.methodValue, + python: undefined, + gpython: undefined, + }; + + if (next.scriptValue.trim() !== '') { + nextConfig[next.scriptEngine] = next.scriptValue; + } + + onChangeConfig(nextConfig); + }; + + const commitCurrent = () => { + commit({ + fieldValue: field, + valueValue: value, + matchValue: match, + checkValue: check, + methodValue: method, + scriptEngine: scriptKind, + scriptValue: script, + }); + }; + + return ( +
+
+
+ + setField(event.target.value)} + onBlur={commitCurrent} + style={{ + width: '100%', + color: '#000', + background: '#fff', + border: '1px solid #cbd5e1', + borderRadius: 4, + padding: '6px 8px', + }} + /> +
+
+ + +
+
+ +
+
+ + setValue(event.target.value)} + onBlur={commitCurrent} + style={{ + width: '100%', + color: '#000', + background: '#fff', + border: '1px solid #cbd5e1', + borderRadius: 4, + padding: '6px 8px', + }} + /> +
+
+ + setMatch(event.target.value)} + onBlur={commitCurrent} + style={{ + width: '100%', + color: '#000', + background: '#fff', + border: '1px solid #cbd5e1', + borderRadius: 4, + padding: '6px 8px', + }} + /> +
+
+ +
+
+ + setMethod(event.target.value)} + onBlur={commitCurrent} + style={{ + width: '100%', + color: '#000', + background: '#fff', + border: '1px solid #cbd5e1', + borderRadius: 4, + padding: '6px 8px', + }} + /> +
+
+ + +
+
+ +
+ +