From ee88eb69e023fd934e54b1fb39d6094a66b8a6bd Mon Sep 17 00:00:00 2001 From: Patrik Cyvoct Date: Mon, 27 Apr 2026 02:32:24 +0200 Subject: [PATCH] feat: add trace support Add trace search and full trace rendering for OpenTelemetry trace indexes. This includes Grafana trace frames, service node graph frames, span events, exception stack traces, status/warning handling, service tags, peer service metadata, and stable per-service node colors. Add trace-to-logs and log-to-trace correlation links for separate Quickwit logs and traces indexes, plus configuration fields for related datasources. Split trace parsing into dedicated backend files, add parser and frontend link tests, document the feature, and add a local multi-service trace/log fixture script for manual testing. Signed-off-by: Patrik Cyvoct --- README.md | 110 ++ pkg/quickwit/client/client.go | 6 + pkg/quickwit/data_query.go | 200 ++- pkg/quickwit/data_query_test.go | 95 +- pkg/quickwit/elasticsearch.go | 2 +- pkg/quickwit/models.go | 3 + pkg/quickwit/parse_query.go | 8 +- pkg/quickwit/parse_query_test.go | 150 ++ pkg/quickwit/quickwit.go | 26 + pkg/quickwit/response_parser.go | 16 +- pkg/quickwit/response_parser_test.go | 6 +- pkg/quickwit/response_parser_traces.go | 1253 +++++++++++++++++ pkg/quickwit/response_parser_traces_test.go | 544 +++++++ scripts/ingest-multi-service-traces.sh | 217 +++ scripts/test-local-traces.sh | 235 ++++ src/README.md | 18 + .../QueryEditor/ElasticsearchQueryContext.tsx | 3 +- .../TraceSearchSettingsEditor.tsx | 112 ++ .../SettingsEditor/index.test.tsx | 58 +- .../SettingsEditor/index.tsx | 39 +- .../SettingsEditor/useDescription.ts | 22 +- .../MetricAggregationsEditor/aggregations.ts | 2 + .../MetricAggregationsEditor/index.tsx | 24 +- .../MetricAggregationsEditor/utils.ts | 35 +- .../QueryEditor/QueryTypeSelector.tsx | 4 + .../QueryEditor/SettingsEditorContainer.tsx | 10 +- src/components/QueryEditor/index.test.tsx | 37 +- src/components/QueryEditor/index.tsx | 27 +- src/configuration/ConfigEditor.test.tsx | 7 + src/configuration/ConfigEditor.tsx | 88 +- src/configuration/utils.ts | 4 + src/dataquery.gen.ts | 19 +- src/datasource/base.ts | 192 ++- src/datasource/processResponse.test.ts | 109 +- src/datasource/processResponse.ts | 104 +- src/plugin.json | 5 +- src/queryModel.test.ts | 139 ++ src/queryModel.ts | 80 ++ src/quickwit.ts | 10 +- src/store/defaults/conf.ts | 38 +- src/types.ts | 37 +- 41 files changed, 3910 insertions(+), 184 deletions(-) create mode 100644 pkg/quickwit/response_parser_traces.go create mode 100644 pkg/quickwit/response_parser_traces_test.go create mode 100755 scripts/ingest-multi-service-traces.sh create mode 100755 scripts/test-local-traces.sh create mode 100644 src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/TraceSearchSettingsEditor.tsx create mode 100644 src/queryModel.test.ts create mode 100644 src/queryModel.ts diff --git a/README.md b/README.md index b5207fb..d5d0a05 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,7 @@ To configure the Quickwit datasource, you need to provide the following informat - The index name. - The log message field name (optional). This is the field displayed in the explorer view. - The log level field name (optional). It must be a fast field. +- The related logs or traces datasource (optional). This enables trace-to-logs and log-to-trace links when logs and traces are stored in separate Quickwit indexes. ### With Grafana UI @@ -141,6 +142,112 @@ datasources: logLevelField: severity_text ``` +### Logs and traces in separate indexes + +When logs and traces are stored in different Quickwit indexes, configure one datasource per index and link them with `logsDatasourceUid` and `tracesDatasourceUid`. + +```yaml +apiVersion: 1 + +datasources: + - name: Quickwit Logs + uid: quickwit-logs + type: quickwit-quickwit-datasource + url: http://localhost:7280/api/v1 + jsonData: + index: 'otel-logs-v0_9' + logMessageField: body.message + logLevelField: severity_text + tracesDatasourceUid: quickwit-traces + tracesDatasourceName: Quickwit Traces + + - name: Quickwit Traces + uid: quickwit-traces + type: quickwit-quickwit-datasource + url: http://localhost:7280/api/v1 + jsonData: + index: 'otel-traces-v0_9' + logsDatasourceUid: quickwit-logs + logsDatasourceName: Quickwit Logs +``` + +## Traces + +The query editor has two trace query types: + +- **Trace search** scans matching spans and returns one row per trace. Use this to find trace IDs by Lucene query, service, operation, status, or attributes. +- **Traces** returns a full trace frame for Grafana's trace viewer. Use this with a trace ID query such as `trace_id:abc123`. + +The trace parser expects Quickwit OpenTelemetry trace fields such as: + +- `trace_id` +- `span_id` +- `parent_span_id` +- `service_name` +- `span_name` +- `span_start_timestamp_nanos` +- `span_duration_millis` or `span_end_timestamp_nanos` + +It also reads optional fields for richer trace rendering: + +- `resource_attributes` for service tags +- `span_attributes` for span tags, including `service.peer.name` and `peer.service` +- `span_status` for error status and warnings +- `events` for span events and exception stack traces +- `links` for span references +- `scope_name` and `scope_version` for instrumentation library details + +Trace responses include: + +- Grafana trace frames for the trace viewer. +- Node graph frames that summarize service-to-service calls. +- Span warnings for error status and dropped attributes/events/links. +- Span event details and exception stack traces. +- Stable per-service node colors in the node graph. + +### Trace/log correlations + +Trace-to-logs links are attached to each trace span. They query the configured logs datasource with: + +```text +trace_id:${__span.traceId} AND span_id:${__span.spanId} +``` + +Log-to-trace links are attached to log fields named: + +- `trace_id` +- `traceID` +- `traceId` +- `attributes.trace_id` + +They open the configured traces datasource with: + +```text +trace_id:${__value.raw} +``` + +### Local trace fixtures + +For local testing, use the fixture script: + +```bash +QUICKWIT_URL=http://127.0.0.1:7280/api/v1 ./scripts/ingest-multi-service-traces.sh +``` + +It writes two multi-service traces into `otel-traces-v0_9` and matching correlated logs into `otel-logs-v0_9`. + +In **Trace search**, filter the fixture data with: + +```text +span_attributes.fixture:multi-service-trace +``` + +In **Traces**, open a returned trace ID with: + +```text +trace_id: +``` + ## Features - Explore view. @@ -149,6 +256,9 @@ datasources: - Adhoc filters. - Annotations - Explore Log Context. +- Trace search and trace view. +- Trace-to-logs and log-to-trace links. +- Service node graph for trace results. - [Alerting](https://grafana.com/docs/grafana/latest/alerting/). ## FAQ and Limitations diff --git a/pkg/quickwit/client/client.go b/pkg/quickwit/client/client.go index 173d8f3..0fc006a 100644 --- a/pkg/quickwit/client/client.go +++ b/pkg/quickwit/client/client.go @@ -22,6 +22,12 @@ type ReadyStatus struct { type DatasourceInfo struct { ID int64 + UID string + Name string + LogsDatasourceUID string + LogsDatasourceName string + TracesDatasourceUID string + TracesDatasourceName string HTTPClient *http.Client URL string Database string diff --git a/pkg/quickwit/data_query.go b/pkg/quickwit/data_query.go index 0a5871a..8f26396 100644 --- a/pkg/quickwit/data_query.go +++ b/pkg/quickwit/data_query.go @@ -4,6 +4,7 @@ import ( "fmt" "regexp" "strconv" + "strings" es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client" "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/simplejson" @@ -25,11 +26,22 @@ func buildMSR(queries []*Query, defaultTimeField string) ([]*es.SearchRequest, e b := ms.Search(q.Interval) b.Size(0) filters := b.Query().Bool().Filter() + // Always pass Grafana's picker range through. Quickwit's metastore + // prunes splits whose timestamps fall outside this window, so even + // trace_id lookups go from "scan every split" to "scan a few" — the + // same speedup the native Jaeger endpoint gets via auto-derived bounds. filters.AddDateRangeFilter(defaultTimeField, q.RangeTo, q.RangeFrom) filters.AddQueryStringFilter(q.RawQuery, true, "AND") + if isTraceSearchQuery(q) { + filters.AddQueryStringFilter(traceSearchSettingsQuery(q), true, "AND") + } if isLogsQuery(q) { processLogsQuery(q, b, q.RangeFrom, q.RangeTo, defaultTimeField) + } else if isTraceSearchQuery(q) { + processTraceSearchQuery(q, b, defaultTimeField) + } else if isTracesQuery(q) { + processTracesQuery(q, b, defaultTimeField) } else if isDocumentQuery(q) { processDocumentQuery(q, b, q.RangeFrom, q.RangeTo, defaultTimeField) } else { @@ -269,8 +281,8 @@ func getPipelineAggField(m *MetricAgg) string { func isQueryWithError(query *Query) error { if len(query.BucketAggs) == 0 { - // If no aggregations, only document and logs queries are valid - if len(query.Metrics) == 0 || !(isLogsQuery(query) || isDocumentQuery(query)) { + // If no aggregations, only document, logs, and trace queries are valid + if len(query.Metrics) == 0 || !(isLogsQuery(query) || isTraceSearchQuery(query) || isTracesQuery(query) || isDocumentQuery(query)) { return fmt.Errorf("invalid query, missing metrics and aggregations") } } else { @@ -302,7 +314,15 @@ func isQueryWithError(query *Query) error { } func isLogsQuery(query *Query) bool { - return query.Metrics[0].Type == logsType + return queryMetricType(query) == logsType +} + +func isTracesQuery(query *Query) bool { + return queryMetricType(query) == tracesType +} + +func isTraceSearchQuery(query *Query) bool { + return queryMetricType(query) == traceSearchType } func isDocumentQuery(query *Query) bool { @@ -310,11 +330,169 @@ func isDocumentQuery(query *Query) bool { } func isRawDataQuery(query *Query) bool { - return query.Metrics[0].Type == rawDataType + return queryMetricType(query) == rawDataType } func isRawDocumentQuery(query *Query) bool { - return query.Metrics[0].Type == rawDocumentType + return queryMetricType(query) == rawDocumentType +} + +var ( + bareTraceIDPattern = regexp.MustCompile(`^[0-9a-fA-F]{32}$`) + durationPattern = regexp.MustCompile(`(?i)^\s*(\d+(?:\.\d+)?)\s*(ns|us|ms|s|m|h)?\s*$`) +) + +func firstMetricType(query *Query) string { + if query == nil || len(query.Metrics) == 0 { + return "" + } + return query.Metrics[0].Type +} + +func queryMetricType(query *Query) string { + return firstMetricType(query) +} + +func isBareTraceIDQuery(rawQuery string) bool { + return bareTraceIDPattern.MatchString(strings.TrimSpace(rawQuery)) +} + +func traceSearchSettingsQuery(query *Query) string { + if !isTraceSearchQuery(query) || len(query.Metrics) == 0 || query.Metrics[0].Settings == nil { + return "" + } + + settings := query.Metrics[0].Settings + clauses := []string{} + + if serviceName := strings.TrimSpace(settings.Get("serviceName").MustString()); serviceName != "" { + clauses = append(clauses, traceSearchPhraseClause("service_name", serviceName)) + } + if spanName := strings.TrimSpace(settings.Get("spanName").MustString()); spanName != "" { + clauses = append(clauses, traceSearchPhraseClause("span_name", spanName)) + } + if statusClause := traceSearchStatusClause(settings.Get("status").MustString()); statusClause != "" { + clauses = append(clauses, statusClause) + } + if minDuration, ok := traceSearchDurationMillis(settings.Get("minDuration").MustString()); ok { + clauses = append(clauses, "span_duration_millis:>="+minDuration) + } + if maxDuration, ok := traceSearchDurationMillis(settings.Get("maxDuration").MustString()); ok { + clauses = append(clauses, "span_duration_millis:<="+maxDuration) + } + + return strings.Join(clauses, " AND ") +} + +func traceSearchPhraseClause(fieldName, value string) string { + escaped := strings.ReplaceAll(value, `\`, `\\`) + escaped = strings.ReplaceAll(escaped, `"`, `\"`) + return fieldName + `:"` + escaped + `"` +} + +func traceSearchStatusClause(status string) string { + // span_status is mapped as `tokenizer: raw`, so matches are exact tokens. + // OTel canonical strings are TitleCase ("Error"/"Ok"/"Unset") and the OTLP + // wire form is the integer enum (0/1/2) or "STATUS_CODE_*". Some pipelines + // lowercase, so cover all variants we've seen. + switch strings.ToLower(strings.TrimSpace(status)) { + case "error": + return `(span_status.code:Error OR span_status.code:ERROR OR span_status.code:error OR span_status.code:STATUS_CODE_ERROR OR span_status.code:2 OR span_attributes.error:true OR span_attributes.otel.status_code:ERROR)` + case "ok": + return `(span_status.code:Ok OR span_status.code:OK OR span_status.code:ok OR span_status.code:STATUS_CODE_OK OR span_status.code:1)` + case "unset": + return `(span_status.code:Unset OR span_status.code:UNSET OR span_status.code:unset OR span_status.code:STATUS_CODE_UNSET OR span_status.code:0)` + default: + return "" + } +} + +func traceSearchDurationMillis(duration string) (string, bool) { + matches := durationPattern.FindStringSubmatch(duration) + if matches == nil { + return "", false + } + + value, err := strconv.ParseFloat(matches[1], 64) + if err != nil { + return "", false + } + + switch strings.ToLower(matches[2]) { + case "ns": + value = value / 1000000 + case "us": + value = value / 1000 + case "s": + value = value * 1000 + case "m": + value = value * 60 * 1000 + case "h": + value = value * 60 * 60 * 1000 + case "ms", "": + default: + return "", false + } + + return strconv.FormatFloat(value, 'f', -1, 64), true +} + +func canInferTraceLink(query *Query) bool { + firstMetricType := firstMetricType(query) + return firstMetricType == "" || firstMetricType == logsType || firstMetricType == tracesType +} + +func canTraceQueryTypeNormalize(query *Query) bool { + // queryType is a transient hint from internal trace links. Once the request + // has an explicit non-log/non-trace metric, the metric type is authoritative. + return query.QueryType == tracesType && canInferTraceLink(query) +} + +func normalizeInternalLinkTraceQuery(query *Query) { + if query == nil { + return + } + + if query.QueryType == tracesType && !canInferTraceLink(query) { + query.QueryType = "" + } + + if !canTraceQueryTypeNormalize(query) && + firstMetricType(query) != tracesType && + !(firstMetricType(query) == "" && isBareTraceIDQuery(query.RawQuery)) { + return + } + + rawQuery := strings.TrimSpace(query.RawQuery) + if isBareTraceIDQuery(rawQuery) { + query.RawQuery = "trace_id:" + rawQuery + } + query.QueryType = tracesType + query.BucketAggs = []*BucketAgg{} + + if len(query.Metrics) == 0 { + query.Metrics = []*MetricAgg{ + { + ID: "1", + Type: tracesType, + Settings: simplejson.NewFromAny(map[string]interface{}{"limit": "1000"}), + Meta: simplejson.New(), + }, + } + return + } + + query.Metrics = query.Metrics[:1] + query.Metrics[0].Type = tracesType + if query.Metrics[0].ID == "" { + query.Metrics[0].ID = "1" + } + if query.Metrics[0].Settings == nil { + query.Metrics[0].Settings = simplejson.New() + } + if query.Metrics[0].Settings.Get("limit").MustString() == "" { + query.Metrics[0].Settings.Set("limit", "1000") + } } func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) { @@ -337,6 +515,18 @@ func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defa } } +func processTracesQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) { + metric := q.Metrics[0] + b.Sort(es.SortOrderAsc, defaultTimeField, "epoch_nanos_int") + b.Size(stringToIntWithDefaultValue(metric.Settings.Get("limit").MustString(), defaultSize)) +} + +func processTraceSearchQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) { + metric := q.Metrics[0] + b.Sort(es.SortOrderDesc, defaultTimeField, "epoch_nanos_int") + b.Size(stringToIntWithDefaultValue(metric.Settings.Get("spanLimit").MustString(), 5000)) +} + func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) { metric := q.Metrics[0] b.Sort(es.SortOrderDesc, defaultTimeField, "epoch_nanos_int") diff --git a/pkg/quickwit/data_query_test.go b/pkg/quickwit/data_query_test.go index a9b8f81..b0f8a9c 100644 --- a/pkg/quickwit/data_query_test.go +++ b/pkg/quickwit/data_query_test.go @@ -12,6 +12,18 @@ import ( es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client" ) +func TestQueryTypeHelpersDoNotPanicWithoutMetrics(t *testing.T) { + query := &Query{} + + require.NotPanics(t, func() { + assert.False(t, isLogsQuery(query)) + assert.False(t, isTracesQuery(query)) + assert.False(t, isTraceSearchQuery(query)) + assert.False(t, isRawDataQuery(query)) + assert.False(t, isRawDocumentQuery(query)) + }) +} + func TestExecuteElasticsearchDataQuery(t *testing.T) { from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) @@ -1321,6 +1333,87 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { require.Equal(t, sr.Size, 1000) }) + t.Run("With traces query should return query sorted by ascending time and apply the picker range", func(t *testing.T) { + c := newFakeClient() + _, err := executeElasticsearchDataQuery(c, `{ + "query": "trace_id:3c191d03fa8be0653c191d03fa8be065", + "metrics": [{ "type": "traces", "id": "1", "settings": { "limit": "5000" }}] + }`, from, to) + require.NoError(t, err) + sr := c.multisearchRequests[0][0] + require.Equal(t, sr.Size, 5000) + require.Equal(t, sr.Sort[0]["@timestamp"]["order"], "asc") + + require.Len(t, sr.Query.Bool.Filters, 2) + rangeFilter := sr.Query.Bool.Filters[0].(*es.DateRangeFilter) + require.Equal(t, rangeFilter.Lte, "2018-05-15T17:55:00Z") + require.Equal(t, rangeFilter.Gte, "2018-05-15T17:50:00Z") + queryFilter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter) + require.Equal(t, queryFilter.Query, "trace_id:3c191d03fa8be0653c191d03fa8be065") + }) + + t.Run("With traces query using a non-trace-id filter should keep the time range", func(t *testing.T) { + c := newFakeClient() + _, err := executeElasticsearchDataQuery(c, `{ + "query": "service_name:quickwit", + "metrics": [{ "type": "traces", "id": "1", "settings": { "limit": "5000" }}] + }`, from, to) + require.NoError(t, err) + sr := c.multisearchRequests[0][0] + + rangeFilter := sr.Query.Bool.Filters[0].(*es.DateRangeFilter) + require.Equal(t, rangeFilter.Lte, "2018-05-15T17:55:00Z") + require.Equal(t, rangeFilter.Gte, "2018-05-15T17:50:00Z") + queryFilter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter) + require.Equal(t, queryFilter.Query, "service_name:quickwit") + }) + + t.Run("With trace search query should scan spans sorted by descending time", func(t *testing.T) { + c := newFakeClient() + _, err := executeElasticsearchDataQuery(c, `{ + "query": "service_name:quickwit", + "metrics": [{ "type": "trace_search", "id": "1", "settings": { "limit": "20", "spanLimit": "2500" }}] + }`, from, to) + require.NoError(t, err) + sr := c.multisearchRequests[0][0] + require.Equal(t, sr.Size, 2500) + require.Equal(t, sr.Sort[0]["@timestamp"]["order"], "desc") + + rangeFilter := sr.Query.Bool.Filters[0].(*es.DateRangeFilter) + require.Equal(t, rangeFilter.Lte, "2018-05-15T17:55:00Z") + require.Equal(t, rangeFilter.Gte, "2018-05-15T17:50:00Z") + queryFilter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter) + require.Equal(t, queryFilter.Query, "service_name:quickwit") + }) + + t.Run("With trace search builder settings should add structured trace filters", func(t *testing.T) { + c := newFakeClient() + _, err := executeElasticsearchDataQuery(c, `{ + "query": "span_attributes.http.method:GET", + "metrics": [{ + "type": "trace_search", + "id": "1", + "settings": { + "limit": "20", + "spanLimit": "2500", + "serviceName": "checkout", + "spanName": "GET /checkout", + "status": "error", + "minDuration": "100ms", + "maxDuration": "1.2s" + } + }] + }`, from, to) + require.NoError(t, err) + sr := c.multisearchRequests[0][0] + + require.Len(t, sr.Query.Bool.Filters, 3) + queryFilter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter) + require.Equal(t, "span_attributes.http.method:GET", queryFilter.Query) + traceSearchFilter := sr.Query.Bool.Filters[2].(*es.QueryStringFilter) + require.Equal(t, `service_name:"checkout" AND span_name:"GET /checkout" AND (span_status.code:Error OR span_status.code:ERROR OR span_status.code:error OR span_status.code:STATUS_CODE_ERROR OR span_status.code:2 OR span_attributes.error:true OR span_attributes.otel.status_code:ERROR) AND span_duration_millis:>=100 AND span_duration_millis:<=1200`, traceSearchFilter.Query) + }) + t.Run("With invalid query should return error", (func(t *testing.T) { c := newFakeClient() _, err := executeElasticsearchDataQuery(c, `{ @@ -1721,5 +1814,5 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time) return &backend.QueryDataResponse{}, err } - return parseResponse(res, queries, configuredFields) + return parseResponse(res, queries, configuredFields, nil) } diff --git a/pkg/quickwit/elasticsearch.go b/pkg/quickwit/elasticsearch.go index afb18f6..26bdaf0 100644 --- a/pkg/quickwit/elasticsearch.go +++ b/pkg/quickwit/elasticsearch.go @@ -45,7 +45,7 @@ func queryData(ctx context.Context, dataQueries []backend.DataQuery, dsInfo *es. return &backend.QueryDataResponse{}, err } - return parseResponse(res, queries, dsInfo.ConfiguredFields) + return parseResponse(res, queries, dsInfo.ConfiguredFields, dsInfo) } func handleQuickwitErrors(err error) (*backend.QueryDataResponse, error) { diff --git a/pkg/quickwit/models.go b/pkg/quickwit/models.go index 818908e..2029183 100644 --- a/pkg/quickwit/models.go +++ b/pkg/quickwit/models.go @@ -9,6 +9,7 @@ import ( // Query represents the time series query model of the datasource type Query struct { RawQuery string `json:"query"` + QueryType string `json:"queryType"` BucketAggs []*BucketAgg `json:"bucketAggs"` Metrics []*MetricAgg `json:"metrics"` Alias string `json:"alias"` @@ -60,6 +61,8 @@ var metricAggType = map[string]string{ "raw_data": "Raw Data", "rate": "Rate", "logs": "Logs", + "traces": "Traces", + "trace_search": "Trace search", } var extendedStats = map[string]string{ diff --git a/pkg/quickwit/parse_query.go b/pkg/quickwit/parse_query.go index 7f1632e..93f26ae 100644 --- a/pkg/quickwit/parse_query.go +++ b/pkg/quickwit/parse_query.go @@ -20,6 +20,7 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { // please do not create a new field with that name, to avoid potential problems with old, persisted queries. rawQuery := model.Get("query").MustString() + queryType := model.Get("queryType").MustString() bucketAggs, err := parseBucketAggs(model) if err != nil { return nil, err @@ -35,8 +36,9 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { from := q.TimeRange.From.UnixNano() / int64(time.Millisecond) to := q.TimeRange.To.UnixNano() / int64(time.Millisecond) - queries = append(queries, &Query{ + query := &Query{ RawQuery: rawQuery, + QueryType: queryType, BucketAggs: bucketAggs, Metrics: metrics, Alias: alias, @@ -46,7 +48,9 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { MaxDataPoints: q.MaxDataPoints, RangeFrom: from, RangeTo: to, - }) + } + normalizeInternalLinkTraceQuery(query) + queries = append(queries, query) } return queries, nil diff --git a/pkg/quickwit/parse_query_test.go b/pkg/quickwit/parse_query_test.go index 44fb115..6b71f77 100644 --- a/pkg/quickwit/parse_query_test.go +++ b/pkg/quickwit/parse_query_test.go @@ -101,5 +101,155 @@ func TestParseQuery(t *testing.T) { require.Equal(t, q.BucketAggs[1].Settings.Get("min_doc_count").MustInt(), 0) require.Equal(t, q.BucketAggs[1].Settings.Get("trimEdges").MustInt(), 0) }) + + t.Run("Should normalize exemplar-style bare trace id links", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "` + traceID + `", + "queryType": "traces", + "metrics": [{ "type": "logs", "id": "3", "settings": { "limit": "100" } }], + "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, "trace_id:"+traceID, q.RawQuery) + require.Equal(t, tracesType, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, tracesType, q.Metrics[0].Type) + require.Equal(t, "3", q.Metrics[0].ID) + require.Equal(t, "100", q.Metrics[0].Settings.Get("limit").MustString()) + require.Empty(t, q.BucketAggs) + }) + + t.Run("Should create a traces metric for bare trace id links without metrics", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "` + traceID + `", + "metrics": [], + "bucketAggs": [] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, "trace_id:"+traceID, q.RawQuery) + require.Equal(t, tracesType, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, tracesType, q.Metrics[0].Type) + require.Equal(t, "1", q.Metrics[0].ID) + require.Equal(t, "1000", q.Metrics[0].Settings.Get("limit").MustString()) + }) + + t.Run("Should not let stale queryType override explicit trace search metrics", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "trace_id:` + traceID + `", + "queryType": "traces", + "metrics": [{ "type": "trace_search", "id": "3", "settings": { "limit": "20", "spanLimit": "1000" } }], + "bucketAggs": [] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, "trace_id:"+traceID, q.RawQuery) + require.Empty(t, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, traceSearchType, q.Metrics[0].Type) + require.Equal(t, "20", q.Metrics[0].Settings.Get("limit").MustString()) + require.Equal(t, "1000", q.Metrics[0].Settings.Get("spanLimit").MustString()) + }) + + t.Run("Should not normalize regular metric queries with bare hex filters", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "` + traceID + `", + "metrics": [{ "type": "count", "id": "1" }], + "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, traceID, q.RawQuery) + require.Empty(t, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, countType, q.Metrics[0].Type) + require.Len(t, q.BucketAggs, 1) + }) + + t.Run("Should not normalize trace_id log filters", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "trace_id:` + traceID + `", + "metrics": [{ "type": "logs", "id": "3", "settings": { "limit": "100" } }], + "bucketAggs": [] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, "trace_id:"+traceID, q.RawQuery) + require.Empty(t, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, logsType, q.Metrics[0].Type) + }) + + t.Run("Should not normalize bare trace ids when logs are explicitly selected", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "` + traceID + `", + "metrics": [{ "type": "logs", "id": "3", "settings": { "limit": "100" } }], + "bucketAggs": [] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, traceID, q.RawQuery) + require.Empty(t, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, logsType, q.Metrics[0].Type) + }) + + t.Run("Should not normalize span-specific log correlation queries", func(t *testing.T) { + traceID := "75d7a6e5c07de26e0238cd17a281a190" + body := `{ + "query": "trace_id:` + traceID + ` AND span_id:cccccccccccccccc", + "metrics": [{ "type": "logs", "id": "3", "settings": { "limit": "100" } }], + "bucketAggs": [] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + require.Equal(t, "trace_id:"+traceID+" AND span_id:cccccccccccccccc", q.RawQuery) + require.Empty(t, q.QueryType) + require.Len(t, q.Metrics, 1) + require.Equal(t, logsType, q.Metrics[0].Type) + }) }) } diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index 05fa0ce..3dfe95d 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -67,6 +67,26 @@ func NewQuickwitDatasource(ctx context.Context, settings backend.DataSourceInsta logMessageField = "" } + logsDatasourceUID, ok := jsonData["logsDatasourceUid"].(string) + if !ok { + logsDatasourceUID = "" + } + + logsDatasourceName, ok := jsonData["logsDatasourceName"].(string) + if !ok { + logsDatasourceName = "" + } + + tracesDatasourceUID, ok := jsonData["tracesDatasourceUid"].(string) + if !ok { + tracesDatasourceUID = "" + } + + tracesDatasourceName, ok := jsonData["tracesDatasourceName"].(string) + if !ok { + tracesDatasourceName = "" + } + index, ok := jsonData["index"].(string) if !ok { index = "" @@ -99,6 +119,12 @@ func NewQuickwitDatasource(ctx context.Context, settings backend.DataSourceInsta model := es.DatasourceInfo{ ID: settings.ID, + UID: settings.UID, + Name: settings.Name, + LogsDatasourceUID: logsDatasourceUID, + LogsDatasourceName: logsDatasourceName, + TracesDatasourceUID: tracesDatasourceUID, + TracesDatasourceName: tracesDatasourceName, URL: settings.URL, HTTPClient: httpCli, Database: index, diff --git a/pkg/quickwit/response_parser.go b/pkg/quickwit/response_parser.go index b291c33..4da9ddc 100644 --- a/pkg/quickwit/response_parser.go +++ b/pkg/quickwit/response_parser.go @@ -41,7 +41,7 @@ const ( var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString)) -func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) { +func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configuredFields es.ConfiguredFields, dsInfo *es.DatasourceInfo) (*backend.QueryDataResponse, error) { result := backend.QueryDataResponse{ Responses: backend.Responses{}, } @@ -54,7 +54,7 @@ func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configured byteReader := bytes.NewReader(*rawRes) dec := json.NewDecoder(byteReader) - if isLogsQuery(target) { + if isLogsQuery(target) || isTraceSearchQuery(target) || isTracesQuery(target) { dec.UseNumber() } var res *es.SearchResponse @@ -92,6 +92,18 @@ func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configured return &backend.QueryDataResponse{}, err } result.Responses[target.RefID] = queryRes + } else if isTraceSearchQuery(target) { + err := processTraceSearchResponse(res, target, dsInfo, &queryRes) + if err != nil { + return &backend.QueryDataResponse{}, err + } + result.Responses[target.RefID] = queryRes + } else if isTracesQuery(target) { + err := processTracesResponse(res, target, configuredFields, dsInfo, &queryRes) + if err != nil { + return &backend.QueryDataResponse{}, err + } + result.Responses[target.RefID] = queryRes } else { // Process as metric query result props := make(map[string]string) diff --git a/pkg/quickwit/response_parser_test.go b/pkg/quickwit/response_parser_test.go index 3a8b41a..fe9e1c6 100644 --- a/pkg/quickwit/response_parser_test.go +++ b/pkg/quickwit/response_parser_test.go @@ -3413,6 +3413,10 @@ func TestTrimEdges(t *testing.T) { } func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*backend.QueryDataResponse, error) { + return parseTestResponseWithDatasourceInfo(tsdbQueries, responseBody, nil) +} + +func parseTestResponseWithDatasourceInfo(tsdbQueries map[string]string, responseBody string, dsInfo *es.DatasourceInfo) (*backend.QueryDataResponse, error) { from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) configuredFields := es.ConfiguredFields{ @@ -3448,7 +3452,7 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac return nil, err } - return parseResponse(response.Responses, queries, configuredFields) + return parseResponse(response.Responses, queries, configuredFields, dsInfo) } func requireTimeValue(t *testing.T, expected int64, frame *data.Frame, index int) { diff --git a/pkg/quickwit/response_parser_traces.go b/pkg/quickwit/response_parser_traces.go new file mode 100644 index 0000000..abb2085 --- /dev/null +++ b/pkg/quickwit/response_parser_traces.go @@ -0,0 +1,1253 @@ +package quickwit + +import ( + "encoding/json" + "fmt" + "sort" + "strconv" + "strings" + "time" + "unicode/utf8" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + + es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client" + "github.com/quickwit-oss/quickwit-datasource/pkg/utils" +) + +const ( + tracesType = "traces" + traceSearchType = "trace_search" + quickwitPluginID = "quickwit-quickwit-datasource" + + maxTraceStackTraceBytes = 64 * 1024 + traceStackTraceTruncatedText = "\n... truncated" +) + +type traceKeyValuePair struct { + Key string `json:"key"` + Value interface{} `json:"value"` + Type string `json:"type,omitempty"` +} + +type traceLog struct { + Timestamp float64 `json:"timestamp"` + Fields []traceKeyValuePair `json:"fields"` + Name string `json:"name,omitempty"` +} + +type traceSpanReference struct { + TraceID string `json:"traceID"` + SpanID string `json:"spanID"` + Tags []traceKeyValuePair `json:"tags,omitempty"` +} + +type traceSearchSummary struct { + traceID string + startTimeMillis float64 + endTimeMillis float64 + latestMillis float64 + spanCount int + errorCount int + services map[string]bool + spanNames map[string]bool + rootServiceName string + rootSpanName string + rootStartMillis float64 +} + +type traceGraphSpan struct { + spanID string + parentSpanID string + serviceName string + durationMillis float64 + statusCode int64 +} + +type traceGraphNode struct { + id string + spanCount int + errorCount int + durationMillis float64 +} + +type traceGraphEdge struct { + id string + source string + target string + callCount int + errorCount int + durationMillis float64 +} + +func processTracesResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, dsInfo *es.DatasourceInfo, queryRes *backend.DataResponse) error { + hits := []map[string]interface{}{} + if res.Hits != nil { + hits = res.Hits.Hits + } + + traceIDs := make([]string, 0, len(hits)) + spanIDs := make([]string, 0, len(hits)) + parentSpanIDs := make([]*string, 0, len(hits)) + operationNames := make([]string, 0, len(hits)) + serviceNames := make([]string, 0, len(hits)) + serviceTags := make([]json.RawMessage, 0, len(hits)) + startTimes := make([]float64, 0, len(hits)) + durations := make([]float64, 0, len(hits)) + logs := make([]json.RawMessage, 0, len(hits)) + references := make([]json.RawMessage, 0, len(hits)) + tags := make([]json.RawMessage, 0, len(hits)) + kinds := make([]string, 0, len(hits)) + statusCodes := make([]int64, 0, len(hits)) + statusMessages := make([]string, 0, len(hits)) + instrumentationLibraryNames := make([]string, 0, len(hits)) + instrumentationLibraryVersions := make([]string, 0, len(hits)) + traceStates := make([]string, 0, len(hits)) + errorIconColors := make([]string, 0, len(hits)) + warnings := make([]json.RawMessage, 0, len(hits)) + stackTraces := make([]json.RawMessage, 0, len(hits)) + graphSpans := make([]traceGraphSpan, 0, len(hits)) + + for _, hit := range hits { + source, ok := hit["_source"].(map[string]interface{}) + if !ok || source == nil { + continue + } + + traceID := traceString(source["trace_id"]) + spanID := traceString(source["span_id"]) + if traceID == "" || spanID == "" { + continue + } + + parentSpanID := traceParentSpanID(source["parent_span_id"]) + spanTags := traceSpanTags(source["span_attributes"]) + serviceName := traceString(source["service_name"]) + spanName := traceString(source["span_name"]) + durationMillis := traceDurationMillis(source) + statusCode, statusMessage, errorIconColor := traceSpanStatus(source) + + traceIDs = append(traceIDs, traceID) + spanIDs = append(spanIDs, spanID) + parentSpanIDs = append(parentSpanIDs, parentSpanID) + operationNames = append(operationNames, spanName) + serviceNames = append(serviceNames, serviceName) + serviceTags = append(serviceTags, traceJSONRawMessage(traceServiceTags(source["resource_attributes"], serviceName))) + startTimes = append(startTimes, traceStartTimeMillis(source, configuredFields)) + durations = append(durations, durationMillis) + logs = append(logs, traceJSONRawMessage(traceLogs(source["events"]))) + references = append(references, traceJSONRawMessage(traceReferences(source["links"]))) + tags = append(tags, traceJSONRawMessage(spanTags)) + kinds = append(kinds, traceSpanKind(source["span_kind"])) + statusCodes = append(statusCodes, statusCode) + statusMessages = append(statusMessages, statusMessage) + errorIconColors = append(errorIconColors, errorIconColor) + instrumentationLibraryNames = append(instrumentationLibraryNames, traceString(source["scope_name"])) + instrumentationLibraryVersions = append(instrumentationLibraryVersions, traceString(source["scope_version"])) + traceStates = append(traceStates, traceString(source["trace_state"])) + warnings = append(warnings, traceJSONRawMessage(traceWarnings(source, statusCode, statusMessage))) + stackTraces = append(stackTraces, traceJSONRawMessage(traceStackTraces(source))) + + parentID := "" + if parentSpanID != nil { + parentID = *parentSpanID + } + graphSpans = append(graphSpans, traceGraphSpan{ + spanID: spanID, + parentSpanID: parentID, + serviceName: serviceName, + durationMillis: durationMillis, + statusCode: statusCode, + }) + } + + spanIDField := data.NewField("spanID", nil, spanIDs) + if links := traceToLogsDataLinks(dsInfo); len(links) > 0 { + spanIDField.SetConfig(&data.FieldConfig{Links: links}) + } + + frame := data.NewFrame("", + data.NewField("traceID", nil, traceIDs), + spanIDField, + data.NewField("parentSpanID", nil, parentSpanIDs), + data.NewField("operationName", nil, operationNames), + data.NewField("serviceName", nil, serviceNames), + data.NewField("serviceTags", nil, serviceTags), + data.NewField("startTime", nil, startTimes), + data.NewField("duration", nil, durations), + data.NewField("logs", nil, logs), + data.NewField("references", nil, references), + data.NewField("tags", nil, tags), + data.NewField("kind", nil, kinds), + data.NewField("statusCode", nil, statusCodes), + data.NewField("statusMessage", nil, statusMessages), + data.NewField("instrumentationLibraryName", nil, instrumentationLibraryNames), + data.NewField("instrumentationLibraryVersion", nil, instrumentationLibraryVersions), + data.NewField("traceState", nil, traceStates), + data.NewField("errorIconColor", nil, errorIconColors), + data.NewField("warnings", nil, warnings), + data.NewField("stackTraces", nil, stackTraces), + ) + setPreferredVisType(frame, data.VisTypeTrace) + frames := data.Frames{frame} + frames = append(frames, traceNodeGraphFrames(graphSpans)...) + queryRes.Frames = frames + return nil +} + +func processTraceSearchResponse(res *es.SearchResponse, target *Query, dsInfo *es.DatasourceInfo, queryRes *backend.DataResponse) error { + hits := []map[string]interface{}{} + if res.Hits != nil { + hits = res.Hits.Hits + } + + summariesByTraceID := map[string]*traceSearchSummary{} + traceOrder := []string{} + + for _, hit := range hits { + source, ok := hit["_source"].(map[string]interface{}) + if !ok || source == nil { + continue + } + + traceID := traceString(source["trace_id"]) + spanID := traceString(source["span_id"]) + if traceID == "" || spanID == "" { + continue + } + + summary, exists := summariesByTraceID[traceID] + if !exists { + summary = &traceSearchSummary{ + traceID: traceID, + startTimeMillis: 0, + endTimeMillis: 0, + latestMillis: 0, + services: map[string]bool{}, + spanNames: map[string]bool{}, + rootStartMillis: 0, + } + summariesByTraceID[traceID] = summary + traceOrder = append(traceOrder, traceID) + } + + startTimeMillis := traceStartTimeMillis(source, es.ConfiguredFields{}) + endTimeMillis := traceEndTimeMillis(source, startTimeMillis) + if startTimeMillis > 0 && (summary.startTimeMillis == 0 || startTimeMillis < summary.startTimeMillis) { + summary.startTimeMillis = startTimeMillis + } + if endTimeMillis > summary.endTimeMillis { + summary.endTimeMillis = endTimeMillis + } + if startTimeMillis > summary.latestMillis { + summary.latestMillis = startTimeMillis + } + + summary.spanCount++ + if traceIsErrorSpan(source) { + summary.errorCount++ + } + if serviceName := traceString(source["service_name"]); serviceName != "" { + summary.services[serviceName] = true + } + if spanName := traceString(source["span_name"]); spanName != "" { + summary.spanNames[spanName] = true + } + + if traceParentSpanID(source["parent_span_id"]) == nil { + if summary.rootSpanName == "" || startTimeMillis < summary.rootStartMillis || summary.rootStartMillis == 0 { + summary.rootSpanName = traceString(source["span_name"]) + summary.rootServiceName = traceString(source["service_name"]) + summary.rootStartMillis = startTimeMillis + } + } + } + + summaries := make([]*traceSearchSummary, 0, len(traceOrder)) + for _, traceID := range traceOrder { + summary := summariesByTraceID[traceID] + if summary.rootSpanName == "" { + summary.rootSpanName = firstSortedMapKey(summary.spanNames) + summary.rootServiceName = firstSortedMapKey(summary.services) + } + summaries = append(summaries, summary) + } + sort.SliceStable(summaries, func(i, j int) bool { + return summaries[i].startTimeMillis > summaries[j].startTimeMillis + }) + + limit := defaultSize + if len(target.Metrics) > 0 { + limit = stringToIntWithDefaultValue(target.Metrics[0].Settings.Get("limit").MustString(), 20) + } + if limit > 0 && len(summaries) > limit { + summaries = summaries[:limit] + } + + traceIDs := make([]string, 0, len(summaries)) + startTimes := make([]time.Time, 0, len(summaries)) + durations := make([]float64, 0, len(summaries)) + spanCounts := make([]int64, 0, len(summaries)) + services := make([]string, 0, len(summaries)) + rootServices := make([]string, 0, len(summaries)) + rootSpans := make([]string, 0, len(summaries)) + matchedSpans := make([]string, 0, len(summaries)) + errorCounts := make([]int64, 0, len(summaries)) + + for _, summary := range summaries { + traceIDs = append(traceIDs, summary.traceID) + startTimes = append(startTimes, time.UnixMilli(int64(summary.startTimeMillis)).UTC()) + duration := summary.endTimeMillis - summary.startTimeMillis + if duration < 0 { + duration = 0 + } + durations = append(durations, duration) + spanCounts = append(spanCounts, int64(summary.spanCount)) + services = append(services, joinSortedMapKeys(summary.services, ", ")) + rootServices = append(rootServices, summary.rootServiceName) + rootSpans = append(rootSpans, summary.rootSpanName) + matchedSpans = append(matchedSpans, joinSortedMapKeysWithLimit(summary.spanNames, ", ", 5)) + errorCounts = append(errorCounts, int64(summary.errorCount)) + } + + traceIDField := data.NewField("traceID", nil, traceIDs).SetConfig(&data.FieldConfig{ + DisplayName: "Trace ID", + Links: traceSearchDataLinks(dsInfo), + }) + durationField := data.NewField("duration", nil, durations).SetConfig(&data.FieldConfig{ + DisplayName: "Duration", + Unit: "ms", + }) + errorField := data.NewField("errors", nil, errorCounts).SetConfig(traceSearchErrorsFieldConfig()) + + frame := data.NewFrame("Trace search", + traceIDField, + data.NewField("startTime", nil, startTimes).SetConfig(&data.FieldConfig{DisplayName: "Start time"}), + durationField, + data.NewField("spans", nil, spanCounts).SetConfig(&data.FieldConfig{DisplayName: "Spans"}), + data.NewField("services", nil, services).SetConfig(&data.FieldConfig{DisplayName: "Services"}), + data.NewField("rootService", nil, rootServices).SetConfig(&data.FieldConfig{DisplayName: "Root service"}), + data.NewField("rootSpan", nil, rootSpans).SetConfig(&data.FieldConfig{DisplayName: "Root span"}), + data.NewField("matchedSpans", nil, matchedSpans).SetConfig(&data.FieldConfig{DisplayName: "Matched spans"}), + errorField, + ) + setPreferredVisType(frame, data.VisTypeTable) + queryRes.Frames = data.Frames{frame} + return nil +} + +func traceParentSpanID(value interface{}) *string { + parentSpanID := traceString(value) + if parentSpanID == "" || strings.Trim(parentSpanID, "0") == "" { + return nil + } + return &parentSpanID +} + +func traceString(value interface{}) string { + switch v := value.(type) { + case nil: + return "" + case string: + return v + case json.Number: + return v.String() + default: + return fmt.Sprintf("%v", v) + } +} + +func traceNumber(value interface{}) (float64, bool) { + switch v := value.(type) { + case json.Number: + parsed, err := v.Float64() + return parsed, err == nil + case float64: + return v, true + case float32: + return float64(v), true + case int: + return float64(v), true + case int64: + return float64(v), true + case int32: + return float64(v), true + case uint64: + return float64(v), true + case uint32: + return float64(v), true + case string: + parsed, err := strconv.ParseFloat(v, 64) + return parsed, err == nil + default: + return 0, false + } +} + +func traceTimestampMillis(value interface{}, outputFormat string) (float64, bool) { + switch typedValue := value.(type) { + case json.Number: + return traceUnixTimestampMillisFromString(typedValue.String(), outputFormat) + case string: + stringValue := strings.TrimSpace(typedValue) + if stringValue == "" { + return 0, false + } + if timestamp, ok := traceUnixTimestampMillisFromString(stringValue, outputFormat); ok { + return timestamp, true + } + if outputFormat != "" { + if parsedTime, err := utils.ParseTime(stringValue, outputFormat); err == nil { + return float64(parsedTime.UnixNano()) / 1e6, true + } + } + if parsedTime, err := time.Parse(time.RFC3339Nano, stringValue); err == nil { + return float64(parsedTime.UnixNano()) / 1e6, true + } + return 0, false + case int: + return traceUnixTimestampMillisFromInt(int64(typedValue), outputFormat) + case int64: + return traceUnixTimestampMillisFromInt(typedValue, outputFormat) + case int32: + return traceUnixTimestampMillisFromInt(int64(typedValue), outputFormat) + case uint64: + return traceUnixTimestampMillisFromFloat(float64(typedValue), outputFormat) + case uint32: + return traceUnixTimestampMillisFromFloat(float64(typedValue), outputFormat) + case float64: + return traceUnixTimestampMillisFromFloat(typedValue, outputFormat) + case float32: + return traceUnixTimestampMillisFromFloat(float64(typedValue), outputFormat) + } + + return 0, false +} + +func traceUnixTimestampMillisFromString(value string, outputFormat string) (float64, bool) { + switch outputFormat { + case TimestampSecs, TimestampMillis, TimestampMicros, TimestampNanos: + default: + return 0, false + } + + if parsed, err := strconv.ParseInt(value, 10, 64); err == nil { + return traceUnixTimestampMillisFromInt(parsed, outputFormat) + } + if parsed, err := strconv.ParseFloat(value, 64); err == nil { + return traceUnixTimestampMillisFromFloat(parsed, outputFormat) + } + return 0, false +} + +func traceUnixTimestampMillisFromInt(value int64, outputFormat string) (float64, bool) { + switch outputFormat { + case TimestampNanos: + return float64(value/1_000_000) + float64(value%1_000_000)/1_000_000, true + case TimestampMicros: + return float64(value/1_000) + float64(value%1_000)/1_000, true + case TimestampMillis: + return float64(value), true + case TimestampSecs: + return float64(value) * 1000, true + default: + return 0, false + } +} + +func traceUnixTimestampMillisFromFloat(value float64, outputFormat string) (float64, bool) { + switch outputFormat { + case TimestampNanos: + return value / 1_000_000, true + case TimestampMicros: + return value / 1_000, true + case TimestampMillis: + return value, true + case TimestampSecs: + return value * 1000, true + default: + return 0, false + } +} + +func traceStartTimeMillis(source map[string]interface{}, configuredFields es.ConfiguredFields) float64 { + if startTime, ok := traceTimestampMillis(source["span_start_timestamp_nanos"], TimestampNanos); ok { + return startTime + } + if configuredFields.TimeField != "" { + if startTime, ok := traceTimestampMillis(source[configuredFields.TimeField], configuredFields.TimeOutputFormat); ok { + return startTime + } + } + return 0 +} + +func traceDurationMillis(source map[string]interface{}) float64 { + // Quickwit's otel-traces-v0_9 mapping types span_duration_millis as u64, + // which truncates sub-millisecond spans to 0. Prefer the nanos diff so we + // keep microsecond precision; fall back to the millis field only if the + // timestamps are missing. + start, startOK := traceTimestampMillis(source["span_start_timestamp_nanos"], TimestampNanos) + end, endOK := traceTimestampMillis(source["span_end_timestamp_nanos"], TimestampNanos) + if startOK && endOK && end >= start { + return end - start + } + if duration, ok := traceNumber(source["span_duration_millis"]); ok { + return duration + } + return 0 +} + +func traceEndTimeMillis(source map[string]interface{}, startTimeMillis float64) float64 { + if endTimeMillis, ok := traceTimestampMillis(source["span_end_timestamp_nanos"], TimestampNanos); ok { + return endTimeMillis + } + return startTimeMillis + traceDurationMillis(source) +} + +func traceSpanKind(value interface{}) string { + kindValue, ok := traceNumber(value) + if !ok { + return "" + } + + switch int(kindValue) { + case 1: + return "internal" + case 2: + return "server" + case 3: + return "client" + case 4: + return "producer" + case 5: + return "consumer" + default: + return "" + } +} + +func traceSpanStatus(source map[string]interface{}) (int64, string, string) { + statusMap, _ := source["span_status"].(map[string]interface{}) + statusCodeValue := traceAttributeValue(statusMap["code"]) + statusMessage := traceString(traceAttributeValue(statusMap["message"])) + attributeError := traceStatusAttributesIndicateError(source["span_attributes"]) + + if code, ok := traceNumber(statusCodeValue); ok { + intCode := int64(code) + switch intCode { + case 2: + return intCode, statusMessage, "red" + case 1: + return intCode, statusMessage, "" + default: + if attributeError { + return 2, statusMessage, "red" + } + return 0, statusMessage, "" + } + } + + switch strings.ToLower(traceString(statusCodeValue)) { + case "error": + return 2, statusMessage, "red" + case "ok": + return 1, statusMessage, "" + case "unset": + if attributeError { + return 2, statusMessage, "red" + } + return 0, statusMessage, "" + default: + if attributeError { + return 2, statusMessage, "red" + } + return 0, statusMessage, "" + } +} + +func traceIsErrorSpan(source map[string]interface{}) bool { + statusCode, _, _ := traceSpanStatus(source) + return statusCode == 2 +} + +func traceAttributeBool(attributes interface{}, key string) bool { + attributesMap, ok := attributes.(map[string]interface{}) + if !ok { + return false + } + + value, exists := attributesMap[key] + if !exists { + return false + } + + switch typedValue := traceAttributeValue(value).(type) { + case bool: + return typedValue + case string: + return strings.EqualFold(typedValue, "true") || strings.EqualFold(typedValue, "error") + default: + return false + } +} + +func traceStatusAttributesIndicateError(attributes interface{}) bool { + return traceAttributeBool(attributes, "error") || traceAttributeBool(attributes, "otel.status_code") +} + +func traceWarnings(source map[string]interface{}, statusCode int64, statusMessage string) []string { + warnings := []string{} + if statusCode == 2 { + if statusMessage != "" { + warnings = append(warnings, fmt.Sprintf("Span status: ERROR - %s", statusMessage)) + } else { + warnings = append(warnings, "Span status: ERROR") + } + } + if traceAttributeBool(source["span_attributes"], "error") { + warnings = append(warnings, "Span attribute error=true") + } + + for _, dropped := range []struct { + field string + label string + }{ + {field: "span_dropped_attributes_count", label: "span attributes"}, + {field: "span_dropped_events_count", label: "span events"}, + {field: "span_dropped_links_count", label: "span links"}, + } { + if count, ok := traceNumber(source[dropped.field]); ok && count > 0 { + warnings = append(warnings, fmt.Sprintf("Dropped %s: %s", dropped.label, formatTraceCount(count))) + } + } + return warnings +} + +var traceStackTraceKeys = []string{ + "exception.stacktrace", + "exception.stack_trace", + "exception.stack", + "stacktrace", + "stack_trace", +} + +func traceStackTraces(source map[string]interface{}) []string { + stackTraces := []string{} + seen := map[string]bool{} + appendStackTrace := func(value interface{}) { + stackTrace := traceStackTraceString(traceAttributeValue(value)) + stackTrace = truncateTraceStackTrace(stackTrace) + if stackTrace == "" || seen[stackTrace] { + return + } + seen[stackTrace] = true + stackTraces = append(stackTraces, stackTrace) + } + + if attributes, ok := source["span_attributes"].(map[string]interface{}); ok { + for _, key := range traceStackTraceKeys { + if value, exists := attributes[key]; exists { + appendStackTrace(value) + } + } + } + + events, _ := source["events"].([]interface{}) + for _, event := range events { + eventMap, ok := event.(map[string]interface{}) + if !ok { + continue + } + attributes, ok := eventMap["event_attributes"].(map[string]interface{}) + if !ok { + continue + } + for _, key := range traceStackTraceKeys { + if value, exists := attributes[key]; exists { + appendStackTrace(value) + } + } + } + return stackTraces +} + +func truncateTraceStackTrace(stackTrace string) string { + if len(stackTrace) <= maxTraceStackTraceBytes { + return stackTrace + } + + limit := maxTraceStackTraceBytes - len(traceStackTraceTruncatedText) + if limit <= 0 { + return stackTrace[:maxTraceStackTraceBytes] + } + for limit > 0 && !utf8.RuneStart(stackTrace[limit]) { + limit-- + } + return stackTrace[:limit] + traceStackTraceTruncatedText +} + +func traceStackTraceString(value interface{}) string { + switch typedValue := value.(type) { + case nil: + return "" + case string: + return typedValue + case []interface{}: + lines := make([]string, 0, len(typedValue)) + for _, line := range typedValue { + lineString := traceString(traceAttributeValue(line)) + if lineString != "" { + lines = append(lines, lineString) + } + } + return strings.Join(lines, "\n") + default: + bytes, err := json.Marshal(typedValue) + if err == nil { + return string(bytes) + } + return traceString(typedValue) + } +} + +func formatTraceCount(value float64) string { + if value == float64(int64(value)) { + return strconv.FormatInt(int64(value), 10) + } + return strconv.FormatFloat(value, 'f', -1, 64) +} + +func traceJSONRawMessage(value interface{}) json.RawMessage { + bytes, err := json.Marshal(value) + if err != nil { + return json.RawMessage("[]") + } + return json.RawMessage(bytes) +} + +func traceKeyValuePairs(value interface{}) []traceKeyValuePair { + switch typedValue := value.(type) { + case map[string]interface{}: + pairs := []traceKeyValuePair{} + traceAppendMapKeyValuePairs(&pairs, "", typedValue) + return pairs + case []interface{}: + pairs := make([]traceKeyValuePair, 0, len(typedValue)) + for _, item := range typedValue { + itemMap, ok := item.(map[string]interface{}) + if !ok { + continue + } + key := traceString(itemMap["key"]) + if key == "" { + continue + } + value := traceAttributeValue(itemMap["value"]) + pairs = append(pairs, traceKeyValuePair{Key: key, Value: value, Type: traceValueType(value)}) + } + return pairs + default: + return []traceKeyValuePair{} + } +} + +func traceServiceTags(value interface{}, serviceName string) []traceKeyValuePair { + pairs := traceKeyValuePairs(value) + if serviceName == "" { + return pairs + } + + if _, exists := traceFindKeyValuePair(pairs, "service.name"); exists { + return pairs + } + + return append(pairs, traceKeyValuePair{ + Key: "service.name", + Value: serviceName, + Type: traceValueType(serviceName), + }) +} + +func traceSpanTags(value interface{}) []traceKeyValuePair { + pairs := traceKeyValuePairs(value) + peerService, hasPeerService := traceFindKeyValuePair(pairs, "peer.service") + servicePeerName, hasServicePeerName := traceFindKeyValuePair(pairs, "service.peer.name") + + // Grafana's trace view still keys the uninstrumented peer-service hint on + // peer.service. Preserve the current OTel service.peer.name attribute and + // add the legacy alias only when needed. + if hasServicePeerName && !hasPeerService { + pairs = append(pairs, traceKeyValuePair{ + Key: "peer.service", + Value: servicePeerName.Value, + Type: servicePeerName.Type, + }) + } + if hasPeerService && !hasServicePeerName { + pairs = append(pairs, traceKeyValuePair{ + Key: "service.peer.name", + Value: peerService.Value, + Type: peerService.Type, + }) + } + return pairs +} + +func traceFindKeyValuePair(pairs []traceKeyValuePair, key string) (traceKeyValuePair, bool) { + for _, pair := range pairs { + if pair.Key == key { + return pair, true + } + } + return traceKeyValuePair{}, false +} + +func traceAppendMapKeyValuePairs(pairs *[]traceKeyValuePair, prefix string, value map[string]interface{}) { + keys := make([]string, 0, len(value)) + for key := range value { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + fullKey := key + if prefix != "" { + fullKey = prefix + "." + key + } + + rawValue := traceAttributeValue(value[key]) + if nestedValue, ok := rawValue.(map[string]interface{}); ok { + traceAppendMapKeyValuePairs(pairs, fullKey, nestedValue) + continue + } + + *pairs = append(*pairs, traceKeyValuePair{Key: fullKey, Value: rawValue, Type: traceValueType(rawValue)}) + } +} + +func traceAttributeValue(value interface{}) interface{} { + valueMap, ok := value.(map[string]interface{}) + if !ok { + return value + } + + for _, key := range []string{"string_value", "int_value", "double_value", "bool_value", "bytes_value", "array_value", "kvlist_value"} { + if wrappedValue, exists := valueMap[key]; exists { + return wrappedValue + } + } + return value +} + +func traceValueType(value interface{}) string { + switch value.(type) { + case string: + return "string" + case json.Number, float64, float32, int, int64, int32, uint64, uint32: + return "number" + case bool: + return "boolean" + default: + return "" + } +} + +func traceLogs(value interface{}) []traceLog { + events, ok := value.([]interface{}) + if !ok { + return []traceLog{} + } + + logs := make([]traceLog, 0, len(events)) + for _, event := range events { + eventMap, ok := event.(map[string]interface{}) + if !ok { + continue + } + + log := traceLog{ + Name: firstTraceString(eventMap, "event_name", "name"), + Fields: traceKeyValuePairs(eventMap["event_attributes"]), + } + if timestamp, ok := firstTraceTimestampMillis(eventMap, "event_timestamp_nanos", "timestamp_nanos", "time_unix_nano", "timestamp", "time"); ok { + log.Timestamp = timestamp + } + + traceAppendEventFields(&log.Fields, eventMap) + logs = append(logs, log) + } + return logs +} + +func traceAppendEventFields(fields *[]traceKeyValuePair, eventMap map[string]interface{}) { + keys := make([]string, 0, len(eventMap)) + for key := range eventMap { + switch key { + case "event_attributes", "event_name", "name", "event_timestamp_nanos", "timestamp_nanos", "time_unix_nano", "timestamp", "time": + continue + default: + keys = append(keys, key) + } + } + sort.Strings(keys) + + for _, key := range keys { + value := traceAttributeValue(eventMap[key]) + *fields = append(*fields, traceKeyValuePair{Key: key, Value: value, Type: traceValueType(value)}) + } +} + +func traceReferences(value interface{}) []traceSpanReference { + links, ok := value.([]interface{}) + if !ok { + return []traceSpanReference{} + } + + references := make([]traceSpanReference, 0, len(links)) + for _, link := range links { + linkMap, ok := link.(map[string]interface{}) + if !ok { + continue + } + + traceID := firstTraceString(linkMap, "trace_id", "traceID") + spanID := firstTraceString(linkMap, "span_id", "spanID") + if traceID == "" || spanID == "" { + continue + } + + tags := traceKeyValuePairs(linkMap["attributes"]) + if len(tags) == 0 { + tags = traceKeyValuePairs(linkMap["link_attributes"]) + } + references = append(references, traceSpanReference{ + TraceID: traceID, + SpanID: spanID, + Tags: tags, + }) + } + return references +} + +func firstSortedMapKey(values map[string]bool) string { + keys := sortedMapKeys(values) + if len(keys) == 0 { + return "" + } + return keys[0] +} + +func joinSortedMapKeys(values map[string]bool, separator string) string { + return strings.Join(sortedMapKeys(values), separator) +} + +func joinSortedMapKeysWithLimit(values map[string]bool, separator string, limit int) string { + keys := sortedMapKeys(values) + if limit > 0 && len(keys) > limit { + keys = keys[:limit] + } + return strings.Join(keys, separator) +} + +func sortedMapKeys(values map[string]bool) []string { + keys := make([]string, 0, len(values)) + for key := range values { + keys = append(keys, key) + } + sort.Strings(keys) + return keys +} + +func traceSearchDataLinks(dsInfo *es.DatasourceInfo) []data.DataLink { + if dsInfo == nil { + return nil + } + + datasourceUID := dsInfo.UID + datasourceName := dsInfo.Name + if dsInfo.TracesDatasourceUID != "" { + datasourceUID = dsInfo.TracesDatasourceUID + datasourceName = dsInfo.TracesDatasourceName + } + if datasourceUID == "" { + return nil + } + + return traceInternalDataLinks("Open trace", datasourceUID, datasourceName, "trace_id:${__value.raw}", tracesType, "1000") +} + +func traceToLogsDataLinks(dsInfo *es.DatasourceInfo) []data.DataLink { + if dsInfo == nil { + return nil + } + + datasourceUID := dsInfo.UID + datasourceName := dsInfo.Name + if dsInfo.LogsDatasourceUID != "" { + datasourceUID = dsInfo.LogsDatasourceUID + datasourceName = dsInfo.LogsDatasourceName + if datasourceName == "" { + datasourceName = "Quickwit logs" + } + } + if datasourceUID == "" { + return nil + } + + return traceInternalDataLinks("Logs for span", datasourceUID, datasourceName, "trace_id:${__span.traceId} AND span_id:${__span.spanId}", logsType, "100") +} + +func traceInternalDataLinks(title string, datasourceUID string, datasourceName string, query string, metricType string, limit string) []data.DataLink { + return []data.DataLink{ + { + Title: title, + Internal: &data.InternalDataLink{ + DatasourceUID: datasourceUID, + DatasourceName: datasourceName, + Query: map[string]interface{}{ + "refId": "A", + "query": query, + "queryType": metricType, + "datasource": map[string]string{"type": quickwitPluginID, "uid": datasourceUID}, + "filters": []interface{}{}, + "bucketAggs": []interface{}{}, + "metrics": []map[string]interface{}{ + { + "id": "1", + "type": metricType, + "settings": map[string]string{"limit": limit}, + }, + }, + }, + }, + }, + } +} + +func traceNodeGraphFrames(spans []traceGraphSpan) data.Frames { + if len(spans) == 0 { + return data.Frames{} + } + + nodesByID := map[string]*traceGraphNode{} + spanServiceByID := map[string]string{} + for _, span := range spans { + if span.serviceName == "" { + continue + } + spanServiceByID[span.spanID] = span.serviceName + node, exists := nodesByID[span.serviceName] + if !exists { + node = &traceGraphNode{id: span.serviceName} + nodesByID[span.serviceName] = node + } + node.spanCount++ + node.durationMillis += span.durationMillis + if span.statusCode == 2 { + node.errorCount++ + } + } + + edgesByID := map[string]*traceGraphEdge{} + for _, span := range spans { + sourceService := spanServiceByID[span.parentSpanID] + targetService := span.serviceName + if sourceService == "" || targetService == "" || sourceService == targetService { + continue + } + edgeID := sourceService + "->" + targetService + edge, exists := edgesByID[edgeID] + if !exists { + edge = &traceGraphEdge{id: edgeID, source: sourceService, target: targetService} + edgesByID[edgeID] = edge + } + edge.callCount++ + edge.durationMillis += span.durationMillis + if span.statusCode == 2 { + edge.errorCount++ + } + } + if len(nodesByID) == 0 { + return data.Frames{} + } + + return data.Frames{traceNodesFrame(nodesByID), traceEdgesFrame(edgesByID)} +} + +func traceNodesFrame(nodesByID map[string]*traceGraphNode) *data.Frame { + nodeIDs := sortedTraceGraphNodeIDs(nodesByID) + ids := make([]string, 0, len(nodeIDs)) + titles := make([]string, 0, len(nodeIDs)) + subtitles := make([]string, 0, len(nodeIDs)) + mainStats := make([]string, 0, len(nodeIDs)) + secondaryStats := make([]string, 0, len(nodeIDs)) + colors := make([]string, 0, len(nodeIDs)) + okArcs := make([]float64, 0, len(nodeIDs)) + errorArcs := make([]float64, 0, len(nodeIDs)) + errorDetails := make([]int64, 0, len(nodeIDs)) + durationDetails := make([]float64, 0, len(nodeIDs)) + + for _, id := range nodeIDs { + node := nodesByID[id] + ids = append(ids, id) + titles = append(titles, id) + subtitles = append(subtitles, "service") + mainStats = append(mainStats, fmt.Sprintf("%d spans", node.spanCount)) + secondaryStats = append(secondaryStats, fmt.Sprintf("%.1f ms", node.durationMillis)) + colors = append(colors, traceServiceColor(id)) + if node.spanCount > 0 { + errorRatio := float64(node.errorCount) / float64(node.spanCount) + errorArcs = append(errorArcs, errorRatio) + okArcs = append(okArcs, 1-errorRatio) + } else { + errorArcs = append(errorArcs, 0) + okArcs = append(okArcs, 1) + } + errorDetails = append(errorDetails, int64(node.errorCount)) + durationDetails = append(durationDetails, node.durationMillis) + } + + frame := data.NewFrame("nodes", + data.NewField("id", nil, ids), + data.NewField("title", nil, titles), + data.NewField("subtitle", nil, subtitles), + data.NewField("mainstat", nil, mainStats), + data.NewField("secondarystat", nil, secondaryStats), + data.NewField("color", nil, colors), + data.NewField("arc__ok", nil, okArcs).SetConfig(traceFixedColorFieldConfig("green")), + data.NewField("arc__errors", nil, errorArcs).SetConfig(traceFixedColorFieldConfig("red")), + data.NewField("detail__errors", nil, errorDetails).SetConfig(&data.FieldConfig{DisplayName: "Errors"}), + data.NewField("detail__duration_ms", nil, durationDetails).SetConfig(&data.FieldConfig{DisplayName: "Total duration", Unit: "ms"}), + ) + setPreferredVisType(frame, data.VisTypeNodeGraph) + return frame +} + +func traceEdgesFrame(edgesByID map[string]*traceGraphEdge) *data.Frame { + edgeIDs := sortedTraceGraphEdgeIDs(edgesByID) + ids := make([]string, 0, len(edgeIDs)) + sources := make([]string, 0, len(edgeIDs)) + targets := make([]string, 0, len(edgeIDs)) + mainStats := make([]string, 0, len(edgeIDs)) + secondaryStats := make([]string, 0, len(edgeIDs)) + thicknesses := make([]float64, 0, len(edgeIDs)) + colors := make([]string, 0, len(edgeIDs)) + errorDetails := make([]int64, 0, len(edgeIDs)) + + for _, id := range edgeIDs { + edge := edgesByID[id] + ids = append(ids, id) + sources = append(sources, edge.source) + targets = append(targets, edge.target) + mainStats = append(mainStats, fmt.Sprintf("%d calls", edge.callCount)) + secondaryStats = append(secondaryStats, fmt.Sprintf("%.1f ms", edge.durationMillis)) + thicknesses = append(thicknesses, 1+float64(edge.callCount-1)*0.5) + if edge.errorCount > 0 { + colors = append(colors, "#d44a3a") + } else { + colors = append(colors, "#7eb26d") + } + errorDetails = append(errorDetails, int64(edge.errorCount)) + } + + frame := data.NewFrame("edges", + data.NewField("id", nil, ids), + data.NewField("source", nil, sources), + data.NewField("target", nil, targets), + data.NewField("mainstat", nil, mainStats), + data.NewField("secondarystat", nil, secondaryStats), + data.NewField("thickness", nil, thicknesses), + data.NewField("color", nil, colors), + data.NewField("detail__errors", nil, errorDetails).SetConfig(&data.FieldConfig{DisplayName: "Errors"}), + ) + setPreferredVisType(frame, data.VisTypeNodeGraph) + return frame +} + +func traceFixedColorFieldConfig(color string) *data.FieldConfig { + return &data.FieldConfig{ + Color: map[string]interface{}{ + "mode": "fixed", + "fixedColor": color, + }, + } +} + +func sortedTraceGraphNodeIDs(nodesByID map[string]*traceGraphNode) []string { + ids := make([]string, 0, len(nodesByID)) + for id := range nodesByID { + ids = append(ids, id) + } + sort.Strings(ids) + return ids +} + +func sortedTraceGraphEdgeIDs(edgesByID map[string]*traceGraphEdge) []string { + ids := make([]string, 0, len(edgesByID)) + for id := range edgesByID { + ids = append(ids, id) + } + sort.Strings(ids) + return ids +} + +func traceServiceColor(serviceName string) string { + palette := []string{ + "#7eb26d", + "#eab839", + "#6ed0e0", + "#ef843c", + "#e24d42", + "#1f78c1", + "#ba43a9", + "#705da0", + "#508642", + "#cca300", + } + hash := 0 + for _, char := range serviceName { + hash = (hash*31 + int(char)) & 0x7fffffff + } + return palette[hash%len(palette)] +} + +func traceSearchErrorsFieldConfig() *data.FieldConfig { + return &data.FieldConfig{ + DisplayName: "Errors", + Color: map[string]interface{}{ + "mode": "thresholds", + }, + Thresholds: &data.ThresholdsConfig{ + Mode: data.ThresholdsModeAbsolute, + Steps: []data.Threshold{ + {Color: "green"}, + data.NewThreshold(1, "red", ""), + }, + }, + } +} + +func firstTraceString(values map[string]interface{}, keys ...string) string { + for _, key := range keys { + value := traceString(values[key]) + if value != "" { + return value + } + } + return "" +} + +func firstTraceTimestampMillis(values map[string]interface{}, keys ...string) (float64, bool) { + for _, key := range keys { + value, ok := values[key] + if !ok { + continue + } + if timestamp, ok := traceTimestampMillis(value, TimestampNanos); ok { + return timestamp, true + } + } + return 0, false +} diff --git a/pkg/quickwit/response_parser_traces_test.go b/pkg/quickwit/response_parser_traces_test.go new file mode 100644 index 0000000..e256e65 --- /dev/null +++ b/pkg/quickwit/response_parser_traces_test.go @@ -0,0 +1,544 @@ +package quickwit + +import ( + "encoding/json" + "strings" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/stretchr/testify/require" + + es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client" +) + +func TestProcessTracesResponse(t *testing.T) { + query := []byte(` + [ + { + "refId": "A", + "metrics": [{ "type": "traces", "id": "1", "settings": { "limit": "1000" } }], + "query": "trace_id:3c191d03fa8be0653c191d03fa8be065" + } + ] + `) + + response := []byte(` + { + "responses": [ + { + "hits": { + "hits": [ + { + "_source": { + "trace_id": "3c191d03fa8be0653c191d03fa8be065", + "span_id": "1111111111111111", + "parent_span_id": "", + "service_name": "checkout", + "resource_attributes": { + "host.name": "node-1", + "service.namespace": "prod" + }, + "span_name": "GET /checkout", + "span_start_timestamp_nanos": 1678974011000000000, + "span_end_timestamp_nanos": 1678974011100000000, + "span_duration_millis": 100, + "span_attributes": { + "http.method": "GET", + "http.status_code": 200 + }, + "events": [ + { + "event_name": "exception", + "event_timestamp_nanos": 1678974011050000000, + "event_attributes": { + "exception.type": "panic" + } + } + ], + "links": [ + { + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "span_id": "2222222222222222", + "attributes": { + "link.type": "follows_from" + } + } + ] + } + }, + { + "_source": { + "trace_id": "3c191d03fa8be0653c191d03fa8be065", + "span_id": "3333333333333333", + "parent_span_id": "1111111111111111", + "service_name": "payments", + "resource_attributes": { + "host.name": "node-2" + }, + "span_name": "POST /charge", + "span_start_timestamp_nanos": 1678974011020000000, + "span_duration_millis": 25, + "span_status": { "code": "ERROR", "message": "declined" }, + "span_attributes": { + "db.system": "postgresql", + "service.peer.name": "stripe-api", + "exception.stacktrace": "SyntheticFailure: declined\n at authorizePayment" + } + } + } + ] + } + } + ] + } + `) + + result, err := queryDataTest(query, response) + require.NoError(t, err) + + require.Len(t, result.response.Responses, 1) + frames := result.response.Responses["A"].Frames + require.Len(t, frames, 3) + + traceFrame := frames[0] + require.Equal(t, data.VisTypeTrace, string(traceFrame.Meta.PreferredVisualization)) + + fields := make(map[string]*data.Field) + for _, field := range traceFrame.Fields { + fields[field.Name] = field + } + + require.Equal(t, "3c191d03fa8be0653c191d03fa8be065", fields["traceID"].At(0)) + require.Equal(t, "1111111111111111", fields["spanID"].At(0)) + require.Nil(t, fields["parentSpanID"].At(0)) + require.Equal(t, "1111111111111111", *fields["parentSpanID"].At(1).(*string)) + require.Equal(t, "GET /checkout", fields["operationName"].At(0)) + require.Equal(t, "checkout", fields["serviceName"].At(0)) + require.InDelta(t, 1678974011000.0, fields["startTime"].At(0).(float64), 0.01) + require.Equal(t, 100.0, fields["duration"].At(0)) + require.Equal(t, int64(0), fields["statusCode"].At(0)) + require.Equal(t, int64(2), fields["statusCode"].At(1)) + require.Equal(t, "declined", fields["statusMessage"].At(1)) + require.Equal(t, "red", fields["errorIconColor"].At(1)) + + serviceTags := string(fields["serviceTags"].At(0).(json.RawMessage)) + require.Contains(t, serviceTags, `"key":"host.name"`) + require.Contains(t, serviceTags, `"value":"node-1"`) + require.Contains(t, serviceTags, `"key":"service.name"`) + require.Contains(t, serviceTags, `"value":"checkout"`) + + spanTags := string(fields["tags"].At(0).(json.RawMessage)) + require.Contains(t, spanTags, `"key":"http.method"`) + require.Contains(t, spanTags, `"value":"GET"`) + + paymentSpanTags := string(fields["tags"].At(1).(json.RawMessage)) + require.Contains(t, paymentSpanTags, `"key":"service.peer.name"`) + require.Contains(t, paymentSpanTags, `"value":"stripe-api"`) + require.Contains(t, paymentSpanTags, `"key":"peer.service"`) + + logs := string(fields["logs"].At(0).(json.RawMessage)) + require.Contains(t, logs, `"name":"exception"`) + require.Contains(t, logs, `"key":"exception.type"`) + require.Contains(t, logs, `"timestamp":1678974011050`) + + references := string(fields["references"].At(0).(json.RawMessage)) + require.Contains(t, references, `"traceID":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"`) + require.Contains(t, references, `"spanID":"2222222222222222"`) + + warnings := string(fields["warnings"].At(1).(json.RawMessage)) + require.Contains(t, warnings, `Span status: ERROR - declined`) + + stackTraces := string(fields["stackTraces"].At(1).(json.RawMessage)) + require.Contains(t, stackTraces, `SyntheticFailure: declined`) + + nodesFrame := frames[1] + require.Equal(t, "nodes", nodesFrame.Name) + require.Equal(t, data.VisTypeNodeGraph, string(nodesFrame.Meta.PreferredVisualization)) + require.Equal(t, 2, nodesFrame.Rows()) + + edgesFrame := frames[2] + require.Equal(t, "edges", edgesFrame.Name) + require.Equal(t, data.VisTypeNodeGraph, string(edgesFrame.Meta.PreferredVisualization)) + require.Equal(t, 1, edgesFrame.Rows()) + edgeFields := make(map[string]*data.Field) + for _, field := range edgesFrame.Fields { + edgeFields[field.Name] = field + } + require.Equal(t, "checkout", edgeFields["source"].At(0)) + require.Equal(t, "payments", edgeFields["target"].At(0)) +} + +func TestProcessTraceSearchResponse(t *testing.T) { + query := []byte(` + [ + { + "refId": "A", + "metrics": [{ "type": "trace_search", "id": "1", "settings": { "limit": "1", "spanLimit": "1000" } }], + "query": "service_name:checkout" + } + ] + `) + + response := []byte(` + { + "responses": [ + { + "hits": { + "hits": [ + { + "_source": { + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "span_id": "1111111111111111", + "service_name": "checkout", + "span_name": "GET /checkout", + "span_start_timestamp_nanos": 1678974011000000000, + "span_duration_millis": 100 + } + }, + { + "_source": { + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "span_id": "2222222222222222", + "parent_span_id": "1111111111111111", + "service_name": "payments", + "span_name": "POST /charge", + "span_start_timestamp_nanos": 1678974011020000000, + "span_duration_millis": 25, + "span_status": { "code": "ERROR", "message": "declined" } + } + }, + { + "_source": { + "trace_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + "span_id": "3333333333333333", + "service_name": "checkout", + "span_name": "GET /cart", + "span_start_timestamp_nanos": 1678974010000000000, + "span_duration_millis": 10 + } + }, + { + "_source": { + "trace_id": "cccccccccccccccccccccccccccccccc", + "span_id": "4444444444444444", + "service_name": "checkout", + "span_name": "GET /old-root", + "span_start_timestamp_nanos": 1678974010500000000, + "span_duration_millis": 10 + } + }, + { + "_source": { + "trace_id": "cccccccccccccccccccccccccccccccc", + "span_id": "5555555555555555", + "parent_span_id": "4444444444444444", + "service_name": "payments", + "span_name": "POST /late-child", + "span_start_timestamp_nanos": 1678974025000000000, + "span_duration_millis": 10 + } + } + ] + } + } + ] + } + `) + + result, err := queryDataTest(query, response) + require.NoError(t, err) + + frames := result.response.Responses["A"].Frames + require.Len(t, frames, 1) + traceSearchFrame := frames[0] + require.Equal(t, data.VisTypeTable, string(traceSearchFrame.Meta.PreferredVisualization)) + require.Equal(t, 1, traceSearchFrame.Rows()) + + fields := make(map[string]*data.Field) + for _, field := range traceSearchFrame.Fields { + fields[field.Name] = field + } + + require.Equal(t, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", fields["traceID"].At(0)) + require.Equal(t, time.UnixMilli(1678974011000).UTC(), fields["startTime"].At(0)) + require.Equal(t, 100.0, fields["duration"].At(0)) + require.Equal(t, int64(2), fields["spans"].At(0)) + require.Equal(t, "checkout, payments", fields["services"].At(0)) + require.Equal(t, "checkout", fields["rootService"].At(0)) + require.Equal(t, "GET /checkout", fields["rootSpan"].At(0)) + require.Equal(t, int64(1), fields["errors"].At(0)) +} + +func TestTraceTimestampMillisUsesDeclaredUnit(t *testing.T) { + nanos, ok := traceTimestampMillis(json.Number("1678974011123456789"), TimestampNanos) + require.True(t, ok) + require.InDelta(t, 1678974011123.4568, nanos, 0.0001) + + micros, ok := traceTimestampMillis(json.Number("1678974011123456"), TimestampMicros) + require.True(t, ok) + require.InDelta(t, 1678974011123.456, micros, 0.0001) + + millis, ok := traceTimestampMillis(json.Number("1678974011123"), TimestampMillis) + require.True(t, ok) + require.Equal(t, 1678974011123.0, millis) + + seconds, ok := traceTimestampMillis(json.Number("1678974011"), TimestampSecs) + require.True(t, ok) + require.Equal(t, 1678974011000.0, seconds) + + _, ok = traceTimestampMillis(json.Number("1678974011123456789"), "") + require.False(t, ok) +} + +func TestTraceSpanStatusUsesErrorAttributeFallbackForUnsetStatus(t *testing.T) { + statusCode, _, errorColor := traceSpanStatus(map[string]interface{}{ + "span_status": map[string]interface{}{ + "code": json.Number("0"), + }, + "span_attributes": map[string]interface{}{ + "error": true, + }, + }) + require.Equal(t, int64(2), statusCode) + require.Equal(t, "red", errorColor) + + statusCode, _, errorColor = traceSpanStatus(map[string]interface{}{ + "span_status": map[string]interface{}{ + "code": "UNSET", + }, + "span_attributes": map[string]interface{}{ + "otel.status_code": "ERROR", + }, + }) + require.Equal(t, int64(2), statusCode) + require.Equal(t, "red", errorColor) + + statusCode, _, errorColor = traceSpanStatus(map[string]interface{}{ + "span_status": map[string]interface{}{ + "code": json.Number("1"), + }, + "span_attributes": map[string]interface{}{ + "error": true, + }, + }) + require.Equal(t, int64(1), statusCode) + require.Equal(t, "", errorColor) +} + +func TestTraceStackTracesDedupAndTruncate(t *testing.T) { + longStackTrace := strings.Repeat("x", maxTraceStackTraceBytes+1024) + stackTraces := traceStackTraces(map[string]interface{}{ + "span_attributes": map[string]interface{}{ + "exception.stacktrace": longStackTrace, + }, + "events": []interface{}{ + map[string]interface{}{ + "event_attributes": map[string]interface{}{ + "exception.stacktrace": longStackTrace, + }, + }, + }, + }) + + require.Len(t, stackTraces, 1) + require.LessOrEqual(t, len(stackTraces[0]), maxTraceStackTraceBytes) + require.True(t, strings.HasSuffix(stackTraces[0], traceStackTraceTruncatedText)) +} + +func TestTraceKeyValuePairsUnwrapsOtelValuesAndFlattensNestedMaps(t *testing.T) { + pairs := traceKeyValuePairs(map[string]interface{}{ + "http": map[string]interface{}{ + "method": map[string]interface{}{ + "string_value": "GET", + }, + "status_code": map[string]interface{}{ + "int_value": json.Number("200"), + }, + }, + }) + + method, ok := traceFindKeyValuePair(pairs, "http.method") + require.True(t, ok) + require.Equal(t, "GET", method.Value) + require.Equal(t, "string", method.Type) + + statusCode, ok := traceFindKeyValuePair(pairs, "http.status_code") + require.True(t, ok) + require.Equal(t, json.Number("200"), statusCode.Value) + require.Equal(t, "number", statusCode.Type) +} + +func TestTraceNodeGraphFramesCollapseSelfEdges(t *testing.T) { + frames := traceNodeGraphFrames([]traceGraphSpan{ + { + spanID: "root", + serviceName: "checkout", + durationMillis: 10, + }, + { + spanID: "child", + parentSpanID: "root", + serviceName: "checkout", + durationMillis: 5, + }, + }) + + require.Len(t, frames, 2) + require.Equal(t, 1, frames[0].Rows()) + require.Equal(t, 0, frames[1].Rows()) +} + +func TestTraceDataLinks(t *testing.T) { + t.Run("trace spans link to configured logs datasource", func(t *testing.T) { + targets := map[string]string{ + "A": `{ + "refId": "A", + "metrics": [{ "type": "traces", "id": "1", "settings": { "limit": "1000" } }], + "query": "trace_id:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + }`, + } + response := `{ + "responses": [ + { + "hits": { + "hits": [ + { + "_source": { + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "span_id": "1111111111111111", + "service_name": "checkout", + "span_name": "GET /checkout", + "span_start_timestamp_nanos": 1678974011000000000, + "span_duration_millis": 100 + } + } + ] + } + } + ] + }` + dsInfo := &es.DatasourceInfo{ + UID: "traces-uid", + Name: "Quickwit Traces", + LogsDatasourceUID: "logs-uid", + LogsDatasourceName: "Quickwit Logs", + } + + result, err := parseTestResponseWithDatasourceInfo(targets, response, dsInfo) + require.NoError(t, err) + + spanIDField, _ := result.Responses["A"].Frames[0].FieldByName("spanID") + require.NotNil(t, spanIDField) + require.NotNil(t, spanIDField.Config) + require.Len(t, spanIDField.Config.Links, 1) + + link := spanIDField.Config.Links[0] + require.Equal(t, "Logs for span", link.Title) + require.NotNil(t, link.Internal) + require.Equal(t, "logs-uid", link.Internal.DatasourceUID) + require.Equal(t, "Quickwit Logs", link.Internal.DatasourceName) + + query := link.Internal.Query.(map[string]interface{}) + require.Equal(t, "trace_id:${__span.traceId} AND span_id:${__span.spanId}", query["query"]) + require.Equal(t, logsType, query["queryType"]) + require.Equal(t, map[string]string{"type": quickwitPluginID, "uid": "logs-uid"}, query["datasource"]) + require.Equal(t, logsType, query["metrics"].([]map[string]interface{})[0]["type"]) + }) + + t.Run("trace search rows link to configured traces datasource", func(t *testing.T) { + targets := map[string]string{ + "A": `{ + "refId": "A", + "metrics": [{ "type": "trace_search", "id": "1", "settings": { "limit": "20", "spanLimit": "1000" } }], + "query": "service_name:checkout" + }`, + } + response := `{ + "responses": [ + { + "hits": { + "hits": [ + { + "_source": { + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "span_id": "1111111111111111", + "service_name": "checkout", + "span_name": "GET /checkout", + "span_start_timestamp_nanos": 1678974011000000000, + "span_duration_millis": 100 + } + } + ] + } + } + ] + }` + dsInfo := &es.DatasourceInfo{ + UID: "logs-uid", + Name: "Quickwit Logs", + TracesDatasourceUID: "traces-uid", + TracesDatasourceName: "Quickwit Traces", + } + + result, err := parseTestResponseWithDatasourceInfo(targets, response, dsInfo) + require.NoError(t, err) + + traceIDField, _ := result.Responses["A"].Frames[0].FieldByName("traceID") + require.NotNil(t, traceIDField) + require.NotNil(t, traceIDField.Config) + require.Len(t, traceIDField.Config.Links, 1) + + link := traceIDField.Config.Links[0] + require.Equal(t, "Open trace", link.Title) + require.NotNil(t, link.Internal) + require.Equal(t, "traces-uid", link.Internal.DatasourceUID) + require.Equal(t, "Quickwit Traces", link.Internal.DatasourceName) + + query := link.Internal.Query.(map[string]interface{}) + require.Equal(t, "trace_id:${__value.raw}", query["query"]) + require.Equal(t, tracesType, query["queryType"]) + require.Equal(t, map[string]string{"type": quickwitPluginID, "uid": "traces-uid"}, query["datasource"]) + require.Equal(t, tracesType, query["metrics"].([]map[string]interface{})[0]["type"]) + }) +} + +func TestTraceParserHelpers(t *testing.T) { + t.Run("service tags add service.name when Quickwit has only top-level service_name", func(t *testing.T) { + tags := traceServiceTags(map[string]interface{}{ + "service.namespace": "demo", + }, "checkout") + + serviceNameCount := 0 + for _, tag := range tags { + if tag.Key == "service.name" { + serviceNameCount++ + require.Equal(t, "checkout", tag.Value) + } + } + require.Equal(t, 1, serviceNameCount) + }) + + t.Run("service tags preserve existing service.name", func(t *testing.T) { + tags := traceServiceTags(map[string]interface{}{ + "service.name": "checkout-api", + }, "checkout") + + serviceNameCount := 0 + for _, tag := range tags { + if tag.Key == "service.name" { + serviceNameCount++ + require.Equal(t, "checkout-api", tag.Value) + } + } + require.Equal(t, 1, serviceNameCount) + }) + + t.Run("node graph skips spans without service names", func(t *testing.T) { + frames := traceNodeGraphFrames([]traceGraphSpan{ + {spanID: "1111111111111111", durationMillis: 10}, + }) + + require.Len(t, frames, 0) + }) +} diff --git a/scripts/ingest-multi-service-traces.sh b/scripts/ingest-multi-service-traces.sh new file mode 100755 index 0000000..7cfe73c --- /dev/null +++ b/scripts/ingest-multi-service-traces.sh @@ -0,0 +1,217 @@ +#!/usr/bin/env bash +set -euo pipefail + +QUICKWIT_URL="${QUICKWIT_URL:-http://127.0.0.1:7280/api/v1}" +INDEX="${INDEX:-otel-traces-v0_9}" +LOG_INDEX="${LOG_INDEX:-otel-logs-v0_9}" +INGEST_LOGS="${INGEST_LOGS:-1}" + +now_ns() { + date +%s%N +} + +new_trace_id() { + printf '%s-%s' "$1" "$(now_ns)" | sha256sum | awk '{print substr($1, 1, 32)}' +} + +TRACE_ID="${TRACE_ID:-$(new_trace_id healthy)}" +ERROR_TRACE_ID="${ERROR_TRACE_ID:-$(new_trace_id error)}" + +json_escape() { + printf '%s' "$1" | sed 's/\\/\\\\/g; s/"/\\"/g' +} + +span_doc() { + local trace_id="$1" + local span_id="$2" + local parent_span_id="$3" + local service_name="$4" + local span_name="$5" + local span_kind="$6" + local start_ns="$7" + local duration_ms="$8" + local status_code="$9" + local status_message="${10}" + local route="${11}" + local method="${12}" + local component="${13}" + local peer_service_name="${14:-}" + + local end_ns=$((start_ns + duration_ms * 1000000)) + local parent_field="" + local peer_service_attr="" + local is_root="false" + local status='{}' + local event_one_name="span.work.started" + local event_two_name="span.work.completed" + local event_status="ok" + local escaped_component + local escaped_method + local escaped_route + escaped_component="$(json_escape "${component}")" + escaped_method="$(json_escape "${method}")" + escaped_route="$(json_escape "${route}")" + + if [[ -n "${parent_span_id}" ]]; then + parent_field=",\"parent_span_id\":\"${parent_span_id}\"" + else + is_root="true" + fi + + if [[ -n "${peer_service_name}" ]]; then + peer_service_attr=",\"service.peer.name\":\"$(json_escape "${peer_service_name}")\"" + fi + + if [[ -n "${status_code}" ]]; then + status="{\"code\":\"${status_code}\",\"message\":\"$(json_escape "${status_message}")\"}" + fi + + case "${component}" in + http) + event_one_name="http.request.received" + event_two_name="http.response.sent" + ;; + grpc) + event_one_name="rpc.client.request" + event_two_name="rpc.client.response" + ;; + db) + event_one_name="db.statement.prepared" + event_two_name="db.statement.completed" + ;; + queue) + event_one_name="messaging.message.created" + event_two_name="messaging.message.sent" + ;; + esac + + if [[ "${status_code}" == "ERROR" ]]; then + event_status="error" + fi + + local event_one_ts=$((start_ns + duration_ms * 150000)) + local event_two_ts=$((start_ns + duration_ms * 850000)) + local event_names="[\"${event_one_name}\",\"${event_two_name}\"]" + local events + printf -v events '[{"event_name":"%s","event_timestamp_nanos":%s,"event_attributes":{"component":"%s","route":"%s","method":"%s","status":"started"}},{"event_name":"%s","event_timestamp_nanos":%s,"event_attributes":{"component":"%s","route":"%s","method":"%s","status":"%s","duration_ms":%s}}]' \ + "${event_one_name}" \ + "${event_one_ts}" \ + "${escaped_component}" \ + "${escaped_route}" \ + "${escaped_method}" \ + "${event_two_name}" \ + "${event_two_ts}" \ + "${escaped_component}" \ + "${escaped_route}" \ + "${escaped_method}" \ + "${event_status}" \ + "${duration_ms}" + + if [[ "${status_code}" == "ERROR" ]]; then + event_names="[\"${event_one_name}\",\"${event_two_name}\",\"exception\"]" + events="${events%]},"'{"event_name":"exception","event_timestamp_nanos":'"$((start_ns + duration_ms * 500000))"',"event_attributes":{"exception.type":"SyntheticFailure","exception.message":"'"$(json_escape "${status_message}")"'","exception.stacktrace":"SyntheticFailure: '"$(json_escape "${status_message}")"'\\n at quickwitFixture.checkout\\n at quickwitFixture.payment"}}'"]" + fi + + printf '{"trace_id":"%s","span_id":"%s"%s,"is_root":%s,"service_name":"%s","resource_attributes":{"service.name":"%s","service.namespace":"demo-shop","deployment.environment":"local","service.version":"dev"},"scope_name":"quickwit-datasource-fixture","scope_version":"0.1.0","span_kind":%s,"span_name":"%s","span_fingerprint":"%s\\u0000%s\\u0000%s","span_start_timestamp_nanos":%s,"span_end_timestamp_nanos":%s,"span_duration_millis":%s,"span_attributes":{"http.route":"%s","http.method":"%s","component":"%s","fixture":"multi-service-trace"%s},"span_status":%s,"event_names":%s,"events":%s,"links":[]}\n' \ + "${trace_id}" \ + "${span_id}" \ + "${parent_field}" \ + "${is_root}" \ + "$(json_escape "${service_name}")" \ + "$(json_escape "${service_name}")" \ + "${span_kind}" \ + "$(json_escape "${span_name}")" \ + "$(json_escape "${service_name}")" \ + "${span_kind}" \ + "$(json_escape "${span_name}")" \ + "${start_ns}" \ + "${end_ns}" \ + "${duration_ms}" \ + "$(json_escape "${route}")" \ + "$(json_escape "${method}")" \ + "$(json_escape "${component}")" \ + "${peer_service_attr}" \ + "${status}" \ + "${event_names}" \ + "${events}" +} + +log_doc() { + local trace_id="$1" + local span_id="$2" + local service_name="$3" + local span_name="$4" + local timestamp_ns="$5" + local severity_text="$6" + local severity_number="$7" + local message="$8" + local route="$9" + local method="${10}" + + printf '{"timestamp_nanos":%s,"observed_timestamp_nanos":%s,"service_name":"%s","severity_text":"%s","severity_number":%s,"body":{"message":"%s"},"attributes":{"http.route":"%s","http.method":"%s","span.name":"%s","fixture":"multi-service-trace"},"dropped_attributes_count":0,"trace_id":"%s","span_id":"%s","trace_flags":1,"resource_attributes":{"service.name":"%s","service.namespace":"demo-shop","deployment.environment":"local","service.version":"dev"},"resource_dropped_attributes_count":0,"scope_name":"quickwit-datasource-fixture","scope_version":"0.1.0","scope_attributes":{},"scope_dropped_attributes_count":0}\n' \ + "${timestamp_ns}" \ + "${timestamp_ns}" \ + "$(json_escape "${service_name}")" \ + "$(json_escape "${severity_text}")" \ + "${severity_number}" \ + "$(json_escape "${message}")" \ + "$(json_escape "${route}")" \ + "$(json_escape "${method}")" \ + "$(json_escape "${span_name}")" \ + "${trace_id}" \ + "${span_id}" \ + "$(json_escape "${service_name}")" +} + +base_ns="$(now_ns)" +trace_tmp_file="$(mktemp)" +log_tmp_file="$(mktemp)" +trap 'rm -f "${trace_tmp_file}" "${log_tmp_file}"' EXIT + +{ + span_doc "${TRACE_ID}" "1111111111111111" "" "web-frontend" "GET /checkout" 2 "${base_ns}" 420 "" "" "/checkout" "GET" "http" + span_doc "${TRACE_ID}" "2222222222222222" "1111111111111111" "checkout-api" "POST /api/checkout" 3 "$((base_ns + 35 * 1000000))" 310 "" "" "/api/checkout" "POST" "http" + span_doc "${TRACE_ID}" "3333333333333333" "2222222222222222" "inventory-service" "Reserve inventory" 3 "$((base_ns + 80 * 1000000))" 85 "" "" "inventory.reserve" "RPC" "grpc" "warehouse-system" + span_doc "${TRACE_ID}" "4444444444444444" "2222222222222222" "payment-service" "Authorize payment" 3 "$((base_ns + 145 * 1000000))" 135 "" "" "payment.authorize" "RPC" "grpc" + span_doc "${TRACE_ID}" "5555555555555555" "4444444444444444" "postgres-payments" "UPDATE payment_intents" 3 "$((base_ns + 175 * 1000000))" 42 "" "" "payment_intents" "SQL" "db" + span_doc "${TRACE_ID}" "6666666666666666" "2222222222222222" "email-service" "Send confirmation email" 4 "$((base_ns + 265 * 1000000))" 55 "" "" "email.confirmation" "RPC" "queue" + + error_base_ns=$((base_ns + 2 * 1000000000)) + span_doc "${ERROR_TRACE_ID}" "aaaaaaaaaaaaaaaa" "" "web-frontend" "GET /checkout" 2 "${error_base_ns}" 690 "ERROR" "checkout failed" "/checkout" "GET" "http" + span_doc "${ERROR_TRACE_ID}" "bbbbbbbbbbbbbbbb" "aaaaaaaaaaaaaaaa" "checkout-api" "POST /api/checkout" 3 "$((error_base_ns + 40 * 1000000))" 610 "ERROR" "payment authorization failed" "/api/checkout" "POST" "http" + span_doc "${ERROR_TRACE_ID}" "cccccccccccccccc" "bbbbbbbbbbbbbbbb" "payment-service" "Authorize payment" 3 "$((error_base_ns + 105 * 1000000))" 470 "ERROR" "card declined" "payment.authorize" "RPC" "grpc" + span_doc "${ERROR_TRACE_ID}" "dddddddddddddddd" "cccccccccccccccc" "fraud-service" "Score transaction risk" 3 "$((error_base_ns + 150 * 1000000))" 145 "" "" "fraud.score" "RPC" "grpc" "risk-engine" + span_doc "${ERROR_TRACE_ID}" "eeeeeeeeeeeeeeee" "cccccccccccccccc" "postgres-payments" "SELECT payment_method" 3 "$((error_base_ns + 350 * 1000000))" 70 "" "" "payment_methods" "SQL" "db" +} > "${trace_tmp_file}" + +curl -fsS "${QUICKWIT_URL}/${INDEX}/ingest?commit=wait_for" \ + -H 'Content-Type: application/json' \ + --data-binary "@${trace_tmp_file}" + +if [[ "${INGEST_LOGS}" == "1" ]]; then + { + log_doc "${TRACE_ID}" "1111111111111111" "web-frontend" "GET /checkout" "$((base_ns + 40 * 1000000))" "INFO" 9 "checkout page requested" "/checkout" "GET" + log_doc "${TRACE_ID}" "2222222222222222" "checkout-api" "POST /api/checkout" "$((base_ns + 90 * 1000000))" "INFO" 9 "checkout request accepted" "/api/checkout" "POST" + log_doc "${TRACE_ID}" "3333333333333333" "inventory-service" "Reserve inventory" "$((base_ns + 120 * 1000000))" "INFO" 9 "inventory reserved" "inventory.reserve" "RPC" + log_doc "${TRACE_ID}" "4444444444444444" "payment-service" "Authorize payment" "$((base_ns + 210 * 1000000))" "INFO" 9 "payment authorization approved" "payment.authorize" "RPC" + log_doc "${TRACE_ID}" "5555555555555555" "postgres-payments" "UPDATE payment_intents" "$((base_ns + 198 * 1000000))" "INFO" 9 "payment intent updated" "payment_intents" "SQL" + log_doc "${TRACE_ID}" "6666666666666666" "email-service" "Send confirmation email" "$((base_ns + 285 * 1000000))" "INFO" 9 "confirmation email queued" "email.confirmation" "RPC" + + log_doc "${ERROR_TRACE_ID}" "aaaaaaaaaaaaaaaa" "web-frontend" "GET /checkout" "$((error_base_ns + 80 * 1000000))" "ERROR" 17 "checkout failed" "/checkout" "GET" + log_doc "${ERROR_TRACE_ID}" "bbbbbbbbbbbbbbbb" "checkout-api" "POST /api/checkout" "$((error_base_ns + 130 * 1000000))" "ERROR" 17 "payment authorization failed" "/api/checkout" "POST" + log_doc "${ERROR_TRACE_ID}" "cccccccccccccccc" "payment-service" "Authorize payment" "$((error_base_ns + 250 * 1000000))" "ERROR" 17 "card declined by synthetic fixture" "payment.authorize" "RPC" + log_doc "${ERROR_TRACE_ID}" "dddddddddddddddd" "fraud-service" "Score transaction risk" "$((error_base_ns + 205 * 1000000))" "INFO" 9 "fraud risk score calculated" "fraud.score" "RPC" + log_doc "${ERROR_TRACE_ID}" "eeeeeeeeeeeeeeee" "postgres-payments" "SELECT payment_method" "$((error_base_ns + 380 * 1000000))" "INFO" 9 "payment method lookup completed" "payment_methods" "SQL" + } > "${log_tmp_file}" + + curl -fsS "${QUICKWIT_URL}/${LOG_INDEX}/ingest?commit=wait_for" \ + -H 'Content-Type: application/json' \ + --data-binary "@${log_tmp_file}" +fi + +printf 'Ingested multi-service trace fixtures into %s/%s\n' "${QUICKWIT_URL}" "${INDEX}" +if [[ "${INGEST_LOGS}" == "1" ]]; then + printf 'Ingested correlated log fixtures into %s/%s\n' "${QUICKWIT_URL}" "${LOG_INDEX}" +fi +printf 'Healthy trace: %s\n' "${TRACE_ID}" +printf 'Error trace: %s\n' "${ERROR_TRACE_ID}" diff --git a/scripts/test-local-traces.sh b/scripts/test-local-traces.sh new file mode 100755 index 0000000..2e1f1f1 --- /dev/null +++ b/scripts/test-local-traces.sh @@ -0,0 +1,235 @@ +#!/usr/bin/env bash +set -euo pipefail + +QUICKWIT_URL="${QUICKWIT_URL:-http://127.0.0.1:7280/api/v1}" +GRAFANA_URL="${GRAFANA_URL:-http://127.0.0.1:3000}" +GRAFANA_USER="${GRAFANA_USER:-admin}" +GRAFANA_PASSWORD="${GRAFANA_PASSWORD:-admin}" +LOGS_DATASOURCE_NAME="${LOGS_DATASOURCE_NAME:-Quickwit}" +TRACES_DATASOURCE_NAME="${TRACES_DATASOURCE_NAME:-Quickwit Traces}" + +require_cmd() { + local cmd="$1" + if ! command -v "${cmd}" >/dev/null 2>&1; then + printf 'Missing required command: %s\n' "${cmd}" >&2 + exit 1 + fi +} + +new_trace_id() { + printf '%s-%s' "$1" "$(date +%s%N)" | sha256sum | awk '{print substr($1, 1, 32)}' +} + +wait_for_grafana() { + local attempts=30 + local attempt + for ((attempt = 1; attempt <= attempts; attempt++)); do + if curl -fsS -u "${GRAFANA_USER}:${GRAFANA_PASSWORD}" "${GRAFANA_URL}/api/health" >/dev/null; then + return 0 + fi + sleep 1 + done + + printf 'Grafana did not become ready at %s\n' "${GRAFANA_URL}" >&2 + return 1 +} + +query_grafana() { + local datasource_uid="$1" + local query="$2" + local metrics_json="$3" + local bucket_aggs_json="${4:-[]}" + + local body + body="$( + jq -nc \ + --arg uid "${datasource_uid}" \ + --arg query "${query}" \ + --argjson metrics "${metrics_json}" \ + --argjson bucketAggs "${bucket_aggs_json}" \ + '{ + from: "now-15m", + to: "now", + queries: [ + { + refId: "A", + datasource: { type: "quickwit-quickwit-datasource", uid: $uid }, + query: $query, + bucketAggs: $bucketAggs, + metrics: $metrics, + maxDataPoints: 1000, + intervalMs: 1000 + } + ] + }' + )" + + curl -fsS \ + -u "${GRAFANA_USER}:${GRAFANA_PASSWORD}" \ + -H 'Content-Type: application/json' \ + --data "${body}" \ + "${GRAFANA_URL}/api/ds/query" +} + +assert_jq() { + local json="$1" + local filter="$2" + local message="$3" + + if ! jq -e "${filter}" >/dev/null <<<"${json}"; then + printf 'Assertion failed: %s\n' "${message}" >&2 + jq '.' <<<"${json}" >&2 + exit 1 + fi +} + +assert_jq_arg() { + local json="$1" + local arg_name="$2" + local arg_value="$3" + local filter="$4" + local message="$5" + + if ! jq -e --arg "${arg_name}" "${arg_value}" "${filter}" >/dev/null <<<"${json}"; then + printf 'Assertion failed: %s\n' "${message}" >&2 + jq '.' <<<"${json}" >&2 + exit 1 + fi +} + +value_for_trace() { + local json="$1" + local trace_id="$2" + local column_index="$3" + + jq -r \ + --arg traceId "${trace_id}" \ + --argjson columnIndex "${column_index}" \ + '.results.A.frames[0].data.values[0] as $traceIds + | ($traceIds | index($traceId)) as $idx + | .results.A.frames[0].data.values[$columnIndex][$idx]' \ + <<<"${json}" +} + +require_cmd curl +require_cmd jq +require_cmd sha256sum +require_cmd awk + +TRACE_ID="${TRACE_ID:-$(new_trace_id healthy-local-trace-test)}" +ERROR_TRACE_ID="${ERROR_TRACE_ID:-$(new_trace_id error-local-trace-test)}" + +printf 'Checking Grafana at %s\n' "${GRAFANA_URL}" +wait_for_grafana + +datasources="$( + curl -fsS -u "${GRAFANA_USER}:${GRAFANA_PASSWORD}" "${GRAFANA_URL}/api/datasources" +)" + +logs_uid="$( + jq -r --arg name "${LOGS_DATASOURCE_NAME}" \ + '.[] | select(.name == $name and .type == "quickwit-quickwit-datasource") | .uid' \ + <<<"${datasources}" | head -n 1 +)" +traces_uid="$( + jq -r --arg name "${TRACES_DATASOURCE_NAME}" \ + '.[] | select(.name == $name and .type == "quickwit-quickwit-datasource") | .uid' \ + <<<"${datasources}" | head -n 1 +)" + +if [[ -z "${logs_uid}" || "${logs_uid}" == "null" ]]; then + printf 'Could not find logs datasource named %s\n' "${LOGS_DATASOURCE_NAME}" >&2 + exit 1 +fi +if [[ -z "${traces_uid}" || "${traces_uid}" == "null" ]]; then + printf 'Could not find traces datasource named %s\n' "${TRACES_DATASOURCE_NAME}" >&2 + exit 1 +fi + +printf 'Using logs datasource %s and traces datasource %s\n' "${logs_uid}" "${traces_uid}" +printf 'Ingesting fixture traces into %s\n' "${QUICKWIT_URL}" +TRACE_ID="${TRACE_ID}" ERROR_TRACE_ID="${ERROR_TRACE_ID}" QUICKWIT_URL="${QUICKWIT_URL}" \ + ./scripts/ingest-multi-service-traces.sh >/dev/null + +trace_search_response="$( + query_grafana \ + "${traces_uid}" \ + "span_attributes.fixture:multi-service-trace AND (trace_id:${TRACE_ID} OR trace_id:${ERROR_TRACE_ID})" \ + '[{"id":"1","type":"trace_search","settings":{"limit":"20","spanLimit":"200"}}]' +)" + +assert_jq "${trace_search_response}" '.results.A.status == 200' 'trace search query returned HTTP 200' +assert_jq "${trace_search_response}" '.results.A.frames[0].schema.meta.preferredVisualisationType == "table"' 'trace search frame is a table' +assert_jq_arg "${trace_search_response}" traceId "${TRACE_ID}" '.results.A.frames[0].data.values[0] | index($traceId) != null' 'healthy trace appears in trace search' +assert_jq_arg "${trace_search_response}" traceId "${ERROR_TRACE_ID}" '.results.A.frames[0].data.values[0] | index($traceId) != null' 'error trace appears in trace search' +assert_jq "${trace_search_response}" '.results.A.frames[0].schema.fields[0].config.links[0].title == "Open trace"' 'trace search exposes Open trace link' +assert_jq "${trace_search_response}" '.results.A.frames[0].schema.fields[0].config.links[0].internal.query.queryType == "traces"' 'trace search link targets the traces query type' +assert_jq "${trace_search_response}" '.results.A.frames[0].schema.fields[0].config.links[0].internal.query.datasource.uid == .results.A.frames[0].schema.fields[0].config.links[0].internal.datasourceUid' 'trace search link embeds its datasource in the target query' + +healthy_spans="$(value_for_trace "${trace_search_response}" "${TRACE_ID}" 3)" +error_spans="$(value_for_trace "${trace_search_response}" "${ERROR_TRACE_ID}" 3)" +error_count="$(value_for_trace "${trace_search_response}" "${ERROR_TRACE_ID}" 8)" + +if [[ "${healthy_spans}" != "6" || "${error_spans}" != "5" || "${error_count}" != "3" ]]; then + printf 'Unexpected trace search summary: healthy_spans=%s error_spans=%s error_count=%s\n' \ + "${healthy_spans}" "${error_spans}" "${error_count}" >&2 + exit 1 +fi + +trace_response="$( + query_grafana \ + "${traces_uid}" \ + "trace_id:${ERROR_TRACE_ID}" \ + '[{"id":"1","type":"traces","settings":{"limit":"1000"}}]' +)" + +assert_jq "${trace_response}" '.results.A.status == 200' 'full trace query returned HTTP 200' +assert_jq "${trace_response}" '.results.A.frames[0].schema.meta.preferredVisualisationType == "trace"' 'full trace frame is a trace' +assert_jq "${trace_response}" '.results.A.frames[0].data.values[0] | length == 5' 'error trace has five spans' +assert_jq "${trace_response}" '.results.A.frames[0].schema.fields[1].config.links[0].title == "Logs for span"' 'trace spans expose Logs for span link' +assert_jq "${trace_response}" '.results.A.frames[0].schema.fields[1].config.links[0].internal.query.queryType == "logs"' 'span log link targets the logs query type' +assert_jq "${trace_response}" '.results.A.frames[0].schema.fields[1].config.links[0].internal.query.datasource.uid == .results.A.frames[0].schema.fields[1].config.links[0].internal.datasourceUid' 'span log link embeds its datasource in the target query' +assert_jq "${trace_response}" '.results.A.frames[1].schema.meta.preferredVisualisationType == "nodeGraph"' 'nodes frame is a node graph' +assert_jq "${trace_response}" '.results.A.frames[1].data.values[0] | length == 5' 'node graph has five service nodes' +assert_jq "${trace_response}" '.results.A.frames[2].data.values[0] | length == 4' 'node graph has four service edges' +assert_jq "${trace_response}" '.results.A.frames[0].data.values[12] | map(select(. == 2)) | length == 3' 'full trace marks three error spans' + +exemplar_link_response="$( + query_grafana \ + "${traces_uid}" \ + "${ERROR_TRACE_ID}" \ + '[{"id":"3","type":"logs","settings":{"limit":"100"}}]' +)" + +assert_jq "${exemplar_link_response}" '.results.A.status == 200' 'exemplar-style internal link query returned HTTP 200' +assert_jq "${exemplar_link_response}" '.results.A.frames[0].schema.meta.preferredVisualisationType == "trace"' 'exemplar-style internal link opens a trace frame' +assert_jq "${exemplar_link_response}" '.results.A.frames[0].data.values[0] | length == 5' 'exemplar-style internal link resolves the bare trace id' + +logs_response="$( + query_grafana \ + "${logs_uid}" \ + "trace_id:${ERROR_TRACE_ID} AND span_id:cccccccccccccccc" \ + '[{"id":"1","type":"logs","settings":{"limit":"100"}}]' +)" + +assert_jq "${logs_response}" '.results.A.status == 200' 'logs query returned HTTP 200' +assert_jq "${logs_response}" '.results.A.frames[0].schema.meta.preferredVisualisationType == "logs"' 'correlated logs frame is a logs frame' +assert_jq "${logs_response}" '.results.A.frames[0] as $frame + | ($frame.schema.fields | map(.name) | index("body.message")) as $messageField + | $frame.data.values[$messageField][0] == "card declined by synthetic fixture"' 'span log correlation returns the expected log line' + +metrics_response="$( + query_grafana \ + "${traces_uid}" \ + "span_attributes.fixture:multi-service-trace AND trace_id:${ERROR_TRACE_ID}" \ + '[{"id":"1","type":"count"}]' \ + '[{"id":"2","type":"date_histogram","settings":{"interval":"1s","min_doc_count":"1"}}]' +)" + +assert_jq "${metrics_response}" '.results.A.status == 200' 'metric query returned HTTP 200' +assert_jq "${metrics_response}" '.results.A.frames[0].schema.meta.type == "timeseries-multi"' 'metric query returns a time series' +assert_jq "${metrics_response}" '.results.A.frames[0].data.values[1] | add == 5' 'metric count over the error trace returns five spans' + +printf 'Local trace validation passed.\n' +printf 'Healthy trace: %s (%s spans)\n' "${TRACE_ID}" "${healthy_spans}" +printf 'Error trace: %s (%s spans, %s error spans)\n' "${ERROR_TRACE_ID}" "${error_spans}" "${error_count}" diff --git a/src/README.md b/src/README.md index d90a7bf..6cfb76d 100644 --- a/src/README.md +++ b/src/README.md @@ -17,6 +17,24 @@ You need a Quickwit standalone server or cluster. The Quickwit data source plugin works with dashboards and explore views. Alerts are also available. +### Logs and traces + +The query editor supports logs, trace search, full trace view, and raw data queries. + +For OpenTelemetry traces, use: + +- **Trace search** to find traces from matching spans. +- **Traces** to open a single trace by `trace_id`. + +Trace results include span events, span status, exception stack traces, service names, service tags, span tags, and service node graph frames. + +When logs and traces are stored in separate Quickwit indexes, create one Quickwit datasource per index and configure the related datasource fields: + +- On the logs datasource, set the traces datasource used by log-to-trace links. +- On the traces datasource, set the logs datasource used by trace-to-logs links. + +Log-to-trace links are added for log fields named `trace_id`, `traceID`, `traceId`, or `attributes.trace_id`. Trace-to-logs links query logs with both `trace_id` and `span_id`. + ## Installation ### Installation on Grafana Cloud diff --git a/src/components/QueryEditor/ElasticsearchQueryContext.tsx b/src/components/QueryEditor/ElasticsearchQueryContext.tsx index 6c8dddf..ea911e6 100644 --- a/src/components/QueryEditor/ElasticsearchQueryContext.tsx +++ b/src/components/QueryEditor/ElasticsearchQueryContext.tsx @@ -5,6 +5,7 @@ import { CoreApp, TimeRange } from '@grafana/data'; import { BaseQuickwitDataSource } from '@/datasource/base'; import { combineReducers, useStatelessReducer, DispatchContext } from '@/hooks/useStatelessReducer'; import { ElasticsearchQuery } from '@/types'; +import { stripQueryType } from '@/queryModel'; import { createReducer as createBucketAggsReducer } from './BucketAggregationsEditor/state/reducer'; import { reducer as metricsReducer } from './MetricAggregationsEditor/state/reducer'; @@ -60,7 +61,7 @@ export const ElasticsearchProvider = withStore(({ const onStateChange = useCallback( (query: ElasticsearchQuery) => { - onChange(query); + onChange(stripQueryType(query)); }, [onChange] ); diff --git a/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/TraceSearchSettingsEditor.tsx b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/TraceSearchSettingsEditor.tsx new file mode 100644 index 0000000..b84b44a --- /dev/null +++ b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/TraceSearchSettingsEditor.tsx @@ -0,0 +1,112 @@ +import React from 'react'; + +import { MetricFindValue, SelectableValue } from '@grafana/data'; +import { InlineField, Input, SegmentAsync, Select } from '@grafana/ui'; + +import { useDatasource, useRange } from '@/components/QueryEditor/ElasticsearchQueryContext'; +import { segmentStyles } from '@/components/QueryEditor/styles'; +import { useDispatch } from '@/hooks/useStatelessReducer'; +import { MetricAggregation } from '@/types'; +import { fuzzySearchSort } from '@/utils'; + +import { changeMetricSetting } from '../state/actions'; + +type TraceSearchStatus = '' | 'error' | 'ok' | 'unset'; + +type TraceSearchSettings = Extract['settings'] & { + serviceName?: string; + spanName?: string; + status?: TraceSearchStatus; + minDuration?: string; + maxDuration?: string; +}; + +type TraceSearchMetric = Extract & { + settings?: TraceSearchSettings; +}; + +interface Props { + metric: Extract; +} + +const statusOptions: Array> = [ + { label: 'Any', value: '' }, + { label: 'Error', value: 'error' }, + { label: 'Ok', value: 'ok' }, + { label: 'Unset', value: 'unset' }, +]; + +function toFuzzyOptions(values: MetricFindValue[], query?: string): Array> { + return fuzzySearchSort( + values.map((value) => String(value.text)), + (text) => text, + query + ).map((text) => ({ label: text, value: text })); +} + +export const TraceSearchSettingsEditor = ({ metric }: Props) => { + const typedMetric = metric as TraceSearchMetric; + const dispatch = useDispatch(); + const datasource = useDatasource(); + const range = useRange(); + + const changeSetting = (settingName: keyof TraceSearchSettings, newValue?: string) => { + dispatch(changeMetricSetting({ metric: typedMetric, settingName, newValue: newValue?.trim() || undefined })); + }; + + const loadFieldValues = (field: string) => async (query?: string): Promise>> => { + if (!datasource.getTagValues) { + return []; + } + const values = await datasource.getTagValues({ key: field, timeRange: range }); + return toFuzzyOptions(values, query); + }; + + return ( + <> + + changeSetting('serviceName', e.value)} + placeholder="All services" + value={typedMetric.settings?.serviceName} + /> + + + changeSetting('spanName', e.value)} + placeholder="All spans" + value={typedMetric.settings?.spanName} + /> + + + changeSetting('minDuration', e.target.value)} + defaultValue={typedMetric.settings?.minDuration} + placeholder="100ms" + /> + + + changeSetting('maxDuration', e.target.value)} + defaultValue={typedMetric.settings?.maxDuration} + placeholder="1.2s" + /> + + + ); +}; diff --git a/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.test.tsx b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.test.tsx index a0b26ad..16ead91 100644 --- a/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.test.tsx +++ b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.test.tsx @@ -4,7 +4,7 @@ import React from 'react'; import { CoreApp, getDefaultTimeRange } from '@grafana/data'; import { ElasticDatasource } from '@/datasource'; -import { ElasticsearchQuery } from '@/types'; +import { ElasticsearchQuery, MetricAggregation } from '@/types'; import { ElasticsearchProvider } from '../../ElasticsearchQueryContext'; import { SettingsEditor } from '.'; @@ -131,4 +131,60 @@ describe('Settings Editor', () => { expect(modeSelectElement).toBeInTheDocument(); }); }); + + describe('Trace search', () => { + it('renders structured trace filters and updates duration settings', () => { + const metricId = '1'; + const query: ElasticsearchQuery = { + refId: 'A', + query: '', + metrics: [ + { + id: metricId, + type: 'trace_search', + settings: { + limit: '20', + spanLimit: '5000', + serviceName: 'checkout', + }, + } as MetricAggregation, + ], + bucketAggs: [], + filters: [], + }; + + const onChange = jest.fn(); + + render( + {}} + range={getDefaultTimeRange()} + > + + + ); + + expect(screen.getByRole('button', { name: /Traces: 20, scan: 5000 spans/i })).toHaveAttribute( + 'aria-expanded', + 'true' + ); + expect(screen.getByText('Service name')).toBeInTheDocument(); + expect(screen.getByText('Span name')).toBeInTheDocument(); + expect(screen.getByText('Status')).toBeInTheDocument(); + + const minDurationInput = screen.getByLabelText('Min duration'); + fireEvent.change(minDurationInput, { target: { value: '250ms' } }); + fireEvent.blur(minDurationInput); + + expect(onChange).toHaveBeenCalledTimes(1); + expect(onChange.mock.calls[0][0].metrics[0].settings).toMatchObject({ + serviceName: 'checkout', + minDuration: '250ms', + }); + }); + }); }); diff --git a/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.tsx b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.tsx index 9b5b8c4..840e13c 100644 --- a/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.tsx +++ b/src/components/QueryEditor/MetricAggregationsEditor/SettingsEditor/index.tsx @@ -17,6 +17,7 @@ import { SettingField } from './SettingField'; import { TopMetricsSettingsEditor } from './TopMetricsSettingsEditor'; import { useDescription } from './useDescription'; import { LogsSettingsEditor } from './LogsSettingsEditor'; +import { TraceSearchSettingsEditor } from './TraceSearchSettingsEditor'; // TODO: Move this somewhere and share it with BucketsAggregation Editor export const inlineFieldProps: Partial> = { @@ -52,7 +53,7 @@ export const SettingsEditor = ({ metric, previousMetrics }: Props) => { ]; return ( -