diff --git a/README.md b/README.md index b5207fb..d5d0a05 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,7 @@ To configure the Quickwit datasource, you need to provide the following informat - The index name. - The log message field name (optional). This is the field displayed in the explorer view. - The log level field name (optional). It must be a fast field. +- The related logs or traces datasource (optional). This enables trace-to-logs and log-to-trace links when logs and traces are stored in separate Quickwit indexes. ### With Grafana UI @@ -141,6 +142,112 @@ datasources: logLevelField: severity_text ``` +### Logs and traces in separate indexes + +When logs and traces are stored in different Quickwit indexes, configure one datasource per index and link them with `logsDatasourceUid` and `tracesDatasourceUid`. + +```yaml +apiVersion: 1 + +datasources: + - name: Quickwit Logs + uid: quickwit-logs + type: quickwit-quickwit-datasource + url: http://localhost:7280/api/v1 + jsonData: + index: 'otel-logs-v0_9' + logMessageField: body.message + logLevelField: severity_text + tracesDatasourceUid: quickwit-traces + tracesDatasourceName: Quickwit Traces + + - name: Quickwit Traces + uid: quickwit-traces + type: quickwit-quickwit-datasource + url: http://localhost:7280/api/v1 + jsonData: + index: 'otel-traces-v0_9' + logsDatasourceUid: quickwit-logs + logsDatasourceName: Quickwit Logs +``` + +## Traces + +The query editor has two trace query types: + +- **Trace search** scans matching spans and returns one row per trace. Use this to find trace IDs by Lucene query, service, operation, status, or attributes. +- **Traces** returns a full trace frame for Grafana's trace viewer. Use this with a trace ID query such as `trace_id:abc123`. + +The trace parser expects Quickwit OpenTelemetry trace fields such as: + +- `trace_id` +- `span_id` +- `parent_span_id` +- `service_name` +- `span_name` +- `span_start_timestamp_nanos` +- `span_duration_millis` or `span_end_timestamp_nanos` + +It also reads optional fields for richer trace rendering: + +- `resource_attributes` for service tags +- `span_attributes` for span tags, including `service.peer.name` and `peer.service` +- `span_status` for error status and warnings +- `events` for span events and exception stack traces +- `links` for span references +- `scope_name` and `scope_version` for instrumentation library details + +Trace responses include: + +- Grafana trace frames for the trace viewer. +- Node graph frames that summarize service-to-service calls. +- Span warnings for error status and dropped attributes/events/links. +- Span event details and exception stack traces. +- Stable per-service node colors in the node graph. + +### Trace/log correlations + +Trace-to-logs links are attached to each trace span. They query the configured logs datasource with: + +```text +trace_id:${__span.traceId} AND span_id:${__span.spanId} +``` + +Log-to-trace links are attached to log fields named: + +- `trace_id` +- `traceID` +- `traceId` +- `attributes.trace_id` + +They open the configured traces datasource with: + +```text +trace_id:${__value.raw} +``` + +### Local trace fixtures + +For local testing, use the fixture script: + +```bash +QUICKWIT_URL=http://127.0.0.1:7280/api/v1 ./scripts/ingest-multi-service-traces.sh +``` + +It writes two multi-service traces into `otel-traces-v0_9` and matching correlated logs into `otel-logs-v0_9`. + +In **Trace search**, filter the fixture data with: + +```text +span_attributes.fixture:multi-service-trace +``` + +In **Traces**, open a returned trace ID with: + +```text +trace_id: +``` + ## Features - Explore view. @@ -149,6 +256,9 @@ datasources: - Adhoc filters. - Annotations - Explore Log Context. +- Trace search and trace view. +- Trace-to-logs and log-to-trace links. +- Service node graph for trace results. - [Alerting](https://grafana.com/docs/grafana/latest/alerting/). ## FAQ and Limitations diff --git a/pkg/quickwit/client/client.go b/pkg/quickwit/client/client.go index 173d8f3..0fc006a 100644 --- a/pkg/quickwit/client/client.go +++ b/pkg/quickwit/client/client.go @@ -22,6 +22,12 @@ type ReadyStatus struct { type DatasourceInfo struct { ID int64 + UID string + Name string + LogsDatasourceUID string + LogsDatasourceName string + TracesDatasourceUID string + TracesDatasourceName string HTTPClient *http.Client URL string Database string diff --git a/pkg/quickwit/data_query.go b/pkg/quickwit/data_query.go index 0a5871a..8f26396 100644 --- a/pkg/quickwit/data_query.go +++ b/pkg/quickwit/data_query.go @@ -4,6 +4,7 @@ import ( "fmt" "regexp" "strconv" + "strings" es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client" "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/simplejson" @@ -25,11 +26,22 @@ func buildMSR(queries []*Query, defaultTimeField string) ([]*es.SearchRequest, e b := ms.Search(q.Interval) b.Size(0) filters := b.Query().Bool().Filter() + // Always pass Grafana's picker range through. Quickwit's metastore + // prunes splits whose timestamps fall outside this window, so even + // trace_id lookups go from "scan every split" to "scan a few" — the + // same speedup the native Jaeger endpoint gets via auto-derived bounds. filters.AddDateRangeFilter(defaultTimeField, q.RangeTo, q.RangeFrom) filters.AddQueryStringFilter(q.RawQuery, true, "AND") + if isTraceSearchQuery(q) { + filters.AddQueryStringFilter(traceSearchSettingsQuery(q), true, "AND") + } if isLogsQuery(q) { processLogsQuery(q, b, q.RangeFrom, q.RangeTo, defaultTimeField) + } else if isTraceSearchQuery(q) { + processTraceSearchQuery(q, b, defaultTimeField) + } else if isTracesQuery(q) { + processTracesQuery(q, b, defaultTimeField) } else if isDocumentQuery(q) { processDocumentQuery(q, b, q.RangeFrom, q.RangeTo, defaultTimeField) } else { @@ -269,8 +281,8 @@ func getPipelineAggField(m *MetricAgg) string { func isQueryWithError(query *Query) error { if len(query.BucketAggs) == 0 { - // If no aggregations, only document and logs queries are valid - if len(query.Metrics) == 0 || !(isLogsQuery(query) || isDocumentQuery(query)) { + // If no aggregations, only document, logs, and trace queries are valid + if len(query.Metrics) == 0 || !(isLogsQuery(query) || isTraceSearchQuery(query) || isTracesQuery(query) || isDocumentQuery(query)) { return fmt.Errorf("invalid query, missing metrics and aggregations") } } else { @@ -302,7 +314,15 @@ func isQueryWithError(query *Query) error { } func isLogsQuery(query *Query) bool { - return query.Metrics[0].Type == logsType + return queryMetricType(query) == logsType +} + +func isTracesQuery(query *Query) bool { + return queryMetricType(query) == tracesType +} + +func isTraceSearchQuery(query *Query) bool { + return queryMetricType(query) == traceSearchType } func isDocumentQuery(query *Query) bool { @@ -310,11 +330,169 @@ func isDocumentQuery(query *Query) bool { } func isRawDataQuery(query *Query) bool { - return query.Metrics[0].Type == rawDataType + return queryMetricType(query) == rawDataType } func isRawDocumentQuery(query *Query) bool { - return query.Metrics[0].Type == rawDocumentType + return queryMetricType(query) == rawDocumentType +} + +var ( + bareTraceIDPattern = regexp.MustCompile(`^[0-9a-fA-F]{32}$`) + durationPattern = regexp.MustCompile(`(?i)^\s*(\d+(?:\.\d+)?)\s*(ns|us|ms|s|m|h)?\s*$`) +) + +func firstMetricType(query *Query) string { + if query == nil || len(query.Metrics) == 0 { + return "" + } + return query.Metrics[0].Type +} + +func queryMetricType(query *Query) string { + return firstMetricType(query) +} + +func isBareTraceIDQuery(rawQuery string) bool { + return bareTraceIDPattern.MatchString(strings.TrimSpace(rawQuery)) +} + +func traceSearchSettingsQuery(query *Query) string { + if !isTraceSearchQuery(query) || len(query.Metrics) == 0 || query.Metrics[0].Settings == nil { + return "" + } + + settings := query.Metrics[0].Settings + clauses := []string{} + + if serviceName := strings.TrimSpace(settings.Get("serviceName").MustString()); serviceName != "" { + clauses = append(clauses, traceSearchPhraseClause("service_name", serviceName)) + } + if spanName := strings.TrimSpace(settings.Get("spanName").MustString()); spanName != "" { + clauses = append(clauses, traceSearchPhraseClause("span_name", spanName)) + } + if statusClause := traceSearchStatusClause(settings.Get("status").MustString()); statusClause != "" { + clauses = append(clauses, statusClause) + } + if minDuration, ok := traceSearchDurationMillis(settings.Get("minDuration").MustString()); ok { + clauses = append(clauses, "span_duration_millis:>="+minDuration) + } + if maxDuration, ok := traceSearchDurationMillis(settings.Get("maxDuration").MustString()); ok { + clauses = append(clauses, "span_duration_millis:<="+maxDuration) + } + + return strings.Join(clauses, " AND ") +} + +func traceSearchPhraseClause(fieldName, value string) string { + escaped := strings.ReplaceAll(value, `\`, `\\`) + escaped = strings.ReplaceAll(escaped, `"`, `\"`) + return fieldName + `:"` + escaped + `"` +} + +func traceSearchStatusClause(status string) string { + // span_status is mapped as `tokenizer: raw`, so matches are exact tokens. + // OTel canonical strings are TitleCase ("Error"/"Ok"/"Unset") and the OTLP + // wire form is the integer enum (0/1/2) or "STATUS_CODE_*". Some pipelines + // lowercase, so cover all variants we've seen. + switch strings.ToLower(strings.TrimSpace(status)) { + case "error": + return `(span_status.code:Error OR span_status.code:ERROR OR span_status.code:error OR span_status.code:STATUS_CODE_ERROR OR span_status.code:2 OR span_attributes.error:true OR span_attributes.otel.status_code:ERROR)` + case "ok": + return `(span_status.code:Ok OR span_status.code:OK OR span_status.code:ok OR span_status.code:STATUS_CODE_OK OR span_status.code:1)` + case "unset": + return `(span_status.code:Unset OR span_status.code:UNSET OR span_status.code:unset OR span_status.code:STATUS_CODE_UNSET OR span_status.code:0)` + default: + return "" + } +} + +func traceSearchDurationMillis(duration string) (string, bool) { + matches := durationPattern.FindStringSubmatch(duration) + if matches == nil { + return "", false + } + + value, err := strconv.ParseFloat(matches[1], 64) + if err != nil { + return "", false + } + + switch strings.ToLower(matches[2]) { + case "ns": + value = value / 1000000 + case "us": + value = value / 1000 + case "s": + value = value * 1000 + case "m": + value = value * 60 * 1000 + case "h": + value = value * 60 * 60 * 1000 + case "ms", "": + default: + return "", false + } + + return strconv.FormatFloat(value, 'f', -1, 64), true +} + +func canInferTraceLink(query *Query) bool { + firstMetricType := firstMetricType(query) + return firstMetricType == "" || firstMetricType == logsType || firstMetricType == tracesType +} + +func canTraceQueryTypeNormalize(query *Query) bool { + // queryType is a transient hint from internal trace links. Once the request + // has an explicit non-log/non-trace metric, the metric type is authoritative. + return query.QueryType == tracesType && canInferTraceLink(query) +} + +func normalizeInternalLinkTraceQuery(query *Query) { + if query == nil { + return + } + + if query.QueryType == tracesType && !canInferTraceLink(query) { + query.QueryType = "" + } + + if !canTraceQueryTypeNormalize(query) && + firstMetricType(query) != tracesType && + !(firstMetricType(query) == "" && isBareTraceIDQuery(query.RawQuery)) { + return + } + + rawQuery := strings.TrimSpace(query.RawQuery) + if isBareTraceIDQuery(rawQuery) { + query.RawQuery = "trace_id:" + rawQuery + } + query.QueryType = tracesType + query.BucketAggs = []*BucketAgg{} + + if len(query.Metrics) == 0 { + query.Metrics = []*MetricAgg{ + { + ID: "1", + Type: tracesType, + Settings: simplejson.NewFromAny(map[string]interface{}{"limit": "1000"}), + Meta: simplejson.New(), + }, + } + return + } + + query.Metrics = query.Metrics[:1] + query.Metrics[0].Type = tracesType + if query.Metrics[0].ID == "" { + query.Metrics[0].ID = "1" + } + if query.Metrics[0].Settings == nil { + query.Metrics[0].Settings = simplejson.New() + } + if query.Metrics[0].Settings.Get("limit").MustString() == "" { + query.Metrics[0].Settings.Set("limit", "1000") + } } func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) { @@ -337,6 +515,18 @@ func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defa } } +func processTracesQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) { + metric := q.Metrics[0] + b.Sort(es.SortOrderAsc, defaultTimeField, "epoch_nanos_int") + b.Size(stringToIntWithDefaultValue(metric.Settings.Get("limit").MustString(), defaultSize)) +} + +func processTraceSearchQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) { + metric := q.Metrics[0] + b.Sort(es.SortOrderDesc, defaultTimeField, "epoch_nanos_int") + b.Size(stringToIntWithDefaultValue(metric.Settings.Get("spanLimit").MustString(), 5000)) +} + func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) { metric := q.Metrics[0] b.Sort(es.SortOrderDesc, defaultTimeField, "epoch_nanos_int") diff --git a/pkg/quickwit/data_query_test.go b/pkg/quickwit/data_query_test.go index a9b8f81..b0f8a9c 100644 --- a/pkg/quickwit/data_query_test.go +++ b/pkg/quickwit/data_query_test.go @@ -12,6 +12,18 @@ import ( es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client" ) +func TestQueryTypeHelpersDoNotPanicWithoutMetrics(t *testing.T) { + query := &Query{} + + require.NotPanics(t, func() { + assert.False(t, isLogsQuery(query)) + assert.False(t, isTracesQuery(query)) + assert.False(t, isTraceSearchQuery(query)) + assert.False(t, isRawDataQuery(query)) + assert.False(t, isRawDocumentQuery(query)) + }) +} + func TestExecuteElasticsearchDataQuery(t *testing.T) { from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) @@ -1321,6 +1333,87 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { require.Equal(t, sr.Size, 1000) }) + t.Run("With traces query should return query sorted by ascending time and apply the picker range", func(t *testing.T) { + c := newFakeClient() + _, err := executeElasticsearchDataQuery(c, `{ + "query": "trace_id:3c191d03fa8be0653c191d03fa8be065", + "metrics": [{ "type": "traces", "id": "1", "settings": { "limit": "5000" }}] + }`, from, to) + require.NoError(t, err) + sr := c.multisearchRequests[0][0] + require.Equal(t, sr.Size, 5000) + require.Equal(t, sr.Sort[0]["@timestamp"]["order"], "asc") + + require.Len(t, sr.Query.Bool.Filters, 2) + rangeFilter := sr.Query.Bool.Filters[0].(*es.DateRangeFilter) + require.Equal(t, rangeFilter.Lte, "2018-05-15T17:55:00Z") + require.Equal(t, rangeFilter.Gte, "2018-05-15T17:50:00Z") + queryFilter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter) + require.Equal(t, queryFilter.Query, "trace_id:3c191d03fa8be0653c191d03fa8be065") + }) + + t.Run("With traces query using a non-trace-id filter should keep the time range", func(t *testing.T) { + c := newFakeClient() + _, err := executeElasticsearchDataQuery(c, `{ + "query": "service_name:quickwit", + "metrics": [{ "type": "traces", "id": "1", "settings": { "limit": "5000" }}] + }`, from, to) + require.NoError(t, err) + sr := c.multisearchRequests[0][0] + + rangeFilter := sr.Query.Bool.Filters[0].(*es.DateRangeFilter) + require.Equal(t, rangeFilter.Lte, "2018-05-15T17:55:00Z") + require.Equal(t, rangeFilter.Gte, "2018-05-15T17:50:00Z") + queryFilter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter) + require.Equal(t, queryFilter.Query, "service_name:quickwit") + }) + + t.Run("With trace search query should scan spans sorted by descending time", func(t *testing.T) { + c := newFakeClient() + _, err := executeElasticsearchDataQuery(c, `{ + "query": "service_name:quickwit", + "metrics": [{ "type": "trace_search", "id": "1", "settings": { "limit": "20", "spanLimit": "2500" }}] + }`, from, to) + require.NoError(t, err) + sr := c.multisearchRequests[0][0] + require.Equal(t, sr.Size, 2500) + require.Equal(t, sr.Sort[0]["@timestamp"]["order"], "desc") + + rangeFilter := sr.Query.Bool.Filters[0].(*es.DateRangeFilter) + require.Equal(t, rangeFilter.Lte, "2018-05-15T17:55:00Z") + require.Equal(t, rangeFilter.Gte, "2018-05-15T17:50:00Z") + queryFilter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter) + require.Equal(t, queryFilter.Query, "service_name:quickwit") + }) + + t.Run("With trace search builder settings should add structured trace filters", func(t *testing.T) { + c := newFakeClient() + _, err := executeElasticsearchDataQuery(c, `{ + "query": "span_attributes.http.method:GET", + "metrics": [{ + "type": "trace_search", + "id": "1", + "settings": { + "limit": "20", + "spanLimit": "2500", + "serviceName": "checkout", + "spanName": "GET /checkout", + "status": "error", + "minDuration": "100ms", + "maxDuration": "1.2s" + } + }] + }`, from, to) + require.NoError(t, err) + sr := c.multisearchRequests[0][0] + + require.Len(t, sr.Query.Bool.Filters, 3) + queryFilter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter) + require.Equal(t, "span_attributes.http.method:GET", queryFilter.Query) + traceSearchFilter := sr.Query.Bool.Filters[2].(*es.QueryStringFilter) + require.Equal(t, `service_name:"checkout" AND span_name:"GET /checkout" AND (span_status.code:Error OR span_status.code:ERROR OR span_status.code:error OR span_status.code:STATUS_CODE_ERROR OR span_status.code:2 OR span_attributes.error:true OR span_attributes.otel.status_code:ERROR) AND span_duration_millis:>=100 AND span_duration_millis:<=1200`, traceSearchFilter.Query) + }) + t.Run("With invalid query should return error", (func(t *testing.T) { c := newFakeClient() _, err := executeElasticsearchDataQuery(c, `{ @@ -1721,5 +1814,5 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time) return &backend.QueryDataResponse{}, err } - return parseResponse(res, queries, configuredFields) + return parseResponse(res, queries, configuredFields, nil) } diff --git a/pkg/quickwit/elasticsearch.go b/pkg/quickwit/elasticsearch.go index afb18f6..26bdaf0 100644 --- a/pkg/quickwit/elasticsearch.go +++ b/pkg/quickwit/elasticsearch.go @@ -45,7 +45,7 @@ func queryData(ctx context.Context, dataQueries []backend.DataQuery, dsInfo *es. return &backend.QueryDataResponse{}, err } - return parseResponse(res, queries, dsInfo.ConfiguredFields) + return parseResponse(res, queries, dsInfo.ConfiguredFields, dsInfo) } func handleQuickwitErrors(err error) (*backend.QueryDataResponse, error) { diff --git a/pkg/quickwit/models.go b/pkg/quickwit/models.go index 818908e..2029183 100644 --- a/pkg/quickwit/models.go +++ b/pkg/quickwit/models.go @@ -9,6 +9,7 @@ import ( // Query represents the time series query model of the datasource type Query struct { RawQuery string `json:"query"` + QueryType string `json:"queryType"` BucketAggs []*BucketAgg `json:"bucketAggs"` Metrics []*MetricAgg `json:"metrics"` Alias string `json:"alias"` @@ -60,6 +61,8 @@ var metricAggType = map[string]string{ "raw_data": "Raw Data", "rate": "Rate", "logs": "Logs", + "traces": "Traces", + "trace_search": "Trace search", } var extendedStats = map[string]string{ diff --git a/pkg/quickwit/parse_query.go b/pkg/quickwit/parse_query.go index 7f1632e..93f26ae 100644 --- a/pkg/quickwit/parse_query.go +++ b/pkg/quickwit/parse_query.go @@ -20,6 +20,7 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { // please do not create a new field with that name, to avoid potential problems with old, persisted queries. rawQuery := model.Get("query").MustString() + queryType := model.Get("queryType").MustString() bucketAggs, err := parseBucketAggs(model) if err != nil { return nil, err @@ -35,8 +36,9 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { from := q.TimeRange.From.UnixNano() / int64(time.Millisecond) to := q.TimeRange.To.UnixNano() / int64(time.Millisecond) - queries = append(queries, &Query{ + query := &Query{ RawQuery: rawQuery, + QueryType: queryType, BucketAggs: bucketAggs, Metrics: metrics, Alias: alias, @@ -46,7 +48,9 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { MaxDataPoints: q.MaxDataPoints, RangeFrom: from, RangeTo: to, - }) + } + normalizeInternalLinkTraceQuery(query) + queries = append(queries, query) } return queries, nil diff --git a/pkg/quickwit/parse_query_test.go b/pkg/quickwit/parse_query_test.go index 44fb115..6b71f77 100644 --- a/pkg/quickwit/parse_query_test.go +++ b/pkg/quickwit/parse_query_test.go @@ -101,5 +101,155 @@ func TestParseQuery(t *testing.T) { require.Equal(t, q.BucketAggs[1].Settings.Get("min_doc_count").MustInt(), 0) require.Equal(t, q.BucketAggs[1].Settings.Get("trimEdges").MustInt(), 0) }) + + t.Run("Should normalize exemplar-style bare trace id links", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "` + traceID + `", + "queryType": "traces", + "metrics": [{ "type": "logs", "id": "3", "settings": { "limit": "100" } }], + "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, "trace_id:"+traceID, q.RawQuery) + require.Equal(t, tracesType, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, tracesType, q.Metrics[0].Type) + require.Equal(t, "3", q.Metrics[0].ID) + require.Equal(t, "100", q.Metrics[0].Settings.Get("limit").MustString()) + require.Empty(t, q.BucketAggs) + }) + + t.Run("Should create a traces metric for bare trace id links without metrics", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "` + traceID + `", + "metrics": [], + "bucketAggs": [] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, "trace_id:"+traceID, q.RawQuery) + require.Equal(t, tracesType, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, tracesType, q.Metrics[0].Type) + require.Equal(t, "1", q.Metrics[0].ID) + require.Equal(t, "1000", q.Metrics[0].Settings.Get("limit").MustString()) + }) + + t.Run("Should not let stale queryType override explicit trace search metrics", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "trace_id:` + traceID + `", + "queryType": "traces", + "metrics": [{ "type": "trace_search", "id": "3", "settings": { "limit": "20", "spanLimit": "1000" } }], + "bucketAggs": [] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, "trace_id:"+traceID, q.RawQuery) + require.Empty(t, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, traceSearchType, q.Metrics[0].Type) + require.Equal(t, "20", q.Metrics[0].Settings.Get("limit").MustString()) + require.Equal(t, "1000", q.Metrics[0].Settings.Get("spanLimit").MustString()) + }) + + t.Run("Should not normalize regular metric queries with bare hex filters", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "` + traceID + `", + "metrics": [{ "type": "count", "id": "1" }], + "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, traceID, q.RawQuery) + require.Empty(t, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, countType, q.Metrics[0].Type) + require.Len(t, q.BucketAggs, 1) + }) + + t.Run("Should not normalize trace_id log filters", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "trace_id:` + traceID + `", + "metrics": [{ "type": "logs", "id": "3", "settings": { "limit": "100" } }], + "bucketAggs": [] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, "trace_id:"+traceID, q.RawQuery) + require.Empty(t, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, logsType, q.Metrics[0].Type) + }) + + t.Run("Should not normalize bare trace ids when logs are explicitly selected", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "` + traceID + `", + "metrics": [{ "type": "logs", "id": "3", "settings": { "limit": "100" } }], + "bucketAggs": [] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, traceID, q.RawQuery) + require.Empty(t, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, logsType, q.Metrics[0].Type) + }) + + t.Run("Should not normalize span-specific log correlation queries", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "trace_id:` + traceID + ` AND span_id:cccccccccccccccc", + "metrics": [{ "type": "logs", "id": "3", "settings": { "limit": "100" } }], + "bucketAggs": [] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, "trace_id:"+traceID+" AND span_id:cccccccccccccccc", q.RawQuery) + require.Empty(t, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, logsType, q.Metrics[0].Type) + }) }) } diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index 05fa0ce..3dfe95d 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -67,6 +67,26 @@ func NewQuickwitDatasource(ctx context.Context, settings backend.DataSourceInsta logMessageField = "" } + logsDatasourceUID, ok := jsonData["logsDatasourceUid"].(string) + if !ok { + logsDatasourceUID = "" + } + + logsDatasourceName, ok := jsonData["logsDatasourceName"].(string) + if !ok { + logsDatasourceName = "" + } + + tracesDatasourceUID, ok := jsonData["tracesDatasourceUid"].(string) + if !ok { + tracesDatasourceUID = "" + } + + tracesDatasourceName, ok := jsonData["tracesDatasourceName"].(string) + if !ok { + tracesDatasourceName = "" + } + index, ok := jsonData["index"].(string) if !ok { index = "" @@ -99,6 +119,12 @@ func NewQuickwitDatasource(ctx context.Context, settings backend.DataSourceInsta model := es.DatasourceInfo{ ID: settings.ID, + UID: settings.UID, + Name: settings.Name, + LogsDatasourceUID: logsDatasourceUID, + LogsDatasourceName: logsDatasourceName, + TracesDatasourceUID: tracesDatasourceUID, + TracesDatasourceName: tracesDatasourceName, URL: settings.URL, HTTPClient: httpCli, Database: index, diff --git a/pkg/quickwit/response_parser.go b/pkg/quickwit/response_parser.go index b291c33..4da9ddc 100644 --- a/pkg/quickwit/response_parser.go +++ b/pkg/quickwit/response_parser.go @@ -41,7 +41,7 @@ const ( var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString)) -func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) { +func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configuredFields es.ConfiguredFields, dsInfo *es.DatasourceInfo) (*backend.QueryDataResponse, error) { result := backend.QueryDataResponse{ Responses: backend.Responses{}, } @@ -54,7 +54,7 @@ func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configured byteReader := bytes.NewReader(*rawRes) dec := json.NewDecoder(byteReader) - if isLogsQuery(target) { + if isLogsQuery(target) || isTraceSearchQuery(target) || isTracesQuery(target) { dec.UseNumber() } var res *es.SearchResponse @@ -92,6 +92,18 @@ func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configured return &backend.QueryDataResponse{}, err } result.Responses[target.RefID] = queryRes + } else if isTraceSearchQuery(target) { + err := processTraceSearchResponse(res, target, dsInfo, &queryRes) + if err != nil { + return &backend.QueryDataResponse{}, err + } + result.Responses[target.RefID] = queryRes + } else if isTracesQuery(target) { + err := processTracesResponse(res, target, configuredFields, dsInfo, &queryRes) + if err != nil { + return &backend.QueryDataResponse{}, err + } + result.Responses[target.RefID] = queryRes } else { // Process as metric query result props := make(map[string]string) diff --git a/pkg/quickwit/response_parser_test.go b/pkg/quickwit/response_parser_test.go index 3a8b41a..fe9e1c6 100644 --- a/pkg/quickwit/response_parser_test.go +++ b/pkg/quickwit/response_parser_test.go @@ -3413,6 +3413,10 @@ func TestTrimEdges(t *testing.T) { } func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*backend.QueryDataResponse, error) { + return parseTestResponseWithDatasourceInfo(tsdbQueries, responseBody, nil) +} + +func parseTestResponseWithDatasourceInfo(tsdbQueries map[string]string, responseBody string, dsInfo *es.DatasourceInfo) (*backend.QueryDataResponse, error) { from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) configuredFields := es.ConfiguredFields{ @@ -3448,7 +3452,7 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac return nil, err } - return parseResponse(response.Responses, queries, configuredFields) + return parseResponse(response.Responses, queries, configuredFields, dsInfo) } func requireTimeValue(t *testing.T, expected int64, frame *data.Frame, index int) { diff --git a/pkg/quickwit/response_parser_traces.go b/pkg/quickwit/response_parser_traces.go new file mode 100644 index 0000000..abb2085 --- /dev/null +++ b/pkg/quickwit/response_parser_traces.go @@ -0,0 +1,1253 @@ +package quickwit + +import ( + "encoding/json" + "fmt" + "sort" + "strconv" + "strings" + "time" + "unicode/utf8" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + + es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client" + "github.com/quickwit-oss/quickwit-datasource/pkg/utils" +) + +const ( + tracesType = "traces" + traceSearchType = "trace_search" + quickwitPluginID = "quickwit-quickwit-datasource" + + maxTraceStackTraceBytes = 64 * 1024 + traceStackTraceTruncatedText = "\n... truncated" +) + +type traceKeyValuePair struct { + Key string `json:"key"` + Value interface{} `json:"value"` + Type string `json:"type,omitempty"` +} + +type traceLog struct { + Timestamp float64 `json:"timestamp"` + Fields []traceKeyValuePair `json:"fields"` + Name string `json:"name,omitempty"` +} + +type traceSpanReference struct { + TraceID string `json:"traceID"` + SpanID string `json:"spanID"` + Tags []traceKeyValuePair `json:"tags,omitempty"` +} + +type traceSearchSummary struct { + traceID string + startTimeMillis float64 + endTimeMillis float64 + latestMillis float64 + spanCount int + errorCount int + services map[string]bool + spanNames map[string]bool + rootServiceName string + rootSpanName string + rootStartMillis float64 +} + +type traceGraphSpan struct { + spanID string + parentSpanID string + serviceName string + durationMillis float64 + statusCode int64 +} + +type traceGraphNode struct { + id string + spanCount int + errorCount int + durationMillis float64 +} + +type traceGraphEdge struct { + id string + source string + target string + callCount int + errorCount int + durationMillis float64 +} + +func processTracesResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, dsInfo *es.DatasourceInfo, queryRes *backend.DataResponse) error { + hits := []map[string]interface{}{} + if res.Hits != nil { + hits = res.Hits.Hits + } + + traceIDs := make([]string, 0, len(hits)) + spanIDs := make([]string, 0, len(hits)) + parentSpanIDs := make([]*string, 0, len(hits)) + operationNames := make([]string, 0, len(hits)) + serviceNames := make([]string, 0, len(hits)) + serviceTags := make([]json.RawMessage, 0, len(hits)) + startTimes := make([]float64, 0, len(hits)) + durations := make([]float64, 0, len(hits)) + logs := make([]json.RawMessage, 0, len(hits)) + references := make([]json.RawMessage, 0, len(hits)) + tags := make([]json.RawMessage, 0, len(hits)) + kinds := make([]string, 0, len(hits)) + statusCodes := make([]int64, 0, len(hits)) + statusMessages := make([]string, 0, len(hits)) + instrumentationLibraryNames := make([]string, 0, len(hits)) + instrumentationLibraryVersions := make([]string, 0, len(hits)) + traceStates := make([]string, 0, len(hits)) + errorIconColors := make([]string, 0, len(hits)) + warnings := make([]json.RawMessage, 0, len(hits)) + stackTraces := make([]json.RawMessage, 0, len(hits)) + graphSpans := make([]traceGraphSpan, 0, len(hits)) + + for _, hit := range hits { + source, ok := hit["_source"].(map[string]interface{}) + if !ok || source == nil { + continue + } + + traceID := traceString(source["trace_id"]) + spanID := traceString(source["span_id"]) + if traceID == "" || spanID == "" { + continue + } + + parentSpanID := traceParentSpanID(source["parent_span_id"]) + spanTags := traceSpanTags(source["span_attributes"]) + serviceName := traceString(source["service_name"]) + spanName := traceString(source["span_name"]) + durationMillis := traceDurationMillis(source) + statusCode, statusMessage, errorIconColor := traceSpanStatus(source) + + traceIDs = append(traceIDs, traceID) + spanIDs = append(spanIDs, spanID) + parentSpanIDs = append(parentSpanIDs, parentSpanID) + operationNames = append(operationNames, spanName) + serviceNames = append(serviceNames, serviceName) + serviceTags = append(serviceTags, traceJSONRawMessage(traceServiceTags(source["resource_attributes"], serviceName))) + startTimes = append(startTimes, traceStartTimeMillis(source, configuredFields)) + durations = append(durations, durationMillis) + logs = append(logs, traceJSONRawMessage(traceLogs(source["events"]))) + references = append(references, traceJSONRawMessage(traceReferences(source["links"]))) + tags = append(tags, traceJSONRawMessage(spanTags)) + kinds = append(kinds, traceSpanKind(source["span_kind"])) + statusCodes = append(statusCodes, statusCode) + statusMessages = append(statusMessages, statusMessage) + errorIconColors = append(errorIconColors, errorIconColor) + instrumentationLibraryNames = append(instrumentationLibraryNames, traceString(source["scope_name"])) + instrumentationLibraryVersions = append(instrumentationLibraryVersions, traceString(source["scope_version"])) + traceStates = append(traceStates, traceString(source["trace_state"])) + warnings = append(warnings, traceJSONRawMessage(traceWarnings(source, statusCode, statusMessage))) + stackTraces = append(stackTraces, traceJSONRawMessage(traceStackTraces(source))) + + parentID := "" + if parentSpanID != nil { + parentID = *parentSpanID + } + graphSpans = append(graphSpans, traceGraphSpan{ + spanID: spanID, + parentSpanID: parentID, + serviceName: serviceName, + durationMillis: durationMillis, + statusCode: statusCode, + }) + } + + spanIDField := data.NewField("spanID", nil, spanIDs) + if links := traceToLogsDataLinks(dsInfo); len(links) > 0 { + spanIDField.SetConfig(&data.FieldConfig{Links: links}) + } + + frame := data.NewFrame("", + data.NewField("traceID", nil, traceIDs), + spanIDField, + data.NewField("parentSpanID", nil, parentSpanIDs), + data.NewField("operationName", nil, operationNames), + data.NewField("serviceName", nil, serviceNames), + data.NewField("serviceTags", nil, serviceTags), + data.NewField("startTime", nil, startTimes), + data.NewField("duration", nil, durations), + data.NewField("logs", nil, logs), + data.NewField("references", nil, references), + data.NewField("tags", nil, tags), + data.NewField("kind", nil, kinds), + data.NewField("statusCode", nil, statusCodes), + data.NewField("statusMessage", nil, statusMessages), + data.NewField("instrumentationLibraryName", nil, instrumentationLibraryNames), + data.NewField("instrumentationLibraryVersion", nil, instrumentationLibraryVersions), + data.NewField("traceState", nil, traceStates), + data.NewField("errorIconColor", nil, errorIconColors), + data.NewField("warnings", nil, warnings), + data.NewField("stackTraces", nil, stackTraces), + ) + setPreferredVisType(frame, data.VisTypeTrace) + frames := data.Frames{frame} + frames = append(frames, traceNodeGraphFrames(graphSpans)...) + queryRes.Frames = frames + return nil +} + +func processTraceSearchResponse(res *es.SearchResponse, target *Query, dsInfo *es.DatasourceInfo, queryRes *backend.DataResponse) error { + hits := []map[string]interface{}{} + if res.Hits != nil { + hits = res.Hits.Hits + } + + summariesByTraceID := map[string]*traceSearchSummary{} + traceOrder := []string{} + + for _, hit := range hits { + source, ok := hit["_source"].(map[string]interface{}) + if !ok || source == nil { + continue + } + + traceID := traceString(source["trace_id"]) + spanID := traceString(source["span_id"]) + if traceID == "" || spanID == "" { + continue + } + + summary, exists := summariesByTraceID[traceID] + if !exists { + summary = &traceSearchSummary{ + traceID: traceID, + startTimeMillis: 0, + endTimeMillis: 0, + latestMillis: 0, + services: map[string]bool{}, + spanNames: map[string]bool{}, + rootStartMillis: 0, + } + summariesByTraceID[traceID] = summary + traceOrder = append(traceOrder, traceID) + } + + startTimeMillis := traceStartTimeMillis(source, es.ConfiguredFields{}) + endTimeMillis := traceEndTimeMillis(source, startTimeMillis) + if startTimeMillis > 0 && (summary.startTimeMillis == 0 || startTimeMillis < summary.startTimeMillis) { + summary.startTimeMillis = startTimeMillis + } + if endTimeMillis > summary.endTimeMillis { + summary.endTimeMillis = endTimeMillis + } + if startTimeMillis > summary.latestMillis { + summary.latestMillis = startTimeMillis + } + + summary.spanCount++ + if traceIsErrorSpan(source) { + summary.errorCount++ + } + if serviceName := traceString(source["service_name"]); serviceName != "" { + summary.services[serviceName] = true + } + if spanName := traceString(source["span_name"]); spanName != "" { + summary.spanNames[spanName] = true + } + + if traceParentSpanID(source["parent_span_id"]) == nil { + if summary.rootSpanName == "" || startTimeMillis < summary.rootStartMillis || summary.rootStartMillis == 0 { + summary.rootSpanName = traceString(source["span_name"]) + summary.rootServiceName = traceString(source["service_name"]) + summary.rootStartMillis = startTimeMillis + } + } + } + + summaries := make([]*traceSearchSummary, 0, len(traceOrder)) + for _, traceID := range traceOrder { + summary := summariesByTraceID[traceID] + if summary.rootSpanName == "" { + summary.rootSpanName = firstSortedMapKey(summary.spanNames) + summary.rootServiceName = firstSortedMapKey(summary.services) + } + summaries = append(summaries, summary) + } + sort.SliceStable(summaries, func(i, j int) bool { + return summaries[i].startTimeMillis > summaries[j].startTimeMillis + }) + + limit := defaultSize + if len(target.Metrics) > 0 { + limit = stringToIntWithDefaultValue(target.Metrics[0].Settings.Get("limit").MustString(), 20) + } + if limit > 0 && len(summaries) > limit { + summaries = summaries[:limit] + } + + traceIDs := make([]string, 0, len(summaries)) + startTimes := make([]time.Time, 0, len(summaries)) + durations := make([]float64, 0, len(summaries)) + spanCounts := make([]int64, 0, len(summaries)) + services := make([]string, 0, len(summaries)) + rootServices := make([]string, 0, len(summaries)) + rootSpans := make([]string, 0, len(summaries)) + matchedSpans := make([]string, 0, len(summaries)) + errorCounts := make([]int64, 0, len(summaries)) + + for _, summary := range summaries { + traceIDs = append(traceIDs, summary.traceID) + startTimes = append(startTimes, time.UnixMilli(int64(summary.startTimeMillis)).UTC()) + duration := summary.endTimeMillis - summary.startTimeMillis + if duration < 0 { + duration = 0 + } + durations = append(durations, duration) + spanCounts = append(spanCounts, int64(summary.spanCount)) + services = append(services, joinSortedMapKeys(summary.services, ", ")) + rootServices = append(rootServices, summary.rootServiceName) + rootSpans = append(rootSpans, summary.rootSpanName) + matchedSpans = append(matchedSpans, joinSortedMapKeysWithLimit(summary.spanNames, ", ", 5)) + errorCounts = append(errorCounts, int64(summary.errorCount)) + } + + traceIDField := data.NewField("traceID", nil, traceIDs).SetConfig(&data.FieldConfig{ + DisplayName: "Trace ID", + Links: traceSearchDataLinks(dsInfo), + }) + durationField := data.NewField("duration", nil, durations).SetConfig(&data.FieldConfig{ + DisplayName: "Duration", + Unit: "ms", + }) + errorField := data.NewField("errors", nil, errorCounts).SetConfig(traceSearchErrorsFieldConfig()) + + frame := data.NewFrame("Trace search", + traceIDField, + data.NewField("startTime", nil, startTimes).SetConfig(&data.FieldConfig{DisplayName: "Start time"}), + durationField, + data.NewField("spans", nil, spanCounts).SetConfig(&data.FieldConfig{DisplayName: "Spans"}), + data.NewField("services", nil, services).SetConfig(&data.FieldConfig{DisplayName: "Services"}), + data.NewField("rootService", nil, rootServices).SetConfig(&data.FieldConfig{DisplayName: "Root service"}), + data.NewField("rootSpan", nil, rootSpans).SetConfig(&data.FieldConfig{DisplayName: "Root span"}), + data.NewField("matchedSpans", nil, matchedSpans).SetConfig(&data.FieldConfig{DisplayName: "Matched spans"}), + errorField, + ) + setPreferredVisType(frame, data.VisTypeTable) + queryRes.Frames = data.Frames{frame} + return nil +} + +func traceParentSpanID(value interface{}) *string { + parentSpanID := traceString(value) + if parentSpanID == "" || strings.Trim(parentSpanID, "0") == "" { + return nil + } + return &parentSpanID +} + +func traceString(value interface{}) string { + switch v := value.(type) { + case nil: + return "" + case string: + return v + case json.Number: + return v.String() + default: + return fmt.Sprintf("%v", v) + } +} + +func traceNumber(value interface{}) (float64, bool) { + switch v := value.(type) { + case json.Number: + parsed, err := v.Float64() + return parsed, err == nil + case float64: + return v, true + case float32: + return float64(v), true + case int: + return float64(v), true + case int64: + return float64(v), true + case int32: + return float64(v), true + case uint64: + return float64(v), true + case uint32: + return float64(v), true + case string: + parsed, err := strconv.ParseFloat(v, 64) + return parsed, err == nil + default: + return 0, false + } +} + +func traceTimestampMillis(value interface{}, outputFormat string) (float64, bool) { + switch typedValue := value.(type) { + case json.Number: + return traceUnixTimestampMillisFromString(typedValue.String(), outputFormat) + case string: + stringValue := strings.TrimSpace(typedValue) + if stringValue == "" { + return 0, false + } + if timestamp, ok := traceUnixTimestampMillisFromString(stringValue, outputFormat); ok { + return timestamp, true + } + if outputFormat != "" { + if parsedTime, err := utils.ParseTime(stringValue, outputFormat); err == nil { + return float64(parsedTime.UnixNano()) / 1e6, true + } + } + if parsedTime, err := time.Parse(time.RFC3339Nano, stringValue); err == nil { + return float64(parsedTime.UnixNano()) / 1e6, true + } + return 0, false + case int: + return traceUnixTimestampMillisFromInt(int64(typedValue), outputFormat) + case int64: + return traceUnixTimestampMillisFromInt(typedValue, outputFormat) + case int32: + return traceUnixTimestampMillisFromInt(int64(typedValue), outputFormat) + case uint64: + return traceUnixTimestampMillisFromFloat(float64(typedValue), outputFormat) + case uint32: + return traceUnixTimestampMillisFromFloat(float64(typedValue), outputFormat) + case float64: + return traceUnixTimestampMillisFromFloat(typedValue, outputFormat) + case float32: + return traceUnixTimestampMillisFromFloat(float64(typedValue), outputFormat) + } + + return 0, false +} + +func traceUnixTimestampMillisFromString(value string, outputFormat string) (float64, bool) { + switch outputFormat { + case TimestampSecs, TimestampMillis, TimestampMicros, TimestampNanos: + default: + return 0, false + } + + if parsed, err := strconv.ParseInt(value, 10, 64); err == nil { + return traceUnixTimestampMillisFromInt(parsed, outputFormat) + } + if parsed, err := strconv.ParseFloat(value, 64); err == nil { + return traceUnixTimestampMillisFromFloat(parsed, outputFormat) + } + return 0, false +} + +func traceUnixTimestampMillisFromInt(value int64, outputFormat string) (float64, bool) { + switch outputFormat { + case TimestampNanos: + return float64(value/1_000_000) + float64(value%1_000_000)/1_000_000, true + case TimestampMicros: + return float64(value/1_000) + float64(value%1_000)/1_000, true + case TimestampMillis: + return float64(value), true + case TimestampSecs: + return float64(value) * 1000, true + default: + return 0, false + } +} + +func traceUnixTimestampMillisFromFloat(value float64, outputFormat string) (float64, bool) { + switch outputFormat { + case TimestampNanos: + return value / 1_000_000, true + case TimestampMicros: + return value / 1_000, true + case TimestampMillis: + return value, true + case TimestampSecs: + return value * 1000, true + default: + return 0, false + } +} + +func traceStartTimeMillis(source map[string]interface{}, configuredFields es.ConfiguredFields) float64 { + if startTime, ok := traceTimestampMillis(source["span_start_timestamp_nanos"], TimestampNanos); ok { + return startTime + } + if configuredFields.TimeField != "" { + if startTime, ok := traceTimestampMillis(source[configuredFields.TimeField], configuredFields.TimeOutputFormat); ok { + return startTime + } + } + return 0 +} + +func traceDurationMillis(source map[string]interface{}) float64 { + // Quickwit's otel-traces-v0_9 mapping types span_duration_millis as u64, + // which truncates sub-millisecond spans to 0. Prefer the nanos diff so we + // keep microsecond precision; fall back to the millis field only if the + // timestamps are missing. + start, startOK := traceTimestampMillis(source["span_start_timestamp_nanos"], TimestampNanos) + end, endOK := traceTimestampMillis(source["span_end_timestamp_nanos"], TimestampNanos) + if startOK && endOK && end >= start { + return end - start + } + if duration, ok := traceNumber(source["span_duration_millis"]); ok { + return duration + } + return 0 +} + +func traceEndTimeMillis(source map[string]interface{}, startTimeMillis float64) float64 { + if endTimeMillis, ok := traceTimestampMillis(source["span_end_timestamp_nanos"], TimestampNanos); ok { + return endTimeMillis + } + return startTimeMillis + traceDurationMillis(source) +} + +func traceSpanKind(value interface{}) string { + kindValue, ok := traceNumber(value) + if !ok { + return "" + } + + switch int(kindValue) { + case 1: + return "internal" + case 2: + return "server" + case 3: + return "client" + case 4: + return "producer" + case 5: + return "consumer" + default: + return "" + } +} + +func traceSpanStatus(source map[string]interface{}) (int64, string, string) { + statusMap, _ := source["span_status"].(map[string]interface{}) + statusCodeValue := traceAttributeValue(statusMap["code"]) + statusMessage := traceString(traceAttributeValue(statusMap["message"])) + attributeError := traceStatusAttributesIndicateError(source["span_attributes"]) + + if code, ok := traceNumber(statusCodeValue); ok { + intCode := int64(code) + switch intCode { + case 2: + return intCode, statusMessage, "red" + case 1: + return intCode, statusMessage, "" + default: + if attributeError { + return 2, statusMessage, "red" + } + return 0, statusMessage, "" + } + } + + switch strings.ToLower(traceString(statusCodeValue)) { + case "error": + return 2, statusMessage, "red" + case "ok": + return 1, statusMessage, "" + case "unset": + if attributeError { + return 2, statusMessage, "red" + } + return 0, statusMessage, "" + default: + if attributeError { + return 2, statusMessage, "red" + } + return 0, statusMessage, "" + } +} + +func traceIsErrorSpan(source map[string]interface{}) bool { + statusCode, _, _ := traceSpanStatus(source) + return statusCode == 2 +} + +func traceAttributeBool(attributes interface{}, key string) bool { + attributesMap, ok := attributes.(map[string]interface{}) + if !ok { + return false + } + + value, exists := attributesMap[key] + if !exists { + return false + } + + switch typedValue := traceAttributeValue(value).(type) { + case bool: + return typedValue + case string: + return strings.EqualFold(typedValue, "true") || strings.EqualFold(typedValue, "error") + default: + return false + } +} + +func traceStatusAttributesIndicateError(attributes interface{}) bool { + return traceAttributeBool(attributes, "error") || traceAttributeBool(attributes, "otel.status_code") +} + +func traceWarnings(source map[string]interface{}, statusCode int64, statusMessage string) []string { + warnings := []string{} + if statusCode == 2 { + if statusMessage != "" { + warnings = append(warnings, fmt.Sprintf("Span status: ERROR - %s", statusMessage)) + } else { + warnings = append(warnings, "Span status: ERROR") + } + } + if traceAttributeBool(source["span_attributes"], "error") { + warnings = append(warnings, "Span attribute error=true") + } + + for _, dropped := range []struct { + field string + label string + }{ + {field: "span_dropped_attributes_count", label: "span attributes"}, + {field: "span_dropped_events_count", label: "span events"}, + {field: "span_dropped_links_count", label: "span links"}, + } { + if count, ok := traceNumber(source[dropped.field]); ok && count > 0 { + warnings = append(warnings, fmt.Sprintf("Dropped %s: %s", dropped.label, formatTraceCount(count))) + } + } + return warnings +} + +var traceStackTraceKeys = []string{ + "exception.stacktrace", + "exception.stack_trace", + "exception.stack", + "stacktrace", + "stack_trace", +} + +func traceStackTraces(source map[string]interface{}) []string { + stackTraces := []string{} + seen := map[string]bool{} + appendStackTrace := func(value interface{}) { + stackTrace := traceStackTraceString(traceAttributeValue(value)) + stackTrace = truncateTraceStackTrace(stackTrace) + if stackTrace == "" || seen[stackTrace] { + return + } + seen[stackTrace] = true + stackTraces = append(stackTraces, stackTrace) + } + + if attributes, ok := source["span_attributes"].(map[string]interface{}); ok { + for _, key := range traceStackTraceKeys { + if value, exists := attributes[key]; exists { + appendStackTrace(value) + } + } + } + + events, _ := source["events"].([]interface{}) + for _, event := range events { + eventMap, ok := event.(map[string]interface{}) + if !ok { + continue + } + attributes, ok := eventMap["event_attributes"].(map[string]interface{}) + if !ok { + continue + } + for _, key := range traceStackTraceKeys { + if value, exists := attributes[key]; exists { + appendStackTrace(value) + } + } + } + return stackTraces +} + +func truncateTraceStackTrace(stackTrace string) string { + if len(stackTrace) <= maxTraceStackTraceBytes { + return stackTrace + } + + limit := maxTraceStackTraceBytes - len(traceStackTraceTruncatedText) + if limit <= 0 { + return stackTrace[:maxTraceStackTraceBytes] + } + for limit > 0 && !utf8.RuneStart(stackTrace[limit]) { + limit-- + } + return stackTrace[:limit] + traceStackTraceTruncatedText +} + +func traceStackTraceString(value interface{}) string { + switch typedValue := value.(type) { + case nil: + return "" + case string: + return typedValue + case []interface{}: + lines := make([]string, 0, len(typedValue)) + for _, line := range typedValue { + lineString := traceString(traceAttributeValue(line)) + if lineString != "" { + lines = append(lines, lineString) + } + } + return strings.Join(lines, "\n") + default: + bytes, err := json.Marshal(typedValue) + if err == nil { + return string(bytes) + } + return traceString(typedValue) + } +} + +func formatTraceCount(value float64) string { + if value == float64(int64(value)) { + return strconv.FormatInt(int64(value), 10) + } + return strconv.FormatFloat(value, 'f', -1, 64) +} + +func traceJSONRawMessage(value interface{}) json.RawMessage { + bytes, err := json.Marshal(value) + if err != nil { + return json.RawMessage("[]") + } + return json.RawMessage(bytes) +} + +func traceKeyValuePairs(value interface{}) []traceKeyValuePair { + switch typedValue := value.(type) { + case map[string]interface{}: + pairs := []traceKeyValuePair{} + traceAppendMapKeyValuePairs(&pairs, "", typedValue) + return pairs + case []interface{}: + pairs := make([]traceKeyValuePair, 0, len(typedValue)) + for _, item := range typedValue { + itemMap, ok := item.(map[string]interface{}) + if !ok { + continue + } + key := traceString(itemMap["key"]) + if key == "" { + continue + } + value := traceAttributeValue(itemMap["value"]) + pairs = append(pairs, traceKeyValuePair{Key: key, Value: value, Type: traceValueType(value)}) + } + return pairs + default: + return []traceKeyValuePair{} + } +} + +func traceServiceTags(value interface{}, serviceName string) []traceKeyValuePair { + pairs := traceKeyValuePairs(value) + if serviceName == "" { + return pairs + } + + if _, exists := traceFindKeyValuePair(pairs, "service.name"); exists { + return pairs + } + + return append(pairs, traceKeyValuePair{ + Key: "service.name", + Value: serviceName, + Type: traceValueType(serviceName), + }) +} + +func traceSpanTags(value interface{}) []traceKeyValuePair { + pairs := traceKeyValuePairs(value) + peerService, hasPeerService := traceFindKeyValuePair(pairs, "peer.service") + servicePeerName, hasServicePeerName := traceFindKeyValuePair(pairs, "service.peer.name") + + // Grafana's trace view still keys the uninstrumented peer-service hint on + // peer.service. Preserve the current OTel service.peer.name attribute and + // add the legacy alias only when needed. + if hasServicePeerName && !hasPeerService { + pairs = append(pairs, traceKeyValuePair{ + Key: "peer.service", + Value: servicePeerName.Value, + Type: servicePeerName.Type, + }) + } + if hasPeerService && !hasServicePeerName { + pairs = append(pairs, traceKeyValuePair{ + Key: "service.peer.name", + Value: peerService.Value, + Type: peerService.Type, + }) + } + return pairs +} + +func traceFindKeyValuePair(pairs []traceKeyValuePair, key string) (traceKeyValuePair, bool) { + for _, pair := range pairs { + if pair.Key == key { + return pair, true + } + } + return traceKeyValuePair{}, false +} + +func traceAppendMapKeyValuePairs(pairs *[]traceKeyValuePair, prefix string, value map[string]interface{}) { + keys := make([]string, 0, len(value)) + for key := range value { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + fullKey := key + if prefix != "" { + fullKey = prefix + "." + key + } + + rawValue := traceAttributeValue(value[key]) + if nestedValue, ok := rawValue.(map[string]interface{}); ok { + traceAppendMapKeyValuePairs(pairs, fullKey, nestedValue) + continue + } + + *pairs = append(*pairs, traceKeyValuePair{Key: fullKey, Value: rawValue, Type: traceValueType(rawValue)}) + } +} + +func traceAttributeValue(value interface{}) interface{} { + valueMap, ok := value.(map[string]interface{}) + if !ok { + return value + } + + for _, key := range []string{"string_value", "int_value", "double_value", "bool_value", "bytes_value", "array_value", "kvlist_value"} { + if wrappedValue, exists := valueMap[key]; exists { + return wrappedValue + } + } + return value +} + +func traceValueType(value interface{}) string { + switch value.(type) { + case string: + return "string" + case json.Number, float64, float32, int, int64, int32, uint64, uint32: + return "number" + case bool: + return "boolean" + default: + return "" + } +} + +func traceLogs(value interface{}) []traceLog { + events, ok := value.([]interface{}) + if !ok { + return []traceLog{} + } + + logs := make([]traceLog, 0, len(events)) + for _, event := range events { + eventMap, ok := event.(map[string]interface{}) + if !ok { + continue + } + + log := traceLog{ + Name: firstTraceString(eventMap, "event_name", "name"), + Fields: traceKeyValuePairs(eventMap["event_attributes"]), + } + if timestamp, ok := firstTraceTimestampMillis(eventMap, "event_timestamp_nanos", "timestamp_nanos", "time_unix_nano", "timestamp", "time"); ok { + log.Timestamp = timestamp + } + + traceAppendEventFields(&log.Fields, eventMap) + logs = append(logs, log) + } + return logs +} + +func traceAppendEventFields(fields *[]traceKeyValuePair, eventMap map[string]interface{}) { + keys := make([]string, 0, len(eventMap)) + for key := range eventMap { + switch key { + case "event_attributes", "event_name", "name", "event_timestamp_nanos", "timestamp_nanos", "time_unix_nano", "timestamp", "time": + continue + default: + keys = append(keys, key) + } + } + sort.Strings(keys) + + for _, key := range keys { + value := traceAttributeValue(eventMap[key]) + *fields = append(*fields, traceKeyValuePair{Key: key, Value: value, Type: traceValueType(value)}) + } +} + +func traceReferences(value interface{}) []traceSpanReference { + links, ok := value.([]interface{}) + if !ok { + return []traceSpanReference{} + } + + references := make([]traceSpanReference, 0, len(links)) + for _, link := range links { + linkMap, ok := link.(map[string]interface{}) + if !ok { + continue + } + + traceID := firstTraceString(linkMap, "trace_id", "traceID") + spanID := firstTraceString(linkMap, "span_id", "spanID") + if traceID == "" || spanID == "" { + continue + } + + tags := traceKeyValuePairs(linkMap["attributes"]) + if len(tags) == 0 { + tags = traceKeyValuePairs(linkMap["link_attributes"]) + } + references = append(references, traceSpanReference{ + TraceID: traceID, + SpanID: spanID, + Tags: tags, + }) + } + return references +} + +func firstSortedMapKey(values map[string]bool) string { + keys := sortedMapKeys(values) + if len(keys) == 0 { + return "" + } + return keys[0] +} + +func joinSortedMapKeys(values map[string]bool, separator string) string { + return strings.Join(sortedMapKeys(values), separator) +} + +func joinSortedMapKeysWithLimit(values map[string]bool, separator string, limit int) string { + keys := sortedMapKeys(values) + if limit > 0 && len(keys) > limit { + keys = keys[:limit] + } + return strings.Join(keys, separator) +} + +func sortedMapKeys(values map[string]bool) []string { + keys := make([]string, 0, len(values)) + for key := range values { + keys = append(keys, key) + } + sort.Strings(keys) + return keys +} + +func traceSearchDataLinks(dsInfo *es.DatasourceInfo) []data.DataLink { + if dsInfo == nil { + return nil + } + + datasourceUID := dsInfo.UID + datasourceName := dsInfo.Name + if dsInfo.TracesDatasourceUID != "" { + datasourceUID = dsInfo.TracesDatasourceUID + datasourceName = dsInfo.TracesDatasourceName + } + if datasourceUID == "" { + return nil + } + + return traceInternalDataLinks("Open trace", datasourceUID, datasourceName, "trace_id:${__value.raw}", tracesType, "1000") +} + +func traceToLogsDataLinks(dsInfo *es.DatasourceInfo) []data.DataLink { + if dsInfo == nil { + return nil + } + + datasourceUID := dsInfo.UID + datasourceName := dsInfo.Name + if dsInfo.LogsDatasourceUID != "" { + datasourceUID = dsInfo.LogsDatasourceUID + datasourceName = dsInfo.LogsDatasourceName + if datasourceName == "" { + datasourceName = "Quickwit logs" + } + } + if datasourceUID == "" { + return nil + } + + return traceInternalDataLinks("Logs for span", datasourceUID, datasourceName, "trace_id:${__span.traceId} AND span_id:${__span.spanId}", logsType, "100") +} + +func traceInternalDataLinks(title string, datasourceUID string, datasourceName string, query string, metricType string, limit string) []data.DataLink { + return []data.DataLink{ + { + Title: title, + Internal: &data.InternalDataLink{ + DatasourceUID: datasourceUID, + DatasourceName: datasourceName, + Query: map[string]interface{}{ + "refId": "A", + "query": query, + "queryType": metricType, + "datasource": map[string]string{"type": quickwitPluginID, "uid": datasourceUID}, + "filters": []interface{}{}, + "bucketAggs": []interface{}{}, + "metrics": []map[string]interface{}{ + { + "id": "1", + "type": metricType, + "settings": map[string]string{"limit": limit}, + }, + }, + }, + }, + }, + } +} + +func traceNodeGraphFrames(spans []traceGraphSpan) data.Frames { + if len(spans) == 0 { + return data.Frames{} + } + + nodesByID := map[string]*traceGraphNode{} + spanServiceByID := map[string]string{} + for _, span := range spans { + if span.serviceName == "" { + continue + } + spanServiceByID[span.spanID] = span.serviceName + node, exists := nodesByID[span.serviceName] + if !exists { + node = &traceGraphNode{id: span.serviceName} + nodesByID[span.serviceName] = node + } + node.spanCount++ + node.durationMillis += span.durationMillis + if span.statusCode == 2 { + node.errorCount++ + } + } + + edgesByID := map[string]*traceGraphEdge{} + for _, span := range spans { + sourceService := spanServiceByID[span.parentSpanID] + targetService := span.serviceName + if sourceService == "" || targetService == "" || sourceService == targetService { + continue + } + edgeID := sourceService + "->" + targetService + edge, exists := edgesByID[edgeID] + if !exists { + edge = &traceGraphEdge{id: edgeID, source: sourceService, target: targetService} + edgesByID[edgeID] = edge + } + edge.callCount++ + edge.durationMillis += span.durationMillis + if span.statusCode == 2 { + edge.errorCount++ + } + } + if len(nodesByID) == 0 { + return data.Frames{} + } + + return data.Frames{traceNodesFrame(nodesByID), traceEdgesFrame(edgesByID)} +} + +func traceNodesFrame(nodesByID map[string]*traceGraphNode) *data.Frame { + nodeIDs := sortedTraceGraphNodeIDs(nodesByID) + ids := make([]string, 0, len(nodeIDs)) + titles := make([]string, 0, len(nodeIDs)) + subtitles := make([]string, 0, len(nodeIDs)) + mainStats := make([]string, 0, len(nodeIDs)) + secondaryStats := make([]string, 0, len(nodeIDs)) + colors := make([]string, 0, len(nodeIDs)) + okArcs := make([]float64, 0, len(nodeIDs)) + errorArcs := make([]float64, 0, len(nodeIDs)) + errorDetails := make([]int64, 0, len(nodeIDs)) + durationDetails := make([]float64, 0, len(nodeIDs)) + + for _, id := range nodeIDs { + node := nodesByID[id] + ids = append(ids, id) + titles = append(titles, id) + subtitles = append(subtitles, "service") + mainStats = append(mainStats, fmt.Sprintf("%d spans", node.spanCount)) + secondaryStats = append(secondaryStats, fmt.Sprintf("%.1f ms", node.durationMillis)) + colors = append(colors, traceServiceColor(id)) + if node.spanCount > 0 { + errorRatio := float64(node.errorCount) / float64(node.spanCount) + errorArcs = append(errorArcs, errorRatio) + okArcs = append(okArcs, 1-errorRatio) + } else { + errorArcs = append(errorArcs, 0) + okArcs = append(okArcs, 1) + } + errorDetails = append(errorDetails, int64(node.errorCount)) + durationDetails = append(durationDetails, node.durationMillis) + } + + frame := data.NewFrame("nodes", + data.NewField("id", nil, ids), + data.NewField("title", nil, titles), + data.NewField("subtitle", nil, subtitles), + data.NewField("mainstat", nil, mainStats), + data.NewField("secondarystat", nil, secondaryStats), + data.NewField("color", nil, colors), + data.NewField("arc__ok", nil, okArcs).SetConfig(traceFixedColorFieldConfig("green")), + data.NewField("arc__errors", nil, errorArcs).SetConfig(traceFixedColorFieldConfig("red")), + data.NewField("detail__errors", nil, errorDetails).SetConfig(&data.FieldConfig{DisplayName: "Errors"}), + data.NewField("detail__duration_ms", nil, durationDetails).SetConfig(&data.FieldConfig{DisplayName: "Total duration", Unit: "ms"}), + ) + setPreferredVisType(frame, data.VisTypeNodeGraph) + return frame +} + +func traceEdgesFrame(edgesByID map[string]*traceGraphEdge) *data.Frame { + edgeIDs := sortedTraceGraphEdgeIDs(edgesByID) + ids := make([]string, 0, len(edgeIDs)) + sources := make([]string, 0, len(edgeIDs)) + targets := make([]string, 0, len(edgeIDs)) + mainStats := make([]string, 0, len(edgeIDs)) + secondaryStats := make([]string, 0, len(edgeIDs)) + thicknesses := make([]float64, 0, len(edgeIDs)) + colors := make([]string, 0, len(edgeIDs)) + errorDetails := make([]int64, 0, len(edgeIDs)) + + for _, id := range edgeIDs { + edge := edgesByID[id] + ids = append(ids, id) + sources = append(sources, edge.source) + targets = append(targets, edge.target) + mainStats = append(mainStats, fmt.Sprintf("%d calls", edge.callCount)) + secondaryStats = append(secondaryStats, fmt.Sprintf("%.1f ms", edge.durationMillis)) + thicknesses = append(thicknesses, 1+float64(edge.callCount-1)*0.5) + if edge.errorCount > 0 { + colors = append(colors, "#d44a3a") + } else { + colors = append(colors, "#7eb26d") + } + errorDetails = append(errorDetails, int64(edge.errorCount)) + } + + frame := data.NewFrame("edges", + data.NewField("id", nil, ids), + data.NewField("source", nil, sources), + data.NewField("target", nil, targets), + data.NewField("mainstat", nil, mainStats), + data.NewField("secondarystat", nil, secondaryStats), + data.NewField("thickness", nil, thicknesses), + data.NewField("color", nil, colors), + data.NewField("detail__errors", nil, errorDetails).SetConfig(&data.FieldConfig{DisplayName: "Errors"}), + ) + setPreferredVisType(frame, data.VisTypeNodeGraph) + return frame +} + +func traceFixedColorFieldConfig(color string) *data.FieldConfig { + return &data.FieldConfig{ + Color: map[string]interface{}{ + "mode": "fixed", + "fixedColor": color, + }, + } +} + +func sortedTraceGraphNodeIDs(nodesByID map[string]*traceGraphNode) []string { + ids := make([]string, 0, len(nodesByID)) + for id := range nodesByID { + ids = append(ids, id) + } + sort.Strings(ids) + return ids +} + +func sortedTraceGraphEdgeIDs(edgesByID map[string]*traceGraphEdge) []string { + ids := make([]string, 0, len(edgesByID)) + for id := range edgesByID { + ids = append(ids, id) + } + sort.Strings(ids) + return ids +} + +func traceServiceColor(serviceName string) string { + palette := []string{ + "#7eb26d", + "#eab839", + "#6ed0e0", + "#ef843c", + "#e24d42", + "#1f78c1", + "#ba43a9", + "#705da0", + "#508642", + "#cca300", + } + hash := 0 + for _, char := range serviceName { + hash = (hash*31 + int(char)) & 0x7fffffff + } + return palette[hash%len(palette)] +} + +func traceSearchErrorsFieldConfig() *data.FieldConfig { + return &data.FieldConfig{ + DisplayName: "Errors", + Color: map[string]interface{}{ + "mode": "thresholds", + }, + Thresholds: &data.ThresholdsConfig{ + Mode: data.ThresholdsModeAbsolute, + Steps: []data.Threshold{ + {Color: "green"}, + data.NewThreshold(1, "red", ""), + }, + }, + } +} + +func firstTraceString(values map[string]interface{}, keys ...string) string { + for _, key := range keys { + value := traceString(values[key]) + if value != "" { + return value + } + } + return "" +} + +func firstTraceTimestampMillis(values map[string]interface{}, keys ...string) (float64, bool) { + for _, key := range keys { + value, ok := values[key] + if !ok { + continue + } + if timestamp, ok := traceTimestampMillis(value, TimestampNanos); ok { + return timestamp, true + } + } + return 0, false +} diff --git a/pkg/quickwit/response_parser_traces_test.go b/pkg/quickwit/response_parser_traces_test.go new file mode 100644 index 0000000..e256e65 --- /dev/null +++ b/pkg/quickwit/response_parser_traces_test.go @@ -0,0 +1,544 @@ +package quickwit + +import ( + "encoding/json" + "strings" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/stretchr/testify/require" + + es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client" +) + +func TestProcessTracesResponse(t *testing.T) { + query := []byte(` + [ + { + "refId": "A", + "metrics": [{ "type": "traces", "id": "1", "settings": { "limit": "1000" } }], + "query": "trace_id:3c191d03fa8be0653c191d03fa8be065" + } + ] + `) + + response := []byte(` + { + "responses": [ + { + "hits": { + "hits": [ + { + "_source": { + "trace_id": "3c191d03fa8be0653c191d03fa8be065", + "span_id": "1111111111111111", + "parent_span_id": "", + "service_name": "checkout", + "resource_attributes": { + "host.name": "node-1", + "service.namespace": "prod" + }, + "span_name": "GET /checkout", + "span_start_timestamp_nanos": 1678974011000000000, + "span_end_timestamp_nanos": 1678974011100000000, + "span_duration_millis": 100, + "span_attributes": { + "http.method": "GET", + "http.status_code": 200 + }, + "events": [ + { + "event_name": "exception", + "event_timestamp_nanos": 1678974011050000000, + "event_attributes": { + "exception.type": "panic" + } + } + ], + "links": [ + { + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "span_id": "2222222222222222", + "attributes": { + "link.type": "follows_from" + } + } + ] + } + }, + { + "_source": { + "trace_id": "3c191d03fa8be0653c191d03fa8be065", + "span_id": "3333333333333333", + "parent_span_id": "1111111111111111", + "service_name": "payments", + "resource_attributes": { + "host.name": "node-2" + }, + "span_name": "POST /charge", + "span_start_timestamp_nanos": 1678974011020000000, + "span_duration_millis": 25, + "span_status": { "code": "ERROR", "message": "declined" }, + "span_attributes": { + "db.system": "postgresql", + "service.peer.name": "stripe-api", + "exception.stacktrace": "SyntheticFailure: declined\n at authorizePayment" + } + } + } + ] + } + } + ] + } + `) + + result, err := queryDataTest(query, response) + require.NoError(t, err) + + require.Len(t, result.response.Responses, 1) + frames := result.response.Responses["A"].Frames + require.Len(t, frames, 3) + + traceFrame := frames[0] + require.Equal(t, data.VisTypeTrace, string(traceFrame.Meta.PreferredVisualization)) + + fields := make(map[string]*data.Field) + for _, field := range traceFrame.Fields { + fields[field.Name] = field + } + + require.Equal(t, "3c191d03fa8be0653c191d03fa8be065", fields["traceID"].At(0)) + require.Equal(t, "1111111111111111", fields["spanID"].At(0)) + require.Nil(t, fields["parentSpanID"].At(0)) + require.Equal(t, "1111111111111111", *fields["parentSpanID"].At(1).(*string)) + require.Equal(t, "GET /checkout", fields["operationName"].At(0)) + require.Equal(t, "checkout", fields["serviceName"].At(0)) + require.InDelta(t, 1678974011000.0, fields["startTime"].At(0).(float64), 0.01) + require.Equal(t, 100.0, fields["duration"].At(0)) + require.Equal(t, int64(0), fields["statusCode"].At(0)) + require.Equal(t, int64(2), fields["statusCode"].At(1)) + require.Equal(t, "declined", fields["statusMessage"].At(1)) + require.Equal(t, "red", fields["errorIconColor"].At(1)) + + serviceTags := string(fields["serviceTags"].At(0).(json.RawMessage)) + require.Contains(t, serviceTags, `"key":"host.name"`) + require.Contains(t, serviceTags, `"value":"node-1"`) + require.Contains(t, serviceTags, `"key":"service.name"`) + require.Contains(t, serviceTags, `"value":"checkout"`) + + spanTags := string(fields["tags"].At(0).(json.RawMessage)) + require.Contains(t, spanTags, `"key":"http.method"`) + require.Contains(t, spanTags, `"value":"GET"`) + + paymentSpanTags := string(fields["tags"].At(1).(json.RawMessage)) + require.Contains(t, paymentSpanTags, `"key":"service.peer.name"`) + require.Contains(t, paymentSpanTags, `"value":"stripe-api"`) + require.Contains(t, paymentSpanTags, `"key":"peer.service"`) + + logs := string(fields["logs"].At(0).(json.RawMessage)) + require.Contains(t, logs, `"name":"exception"`) + require.Contains(t, logs, `"key":"exception.type"`) + require.Contains(t, logs, `"timestamp":1678974011050`) + + references := string(fields["references"].At(0).(json.RawMessage)) + require.Contains(t, references, `"traceID":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"`) + require.Contains(t, references, `"spanID":"2222222222222222"`) + + warnings := string(fields["warnings"].At(1).(json.RawMessage)) + require.Contains(t, warnings, `Span status: ERROR - declined`) + + stackTraces := string(fields["stackTraces"].At(1).(json.RawMessage)) + require.Contains(t, stackTraces, `SyntheticFailure: declined`) + + nodesFrame := frames[1] + require.Equal(t, "nodes", nodesFrame.Name) + require.Equal(t, data.VisTypeNodeGraph, string(nodesFrame.Meta.PreferredVisualization)) + require.Equal(t, 2, nodesFrame.Rows()) + + edgesFrame := frames[2] + require.Equal(t, "edges", edgesFrame.Name) + require.Equal(t, data.VisTypeNodeGraph, string(edgesFrame.Meta.PreferredVisualization)) + require.Equal(t, 1, edgesFrame.Rows()) + edgeFields := make(map[string]*data.Field) + for _, field := range edgesFrame.Fields { + edgeFields[field.Name] = field + } + require.Equal(t, "checkout", edgeFields["source"].At(0)) + require.Equal(t, "payments", edgeFields["target"].At(0)) +} + +func TestProcessTraceSearchResponse(t *testing.T) { + query := []byte(` + [ + { + "refId": "A", + "metrics": [{ "type": "trace_search", "id": "1", "settings": { "limit": "1", "spanLimit": "1000" } }], + "query": "service_name:checkout" + } + ] + `) + + response := []byte(` + { + "responses": [ + { + "hits": { + "hits": [ + { + "_source": { + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "span_id": "1111111111111111", + "service_name": "checkout", + "span_name": "GET /checkout", + "span_start_timestamp_nanos": 1678974011000000000, + "span_duration_millis": 100 + } + }, + { + "_source": { + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "span_id": "2222222222222222", + "parent_span_id": "1111111111111111", + "service_name": "payments", + "span_name": "POST /charge", + "span_start_timestamp_nanos": 1678974011020000000, + "span_duration_millis": 25, + "span_status": { "code": "ERROR", "message": "declined" } + } + }, + { + "_source": { + "trace_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + "span_id": "3333333333333333", + "service_name": "checkout", + "span_name": "GET /cart", + "span_start_timestamp_nanos": 1678974010000000000, + "span_duration_millis": 10 + } + }, + { + "_source": { + "trace_id": "cccccccccccccccccccccccccccccccc", + "span_id": "4444444444444444", + "service_name": "checkout", + "span_name": "GET /old-root", + "span_start_timestamp_nanos": 1678974010500000000, + "span_duration_millis": 10 + } + }, + { + "_source": { + "trace_id": "cccccccccccccccccccccccccccccccc", + "span_id": "5555555555555555", + "parent_span_id": "4444444444444444", + "service_name": "payments", + "span_name": "POST /late-child", + "span_start_timestamp_nanos": 1678974025000000000, + "span_duration_millis": 10 + } + } + ] + } + } + ] + } + `) + + result, err := queryDataTest(query, response) + require.NoError(t, err) + + frames := result.response.Responses["A"].Frames + require.Len(t, frames, 1) + traceSearchFrame := frames[0] + require.Equal(t, data.VisTypeTable, string(traceSearchFrame.Meta.PreferredVisualization)) + require.Equal(t, 1, traceSearchFrame.Rows()) + + fields := make(map[string]*data.Field) + for _, field := range traceSearchFrame.Fields { + fields[field.Name] = field + } + + require.Equal(t, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", fields["traceID"].At(0)) + require.Equal(t, time.UnixMilli(1678974011000).UTC(), fields["startTime"].At(0)) + require.Equal(t, 100.0, fields["duration"].At(0)) + require.Equal(t, int64(2), fields["spans"].At(0)) + require.Equal(t, "checkout, payments", fields["services"].At(0)) + require.Equal(t, "checkout", fields["rootService"].At(0)) + require.Equal(t, "GET /checkout", fields["rootSpan"].At(0)) + require.Equal(t, int64(1), fields["errors"].At(0)) +} + +func TestTraceTimestampMillisUsesDeclaredUnit(t *testing.T) { + nanos, ok := traceTimestampMillis(json.Number("1678974011123456789"), TimestampNanos) + require.True(t, ok) + require.InDelta(t, 1678974011123.4568, nanos, 0.0001) + + micros, ok := traceTimestampMillis(json.Number("1678974011123456"), TimestampMicros) + require.True(t, ok) + require.InDelta(t, 1678974011123.456, micros, 0.0001) + + millis, ok := traceTimestampMillis(json.Number("1678974011123"), TimestampMillis) + require.True(t, ok) + require.Equal(t, 1678974011123.0, millis) + + seconds, ok := traceTimestampMillis(json.Number("1678974011"), TimestampSecs) + require.True(t, ok) + require.Equal(t, 1678974011000.0, seconds) + + _, ok = traceTimestampMillis(json.Number("1678974011123456789"), "") + require.False(t, ok) +} + +func TestTraceSpanStatusUsesErrorAttributeFallbackForUnsetStatus(t *testing.T) { + statusCode, _, errorColor := traceSpanStatus(map[string]interface{}{ + "span_status": map[string]interface{}{ + "code": json.Number("0"), + }, + "span_attributes": map[string]interface{}{ + "error": true, + }, + }) + require.Equal(t, int64(2), statusCode) + require.Equal(t, "red", errorColor) + + statusCode, _, errorColor = traceSpanStatus(map[string]interface{}{ + "span_status": map[string]interface{}{ + "code": "UNSET", + }, + "span_attributes": map[string]interface{}{ + "otel.status_code": "ERROR", + }, + }) + require.Equal(t, int64(2), statusCode) + require.Equal(t, "red", errorColor) + + statusCode, _, errorColor = traceSpanStatus(map[string]interface{}{ + "span_status": map[string]interface{}{ + "code": json.Number("1"), + }, + "span_attributes": map[string]interface{}{ + "error": true, + }, + }) + require.Equal(t, int64(1), statusCode) + require.Equal(t, "", errorColor) +} + +func TestTraceStackTracesDedupAndTruncate(t *testing.T) { + longStackTrace := strings.Repeat("x", maxTraceStackTraceBytes+1024) + stackTraces := traceStackTraces(map[string]interface{}{ + "span_attributes": map[string]interface{}{ + "exception.stacktrace": longStackTrace, + }, + "events": []interface{}{ + map[string]interface{}{ + "event_attributes": map[string]interface{}{ + "exception.stacktrace": longStackTrace, + }, + }, + }, + }) + + require.Len(t, stackTraces, 1) + require.LessOrEqual(t, len(stackTraces[0]), maxTraceStackTraceBytes) + require.True(t, strings.HasSuffix(stackTraces[0], traceStackTraceTruncatedText)) +} + +func TestTraceKeyValuePairsUnwrapsOtelValuesAndFlattensNestedMaps(t *testing.T) { + pairs := traceKeyValuePairs(map[string]interface{}{ + "http": map[string]interface{}{ + "method": map[string]interface{}{ + "string_value": "GET", + }, + "status_code": map[string]interface{}{ + "int_value": json.Number("200"), + }, + }, + }) + + method, ok := traceFindKeyValuePair(pairs, "http.method") + require.True(t, ok) + require.Equal(t, "GET", method.Value) + require.Equal(t, "string", method.Type) + + statusCode, ok := traceFindKeyValuePair(pairs, "http.status_code") + require.True(t, ok) + require.Equal(t, json.Number("200"), statusCode.Value) + require.Equal(t, "number", statusCode.Type) +} + +func TestTraceNodeGraphFramesCollapseSelfEdges(t *testing.T) { + frames := traceNodeGraphFrames([]traceGraphSpan{ + { + spanID: "root", + serviceName: "checkout", + durationMillis: 10, + }, + { + spanID: "child", + parentSpanID: "root", + serviceName: "checkout", + durationMillis: 5, + }, + }) + + require.Len(t, frames, 2) + require.Equal(t, 1, frames[0].Rows()) + require.Equal(t, 0, frames[1].Rows()) +} + +func TestTraceDataLinks(t *testing.T) { + t.Run("trace spans link to configured logs datasource", func(t *testing.T) { + targets := map[string]string{ + "A": `{ + "refId": "A", + "metrics": [{ "type": "traces", "id": "1", "settings": { "limit": "1000" } }], + "query": "trace_id:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + }`, + } + response := `{ + "responses": [ + { + "hits": { + "hits": [ + { + "_source": { + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "span_id": "1111111111111111", + "service_name": "checkout", + "span_name": "GET /checkout", + "span_start_timestamp_nanos": 1678974011000000000, + "span_duration_millis": 100 + } + } + ] + } + } + ] + }` + dsInfo := &es.DatasourceInfo{ + UID: "traces-uid", + Name: "Quickwit Traces", + LogsDatasourceUID: "logs-uid", + LogsDatasourceName: "Quickwit Logs", + } + + result, err := parseTestResponseWithDatasourceInfo(targets, response, dsInfo) + require.NoError(t, err) + + spanIDField, _ := result.Responses["A"].Frames[0].FieldByName("spanID") + require.NotNil(t, spanIDField) + require.NotNil(t, spanIDField.Config) + require.Len(t, spanIDField.Config.Links, 1) + + link := spanIDField.Config.Links[0] + require.Equal(t, "Logs for span", link.Title) + require.NotNil(t, link.Internal) + require.Equal(t, "logs-uid", link.Internal.DatasourceUID) + require.Equal(t, "Quickwit Logs", link.Internal.DatasourceName) + + query := link.Internal.Query.(map[string]interface{}) + require.Equal(t, "trace_id:${__span.traceId} AND span_id:${__span.spanId}", query["query"]) + require.Equal(t, logsType, query["queryType"]) + require.Equal(t, map[string]string{"type": quickwitPluginID, "uid": "logs-uid"}, query["datasource"]) + require.Equal(t, logsType, query["metrics"].([]map[string]interface{})[0]["type"]) + }) + + t.Run("trace search rows link to configured traces datasource", func(t *testing.T) { + targets := map[string]string{ + "A": `{ + "refId": "A", + "metrics": [{ "type": "trace_search", "id": "1", "settings": { "limit": "20", "spanLimit": "1000" } }], + "query": "service_name:checkout" + }`, + } + response := `{ + "responses": [ + { + "hits": { + "hits": [ + { + "_source": { + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "span_id": "1111111111111111", + "service_name": "checkout", + "span_name": "GET /checkout", + "span_start_timestamp_nanos": 1678974011000000000, + "span_duration_millis": 100 + } + } + ] + } + } + ] + }` + dsInfo := &es.DatasourceInfo{ + UID: "logs-uid", + Name: "Quickwit Logs", + TracesDatasourceUID: "traces-uid", + TracesDatasourceName: "Quickwit Traces", + } + + result, err := parseTestResponseWithDatasourceInfo(targets, response, dsInfo) + require.NoError(t, err) + + traceIDField, _ := result.Responses["A"].Frames[0].FieldByName("traceID") + require.NotNil(t, traceIDField) + require.NotNil(t, traceIDField.Config) + require.Len(t, traceIDField.Config.Links, 1) + + link := traceIDField.Config.Links[0] + require.Equal(t, "Open trace", link.Title) + require.NotNil(t, link.Internal) + require.Equal(t, "traces-uid", link.Internal.DatasourceUID) + require.Equal(t, "Quickwit Traces", link.Internal.DatasourceName) + + query := link.Internal.Query.(map[string]interface{}) + require.Equal(t, "trace_id:${__value.raw}", query["query"]) + require.Equal(t, tracesType, query["queryType"]) + require.Equal(t, map[string]string{"type": quickwitPluginID, "uid": "traces-uid"}, query["datasource"]) + require.Equal(t, tracesType, query["metrics"].([]map[string]interface{})[0]["type"]) + }) +} + +func TestTraceParserHelpers(t *testing.T) { + t.Run("service tags add service.name when Quickwit has only top-level service_name", func(t *testing.T) { + tags := traceServiceTags(map[string]interface{}{ + "service.namespace": "demo", + }, "checkout") + + serviceNameCount := 0 + for _, tag := range tags { + if tag.Key == "service.name" { + serviceNameCount++ + require.Equal(t, "checkout", tag.Value) + } + } + require.Equal(t, 1, serviceNameCount) + }) + + t.Run("service tags preserve existing service.name", func(t *testing.T) { + tags := traceServiceTags(map[string]interface{}{ + "service.name": "checkout-api", + }, "checkout") + + serviceNameCount := 0 + for _, tag := range tags { + if tag.Key == "service.name" { + serviceNameCount++ + require.Equal(t, "checkout-api", tag.Value) + } + } + require.Equal(t, 1, serviceNameCount) + }) + + t.Run("node graph skips spans without service names", func(t *testing.T) { + frames := traceNodeGraphFrames([]traceGraphSpan{ + {spanID: "1111111111111111", durationMillis: 10}, + }) + + require.Len(t, frames, 0) + }) +} diff --git a/scripts/ingest-multi-service-traces.sh b/scripts/ingest-multi-service-traces.sh new file mode 100755 index 0000000..7cfe73c --- /dev/null +++ b/scripts/ingest-multi-service-traces.sh @@ -0,0 +1,217 @@ +#!/usr/bin/env bash +set -euo pipefail + +QUICKWIT_URL="${QUICKWIT_URL:-http://127.0.0.1:7280/api/v1}" +INDEX="${INDEX:-otel-traces-v0_9}" +LOG_INDEX="${LOG_INDEX:-otel-logs-v0_9}" +INGEST_LOGS="${INGEST_LOGS:-1}" + +now_ns() { + date +%s%N +} + +new_trace_id() { + printf '%s-%s' "$1" "$(now_ns)" | sha256sum | awk '{print substr($1, 1, 32)}' +} + +TRACE_ID="${TRACE_ID:-$(new_trace_id healthy)}" +ERROR_TRACE_ID="${ERROR_TRACE_ID:-$(new_trace_id error)}" + +json_escape() { + printf '%s' "$1" | sed 's/\\/\\\\/g; s/"/\\"/g' +} + +span_doc() { + local trace_id="$1" + local span_id="$2" + local parent_span_id="$3" + local service_name="$4" + local span_name="$5" + local span_kind="$6" + local start_ns="$7" + local duration_ms="$8" + local status_code="$9" + local status_message="${10}" + local route="${11}" + local method="${12}" + local component="${13}" + local peer_service_name="${14:-}" + + local end_ns=$((start_ns + duration_ms * 1000000)) + local parent_field="" + local peer_service_attr="" + local is_root="false" + local status='{}' + local event_one_name="span.work.started" + local event_two_name="span.work.completed" + local event_status="ok" + local escaped_component + local escaped_method + local escaped_route + escaped_component="$(json_escape "${component}")" + escaped_method="$(json_escape "${method}")" + escaped_route="$(json_escape "${route}")" + + if [[ -n "${parent_span_id}" ]]; then + parent_field=",\"parent_span_id\":\"${parent_span_id}\"" + else + is_root="true" + fi + + if [[ -n "${peer_service_name}" ]]; then + peer_service_attr=",\"service.peer.name\":\"$(json_escape "${peer_service_name}")\"" + fi + + if [[ -n "${status_code}" ]]; then + status="{\"code\":\"${status_code}\",\"message\":\"$(json_escape "${status_message}")\"}" + fi + + case "${component}" in + http) + event_one_name="http.request.received" + event_two_name="http.response.sent" + ;; + grpc) + event_one_name="rpc.client.request" + event_two_name="rpc.client.response" + ;; + db) + event_one_name="db.statement.prepared" + event_two_name="db.statement.completed" + ;; + queue) + event_one_name="messaging.message.created" + event_two_name="messaging.message.sent" + ;; + esac + + if [[ "${status_code}" == "ERROR" ]]; then + event_status="error" + fi + + local event_one_ts=$((start_ns + duration_ms * 150000)) + local event_two_ts=$((start_ns + duration_ms * 850000)) + local event_names="[\"${event_one_name}\",\"${event_two_name}\"]" + local events + printf -v events '[{"event_name":"%s","event_timestamp_nanos":%s,"event_attributes":{"component":"%s","route":"%s","method":"%s","status":"started"}},{"event_name":"%s","event_timestamp_nanos":%s,"event_attributes":{"component":"%s","route":"%s","method":"%s","status":"%s","duration_ms":%s}}]' \ + "${event_one_name}" \ + "${event_one_ts}" \ + "${escaped_component}" \ + "${escaped_route}" \ + "${escaped_method}" \ + "${event_two_name}" \ + "${event_two_ts}" \ + "${escaped_component}" \ + "${escaped_route}" \ + "${escaped_method}" \ + "${event_status}" \ + "${duration_ms}" + + if [[ "${status_code}" == "ERROR" ]]; then + event_names="[\"${event_one_name}\",\"${event_two_name}\",\"exception\"]" + events="${events%]},"'{"event_name":"exception","event_timestamp_nanos":'"$((start_ns + duration_ms * 500000))"',"event_attributes":{"exception.type":"SyntheticFailure","exception.message":"'"$(json_escape "${status_message}")"'","exception.stacktrace":"SyntheticFailure: '"$(json_escape "${status_message}")"'\\n at quickwitFixture.checkout\\n at quickwitFixture.payment"}}'"]" + fi + + printf '{"trace_id":"%s","span_id":"%s"%s,"is_root":%s,"service_name":"%s","resource_attributes":{"service.name":"%s","service.namespace":"demo-shop","deployment.environment":"local","service.version":"dev"},"scope_name":"quickwit-datasource-fixture","scope_version":"0.1.0","span_kind":%s,"span_name":"%s","span_fingerprint":"%s\\u0000%s\\u0000%s","span_start_timestamp_nanos":%s,"span_end_timestamp_nanos":%s,"span_duration_millis":%s,"span_attributes":{"http.route":"%s","http.method":"%s","component":"%s","fixture":"multi-service-trace"%s},"span_status":%s,"event_names":%s,"events":%s,"links":[]}\n' \ + "${trace_id}" \ + "${span_id}" \ + "${parent_field}" \ + "${is_root}" \ + "$(json_escape "${service_name}")" \ + "$(json_escape "${service_name}")" \ + "${span_kind}" \ + "$(json_escape "${span_name}")" \ + "$(json_escape "${service_name}")" \ + "${span_kind}" \ + "$(json_escape "${span_name}")" \ + "${start_ns}" \ + "${end_ns}" \ + "${duration_ms}" \ + "$(json_escape "${route}")" \ + "$(json_escape "${method}")" \ + "$(json_escape "${component}")" \ + "${peer_service_attr}" \ + "${status}" \ + "${event_names}" \ + "${events}" +} + +log_doc() { + local trace_id="$1" + local span_id="$2" + local service_name="$3" + local span_name="$4" + local timestamp_ns="$5" + local severity_text="$6" + local severity_number="$7" + local message="$8" + local route="$9" + local method="${10}" + + printf '{"timestamp_nanos":%s,"observed_timestamp_nanos":%s,"service_name":"%s","severity_text":"%s","severity_number":%s,"body":{"message":"%s"},"attributes":{"http.route":"%s","http.method":"%s","span.name":"%s","fixture":"multi-service-trace"},"dropped_attributes_count":0,"trace_id":"%s","span_id":"%s","trace_flags":1,"resource_attributes":{"service.name":"%s","service.namespace":"demo-shop","deployment.environment":"local","service.version":"dev"},"resource_dropped_attributes_count":0,"scope_name":"quickwit-datasource-fixture","scope_version":"0.1.0","scope_attributes":{},"scope_dropped_attributes_count":0}\n' \ + "${timestamp_ns}" \ + "${timestamp_ns}" \ + "$(json_escape "${service_name}")" \ + "$(json_escape "${severity_text}")" \ + "${severity_number}" \ + "$(json_escape "${message}")" \ + "$(json_escape "${route}")" \ + "$(json_escape "${method}")" \ + "$(json_escape "${span_name}")" \ + "${trace_id}" \ + "${span_id}" \ + "$(json_escape "${service_name}")" +} + +base_ns="$(now_ns)" +trace_tmp_file="$(mktemp)" +log_tmp_file="$(mktemp)" +trap 'rm -f "${trace_tmp_file}" "${log_tmp_file}"' EXIT + +{ + span_doc "${TRACE_ID}" "1111111111111111" "" "web-frontend" "GET /checkout" 2 "${base_ns}" 420 "" "" "/checkout" "GET" "http" + span_doc "${TRACE_ID}" "2222222222222222" "1111111111111111" "checkout-api" "POST /api/checkout" 3 "$((base_ns + 35 * 1000000))" 310 "" "" "/api/checkout" "POST" "http" + span_doc "${TRACE_ID}" "3333333333333333" "2222222222222222" "inventory-service" "Reserve inventory" 3 "$((base_ns + 80 * 1000000))" 85 "" "" "inventory.reserve" "RPC" "grpc" "warehouse-system" + span_doc "${TRACE_ID}" "4444444444444444" "2222222222222222" "payment-service" "Authorize payment" 3 "$((base_ns + 145 * 1000000))" 135 "" "" "payment.authorize" "RPC" "grpc" + span_doc "${TRACE_ID}" "5555555555555555" "4444444444444444" "postgres-payments" "UPDATE payment_intents" 3 "$((base_ns + 175 * 1000000))" 42 "" "" "payment_intents" "SQL" "db" + span_doc "${TRACE_ID}" "6666666666666666" "2222222222222222" "email-service" "Send confirmation email" 4 "$((base_ns + 265 * 1000000))" 55 "" "" "email.confirmation" "RPC" "queue" + + error_base_ns=$((base_ns + 2 * 1000000000)) + span_doc "${ERROR_TRACE_ID}" "aaaaaaaaaaaaaaaa" "" "web-frontend" "GET /checkout" 2 "${error_base_ns}" 690 "ERROR" "checkout failed" "/checkout" "GET" "http" + span_doc "${ERROR_TRACE_ID}" "bbbbbbbbbbbbbbbb" "aaaaaaaaaaaaaaaa" "checkout-api" "POST /api/checkout" 3 "$((error_base_ns + 40 * 1000000))" 610 "ERROR" "payment authorization failed" "/api/checkout" "POST" "http" + span_doc "${ERROR_TRACE_ID}" "cccccccccccccccc" "bbbbbbbbbbbbbbbb" "payment-service" "Authorize payment" 3 "$((error_base_ns + 105 * 1000000))" 470 "ERROR" "card declined" "payment.authorize" "RPC" "grpc" + span_doc "${ERROR_TRACE_ID}" "dddddddddddddddd" "cccccccccccccccc" "fraud-service" "Score transaction risk" 3 "$((error_base_ns + 150 * 1000000))" 145 "" "" "fraud.score" "RPC" "grpc" "risk-engine" + span_doc "${ERROR_TRACE_ID}" "eeeeeeeeeeeeeeee" "cccccccccccccccc" "postgres-payments" "SELECT payment_method" 3 "$((error_base_ns + 350 * 1000000))" 70 "" "" "payment_methods" "SQL" "db" +} > "${trace_tmp_file}" + +curl -fsS "${QUICKWIT_URL}/${INDEX}/ingest?commit=wait_for" \ + -H 'Content-Type: application/json' \ + --data-binary "@${trace_tmp_file}" + +if [[ "${INGEST_LOGS}" == "1" ]]; then + { + log_doc "${TRACE_ID}" "1111111111111111" "web-frontend" "GET /checkout" "$((base_ns + 40 * 1000000))" "INFO" 9 "checkout page requested" "/checkout" "GET" + log_doc "${TRACE_ID}" "2222222222222222" "checkout-api" "POST /api/checkout" "$((base_ns + 90 * 1000000))" "INFO" 9 "checkout request accepted" "/api/checkout" "POST" + log_doc "${TRACE_ID}" "3333333333333333" "inventory-service" "Reserve inventory" "$((base_ns + 120 * 1000000))" "INFO" 9 "inventory reserved" "inventory.reserve" "RPC" + log_doc "${TRACE_ID}" "4444444444444444" "payment-service" "Authorize payment" "$((base_ns + 210 * 1000000))" "INFO" 9 "payment authorization approved" "payment.authorize" "RPC" + log_doc "${TRACE_ID}" "5555555555555555" "postgres-payments" "UPDATE payment_intents" "$((base_ns + 198 * 1000000))" "INFO" 9 "payment intent updated" "payment_intents" "SQL" + log_doc "${TRACE_ID}" "6666666666666666" "email-service" "Send confirmation email" "$((base_ns + 285 * 1000000))" "INFO" 9 "confirmation email queued" "email.confirmation" "RPC" + + log_doc "${ERROR_TRACE_ID}" "aaaaaaaaaaaaaaaa" "web-frontend" "GET /checkout" "$((error_base_ns + 80 * 1000000))" "ERROR" 17 "checkout failed" "/checkout" "GET" + log_doc "${ERROR_TRACE_ID}" "bbbbbbbbbbbbbbbb" "checkout-api" "POST /api/checkout" "$((error_base_ns + 130 * 1000000))" "ERROR" 17 "payment authorization failed" "/api/checkout" "POST" + log_doc "${ERROR_TRACE_ID}" "cccccccccccccccc" "payment-service" "Authorize payment" "$((error_base_ns + 250 * 1000000))" "ERROR" 17 "card declined by synthetic fixture" "payment.authorize" "RPC" + log_doc "${ERROR_TRACE_ID}" "dddddddddddddddd" "fraud-service" "Score transaction risk" "$((error_base_ns + 205 * 1000000))" "INFO" 9 "fraud risk score calculated" "fraud.score" "RPC" + log_doc "${ERROR_TRACE_ID}" "eeeeeeeeeeeeeeee" "postgres-payments" "SELECT payment_method" "$((error_base_ns + 380 * 1000000))" "INFO" 9 "payment method lookup completed" "payment_methods" "SQL" + } > "${log_tmp_file}" + + curl -fsS "${QUICKWIT_URL}/${LOG_INDEX}/ingest?commit=wait_for" \ + -H 'Content-Type: application/json' \ + --data-binary "@${log_tmp_file}" +fi + +printf 'Ingested multi-service trace fixtures into %s/%s\n' "${QUICKWIT_URL}" "${INDEX}" +if [[ "${INGEST_LOGS}" == "1" ]]; then + printf 'Ingested correlated log fixtures into %s/%s\n' "${QUICKWIT_URL}" "${LOG_INDEX}" +fi +printf 'Healthy trace: %s\n' "${TRACE_ID}" +printf 'Error trace: %s\n' "${ERROR_TRACE_ID}" diff --git a/scripts/test-local-traces.sh b/scripts/test-local-traces.sh new file mode 100755 index 0000000..2e1f1f1 --- /dev/null +++ b/scripts/test-local-traces.sh @@ -0,0 +1,235 @@ +#!/usr/bin/env bash +set -euo pipefail + +QUICKWIT_URL="${QUICKWIT_URL:-http://127.0.0.1:7280/api/v1}" +GRAFANA_URL="${GRAFANA_URL:-http://127.0.0.1:3000}" +GRAFANA_USER="${GRAFANA_USER:-admin}" +GRAFANA_PASSWORD="${GRAFANA_PASSWORD:-admin}" +LOGS_DATASOURCE_NAME="${LOGS_DATASOURCE_NAME:-Quickwit}" +TRACES_DATASOURCE_NAME="${TRACES_DATASOURCE_NAME:-Quickwit Traces}" + +require_cmd() { + local cmd="$1" + if ! command -v "${cmd}" >/dev/null 2>&1; then + printf 'Missing required command: %s\n' "${cmd}" >&2 + exit 1 + fi +} + +new_trace_id() { + printf '%s-%s' "$1" "$(date +%s%N)" | sha256sum | awk '{print substr($1, 1, 32)}' +} + +wait_for_grafana() { + local attempts=30 + local attempt + for ((attempt = 1; attempt <= attempts; attempt++)); do + if curl -fsS -u "${GRAFANA_USER}:${GRAFANA_PASSWORD}" "${GRAFANA_URL}/api/health" >/dev/null; then + return 0 + fi + sleep 1 + done + + printf 'Grafana did not become ready at %s\n' "${GRAFANA_URL}" >&2 + return 1 +} + +query_grafana() { + local datasource_uid="$1" + local query="$2" + local metrics_json="$3" + local bucket_aggs_json="${4:-[]}" + + local body + body="$( + jq -nc \ + --arg uid "${datasource_uid}" \ + --arg query "${query}" \ + --argjson metrics "${metrics_json}" \ + --argjson bucketAggs "${bucket_aggs_json}" \ + '{ + from: "now-15m", + to: "now", + queries: [ + { + refId: "A", + datasource: { type: "quickwit-quickwit-datasource", uid: $uid }, + query: $query, + bucketAggs: $bucketAggs, + metrics: $metrics, + maxDataPoints: 1000, + intervalMs: 1000 + } + ] + }' + )" + + curl -fsS \ + -u "${GRAFANA_USER}:${GRAFANA_PASSWORD}" \ + -H 'Content-Type: application/json' \ + --data "${body}" \ + "${GRAFANA_URL}/api/ds/query" +} + +assert_jq() { + local json="$1" + local filter="$2" + local message="$3" + + if ! jq -e "${filter}" >/dev/null <<<"${json}"; then + printf 'Assertion failed: %s\n' "${message}" >&2 + jq '.' <<<"${json}" >&2 + exit 1 + fi +} + +assert_jq_arg() { + local json="$1" + local arg_name="$2" + local arg_value="$3" + local filter="$4" + local message="$5" + + if ! jq -e --arg "${arg_name}" "${arg_value}" "${filter}" >/dev/null <<<"${json}"; then + printf 'Assertion failed: %s\n' "${message}" >&2 + jq '.' <<<"${json}" >&2 + exit 1 + fi +} + +value_for_trace() { + local json="$1" + local trace_id="$2" + local column_index="$3" + + jq -r \ + --arg traceId "${trace_id}" \ + --argjson columnIndex "${column_index}" \ + '.results.A.frames[0].data.values[0] as $traceIds + | ($traceIds | index($traceId)) as $idx + | .results.A.frames[0].data.values[$columnIndex][$idx]' \ + <<<"${json}" +} + +require_cmd curl +require_cmd jq +require_cmd sha256sum +require_cmd awk + +TRACE_ID="${TRACE_ID:-$(new_trace_id healthy-local-trace-test)}" +ERROR_TRACE_ID="${ERROR_TRACE_ID:-$(new_trace_id error-local-trace-test)}" + +printf 'Checking Grafana at %s\n' "${GRAFANA_URL}" +wait_for_grafana + +datasources="$( + curl -fsS -u "${GRAFANA_USER}:${GRAFANA_PASSWORD}" "${GRAFANA_URL}/api/datasources" +)" + +logs_uid="$( + jq -r --arg name "${LOGS_DATASOURCE_NAME}" \ + '.[] | select(.name == $name and .type == "quickwit-quickwit-datasource") | .uid' \ + <<<"${datasources}" | head -n 1 +)" +traces_uid="$( + jq -r --arg name "${TRACES_DATASOURCE_NAME}" \ + '.[] | select(.name == $name and .type == "quickwit-quickwit-datasource") | .uid' \ + <<<"${datasources}" | head -n 1 +)" + +if [[ -z "${logs_uid}" || "${logs_uid}" == "null" ]]; then + printf 'Could not find logs datasource named %s\n' "${LOGS_DATASOURCE_NAME}" >&2 + exit 1 +fi +if [[ -z "${traces_uid}" || "${traces_uid}" == "null" ]]; then + printf 'Could not find traces datasource named %s\n' "${TRACES_DATASOURCE_NAME}" >&2 + exit 1 +fi + +printf 'Using logs datasource %s and traces datasource %s\n' "${logs_uid}" "${traces_uid}" +printf 'Ingesting fixture traces into %s\n' "${QUICKWIT_URL}" +TRACE_ID="${TRACE_ID}" ERROR_TRACE_ID="${ERROR_TRACE_ID}" QUICKWIT_URL="${QUICKWIT_URL}" \ + ./scripts/ingest-multi-service-traces.sh >/dev/null + +trace_search_response="$( + query_grafana \ + "${traces_uid}" \ + "span_attributes.fixture:multi-service-trace AND (trace_id:${TRACE_ID} OR trace_id:${ERROR_TRACE_ID})" \ + '[{"id":"1","type":"trace_search","settings":{"limit":"20","spanLimit":"200"}}]' +)" + +assert_jq "${trace_search_response}" '.results.A.status == 200' 'trace search query returned HTTP 200' +assert_jq "${trace_search_response}" '.results.A.frames[0].schema.meta.preferredVisualisationType == "table"' 'trace search frame is a table' +assert_jq_arg "${trace_search_response}" traceId "${TRACE_ID}" '.results.A.frames[0].data.values[0] | index($traceId) != null' 'healthy trace appears in trace search' +assert_jq_arg "${trace_search_response}" traceId "${ERROR_TRACE_ID}" '.results.A.frames[0].data.values[0] | index($traceId) != null' 'error trace appears in trace search' +assert_jq "${trace_search_response}" '.results.A.frames[0].schema.fields[0].config.links[0].title == "Open trace"' 'trace search exposes Open trace link' +assert_jq "${trace_search_response}" '.results.A.frames[0].schema.fields[0].config.links[0].internal.query.queryType == "traces"' 'trace search link targets the traces query type' +assert_jq "${trace_search_response}" '.results.A.frames[0].schema.fields[0].config.links[0].internal.query.datasource.uid == .results.A.frames[0].schema.fields[0].config.links[0].internal.datasourceUid' 'trace search link embeds its datasource in the target query' + +healthy_spans="$(value_for_trace "${trace_search_response}" "${TRACE_ID}" 3)" +error_spans="$(value_for_trace "${trace_search_response}" "${ERROR_TRACE_ID}" 3)" +error_count="$(value_for_trace "${trace_search_response}" "${ERROR_TRACE_ID}" 8)" + +if [[ "${healthy_spans}" != "6" || "${error_spans}" != "5" || "${error_count}" != "3" ]]; then + printf 'Unexpected trace search summary: healthy_spans=%s error_spans=%s error_count=%s\n' \ + "${healthy_spans}" "${error_spans}" "${error_count}" >&2 + exit 1 +fi + +trace_response="$( + query_grafana \ + "${traces_uid}" \ + "trace_id:${ERROR_TRACE_ID}" \ + '[{"id":"1","type":"traces","settings":{"limit":"1000"}}]' +)" + +assert_jq "${trace_response}" '.results.A.status == 200' 'full trace query returned HTTP 200' +assert_jq "${trace_response}" '.results.A.frames[0].schema.meta.preferredVisualisationType == "trace"' 'full trace frame is a trace' +assert_jq "${trace_response}" '.results.A.frames[0].data.values[0] | length == 5' 'error trace has five spans' +assert_jq "${trace_response}" '.results.A.frames[0].schema.fields[1].config.links[0].title == "Logs for span"' 'trace spans expose Logs for span link' +assert_jq "${trace_response}" '.results.A.frames[0].schema.fields[1].config.links[0].internal.query.queryType == "logs"' 'span log link targets the logs query type' +assert_jq "${trace_response}" '.results.A.frames[0].schema.fields[1].config.links[0].internal.query.datasource.uid == .results.A.frames[0].schema.fields[1].config.links[0].internal.datasourceUid' 'span log link embeds its datasource in the target query' +assert_jq "${trace_response}" '.results.A.frames[1].schema.meta.preferredVisualisationType == "nodeGraph"' 'nodes frame is a node graph' +assert_jq "${trace_response}" '.results.A.frames[1].data.values[0] | length == 5' 'node graph has five service nodes' +assert_jq "${trace_response}" '.results.A.frames[2].data.values[0] | length == 4' 'node graph has four service edges' +assert_jq "${trace_response}" '.results.A.frames[0].data.values[12] | map(select(. == 2)) | length == 3' 'full trace marks three error spans' + +exemplar_link_response="$( + query_grafana \ + "${traces_uid}" \ + "${ERROR_TRACE_ID}" \ + '[{"id":"3","type":"logs","settings":{"limit":"100"}}]' +)" + +assert_jq "${exemplar_link_response}" '.results.A.status == 200' 'exemplar-style internal link query returned HTTP 200' +assert_jq "${exemplar_link_response}" '.results.A.frames[0].schema.meta.preferredVisualisationType == "trace"' 'exemplar-style internal link opens a trace frame' +assert_jq "${exemplar_link_response}" '.results.A.frames[0].data.values[0] | length == 5' 'exemplar-style internal link resolves the bare trace id' + +logs_response="$( + query_grafana \ + "${logs_uid}" \ + "trace_id:${ERROR_TRACE_ID} AND span_id:cccccccccccccccc" \ + '[{"id":"1","type":"logs","settings":{"limit":"100"}}]' +)" + +assert_jq "${logs_response}" '.results.A.status == 200' 'logs query returned HTTP 200' +assert_jq "${logs_response}" '.results.A.frames[0].schema.meta.preferredVisualisationType == "logs"' 'correlated logs frame is a logs frame' +assert_jq "${logs_response}" '.results.A.frames[0] as $frame + | ($frame.schema.fields | map(.name) | index("body.message")) as $messageField + | $frame.data.values[$messageField][0] == "card declined by synthetic fixture"' 'span log correlation returns the expected log line' + +metrics_response="$( + query_grafana \ + "${traces_uid}" \ + "span_attributes.fixture:multi-service-trace AND trace_id:${ERROR_TRACE_ID}" \ + '[{"id":"1","type":"count"}]' \ + '[{"id":"2","type":"date_histogram","settings":{"interval":"1s","min_doc_count":"1"}}]' +)" + +assert_jq "${metrics_response}" '.results.A.status == 200' 'metric query returned HTTP 200' +assert_jq "${metrics_response}" '.results.A.frames[0].schema.meta.type == "timeseries-multi"' 'metric query returns a time series' +assert_jq "${metrics_response}" '.results.A.frames[0].data.values[1] | add == 5' 'metric count over the error trace returns five spans' + +printf 'Local trace validation passed.\n' +printf 'Healthy trace: %s (%s spans)\n' "${TRACE_ID}" "${healthy_spans}" +printf 'Error trace: %s (%s spans, %s error spans)\n' "${ERROR_TRACE_ID}" "${error_spans}" "${error_count}" diff --git a/src/README.md b/src/README.md index d90a7bf..6cfb76d 100644 --- a/src/README.md +++ b/src/README.md @@ -17,6 +17,24 @@ You need a Quickwit standalone server or cluster. The Quickwit data source plugin works with dashboards and explore views. Alerts are also available. +### Logs and traces + +The query editor supports logs, trace search, full trace view, and raw data queries. + +For OpenTelemetry traces, use: + +- **Trace search** to find traces from matching spans. +- **Traces** to open a single trace by `trace_id`. + +Trace results include span events, span status, exception stack traces, service names, service tags, span tags, and service node graph frames. + +When logs and traces are stored in separate Quickwit indexes, create one Quickwit datasource per index and configure the related datasource fields: + +- On the logs datasource, set the traces datasource used by log-to-trace links. +- On the traces datasource, set the logs datasource used by trace-to-logs links. + +Log-to-trace links are added for log fields named `trace_id`, `traceID`, `traceId`, or `attributes.trace_id`. Trace-to-logs links query logs with both `trace_id` and `span_id`. + ## Installation ### Installation on Grafana Cloud diff --git a/src/components/QueryEditor/ElasticsearchQueryContext.tsx b/src/components/QueryEditor/ElasticsearchQueryContext.tsx index 6c8dddf..ea911e6 100644 --- a/src/components/QueryEditor/ElasticsearchQueryContext.tsx +++ b/src/components/QueryEditor/ElasticsearchQueryContext.tsx @@ -5,6 +5,7 @@ import { CoreApp, TimeRange } from '@grafana/data'; import { BaseQuickwitDataSource } from '@/datasource/base'; import { combineReducers, useStatelessReducer, DispatchContext } from '@/hooks/useStatelessReducer'; import { ElasticsearchQuery } from '@/types'; +import { stripQueryType } from '@/queryModel'; import { createReducer as createBucketAggsReducer } from './BucketAggregationsEditor/state/reducer'; import { reducer as metricsReducer } from './MetricAggregationsEditor/state/reducer'; @@ -60,7 +61,7 @@ export const ElasticsearchProvider = withStore(({ const onStateChange = useCallback( (query: ElasticsearchQuery) => { - onChange(query); + onChange(stripQueryType(query)); }, [onChange] ); diff --git a/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/TraceSearchSettingsEditor.tsx b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/TraceSearchSettingsEditor.tsx new file mode 100644 index 0000000..b84b44a --- /dev/null +++ b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/TraceSearchSettingsEditor.tsx @@ -0,0 +1,112 @@ +import React from 'react'; + +import { MetricFindValue, SelectableValue } from '@grafana/data'; +import { InlineField, Input, SegmentAsync, Select } from '@grafana/ui'; + +import { useDatasource, useRange } from '@/components/QueryEditor/ElasticsearchQueryContext'; +import { segmentStyles } from '@/components/QueryEditor/styles'; +import { useDispatch } from '@/hooks/useStatelessReducer'; +import { MetricAggregation } from '@/types'; +import { fuzzySearchSort } from '@/utils'; + +import { changeMetricSetting } from '../state/actions'; + +type TraceSearchStatus = '' | 'error' | 'ok' | 'unset'; + +type TraceSearchSettings = Extract['settings'] & { + serviceName?: string; + spanName?: string; + status?: TraceSearchStatus; + minDuration?: string; + maxDuration?: string; +}; + +type TraceSearchMetric = Extract & { + settings?: TraceSearchSettings; +}; + +interface Props { + metric: Extract; +} + +const statusOptions: Array> = [ + { label: 'Any', value: '' }, + { label: 'Error', value: 'error' }, + { label: 'Ok', value: 'ok' }, + { label: 'Unset', value: 'unset' }, +]; + +function toFuzzyOptions(values: MetricFindValue[], query?: string): Array> { + return fuzzySearchSort( + values.map((value) => String(value.text)), + (text) => text, + query + ).map((text) => ({ label: text, value: text })); +} + +export const TraceSearchSettingsEditor = ({ metric }: Props) => { + const typedMetric = metric as TraceSearchMetric; + const dispatch = useDispatch(); + const datasource = useDatasource(); + const range = useRange(); + + const changeSetting = (settingName: keyof TraceSearchSettings, newValue?: string) => { + dispatch(changeMetricSetting({ metric: typedMetric, settingName, newValue: newValue?.trim() || undefined })); + }; + + const loadFieldValues = (field: string) => async (query?: string): Promise>> => { + if (!datasource.getTagValues) { + return []; + } + const values = await datasource.getTagValues({ key: field, timeRange: range }); + return toFuzzyOptions(values, query); + }; + + return ( + <> + + changeSetting('serviceName', e.value)} + placeholder="All services" + value={typedMetric.settings?.serviceName} + /> + + + changeSetting('spanName', e.value)} + placeholder="All spans" + value={typedMetric.settings?.spanName} + /> + + + changeSetting('minDuration', e.target.value)} + defaultValue={typedMetric.settings?.minDuration} + placeholder="100ms" + /> + + + changeSetting('maxDuration', e.target.value)} + defaultValue={typedMetric.settings?.maxDuration} + placeholder="1.2s" + /> + + + ); +}; diff --git a/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.test.tsx b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.test.tsx index a0b26ad..16ead91 100644 --- a/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.test.tsx +++ b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.test.tsx @@ -4,7 +4,7 @@ import React from 'react'; import { CoreApp, getDefaultTimeRange } from '@grafana/data'; import { ElasticDatasource } from '@/datasource'; -import { ElasticsearchQuery } from '@/types'; +import { ElasticsearchQuery, MetricAggregation } from '@/types'; import { ElasticsearchProvider } from '../../ElasticsearchQueryContext'; import { SettingsEditor } from '.'; @@ -131,4 +131,60 @@ describe('Settings Editor', () => { expect(modeSelectElement).toBeInTheDocument(); }); }); + + describe('Trace search', () => { + it('renders structured trace filters and updates duration settings', () => { + const metricId = '1'; + const query: ElasticsearchQuery = { + refId: 'A', + query: '', + metrics: [ + { + id: metricId, + type: 'trace_search', + settings: { + limit: '20', + spanLimit: '5000', + serviceName: 'checkout', + }, + } as MetricAggregation, + ], + bucketAggs: [], + filters: [], + }; + + const onChange = jest.fn(); + + render( + {}} + range={getDefaultTimeRange()} + > + + + ); + + expect(screen.getByRole('button', { name: /Traces: 20, scan: 5000 spans/i })).toHaveAttribute( + 'aria-expanded', + 'true' + ); + expect(screen.getByText('Service name')).toBeInTheDocument(); + expect(screen.getByText('Span name')).toBeInTheDocument(); + expect(screen.getByText('Status')).toBeInTheDocument(); + + const minDurationInput = screen.getByLabelText('Min duration'); + fireEvent.change(minDurationInput, { target: { value: '250ms' } }); + fireEvent.blur(minDurationInput); + + expect(onChange).toHaveBeenCalledTimes(1); + expect(onChange.mock.calls[0][0].metrics[0].settings).toMatchObject({ + serviceName: 'checkout', + minDuration: '250ms', + }); + }); + }); }); diff --git a/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.tsx b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.tsx index 9b5b8c4..840e13c 100644 --- a/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.tsx +++ b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.tsx @@ -17,6 +17,7 @@ import { SettingField } from './SettingField'; import { TopMetricsSettingsEditor } from './TopMetricsSettingsEditor'; import { useDescription } from './useDescription'; import { LogsSettingsEditor } from './LogsSettingsEditor'; +import { TraceSearchSettingsEditor } from './TraceSearchSettingsEditor'; // TODO: Move this somewhere and share it with BucketsAggregation Editor export const inlineFieldProps: Partial> = { @@ -52,7 +53,7 @@ export const SettingsEditor = ({ metric, previousMetrics }: Props) => { ]; return ( -