Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 212 additions & 1 deletion src/Audit/Adapter/ClickHouse.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ClickHouse extends SQL

private const DEFAULT_TABLE = 'audits';

private const SLOW_QUERIES_TABLE = 'slow_queries';

private const DEFAULT_DATABASE = 'default';

private string $host;
Expand Down Expand Up @@ -59,7 +61,7 @@ public function __construct(
string $username = 'default',
string $password = '',
int $port = self::DEFAULT_PORT,
bool $secure = false
bool $secure = false,
) {
$this->validateHost($host);
$this->validatePort($port);
Expand Down Expand Up @@ -472,6 +474,22 @@ private function getTableName(): string
return $tableName;
}

/**
* Get the slow_queries table name with namespace prefix.
*
* @return string
*/
private function getSlowQueryTableName(): string
{
$tableName = self::SLOW_QUERIES_TABLE;

if (!empty($this->namespace)) {
$tableName = $this->namespace . '_' . $tableName;
}

return $tableName;
}

/**
* Execute a ClickHouse query via HTTP interface using Fetch Client.
*
Expand Down Expand Up @@ -1574,4 +1592,197 @@ public function cleanup(\DateTime $datetime): bool

return true;
}

/**
* Setup the slow_queries ClickHouse table.
*
* Creates a dedicated table for logging slow database queries with a
* purpose-built schema optimized for analytical queries on duration,
* action, and project.
*
* @throws Exception
*/
public function setupSlowQueries(): void
{
$escapedDatabase = $this->escapeIdentifier($this->database);
$this->query("CREATE DATABASE IF NOT EXISTS {$escapedDatabase}");

$columns = [
'id String',
'time DateTime64(3)',
'durationMs UInt32',
'method LowCardinality(String)',
'action LowCardinality(String)',
'path String',
'uri String',
'hostname String',
'projectId String',
'userId String',
'statusCode UInt16',
'plan LowCardinality(String)',
'ip String',
'userAgent String',
'data String',
];

if ($this->sharedTables) {
$columns[] = 'tenant Nullable(UInt64)';
}

$tableName = $this->getSlowQueryTableName();
$escapedTable = $this->escapeIdentifier($this->database) . '.' . $this->escapeIdentifier($tableName);

$createTableSql = "
CREATE TABLE IF NOT EXISTS {$escapedTable} (
" . implode(",\n ", $columns) . ",
INDEX idx_duration durationMs TYPE minmax GRANULARITY 4
)
ENGINE = MergeTree()
ORDER BY (time, id)
PARTITION BY toYYYYMM(time)
SETTINGS index_granularity = 8192
";

$this->query($createTableSql);
}

/**
* Log a slow query to the slow_queries table.
*
* @param array{
* durationMs: int,
* method: string,
* action: string,
* path: string,
* uri: string,
* hostname: string,
* projectId: string,
* userId: string,
* statusCode: int,
* plan: string,
* ip: string,
* userAgent: string,
* data: string,
* time?: \DateTime|string|null,
* } $data
* @throws Exception
*/
public function createSlowQuery(array $data): void
{
$tableName = $this->getSlowQueryTableName();
$escapedTable = $this->escapeIdentifier($this->database) . '.' . $this->escapeIdentifier($tableName);

$row = [
'id' => uniqid('', true),
'time' => $this->formatDateTime($data['time'] ?? null),
'durationMs' => $data['durationMs'],
'method' => $data['method'],
'action' => $data['action'],
'path' => $data['path'],
'uri' => $data['uri'],
'hostname' => $data['hostname'],
'projectId' => $data['projectId'],
'userId' => $data['userId'],
'statusCode' => $data['statusCode'],
'plan' => $data['plan'],
'ip' => $data['ip'],
'userAgent' => $data['userAgent'],
'data' => $data['data'],
];

if ($this->sharedTables) {
$row['tenant'] = $this->tenant;
}

$this->query("INSERT INTO {$escapedTable} FORMAT JSONEachRow", [], [$row]);
}

/**
* Find slow query logs within a time range.
*
* @param \DateTime|null $after Only return logs after this time
* @param \DateTime|null $before Only return logs before this time
* @param int $limit Maximum number of results
* @param int $offset Number of results to skip
* @return array<int, array<string, mixed>>
* @throws Exception
*/
public function findSlowQueries(
?\DateTime $after = null,
?\DateTime $before = null,
int $limit = 25,
int $offset = 0,
): array {
$tableName = $this->getSlowQueryTableName();
$escapedTable = $this->escapeIdentifier($this->database) . '.' . $this->escapeIdentifier($tableName);
$tenantFilter = $this->getTenantFilter();

$conditions = [];
$params = [];

if ($tenantFilter) {
$conditions[] = ltrim($tenantFilter, ' AND');
}

if ($after !== null) {
$conditions[] = 'time > {after:DateTime64(3)}';
$params['after'] = $this->formatDateTime($after);
}

if ($before !== null) {
$conditions[] = 'time < {before:DateTime64(3)}';
$params['before'] = $this->formatDateTime($before);
}

$whereClause = !empty($conditions) ? ' WHERE ' . implode(' AND ', $conditions) : '';
$params['limit'] = $limit;
$params['offset'] = $offset;

$sql = "
SELECT *
FROM {$escapedTable}{$whereClause}
ORDER BY time DESC
LIMIT {limit:UInt64} OFFSET {offset:UInt64}
FORMAT JSON
";

$result = $this->query($sql, $params);

if (empty(trim($result))) {
return [];
}

/** @var array<string, mixed>|null $decoded */
$decoded = json_decode($result, true);
if ($decoded === null || !isset($decoded['data']) || !is_array($decoded['data'])) {
return [];
}

return $decoded['data'];
}

/**
* Delete slow query logs older than the specified datetime.
*
* @param \DateTime $datetime Delete logs older than this time
* @return bool
* @throws Exception
*/
public function cleanupSlowQueries(\DateTime $datetime): bool
{
$tableName = $this->getSlowQueryTableName();
$tenantFilter = $this->getTenantFilter();
$escapedTable = $this->escapeIdentifier($this->database) . '.' . $this->escapeIdentifier($tableName);

$datetimeString = $datetime->format('Y-m-d H:i:s.v');

$sql = "
DELETE FROM {$escapedTable}
WHERE time < {datetime:String}{$tenantFilter}
";

$this->query($sql, ['datetime' => $datetimeString]);

return true;
}
}
Loading