Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions internal/commands/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"os"
"strings"
"time"

"github.com/Topline-com/os-cli/internal/output"
"github.com/Topline-com/os-cli/internal/topline"
Expand Down Expand Up @@ -66,6 +67,12 @@ func runQueryCommand(args []string, stdout io.Writer, globals globalOptions) err
return errors.New("usage: topline query sql --sql 'SELECT COUNT(*) FROM contacts' [--url https://os-mcp.topline.com]")
}
err = client.PostJSON(ctx, "/query/api/execute-sql", map[string]any{"sql": sqlText}, &result)
case "freshness":
return runQueryFreshness(ctx, client, stdout, globals)
case "snapshot":
return runQuerySnapshot(ctx, client, flags, stdout, globals)
case "audit":
return runQueryAudit(ctx, client, flags, stdout, globals)
default:
return fmt.Errorf("unknown query command %q; try topline query help", subcommand)
}
Expand All @@ -84,6 +91,13 @@ func printQueryHelp(w io.Writer) {
_, _ = fmt.Fprintln(w, " topline query catalog")
_, _ = fmt.Fprintln(w, " topline query explain --tables contacts,opportunities")
_, _ = fmt.Fprintln(w, " topline query sql --sql 'SELECT COUNT(*) AS n FROM contacts'")
_, _ = fmt.Fprintln(w, " topline query freshness")
_, _ = fmt.Fprintln(w, " topline query snapshot --pipeline <id>")
_, _ = fmt.Fprintln(w, " topline query audit --pipeline <id> [--since this-week-et] [--status open]")
_, _ = fmt.Fprintln(w, "")
_, _ = fmt.Fprintln(w, "Composite commands (Phase 3): one CLI call wraps the warehouse views")
_, _ = fmt.Fprintln(w, "shipped in Topline-com/os-mcp#1. Use these for pipeline audits instead")
_, _ = fmt.Fprintln(w, "of hand-stitched CTEs.")
_, _ = fmt.Fprintln(w, "")
_, _ = fmt.Fprintln(w, "Environment:")
_, _ = fmt.Fprintln(w, " TOPLINE_QUERY_TOKEN Connection-bound token from https://os-mcp.topline.com/connect")
Expand Down Expand Up @@ -221,3 +235,159 @@ func firstNonEmptyQueryFlag(values ...string) string {
}
return ""
}

// executeSQL runs one SQL against /query/api/execute-sql and returns the parsed
// response shape as map[string]any. Used by the composite commands to wrap the
// warehouse views shipped in Topline-com/os-mcp#1.
func executeSQL(ctx context.Context, client *topline.QueryClient, sql string) (map[string]any, error) {
var raw any
if err := client.PostJSON(ctx, "/query/api/execute-sql", map[string]any{"sql": sql}, &raw); err != nil {
return nil, err
}
if m, ok := raw.(map[string]any); ok {
return m, nil
}
return map[string]any{"raw": raw}, nil
}

func runQueryFreshness(ctx context.Context, client *topline.QueryClient, stdout io.Writer, globals globalOptions) error {
const sqlText = "SELECT table_name, row_count, last_synced_at, lag_seconds FROM warehouse_freshness ORDER BY table_name"
result, err := executeSQL(ctx, client, sqlText)
if err != nil {
return err
}
return output.WriteJSON(stdout, result, globals.MaskPII)
}

func runQuerySnapshot(ctx context.Context, client *topline.QueryClient, flags map[string]string, stdout io.Writer, globals globalOptions) error {
pipelineID := strings.TrimSpace(firstNonEmptyQueryFlag(flags["pipeline"], flags["pipelineId"], flags["pipelineID"]))
if pipelineID == "" {
return errors.New("usage: topline query snapshot --pipeline <pipeline_id>")
}
status := strings.TrimSpace(flags["status"])
if status == "" {
status = "open"
}
sqlText := fmt.Sprintf(
"SELECT pipeline_id, pipeline_name, pipeline_stage_id, stage_name, stage_position, opportunity_status, "+
"opportunity_count, pipeline_value, CAST(avg_days_in_stage AS INTEGER) AS avg_days_in_stage "+
"FROM pipeline_snapshot WHERE pipeline_id = %s%s ORDER BY stage_position",
sqlString(pipelineID), queryStatusClause("opportunity_status", status),
)
result, err := executeSQL(ctx, client, sqlText)
if err != nil {
return err
}
return output.WriteJSON(stdout, result, globals.MaskPII)
}

func runQueryAudit(ctx context.Context, client *topline.QueryClient, flags map[string]string, stdout io.Writer, globals globalOptions) error {
pipelineID := strings.TrimSpace(firstNonEmptyQueryFlag(flags["pipeline"], flags["pipelineId"], flags["pipelineID"]))
if pipelineID == "" {
return errors.New("usage: topline query audit --pipeline <pipeline_id> [--since this-week-et] [--status open]")
}
start, err := parseAuditTime(flags["since"], time.Now().AddDate(0, 0, -7))
if err != nil {
return err
}
end, err := parseAuditTime(flags["until"], time.Now())
if err != nil {
return err
}
since := start.UTC().Format(time.RFC3339)
until := end.UTC().Format(time.RFC3339)
status := strings.TrimSpace(flags["status"])
if status == "" {
status = "open"
}
dealLimit := parsePositiveInt(flags["limit"], 25)
if dealLimit > 100 {
dealLimit = 100
}
statusClause := queryStatusClause("opportunity_status", status)

freshnessSQL := "SELECT table_name, row_count, last_synced_at, lag_seconds FROM warehouse_freshness ORDER BY table_name"
snapshotSQL := fmt.Sprintf(
"SELECT pipeline_id, pipeline_name, pipeline_stage_id, stage_name, stage_position, opportunity_status, "+
"opportunity_count, pipeline_value, CAST(avg_days_in_stage AS INTEGER) AS avg_days_in_stage "+
"FROM pipeline_snapshot WHERE pipeline_id = %s%s ORDER BY stage_position",
sqlString(pipelineID), statusClause,
)
activitySQL := fmt.Sprintf(
"SELECT activity_class, direction, COUNT(DISTINCT source_id) AS unique_touches, "+
"COUNT(DISTINCT opportunity_id) AS opportunities_touched, COUNT(DISTINCT contact_id) AS contacts_touched, "+
"MIN(event_at) AS first_touch, MAX(event_at) AS last_touch "+
"FROM pipeline_activity_window WHERE pipeline_id = %s%s AND event_at >= %s AND event_at <= %s "+
"GROUP BY activity_class, direction ORDER BY unique_touches DESC",
sqlString(pipelineID), statusClause, sqlString(since), sqlString(until),
)
dealsSQL := fmt.Sprintf(
"SELECT opportunity_id, opportunity_name, contact_id, pipeline_stage_id, owner_user_id, ROUND(monetary_value, 2) AS monetary_value, "+
"COUNT(DISTINCT source_id) AS unique_touches, "+
"COUNT(DISTINCT CASE WHEN activity_class = 'message' THEN source_id END) AS message_touches, "+
"COUNT(DISTINCT CASE WHEN activity_class = 'call' THEN source_id END) AS call_touches, "+
"COUNT(DISTINCT CASE WHEN activity_class = 'appointment' THEN source_id END) AS appointment_touches, "+
"COUNT(DISTINCT CASE WHEN direction = 'inbound' THEN source_id END) AS inbound_touches, "+
"COUNT(DISTINCT CASE WHEN direction = 'outbound' THEN source_id END) AS outbound_touches, "+
"MIN(event_at) AS first_touch, MAX(event_at) AS last_touch "+
"FROM pipeline_activity_window WHERE pipeline_id = %s%s AND event_at >= %s AND event_at <= %s "+
"GROUP BY opportunity_id, opportunity_name, contact_id, pipeline_stage_id, owner_user_id, monetary_value "+
"ORDER BY unique_touches DESC, monetary_value DESC LIMIT %d",
sqlString(pipelineID), statusClause, sqlString(since), sqlString(until), dealLimit,
)
movementSQL := fmt.Sprintf(
"SELECT opportunity_id, opportunity_name, contact_id, pipeline_stage_id, stage_name, opportunity_status, monetary_value, "+
"last_movement_at, last_movement_kind "+
"FROM pipeline_movement_window WHERE pipeline_id = %s%s AND last_movement_at >= %s AND last_movement_at <= %s "+
"ORDER BY last_movement_at DESC",
sqlString(pipelineID), statusClause, sqlString(since), sqlString(until),
)

freshness, err := executeSQL(ctx, client, freshnessSQL)
if err != nil {
return err
}
snapshot, err := executeSQL(ctx, client, snapshotSQL)
if err != nil {
return err
}
activity, err := executeSQL(ctx, client, activitySQL)
if err != nil {
return err
}
deals, err := executeSQL(ctx, client, dealsSQL)
if err != nil {
return err
}
movement, err := executeSQL(ctx, client, movementSQL)
if err != nil {
return err
}

result := map[string]any{
"pipelineId": pipelineID,
"window": map[string]string{"since": since, "until": until},
"status": status,
"freshness": freshness,
"snapshot": snapshot,
"activity": activity,
"deals": deals,
"movement": movement,
}
return output.WriteJSON(stdout, result, globals.MaskPII)
}

// sqlString quotes a value as a SQLite single-quoted literal. Used because the
// query API doesn't yet accept bind parameters — pipeline IDs are caller-controlled
// CRM identifiers, not user input, but we still escape embedded single-quotes.
func sqlString(v string) string {
return "'" + strings.ReplaceAll(v, "'", "''") + "'"
}

func queryStatusClause(column string, status string) string {
trimmed := strings.TrimSpace(status)
if trimmed == "" || strings.EqualFold(trimmed, "all") || strings.EqualFold(trimmed, "any") {
return ""
}
return fmt.Sprintf(" AND %s = %s", column, sqlString(trimmed))
}
132 changes: 132 additions & 0 deletions internal/commands/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,135 @@ func TestQueryDoctorFlagsMissingTables(t *testing.T) {
t.Fatalf("recommendation should call out coverage gap; got %q", rec)
}
}

func TestQueryFreshnessHitsExecuteSQLEndpoint(t *testing.T) {
t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token")

var gotSQL string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/query/api/execute-sql" {
t.Fatalf("path = %q", r.URL.Path)
}
var body struct {
SQL string `json:"sql"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Fatalf("decode body: %v", err)
}
gotSQL = body.SQL
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"columns":["table_name","row_count","last_synced_at","lag_seconds"],"rows":[]}`))
}))
defer server.Close()
t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL)

var stdout bytes.Buffer
if err := Execute([]string{"query", "freshness"}, &stdout, io.Discard); err != nil {
t.Fatalf("Execute returned error: %v", err)
}
if !strings.Contains(gotSQL, "FROM warehouse_freshness") {
t.Fatalf("freshness SQL did not reference warehouse_freshness view; got %q", gotSQL)
}
}

func TestQuerySnapshotRequiresPipeline(t *testing.T) {
t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token")
t.Setenv("TOPLINE_QUERY_BASE_URL", "http://127.0.0.1:1")

var stdout bytes.Buffer
err := Execute([]string{"query", "snapshot"}, &stdout, io.Discard)
if err == nil || !strings.Contains(err.Error(), "--pipeline") {
t.Fatalf("expected --pipeline usage error, got %v", err)
}
}

func TestQuerySnapshotQueriesPipelineSnapshotView(t *testing.T) {
t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token")

var gotSQL string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var body struct {
SQL string `json:"sql"`
}
_ = json.NewDecoder(r.Body).Decode(&body)
gotSQL = body.SQL
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"columns":[],"rows":[]}`))
}))
defer server.Close()
t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL)

var stdout bytes.Buffer
if err := Execute([]string{"query", "snapshot", "--pipeline", "CLUy1QapsrEeBiNrmQiL"}, &stdout, io.Discard); err != nil {
t.Fatalf("Execute returned error: %v", err)
}
if !strings.Contains(gotSQL, "FROM pipeline_snapshot") {
t.Fatalf("snapshot SQL missing pipeline_snapshot view; got %q", gotSQL)
}
if !strings.Contains(gotSQL, "'CLUy1QapsrEeBiNrmQiL'") {
t.Fatalf("snapshot SQL did not bind pipeline id; got %q", gotSQL)
}
}

func TestQueryAuditCallsCompositeViewsInOneInvocation(t *testing.T) {
t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token")

var sqls []string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/query/api/execute-sql" {
t.Fatalf("path = %q", r.URL.Path)
}
var body struct {
SQL string `json:"sql"`
}
_ = json.NewDecoder(r.Body).Decode(&body)
sqls = append(sqls, body.SQL)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"columns":[],"rows":[]}`))
}))
defer server.Close()
t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL)

var stdout bytes.Buffer
err := Execute(
[]string{"query", "audit", "--pipeline", "CLUy1QapsrEeBiNrmQiL", "--since", "2026-05-11", "--until", "2026-05-13"},
&stdout, io.Discard,
)
if err != nil {
t.Fatalf("Execute returned error: %v", err)
}
if len(sqls) != 5 {
t.Fatalf("expected 5 SQL calls (freshness/snapshot/activity/deals/movement), got %d", len(sqls))
}
joined := strings.Join(sqls, "\n")
for _, view := range []string{"warehouse_freshness", "pipeline_snapshot", "pipeline_activity_window", "pipeline_movement_window"} {
if !strings.Contains(joined, view) {
t.Fatalf("expected audit to hit %s view; got SQLs:\n%s", view, joined)
}
}
for _, required := range []string{"pipeline_stage_id", "opportunity_status = 'open'", "COUNT(DISTINCT source_id) AS unique_touches", "CAST(avg_days_in_stage AS INTEGER)"} {
if !strings.Contains(joined, required) {
t.Fatalf("expected audit SQL to include %q; got SQLs:\n%s", required, joined)
}
}
out := map[string]any{}
if err := json.Unmarshal(stdout.Bytes(), &out); err != nil {
t.Fatalf("decode audit output: %v", err)
}
for _, key := range []string{"freshness", "snapshot", "activity", "deals", "movement", "pipelineId", "window", "status"} {
if _, ok := out[key]; !ok {
t.Fatalf("audit output missing %q key: %s", key, stdout.String())
}
}
}

func TestQueryAuditRequiresPipeline(t *testing.T) {
t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token")
t.Setenv("TOPLINE_QUERY_BASE_URL", "http://127.0.0.1:1")

var stdout bytes.Buffer
err := Execute([]string{"query", "audit"}, &stdout, io.Discard)
if err == nil || !strings.Contains(err.Error(), "--pipeline") {
t.Fatalf("expected --pipeline usage error, got %v", err)
}
}
Loading