Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ To configure the Quickwit datasource, you need to provide the following informat
- The index name.
- The log message field name (optional). This is the field displayed in the explorer view.
- The log level field name (optional). It must be a fast field.
- The related logs or traces datasource (optional). This enables trace-to-logs and log-to-trace links when logs and traces are stored in separate Quickwit indexes.

### With Grafana UI

Expand All @@ -141,6 +142,112 @@ datasources:
logLevelField: severity_text
```

### Logs and traces in separate indexes

When logs and traces are stored in different Quickwit indexes, configure one datasource per index and link them with `logsDatasourceUid` and `tracesDatasourceUid`.

```yaml
apiVersion: 1

datasources:
- name: Quickwit Logs
uid: quickwit-logs
type: quickwit-quickwit-datasource
url: http://localhost:7280/api/v1
jsonData:
index: 'otel-logs-v0_9'
logMessageField: body.message
logLevelField: severity_text
tracesDatasourceUid: quickwit-traces
tracesDatasourceName: Quickwit Traces

- name: Quickwit Traces
uid: quickwit-traces
type: quickwit-quickwit-datasource
url: http://localhost:7280/api/v1
jsonData:
index: 'otel-traces-v0_9'
logsDatasourceUid: quickwit-logs
logsDatasourceName: Quickwit Logs
```

## Traces

The query editor has two trace query types:

- **Trace search** scans matching spans and returns one row per trace. Use this to find trace IDs by Lucene query, service, operation, status, or attributes.
- **Traces** returns a full trace frame for Grafana's trace viewer. Use this with a trace ID query such as `trace_id:abc123`.

The trace parser expects Quickwit OpenTelemetry trace fields such as:

- `trace_id`
- `span_id`
- `parent_span_id`
- `service_name`
- `span_name`
- `span_start_timestamp_nanos`
- `span_duration_millis` or `span_end_timestamp_nanos`

It also reads optional fields for richer trace rendering:

- `resource_attributes` for service tags
- `span_attributes` for span tags, including `service.peer.name` and `peer.service`
- `span_status` for error status and warnings
- `events` for span events and exception stack traces
- `links` for span references
- `scope_name` and `scope_version` for instrumentation library details

Trace responses include:

- Grafana trace frames for the trace viewer.
- Node graph frames that summarize service-to-service calls.
- Span warnings for error status and dropped attributes/events/links.
- Span event details and exception stack traces.
- Stable per-service node colors in the node graph.

### Trace/log correlations

Trace-to-logs links are attached to each trace span. They query the configured logs datasource with:

```text
trace_id:${__span.traceId} AND span_id:${__span.spanId}
```

Log-to-trace links are attached to log fields named:

- `trace_id`
- `traceID`
- `traceId`
- `attributes.trace_id`

They open the configured traces datasource with:

```text
trace_id:${__value.raw}
```

### Local trace fixtures

For local testing, use the fixture script:

```bash
QUICKWIT_URL=http://127.0.0.1:7280/api/v1 ./scripts/ingest-multi-service-traces.sh
```

It writes two multi-service traces into `otel-traces-v0_9` and matching correlated logs into `otel-logs-v0_9`.

In **Trace search**, filter the fixture data with:

```text
span_attributes.fixture:multi-service-trace
```

In **Traces**, open a returned trace ID with:

```text
trace_id:<trace id>
```

## Features

- Explore view.
Expand All @@ -149,6 +256,9 @@ datasources:
- Adhoc filters.
- Annotations
- Explore Log Context.
- Trace search and trace view.
- Trace-to-logs and log-to-trace links.
- Service node graph for trace results.
- [Alerting](https://grafana.com/docs/grafana/latest/alerting/).

## FAQ and Limitations
Expand Down
6 changes: 6 additions & 0 deletions pkg/quickwit/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ type ReadyStatus struct {

type DatasourceInfo struct {
ID int64
UID string
Name string
LogsDatasourceUID string
LogsDatasourceName string
TracesDatasourceUID string
TracesDatasourceName string
HTTPClient *http.Client
URL string
Database string
Expand Down
200 changes: 195 additions & 5 deletions pkg/quickwit/data_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"regexp"
"strconv"
"strings"

es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client"
"github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/simplejson"
Expand All @@ -25,11 +26,22 @@ func buildMSR(queries []*Query, defaultTimeField string) ([]*es.SearchRequest, e
b := ms.Search(q.Interval)
b.Size(0)
filters := b.Query().Bool().Filter()
// Always pass Grafana's picker range through. Quickwit's metastore
// prunes splits whose timestamps fall outside this window, so even
// trace_id lookups go from "scan every split" to "scan a few" — the
// same speedup the native Jaeger endpoint gets via auto-derived bounds.
filters.AddDateRangeFilter(defaultTimeField, q.RangeTo, q.RangeFrom)
filters.AddQueryStringFilter(q.RawQuery, true, "AND")
if isTraceSearchQuery(q) {
filters.AddQueryStringFilter(traceSearchSettingsQuery(q), true, "AND")
}

if isLogsQuery(q) {
processLogsQuery(q, b, q.RangeFrom, q.RangeTo, defaultTimeField)
} else if isTraceSearchQuery(q) {
processTraceSearchQuery(q, b, defaultTimeField)
} else if isTracesQuery(q) {
processTracesQuery(q, b, defaultTimeField)
} else if isDocumentQuery(q) {
processDocumentQuery(q, b, q.RangeFrom, q.RangeTo, defaultTimeField)
} else {
Expand Down Expand Up @@ -269,8 +281,8 @@ func getPipelineAggField(m *MetricAgg) string {

func isQueryWithError(query *Query) error {
if len(query.BucketAggs) == 0 {
// If no aggregations, only document and logs queries are valid
if len(query.Metrics) == 0 || !(isLogsQuery(query) || isDocumentQuery(query)) {
// If no aggregations, only document, logs, and trace queries are valid
if len(query.Metrics) == 0 || !(isLogsQuery(query) || isTraceSearchQuery(query) || isTracesQuery(query) || isDocumentQuery(query)) {
return fmt.Errorf("invalid query, missing metrics and aggregations")
}
} else {
Expand Down Expand Up @@ -302,19 +314,185 @@ func isQueryWithError(query *Query) error {
}

func isLogsQuery(query *Query) bool {
return query.Metrics[0].Type == logsType
return queryMetricType(query) == logsType
}

func isTracesQuery(query *Query) bool {
return queryMetricType(query) == tracesType
}

func isTraceSearchQuery(query *Query) bool {
return queryMetricType(query) == traceSearchType
}

func isDocumentQuery(query *Query) bool {
return isRawDataQuery(query) || isRawDocumentQuery(query)
}

func isRawDataQuery(query *Query) bool {
return query.Metrics[0].Type == rawDataType
return queryMetricType(query) == rawDataType
}

func isRawDocumentQuery(query *Query) bool {
return query.Metrics[0].Type == rawDocumentType
return queryMetricType(query) == rawDocumentType
}

var (
bareTraceIDPattern = regexp.MustCompile(`^[0-9a-fA-F]{32}$`)
durationPattern = regexp.MustCompile(`(?i)^\s*(\d+(?:\.\d+)?)\s*(ns|us|ms|s|m|h)?\s*$`)
)

func firstMetricType(query *Query) string {
if query == nil || len(query.Metrics) == 0 {
return ""
}
return query.Metrics[0].Type
}

func queryMetricType(query *Query) string {
return firstMetricType(query)
}

func isBareTraceIDQuery(rawQuery string) bool {
return bareTraceIDPattern.MatchString(strings.TrimSpace(rawQuery))
}

func traceSearchSettingsQuery(query *Query) string {
if !isTraceSearchQuery(query) || len(query.Metrics) == 0 || query.Metrics[0].Settings == nil {
return ""
}

settings := query.Metrics[0].Settings
clauses := []string{}

if serviceName := strings.TrimSpace(settings.Get("serviceName").MustString()); serviceName != "" {
clauses = append(clauses, traceSearchPhraseClause("service_name", serviceName))
}
if spanName := strings.TrimSpace(settings.Get("spanName").MustString()); spanName != "" {
clauses = append(clauses, traceSearchPhraseClause("span_name", spanName))
}
if statusClause := traceSearchStatusClause(settings.Get("status").MustString()); statusClause != "" {
clauses = append(clauses, statusClause)
}
if minDuration, ok := traceSearchDurationMillis(settings.Get("minDuration").MustString()); ok {
clauses = append(clauses, "span_duration_millis:>="+minDuration)
}
if maxDuration, ok := traceSearchDurationMillis(settings.Get("maxDuration").MustString()); ok {
clauses = append(clauses, "span_duration_millis:<="+maxDuration)
}

return strings.Join(clauses, " AND ")
}

func traceSearchPhraseClause(fieldName, value string) string {
escaped := strings.ReplaceAll(value, `\`, `\\`)
escaped = strings.ReplaceAll(escaped, `"`, `\"`)
return fieldName + `:"` + escaped + `"`
}

func traceSearchStatusClause(status string) string {
// span_status is mapped as `tokenizer: raw`, so matches are exact tokens.
// OTel canonical strings are TitleCase ("Error"/"Ok"/"Unset") and the OTLP
// wire form is the integer enum (0/1/2) or "STATUS_CODE_*". Some pipelines
// lowercase, so cover all variants we've seen.
switch strings.ToLower(strings.TrimSpace(status)) {
case "error":
return `(span_status.code:Error OR span_status.code:ERROR OR span_status.code:error OR span_status.code:STATUS_CODE_ERROR OR span_status.code:2 OR span_attributes.error:true OR span_attributes.otel.status_code:ERROR)`
case "ok":
return `(span_status.code:Ok OR span_status.code:OK OR span_status.code:ok OR span_status.code:STATUS_CODE_OK OR span_status.code:1)`
case "unset":
return `(span_status.code:Unset OR span_status.code:UNSET OR span_status.code:unset OR span_status.code:STATUS_CODE_UNSET OR span_status.code:0)`
default:
return ""
}
}

func traceSearchDurationMillis(duration string) (string, bool) {
matches := durationPattern.FindStringSubmatch(duration)
if matches == nil {
return "", false
}

value, err := strconv.ParseFloat(matches[1], 64)
if err != nil {
return "", false
}

switch strings.ToLower(matches[2]) {
case "ns":
value = value / 1000000
case "us":
value = value / 1000
case "s":
value = value * 1000
case "m":
value = value * 60 * 1000
case "h":
value = value * 60 * 60 * 1000
case "ms", "":
default:
return "", false
}

return strconv.FormatFloat(value, 'f', -1, 64), true
}

func canInferTraceLink(query *Query) bool {
firstMetricType := firstMetricType(query)
return firstMetricType == "" || firstMetricType == logsType || firstMetricType == tracesType
}

func canTraceQueryTypeNormalize(query *Query) bool {
// queryType is a transient hint from internal trace links. Once the request
// has an explicit non-log/non-trace metric, the metric type is authoritative.
return query.QueryType == tracesType && canInferTraceLink(query)
}

func normalizeInternalLinkTraceQuery(query *Query) {
if query == nil {
return
}

if query.QueryType == tracesType && !canInferTraceLink(query) {
query.QueryType = ""
}

if !canTraceQueryTypeNormalize(query) &&
firstMetricType(query) != tracesType &&
!(firstMetricType(query) == "" && isBareTraceIDQuery(query.RawQuery)) {
return
}

rawQuery := strings.TrimSpace(query.RawQuery)
if isBareTraceIDQuery(rawQuery) {
query.RawQuery = "trace_id:" + rawQuery
}
query.QueryType = tracesType
query.BucketAggs = []*BucketAgg{}

if len(query.Metrics) == 0 {
query.Metrics = []*MetricAgg{
{
ID: "1",
Type: tracesType,
Settings: simplejson.NewFromAny(map[string]interface{}{"limit": "1000"}),
Meta: simplejson.New(),
},
}
return
}

query.Metrics = query.Metrics[:1]
query.Metrics[0].Type = tracesType
if query.Metrics[0].ID == "" {
query.Metrics[0].ID = "1"
}
if query.Metrics[0].Settings == nil {
query.Metrics[0].Settings = simplejson.New()
}
if query.Metrics[0].Settings.Get("limit").MustString() == "" {
query.Metrics[0].Settings.Set("limit", "1000")
}
}

func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
Expand All @@ -337,6 +515,18 @@ func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defa
}
}

func processTracesQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) {
metric := q.Metrics[0]
b.Sort(es.SortOrderAsc, defaultTimeField, "epoch_nanos_int")
b.Size(stringToIntWithDefaultValue(metric.Settings.Get("limit").MustString(), defaultSize))
}

func processTraceSearchQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) {
metric := q.Metrics[0]
b.Sort(es.SortOrderDesc, defaultTimeField, "epoch_nanos_int")
b.Size(stringToIntWithDefaultValue(metric.Settings.Get("spanLimit").MustString(), 5000))
}

func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
metric := q.Metrics[0]
b.Sort(es.SortOrderDesc, defaultTimeField, "epoch_nanos_int")
Expand Down
Loading
Loading