Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions internal/sync/pagination_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package sync

import (
"context"
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"path/filepath"
"sync"
"testing"

"github.com/Topline-com/os-cli/internal/topline"
)

// TestSync_NumericCursorPagination is a regression guard for the production bug
// where GHL returned meta.startAfter as a JSON number (epoch ms) and the syncer
// silently dropped it (stringField only matched string), causing every "page 2"
// request to replay the page-1 cursor and the loop to break after one upsert.
//
// The fake here returns startAfter as a number, exactly like GHL prod, and
// asserts that the syncer keeps paging until the server stops handing back a
// cursor — pulling all rows down into SQLite. If this test ever fails again
// with "got 1 opp, expected 3", the regression is back.
func TestSync_NumericCursorPagination(t *testing.T) {
// Each entity returns three pages; cursor is a numeric epoch-ms value.
type oppPage struct {
ID string
Cursor float64
HasNext bool
NextID string
NextCur float64
}
oppPages := []oppPage{
{ID: "O1", HasNext: true, NextID: "O1", NextCur: 1700000001000},
{ID: "O2", HasNext: true, NextID: "O2", NextCur: 1700000002000},
{ID: "O3", HasNext: false},
}
contactPages := []oppPage{
{ID: "C1", HasNext: true, NextID: "C1", NextCur: 1700000001000},
{ID: "C2", HasNext: true, NextID: "C2", NextCur: 1700000002000},
{ID: "C3", HasNext: false},
}
convoPages := []oppPage{
{ID: "V1", HasNext: true, NextID: "V1", NextCur: 1700000001000},
{ID: "V2", HasNext: true, NextID: "V2", NextCur: 1700000002000},
{ID: "V3", HasNext: false},
}

var mu sync.Mutex
oppIdx, contactIdx, convoIdx := 0, 0, 0

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
mu.Lock()
defer mu.Unlock()
switch r.URL.Path {
case "/opportunities/pipelines":
_ = json.NewEncoder(w).Encode(map[string]any{"pipelines": []any{}})
case "/opportunities/search":
if oppIdx >= len(oppPages) {
_ = json.NewEncoder(w).Encode(map[string]any{"opportunities": []any{}})
return
}
p := oppPages[oppIdx]
oppIdx++
body := map[string]any{
"opportunities": []any{map[string]any{
"id": p.ID,
"name": p.ID,
"status": "open",
"pipelineId": "PIPE1",
"pipelineStageId": "STG1",
"contactId": "X",
"monetaryValue": 100.0,
}},
}
if p.HasNext {
body["meta"] = map[string]any{
"startAfterId": p.NextID,
"startAfter": p.NextCur, // numeric — the actual prod shape
}
}
_ = json.NewEncoder(w).Encode(body)
case "/contacts/search":
if contactIdx >= len(contactPages) {
_ = json.NewEncoder(w).Encode(map[string]any{"contacts": []any{}})
return
}
p := contactPages[contactIdx]
contactIdx++
body := map[string]any{
"contacts": []any{map[string]any{
"id": p.ID,
"firstName": p.ID,
"email": p.ID + "@example.com",
}},
}
if p.HasNext {
body["meta"] = map[string]any{
"startAfterId": p.NextID,
"startAfter": p.NextCur,
}
}
_ = json.NewEncoder(w).Encode(body)
case "/conversations/search":
if convoIdx >= len(convoPages) {
_ = json.NewEncoder(w).Encode(map[string]any{"conversations": []any{}})
return
}
p := convoPages[convoIdx]
convoIdx++
body := map[string]any{
"conversations": []any{map[string]any{
"id": p.ID,
"contactId": "X",
"lastMessageDate": "2026-05-13T00:00:00Z",
}},
}
if p.HasNext {
body["meta"] = map[string]any{
"startAfterId": p.NextID,
"startAfter": p.NextCur,
}
}
_ = json.NewEncoder(w).Encode(body)
default:
_ = json.NewEncoder(w).Encode(map[string]any{"messages": map[string]any{"messages": []any{}}})
}
}))
defer ts.Close()

client := topline.NewClient(topline.Config{BaseURL: ts.URL, PIT: "test", LocationID: "LOC"})
dbPath := filepath.Join(t.TempDir(), "t.db")
if _, err := SyncAll(context.Background(), client, "LOC", dbPath); err != nil {
t.Fatalf("SyncAll: %v", err)
}

db, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatalf("open: %v", err)
}
defer db.Close()
for _, c := range []struct {
table string
want int
}{
{"opportunities", 3},
{"contacts", 3},
{"conversations", 3},
} {
var n int
if err := db.QueryRow(`SELECT COUNT(*) FROM ` + c.table).Scan(&n); err != nil {
t.Fatalf("count %s: %v", c.table, err)
}
if n != c.want {
t.Fatalf("%s: expected %d rows (pagination must follow numeric cursor), got %d", c.table, c.want, n)
}
}
}

func TestAnyToString_HandlesNumericCursor(t *testing.T) {
cases := []struct {
in any
want string
}{
{1700000001000.0, "1700000001000"},
{"abc", "abc"},
{nil, ""},
{int(42), "42"},
{int64(1700000001000), "1700000001000"},
{true, "true"},
}
for _, c := range cases {
got := anyToString(c.in)
if got != c.want {
t.Fatalf("anyToString(%v) = %q, want %q", c.in, got, c.want)
}
}
}
53 changes: 47 additions & 6 deletions internal/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -219,11 +220,16 @@ func syncOpportunities(ctx context.Context, client *topline.Client, db *sql.DB,
if err := tx.Commit(); err != nil {
return r, err
}
// Pagination: GHL search endpoints expose startAfter (epoch ms) +
// startAfterId via meta.{startAfter,startAfterId} OR nextPageUrl.
// Pagination: GHL search endpoints expose startAfter (epoch ms, encoded
// as a JSON number) + startAfterId via meta. anyToString handles the
// numeric case — stringField alone returns "" for float64 values and
// silently drops the cursor, causing infinite same-page replays.
meta, _ := raw["meta"].(map[string]any)
nextID := stringField(meta, "startAfterId", "start_after_id")
nextAfter := stringField(meta, "startAfter", "start_after")
nextAfter := anyToString(meta["startAfter"])
if nextAfter == "" {
nextAfter = anyToString(meta["start_after"])
}
if nextID == "" || (nextID == startAfterID && nextAfter == startAfter) {
break
}
Expand Down Expand Up @@ -306,11 +312,16 @@ func syncContacts(ctx context.Context, client *topline.Client, db *sql.DB, locat
}
// Cursor: GHL contacts/search returns meta.startAfter (epoch ms) +
// meta.startAfterId on the next page. Feed those back as a
// searchAfter array.
// searchAfter array. Trust the cursor — not the page length — as the
// stop signal; a "short" page can still be followed by a full one when
// GHL filters server-side after the fetch.
meta, _ := raw["meta"].(map[string]any)
nextID := stringField(meta, "startAfterId", "start_after_id")
nextAfter := meta["startAfter"]
if nextID == "" || len(list) < pageLimit {
if nextAfter == nil {
nextAfter = meta["start_after"]
}
if nextID == "" {
break
}
next := []any{nextAfter, nextID}
Expand Down Expand Up @@ -394,7 +405,10 @@ func syncConversations(ctx context.Context, client *topline.Client, db *sql.DB,
}
meta, _ := raw["meta"].(map[string]any)
nextID := stringField(meta, "startAfterId", "start_after_id")
nextAfter := stringField(meta, "startAfter", "start_after")
nextAfter := anyToString(meta["startAfter"])
if nextAfter == "" {
nextAfter = anyToString(meta["start_after"])
}
if nextID == "" || (nextID == startAfterID && nextAfter == startAfter) {
break
}
Expand Down Expand Up @@ -545,6 +559,33 @@ func stringField(m map[string]any, keys ...string) string {
return ""
}

// anyToString converts a JSON-decoded value to its string form, handling the
// numeric case that stringField silently drops. GHL pagination cursors arrive
// as numbers (epoch ms) inside meta.startAfter — formatting them as a string
// without scientific notation is required so we can pass them back in the
// query string for the next page.
func anyToString(v any) string {
if v == nil {
return ""
}
switch x := v.(type) {
case string:
return x
case float64:
return strconv.FormatFloat(x, 'f', -1, 64)
case int:
return strconv.Itoa(x)
case int64:
return strconv.FormatInt(x, 10)
case json.Number:
return string(x)
case bool:
return strconv.FormatBool(x)
default:
return fmt.Sprint(x)
}
}

func floatField(m map[string]any, keys ...string) float64 {
for _, k := range keys {
switch v := m[k].(type) {
Expand Down
7 changes: 7 additions & 0 deletions skills/claude-code/SKILL.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
name: topline-os-cli
description: Use the Topline OS CLI for SQL-first CRM analytics, pipeline audits, deal briefs, and agent-safe sales operations. Default to the composite `topline --agent query audit|snapshot|freshness` commands for standard analytics; use REST-backed commands for live drilldowns and approved writes. Triggers on Topline OS, CRM pipeline, opportunity, deal, sales activity, and `topline` CLI questions.
version: 1.6.1
---

# Topline OS CLI Skill
Expand Down Expand Up @@ -97,3 +98,9 @@ topline sync init --db topline.db
## Output rules

Prefer `--agent` for token-efficient, PII-masked output. Never print PIT or query token values. Avoid markdown tables in chat replies; use bullets. Keep activity separate from movement: conversation activity can happen without opportunity stage/status changes.

## Common pitfalls

- **Treating current pipeline as historical origin.** The `opportunities` warehouse table exposes current pipeline/stage state; a won Qualified opportunity is not proof the deal originated in Triage. For Flex conversion questions, answer current-state first (e.g. current pipeline = `Sales - Flex - Qualified`, status = won, created/closed in window), then add a lineage confidence caveat unless a history/audit table or activity event records the move.
- **Counting automated workflow touches as rep effort.** For manual outreach audits, exclude workflow/app automation — in the hosted warehouse, `raw_payload.source = 'app'` on `messages` is the automation exclusion signal. Break out calls/email/SMS separately and report contact counts.
- **Mislabeling SQL/native disagreements as "sync lag".** Only call it lag when `_synced_at` proves lag. Missing UNION branches or coverage gaps are `os-mcp` bugs, not lag — disclose and stop.
4 changes: 3 additions & 1 deletion skills/hermes/SKILL.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
name: topline-os-cli
description: Use the Topline OS CLI for SQL-first CRM analytics, pipeline audits, token-efficient reads, deal briefs, and agent-safe sales operations. Default to the composite `topline --agent query audit|snapshot|freshness` commands for standard analytics; use REST-backed commands for live drilldowns and approved writes.
version: 1.6.0
version: 1.6.1
---

# Topline OS CLI
Expand Down Expand Up @@ -170,6 +170,8 @@ topline raw request GET /opportunities/search --query '{"pipelineId":"PIPELINE_I
12. **Editing this skill (or `topline-os-crm-audits`) mid-audit via `skill_manage`.** The contract is read-only during execution. If the skill is wrong, finish the current run honestly (or stop and disclose the gap), then propose the edit in a follow-up turn.
13. **Prompt-rule tightening instead of primitive design.** If repeated runs keep finding new over-calling shapes (REST fan-out → python wrappers → over-decomposed SQL → bash heredocs), stop adding rules and move the workflow into a composite command/view. The standard pipeline audit is now `query doctor` → `query audit` → answer.
14. **Doing math on the audit JSON after the fact.** Real failure mode: agent runs `query doctor` + `query audit` cleanly, then opens a `python3 - <<'PY' vals=[...] PY` heredoc (or `jq` / `awk` / bash arithmetic) to compute averages/totals over the audit's `activity.by_stage`, `deals`, or `snapshot` rollups before answering. The audit response already carries those rollups — `activity.total_messages`, `activity.by_stage[*]`, `deals.open_count`, `deals.open_value_total`, `snapshot.avg_days_in_stage`, `movement.advances`/`movement.regresses`/`movement.stalls`. If a question genuinely needs a number not in the payload (e.g. p95 instead of avg), express it as `topline --agent query sql --sql ...` and disclose that it is non-standard analytics. Computing in Python/jq/bash on the audit JSON is the same anti-pattern as wrapping the CLI in Python — it just moves the violation one step past the CLI boundary.
15. **Treating current pipeline as historical origin.** The `opportunities` warehouse table exposes current pipeline/stage state; it does not, by itself, prove that a won Qualified opportunity started in Triage. For Flex conversion questions, answer in two layers: (a) direct/current-state query (current pipeline = `Sales - Flex - Qualified`, status = won, created/closed in window); (b) lineage confidence caveat unless a history/audit table or activity event explicitly records the pipeline move. Do not report "originated in Triage" as proven just because the deal is now in Qualified. See `references/flex-crm-lineage-and-manual-outreach.md`.
16. **Counting automated workflow touches as rep effort.** For manual outreach/activity audits, exclude workflow/app automation. In the hosted warehouse, use message/activity metadata such as `raw_payload.source = 'app'` as the automation exclusion signal when available, then break out calls/email/SMS separately and report contact counts.

## Reporting rule of thumb

Expand Down
75 changes: 75 additions & 0 deletions skills/hermes/references/flex-crm-lineage-and-manual-outreach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Flex CRM lineage and manual outreach audit notes

Use this reference when Alex asks about Flex triage → qualified conversion, won deals, rep responsiveness, or manual outreach counts.

## Definitions

- **Manual outreach**: rep-authored outbound calls, emails, or SMS. Exclude workflow/app automation; in the warehouse, `raw_payload.source = 'app'` is the key automation signal observed in `messages`.
- **Response SLA**: for lead-response audits, measure the first outbound call within 4 hours of opportunity creation unless Alex specifies another SLA.
- **Owner attribution**: prefer `opportunities.assigned_to`; fall back to `contacts.assigned_to` when opportunity owner is missing.
- **Current month / QTD**: use the live date tool first, then build explicit date boundaries.

## Pipeline lineage caveat

The hosted warehouse `opportunities` table represents current opportunity state. A deal currently in `Sales - Flex - Qualified` with status `won` can be a good operational proxy for a triage-converted won deal, but it is not proof that the opportunity originally started in `Sales - Flex - Triage` unless a history/audit surface records the move.

Observed pipeline IDs:

- `Sales - Flex - Triage`: `bna6e9DoPgRchNsjeYS3`
- `Sales - Flex - Qualified`: `CLUy1QapsrEeBiNrmQiL`

When asked “how many Triage leads won,” do not only query current Triage; also query current Qualified wins in the window, then state the limitation:

- Current Triage + won: direct same-pipeline wins.
- Current Qualified + won + created/closed in window: operational proxy for moved-forward wins.
- True origin lineage: requires opportunity-history/audit table or explicit activity event; if unavailable, say so plainly.

## Useful SQL patterns

Manual outreach by rep/month:

```sql
SELECT
assigned_user_name,
COUNT(*) AS manual_touches,
COUNT(DISTINCT contact_id) AS contacts_touched,
SUM(CASE WHEN channel = 'call' THEN 1 ELSE 0 END) AS calls,
SUM(CASE WHEN channel = 'email' THEN 1 ELSE 0 END) AS emails,
SUM(CASE WHEN channel = 'sms' THEN 1 ELSE 0 END) AS sms
FROM messages
WHERE direction = 'outbound'
AND created_at >= 'YYYY-MM-01'
AND created_at < 'YYYY-MM-NEXT-01'
-- adapt to the warehouse JSON/text dialect; exclude automation where raw_payload.source = 'app'
AND COALESCE(JSON_EXTRACT(raw_payload, '$.source'), '') <> 'app'
GROUP BY assigned_user_name;
```

Flex Qualified won proxy:

```sql
SELECT
contact_name,
company_name,
value,
source,
created_at,
closed_at
FROM opportunities
WHERE pipeline_id = 'CLUy1QapsrEeBiNrmQiL'
AND status = 'won'
AND created_at >= 'YYYY-01-01'
ORDER BY closed_at;
```

Before relying on history, check whether the warehouse exposes a movement surface:

```sql
SELECT name
FROM sqlite_master
WHERE LOWER(name) LIKE '%history%'
OR LOWER(name) LIKE '%audit%'
OR LOWER(name) LIKE '%movement%';
```

If no history surface exists and activity rows do not include stage/pipeline move payloads, qualify the answer as current-state/proxy analysis, not proven origin lineage.
Loading