Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 178 additions & 17 deletions internal/commands/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
"net/url"
"os"
"regexp"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -92,13 +94,17 @@ func printQueryHelp(w io.Writer) {
_, _ = fmt.Fprintln(w, " topline query explain --tables contacts,opportunities")
_, _ = fmt.Fprintln(w, " topline query sql --sql 'SELECT COUNT(*) AS n FROM contacts'")
_, _ = fmt.Fprintln(w, " topline query freshness")
_, _ = fmt.Fprintln(w, " topline query snapshot --pipeline <id>")
_, _ = fmt.Fprintln(w, " topline query audit --pipeline <id> [--since this-week-et] [--status open]")
_, _ = fmt.Fprintln(w, " topline query snapshot --pipeline <id_or_name>")
_, _ = fmt.Fprintln(w, " topline query audit --pipeline <id_or_name> [--since this-week-et] [--status open]")
_, _ = fmt.Fprintln(w, "")
_, _ = fmt.Fprintln(w, "Composite commands (Phase 3): one CLI call wraps the warehouse views")
_, _ = fmt.Fprintln(w, "shipped in Topline-com/os-mcp#1. Use these for pipeline audits instead")
_, _ = fmt.Fprintln(w, "of hand-stitched CTEs.")
_, _ = fmt.Fprintln(w, "")
_, _ = fmt.Fprintln(w, "--pipeline accepts an opaque 20-char ID (e.g. CLUy1QapsrEeBiNrmQiL)")
_, _ = fmt.Fprintln(w, "or a fuzzy name (e.g. 'flex triage'). On ambiguous or missing names")
_, _ = fmt.Fprintln(w, "the CLI errors with the list of available pipelines.")
_, _ = fmt.Fprintln(w, "")
_, _ = fmt.Fprintln(w, "Environment:")
_, _ = fmt.Fprintln(w, " TOPLINE_QUERY_TOKEN Connection-bound token from https://os-mcp.topline.com/connect")
_, _ = fmt.Fprintln(w, " TOPLINE_QUERY_BASE_URL Defaults to https://os-mcp.topline.com")
Expand Down Expand Up @@ -260,10 +266,15 @@ func runQueryFreshness(ctx context.Context, client *topline.QueryClient, stdout
}

func runQuerySnapshot(ctx context.Context, client *topline.QueryClient, flags map[string]string, stdout io.Writer, globals globalOptions) error {
pipelineID := strings.TrimSpace(firstNonEmptyQueryFlag(flags["pipeline"], flags["pipelineId"], flags["pipelineID"]))
if pipelineID == "" {
return errors.New("usage: topline query snapshot --pipeline <pipeline_id>")
rawPipeline := strings.TrimSpace(firstNonEmptyQueryFlag(flags["pipeline"], flags["pipelineId"], flags["pipelineID"]))
if rawPipeline == "" {
return errors.New("usage: topline query snapshot --pipeline <pipeline_id_or_name>")
}
resolution, err := resolvePipelineID(ctx, client, rawPipeline)
if err != nil {
return err
}
pipelineID := resolution.MatchedID
status := strings.TrimSpace(flags["status"])
if status == "" {
status = "open"
Expand All @@ -274,18 +285,29 @@ func runQuerySnapshot(ctx context.Context, client *topline.QueryClient, flags ma
"FROM pipeline_snapshot WHERE pipeline_id = %s%s ORDER BY stage_position",
sqlString(pipelineID), queryStatusClause("opportunity_status", status),
)
result, err := executeSQL(ctx, client, sqlText)
rows, err := executeSQL(ctx, client, sqlText)
if err != nil {
return err
}
result := map[string]any{
"pipelineId": pipelineID,
"pipelineResolution": resolution,
"status": status,
"snapshot": rows,
}
return output.WriteJSON(stdout, result, globals.MaskPII)
}

func runQueryAudit(ctx context.Context, client *topline.QueryClient, flags map[string]string, stdout io.Writer, globals globalOptions) error {
pipelineID := strings.TrimSpace(firstNonEmptyQueryFlag(flags["pipeline"], flags["pipelineId"], flags["pipelineID"]))
if pipelineID == "" {
return errors.New("usage: topline query audit --pipeline <pipeline_id> [--since this-week-et] [--status open]")
rawPipeline := strings.TrimSpace(firstNonEmptyQueryFlag(flags["pipeline"], flags["pipelineId"], flags["pipelineID"]))
if rawPipeline == "" {
return errors.New("usage: topline query audit --pipeline <pipeline_id_or_name> [--since this-week-et] [--status open]")
}
resolution, err := resolvePipelineID(ctx, client, rawPipeline)
if err != nil {
return err
}
pipelineID := resolution.MatchedID
start, err := parseAuditTime(flags["since"], time.Now().AddDate(0, 0, -7))
if err != nil {
return err
Expand Down Expand Up @@ -365,18 +387,157 @@ func runQueryAudit(ctx context.Context, client *topline.QueryClient, flags map[s
}

result := map[string]any{
"pipelineId": pipelineID,
"window": map[string]string{"since": since, "until": until},
"status": status,
"freshness": freshness,
"snapshot": snapshot,
"activity": activity,
"deals": deals,
"movement": movement,
"pipelineId": pipelineID,
"pipelineResolution": resolution,
"window": map[string]string{"since": since, "until": until},
"status": status,
"freshness": freshness,
"snapshot": snapshot,
"activity": activity,
"deals": deals,
"movement": movement,
}
return output.WriteJSON(stdout, result, globals.MaskPII)
}

// pipelineIDPattern matches an opaque 20-char alphanumeric warehouse pipeline ID
// (e.g. "CLUy1QapsrEeBiNrmQiL", "bna6e9DoPgRchNsjeYS3"). Names are typically
// multi-word with spaces/punctuation, so this regex distinguishes the two
// without an extra HTTP round-trip.
var pipelineIDPattern = regexp.MustCompile(`^[A-Za-z0-9]{20}$`)

// PipelineResolution surfaces how `--pipeline` was interpreted so the agent
// can see whether it passed an opaque ID directly or matched a name.
type PipelineResolution struct {
Input string `json:"input"`
MatchedID string `json:"matchedId"`
MatchedName string `json:"matchedName,omitempty"`
MatchedBy string `json:"matchedBy"` // "id" | "name"
CandidateCnt int `json:"candidateCount,omitempty"`
}

// resolvePipelineID converts the --pipeline flag value into an opaque pipeline
// ID. If the input already matches the opaque-ID shape it is returned as-is;
// otherwise a case-insensitive LIKE lookup against the `pipelines` table is
// run. On 0 or >1 matches the error message lists candidate pipelines so the
// caller (agent or human) can disambiguate without dropping to raw SQL.
func resolvePipelineID(ctx context.Context, client *topline.QueryClient, input string) (PipelineResolution, error) {
input = strings.TrimSpace(input)
if input == "" {
return PipelineResolution{}, errors.New("--pipeline is required (id or name)")
}
if pipelineIDPattern.MatchString(input) {
return PipelineResolution{Input: input, MatchedID: input, MatchedBy: "id"}, nil
}
tokens := strings.Fields(strings.ToLower(input))
if len(tokens) == 0 {
return PipelineResolution{}, errors.New("--pipeline value is empty after whitespace trim")
}
clauses := make([]string, len(tokens))
for i, tok := range tokens {
clauses[i] = fmt.Sprintf("LOWER(name) LIKE %s", sqlString("%"+tok+"%"))
}
matchSQL := fmt.Sprintf(
"SELECT id, name FROM pipelines WHERE %s ORDER BY name",
strings.Join(clauses, " AND "),
)
matches, err := selectIDNamePairs(ctx, client, matchSQL)
if err != nil {
return PipelineResolution{}, fmt.Errorf("pipeline name resolution failed for %q: %w", input, err)
}
switch len(matches) {
case 1:
return PipelineResolution{
Input: input,
MatchedID: matches[0].ID,
MatchedName: matches[0].Name,
MatchedBy: "name",
}, nil
case 0:
all, listErr := selectIDNamePairs(ctx, client, "SELECT id, name FROM pipelines ORDER BY name")
if listErr != nil {
return PipelineResolution{}, fmt.Errorf("no pipeline matched %q (and listing failed: %v)", input, listErr)
}
return PipelineResolution{}, fmt.Errorf(
"no pipeline matched %q. Available pipelines: %s",
input, formatPipelineList(all),
)
default:
return PipelineResolution{}, fmt.Errorf(
"pipeline %q is ambiguous (%d matches). Pass the opaque id or a more specific name. Candidates: %s",
input, len(matches), formatPipelineList(matches),
)
}
}

type idNamePair struct {
ID string
Name string
}

func selectIDNamePairs(ctx context.Context, client *topline.QueryClient, sql string) ([]idNamePair, error) {
raw, err := executeSQL(ctx, client, sql)
if err != nil {
return nil, err
}
columns := stringSliceFromAny(raw["columns"])
idIdx, nameIdx := -1, -1
for i, col := range columns {
switch strings.ToLower(col) {
case "id":
idIdx = i
case "name":
nameIdx = i
}
}
if idIdx == -1 || nameIdx == -1 {
return nil, fmt.Errorf("pipeline lookup response missing id/name columns (got %v)", columns)
}
rowsAny, _ := raw["rows"].([]any)
out := make([]idNamePair, 0, len(rowsAny))
for _, r := range rowsAny {
row, ok := r.([]any)
if !ok || len(row) <= idIdx || len(row) <= nameIdx {
continue
}
id, _ := row[idIdx].(string)
name, _ := row[nameIdx].(string)
id = strings.TrimSpace(id)
name = strings.TrimSpace(name)
if id == "" {
continue
}
out = append(out, idNamePair{ID: id, Name: name})
}
sort.SliceStable(out, func(i, j int) bool { return strings.ToLower(out[i].Name) < strings.ToLower(out[j].Name) })
return out, nil
}

func stringSliceFromAny(v any) []string {
raw, ok := v.([]any)
if !ok {
return nil
}
out := make([]string, 0, len(raw))
for _, entry := range raw {
if s, ok := entry.(string); ok {
out = append(out, s)
}
}
return out
}

func formatPipelineList(pairs []idNamePair) string {
if len(pairs) == 0 {
return "(none found)"
}
parts := make([]string, len(pairs))
for i, p := range pairs {
parts[i] = fmt.Sprintf("%s (%s)", p.Name, p.ID)
}
return strings.Join(parts, ", ")
}

// sqlString quotes a value as a SQLite single-quoted literal. Used because the
// query API doesn't yet accept bind parameters — pipeline IDs are caller-controlled
// CRM identifiers, not user input, but we still escape embedded single-quotes.
Expand Down
Loading
Loading