performance upgrades by using clickhouse for logs

This commit is contained in:
2026-05-31 20:39:33 +02:00
parent d3d60dcaa9
commit 9e95fe7403
10 changed files with 570 additions and 139 deletions
+9
View File
@@ -4,8 +4,17 @@ return [
'db' => [ 'db' => [
'path' => '/app/data/logging.db', 'path' => '/app/data/logging.db',
], ],
'clickhouse' => [
'host' => 'clickhouse',
'port' => 8123,
'database' => 'jakach_logging',
'username' => 'default',
'password' => '',
],
'worker' => [ 'worker' => [
'file_check_interval' => 500000, 'file_check_interval' => 500000,
'buffer_flush_interval_ms' => 100,
'buffer_max_rows' => 1000,
], ],
'sources' => [], 'sources' => [],
'rules' => [ 'rules' => [
+22 -5
View File
@@ -11,7 +11,8 @@ services:
- ./composer.json:/app/composer.json - ./composer.json:/app/composer.json
- data:/app/data - data:/app/data
depends_on: depends_on:
- redis clickhouse:
condition: service_healthy
restart: unless-stopped restart: unless-stopped
nginx: nginx:
@@ -41,13 +42,29 @@ services:
- data:/app/data - data:/app/data
- log_collect:/collect - log_collect:/collect
depends_on: depends_on:
- redis clickhouse:
condition: service_healthy
command: ["php", "bin/consume", "--daemon"] command: ["php", "bin/consume", "--daemon"]
restart: unless-stopped restart: unless-stopped
redis: clickhouse:
image: redis:7-alpine image: clickhouse/clickhouse-server:24.3-alpine
ports:
- "8123:8123"
- "9000:9000"
volumes:
- clickhouse_data:/var/lib/clickhouse
ulimits:
nofile:
soft: 262144
hard: 262144
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:8123/ping"]
interval: 5s
timeout: 3s
retries: 10
volumes: volumes:
data: data:
log_collect: log_collect:
clickhouse_data:
+2
View File
@@ -4,6 +4,8 @@ RUN apk add --no-cache linux-headers curl-dev \
&& docker-php-ext-install curl pcntl sockets || \ && docker-php-ext-install curl pcntl sockets || \
docker-php-ext-install curl pcntl docker-php-ext-install curl pcntl
RUN apk add --no-cache curl
COPY --from=composer:2 /usr/bin/composer /usr/bin/composer COPY --from=composer:2 /usr/bin/composer /usr/bin/composer
WORKDIR /app WORKDIR /app
+1 -1
View File
@@ -1,6 +1,6 @@
FROM php:8.3-cli-alpine FROM php:8.3-cli-alpine
RUN apk add --no-cache curl-dev git linux-headers RUN apk add --no-cache curl-dev git linux-headers curl
RUN docker-php-ext-install curl pcntl sockets RUN docker-php-ext-install curl pcntl sockets
+12
View File
@@ -2,4 +2,16 @@
mkdir -p /app/data mkdir -p /app/data
chmod -R 777 /app/data chmod -R 777 /app/data
rm -f /app/data/*.lock rm -f /app/data/*.lock
# Wait for ClickHouse to be ready
echo "Waiting for ClickHouse..."
for i in $(seq 1 30); do
if wget --spider -q http://clickhouse:8123/ping 2>/dev/null; then
echo "ClickHouse is ready"
break
fi
echo " attempt $i..."
sleep 1
done
exec docker-php-entrypoint "$@" exec docker-php-entrypoint "$@"
+14 -4
View File
@@ -164,7 +164,8 @@ class Router
private function health(): array private function health(): array
{ {
$dbOk = true; $sqliteOk = true;
$clickhouseOk = true;
$dbSize = 'unknown'; $dbSize = 'unknown';
try { try {
$this->repo->getAlerts(1); $this->repo->getAlerts(1);
@@ -176,12 +177,21 @@ class Router
: round($bytes / 1024, 1) . ' KB'); : round($bytes / 1024, 1) . ' KB');
} }
} catch (\Throwable) { } catch (\Throwable) {
$dbOk = false; $sqliteOk = false;
} }
try {
$this->repo->clickhouse()->query('SELECT 1');
} catch (\Throwable) {
$clickhouseOk = false;
}
$allOk = $sqliteOk && $clickhouseOk;
return [ return [
'status' => $dbOk ? 'ok' : 'degraded', 'status' => $allOk ? 'ok' : 'degraded',
'database' => $dbOk ? 'connected' : 'error', 'sqlite' => $sqliteOk ? 'connected' : 'error',
'clickhouse' => $clickhouseOk ? 'connected' : 'error',
'db_size' => $dbSize, 'db_size' => $dbSize,
'time' => date('c'), 'time' => date('c'),
]; ];
+128
View File
@@ -0,0 +1,128 @@
<?php
namespace Jakach\Logging\Storage;
class ClickHouseBuffer
{
private ClickHouseClient $client;
private int $maxRows;
private int $flushIntervalMs;
private array $logBuffer = [];
private array $alertBuffer = [];
private int $lastFlush;
private int $idCounter;
private const LOG_COLUMNS = ['id', 'line', 'source_id', 'source_name', 'level', 'created_at'];
private const ALERT_COLUMNS = ['id', 'rule_id', 'rule_name', 'severity', 'status', 'message', 'raw_line', 'source_id', 'source_name', 'created_at'];
public function __construct(
ClickHouseClient $client,
int $maxRows = 1000,
int $flushIntervalMs = 100,
) {
$this->client = $client;
$this->maxRows = $maxRows;
$this->flushIntervalMs = $flushIntervalMs;
$this->lastFlush = hrtime(true);
$this->idCounter = $this->loadMaxId();
}
private function loadMaxId(): int
{
try {
$rows = $this->client->query("SELECT max(id) as max_id FROM (SELECT max(id) as id FROM log_entries UNION ALL SELECT max(id) as id FROM alerts)");
$max = $rows[0]['max_id'] ?? null;
return $max ? (int) $max : 0;
} catch (\Throwable) {
return 0;
}
}
private function nextId(): int
{
return ++$this->idCounter;
}
public function pushLog(string $line, ?int $sourceId, ?string $sourceName, ?string $level): void
{
$this->logBuffer[] = [
'id' => $this->nextId(),
'line' => $line,
'source_id' => $sourceId,
'source_name' => $sourceName,
'level' => $level,
'created_at' => gmdate('Y-m-d H:i:s'),
];
if (count($this->logBuffer) >= $this->maxRows) {
$this->flush();
}
}
public function pushAlert(int $ruleId, string $ruleName, string $severity, string $status, string $message, string $rawLine, ?int $sourceId, ?string $sourceName): array
{
$id = $this->nextId();
$createdAt = gmdate('Y-m-d H:i:s');
$this->alertBuffer[] = [
'id' => $id,
'rule_id' => $ruleId,
'rule_name' => $ruleName,
'severity' => $severity,
'status' => $status,
'message' => $message,
'raw_line' => $rawLine,
'source_id' => $sourceId,
'source_name' => $sourceName,
'created_at' => $createdAt,
];
if (count($this->alertBuffer) >= $this->maxRows) {
$this->flush();
}
return ['id' => $id, 'created_at' => $createdAt];
}
public function tick(): void
{
$elapsed = (hrtime(true) - $this->lastFlush) / 1_000_000;
if ($elapsed >= $this->flushIntervalMs && ($this->logBuffer || $this->alertBuffer)) {
$this->flush();
}
}
public function flush(): void
{
$this->flushLogs();
$this->flushAlerts();
$this->lastFlush = hrtime(true);
}
public function flushLogs(): void
{
if (empty($this->logBuffer)) {
return;
}
$batch = $this->logBuffer;
$this->logBuffer = [];
try {
$this->client->insert('log_entries', self::LOG_COLUMNS, $batch);
} catch (\Throwable $e) {
error_log("ClickHouse log insert error: " . $e->getMessage());
}
}
public function flushAlerts(): void
{
if (empty($this->alertBuffer)) {
return;
}
$batch = $this->alertBuffer;
$this->alertBuffer = [];
try {
$this->client->insert('alerts', self::ALERT_COLUMNS, $batch);
} catch (\Throwable $e) {
error_log("ClickHouse alert insert error: " . $e->getMessage());
}
}
}
+208
View File
@@ -0,0 +1,208 @@
<?php
namespace Jakach\Logging\Storage;
class ClickHouseClient
{
private string $host;
private int $port;
private string $database;
private string $username;
private string $password;
private int $timeout;
public function __construct(
string $host = 'clickhouse',
int $port = 8123,
string $database = 'jakach_logging',
string $username = 'default',
string $password = '',
int $timeout = 5,
) {
$this->host = $host;
$this->port = $port;
$this->database = $database;
$this->username = $username;
$this->password = $password;
$this->timeout = $timeout;
}
private function url(): string
{
return "http://{$this->host}:{$this->port}/";
}
public function query(string $sql, array $params = []): array
{
if (!empty($params)) {
$sql = $this->formatParams($sql, $params);
}
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $this->url() . '?' . http_build_query([
'database' => $this->database,
'default_format' => 'JSONCompact',
]),
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => $sql,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => $this->timeout,
CURLOPT_CONNECTTIMEOUT => $this->timeout,
CURLOPT_HTTPHEADER => ['Content-Type: text/plain'],
]);
if ($this->username) {
curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
curl_setopt($ch, CURLOPT_USERPWD, $this->username . ':' . $this->password);
}
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
$error = curl_error($ch);
curl_close($ch);
if ($error) {
throw new \RuntimeException("ClickHouse connection error: $error");
}
if ($httpCode !== 200) {
throw new \RuntimeException("ClickHouse error (HTTP $httpCode): $response");
}
$decoded = json_decode($response, true);
if (json_last_error() !== JSON_ERROR_NONE) {
return [];
}
return $decoded['data'] ?? [];
}
public function execute(string $sql): void
{
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $this->url() . '?' . http_build_query([
'database' => $this->database,
]),
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => $sql,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => $this->timeout,
CURLOPT_CONNECTTIMEOUT => $this->timeout,
CURLOPT_HTTPHEADER => ['Content-Type: text/plain'],
]);
if ($this->username) {
curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
curl_setopt($ch, CURLOPT_USERPWD, $this->username . ':' . $this->password);
}
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
$error = curl_error($ch);
curl_close($ch);
if ($error) {
throw new \RuntimeException("ClickHouse connection error: $error");
}
if ($httpCode !== 200) {
if (str_contains($response, 'already exists')) {
return;
}
throw new \RuntimeException("ClickHouse error (HTTP $httpCode): $response");
}
}
public function insert(string $table, array $columns, array $rows): void
{
if (empty($rows)) {
return;
}
$escapedRows = [];
foreach ($rows as $row) {
$vals = [];
foreach ($columns as $col) {
$v = $row[$col] ?? null;
if ($v === null) {
$vals[] = 'NULL';
} elseif (is_int($v) || is_float($v)) {
$vals[] = (string) $v;
} else {
$vals[] = "'" . str_replace(['\\', "'"], ['\\\\', "\\'"], (string) $v) . "'";
}
}
$escapedRows[] = '(' . implode(',', $vals) . ')';
}
$sql = sprintf(
'INSERT INTO %s (%s) VALUES %s',
$table,
implode(', ', $columns),
implode(', ', $escapedRows)
);
$this->execute($sql);
}
private function formatParams(string $sql, array $params): string
{
$parts = explode('?', $sql);
$result = $parts[0];
for ($i = 0; $i < count($params); $i++) {
$v = $params[$i];
if ($v === null) {
$result .= 'NULL';
} elseif (is_int($v) || is_float($v)) {
$result .= (string) $v;
} else {
$result .= "'" . str_replace(["'", "\\"], ["\\'", "\\\\"], (string) $v) . "'";
}
$result .= $parts[$i + 1] ?? '';
}
return $result;
}
public function migrate(): void
{
$this->execute("CREATE DATABASE IF NOT EXISTS {$this->database}");
$this->execute("
CREATE TABLE IF NOT EXISTS log_entries (
id UInt64,
line String,
source_id Nullable(Int32),
source_name Nullable(String),
level Nullable(String),
created_at DateTime('UTC')
) ENGINE = MergeTree()
PARTITION BY toDate(created_at)
ORDER BY (created_at, id)
TTL toDate(created_at) + INTERVAL 90 DAY DELETE
SETTINGS index_granularity = 8192
");
$this->execute("
CREATE TABLE IF NOT EXISTS alerts (
id UInt64,
rule_id Int32,
rule_name String,
severity String,
status String,
message String,
raw_line String,
source_id Nullable(Int32),
source_name Nullable(String),
created_at DateTime('UTC')
) ENGINE = MergeTree()
PARTITION BY toDate(created_at)
ORDER BY (created_at, id)
TTL toDate(created_at) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192
");
$this->query("SELECT count() FROM log_entries");
}
}
+171 -129
View File
@@ -6,27 +6,54 @@ use Jakach\Logging\Model\{LogSource, Rule, Alert, AlertStatus, LogSourceType};
class Repository class Repository
{ {
private \PDO $pdo;
private ClickHouseClient $clickhouse;
private ClickHouseBuffer $buffer;
public function __construct( public function __construct(
private Database $db, private Database $db,
) {} ?ClickHouseClient $clickhouse = null,
?ClickHouseBuffer $buffer = null,
) {
$this->pdo = $db->pdo();
$this->clickhouse = $clickhouse ?? new ClickHouseClient();
$this->buffer = $buffer ?? new ClickHouseBuffer($this->clickhouse);
// --- Log Sources --- $this->clickhouse->migrate();
}
public function clickhouse(): ClickHouseClient
{
return $this->clickhouse;
}
public function buffer(): ClickHouseBuffer
{
return $this->buffer;
}
public function flush(): void
{
$this->buffer->flush();
}
// --- Log Sources (SQLite) ---
public function getSources(): array public function getSources(): array
{ {
$rows = $this->db->pdo()->query("SELECT * FROM log_sources ORDER BY name")->fetchAll(); $rows = $this->pdo->query("SELECT * FROM log_sources ORDER BY name")->fetchAll();
return array_map(fn(array $r) => LogSource::fromRow($r), $rows); return array_map(fn(array $r) => LogSource::fromRow($r), $rows);
} }
public function getActiveSources(): array public function getActiveSources(): array
{ {
$rows = $this->db->pdo()->query("SELECT * FROM log_sources WHERE active = 1 ORDER BY name")->fetchAll(); $rows = $this->pdo->query("SELECT * FROM log_sources WHERE active = 1 ORDER BY name")->fetchAll();
return array_map(fn(array $r) => LogSource::fromRow($r), $rows); return array_map(fn(array $r) => LogSource::fromRow($r), $rows);
} }
public function getSource(int $id): ?LogSource public function getSource(int $id): ?LogSource
{ {
$stmt = $this->db->pdo()->prepare("SELECT * FROM log_sources WHERE id = ?"); $stmt = $this->pdo->prepare("SELECT * FROM log_sources WHERE id = ?");
$stmt->execute([$id]); $stmt->execute([$id]);
$row = $stmt->fetch(); $row = $stmt->fetch();
return $row ? LogSource::fromRow($row) : null; return $row ? LogSource::fromRow($row) : null;
@@ -34,44 +61,44 @@ class Repository
public function createSource(string $name, LogSourceType $type, string $address, array $labels = []): LogSource public function createSource(string $name, LogSourceType $type, string $address, array $labels = []): LogSource
{ {
$stmt = $this->db->pdo()->prepare( $stmt = $this->pdo->prepare(
"INSERT INTO log_sources (name, type, address, labels) VALUES (?, ?, ?, ?)" "INSERT INTO log_sources (name, type, address, labels) VALUES (?, ?, ?, ?)"
); );
$stmt->execute([$name, $type->value, $address, json_encode($labels)]); $stmt->execute([$name, $type->value, $address, json_encode($labels)]);
return $this->getSource((int) $this->db->pdo()->lastInsertId()); return $this->getSource((int) $this->pdo->lastInsertId());
} }
public function deleteSource(int $id): void public function deleteSource(int $id): void
{ {
$this->db->pdo()->prepare("DELETE FROM log_sources WHERE id = ?")->execute([$id]); $this->pdo->prepare("DELETE FROM log_sources WHERE id = ?")->execute([$id]);
} }
public function updateSource(int $id, string $name, LogSourceType $type, string $address, array $labels = [], bool $active = true): LogSource public function updateSource(int $id, string $name, LogSourceType $type, string $address, array $labels = [], bool $active = true): LogSource
{ {
$stmt = $this->db->pdo()->prepare( $stmt = $this->pdo->prepare(
"UPDATE log_sources SET name = ?, type = ?, address = ?, labels = ?, active = ? WHERE id = ?" "UPDATE log_sources SET name = ?, type = ?, address = ?, labels = ?, active = ? WHERE id = ?"
); );
$stmt->execute([$name, $type->value, $address, json_encode($labels), (int) $active, $id]); $stmt->execute([$name, $type->value, $address, json_encode($labels), (int) $active, $id]);
return $this->getSource($id); return $this->getSource($id);
} }
// --- Rules --- // --- Rules (SQLite) ---
public function getRules(): array public function getRules(): array
{ {
$rows = $this->db->pdo()->query("SELECT * FROM rules ORDER BY name")->fetchAll(); $rows = $this->pdo->query("SELECT * FROM rules ORDER BY name")->fetchAll();
return array_map(fn(array $r) => Rule::fromRow($r), $rows); return array_map(fn(array $r) => Rule::fromRow($r), $rows);
} }
public function getActiveRules(): array public function getActiveRules(): array
{ {
$rows = $this->db->pdo()->query("SELECT * FROM rules WHERE active = 1 ORDER BY name")->fetchAll(); $rows = $this->pdo->query("SELECT * FROM rules WHERE active = 1 ORDER BY name")->fetchAll();
return array_map(fn(array $r) => Rule::fromRow($r), $rows); return array_map(fn(array $r) => Rule::fromRow($r), $rows);
} }
public function getRule(int $id): ?Rule public function getRule(int $id): ?Rule
{ {
$stmt = $this->db->pdo()->prepare("SELECT * FROM rules WHERE id = ?"); $stmt = $this->pdo->prepare("SELECT * FROM rules WHERE id = ?");
$stmt->execute([$id]); $stmt->execute([$id]);
$row = $stmt->fetch(); $row = $stmt->fetch();
return $row ? Rule::fromRow($row) : null; return $row ? Rule::fromRow($row) : null;
@@ -79,46 +106,68 @@ class Repository
public function createRule(string $name, string $pattern, string $severity, ?int $rateLimitSeconds = null): Rule public function createRule(string $name, string $pattern, string $severity, ?int $rateLimitSeconds = null): Rule
{ {
$stmt = $this->db->pdo()->prepare( $stmt = $this->pdo->prepare(
"INSERT INTO rules (name, pattern, severity, rate_limit_seconds) VALUES (?, ?, ?, ?)" "INSERT INTO rules (name, pattern, severity, rate_limit_seconds) VALUES (?, ?, ?, ?)"
); );
$stmt->execute([$name, $pattern, $severity, $rateLimitSeconds]); $stmt->execute([$name, $pattern, $severity, $rateLimitSeconds]);
return $this->getRule((int) $this->db->pdo()->lastInsertId()); return $this->getRule((int) $this->pdo->lastInsertId());
} }
public function deleteRule(int $id): void public function deleteRule(int $id): void
{ {
$this->db->pdo()->prepare("DELETE FROM rules WHERE id = ?")->execute([$id]); $this->pdo->prepare("DELETE FROM rules WHERE id = ?")->execute([$id]);
} }
public function updateRule(int $id, string $name, string $pattern, string $severity, ?int $rateLimitSeconds = null, bool $active = true): Rule public function updateRule(int $id, string $name, string $pattern, string $severity, ?int $rateLimitSeconds = null, bool $active = true): Rule
{ {
$stmt = $this->db->pdo()->prepare( $stmt = $this->pdo->prepare(
"UPDATE rules SET name = ?, pattern = ?, severity = ?, rate_limit_seconds = ?, active = ? WHERE id = ?" "UPDATE rules SET name = ?, pattern = ?, severity = ?, rate_limit_seconds = ?, active = ? WHERE id = ?"
); );
$stmt->execute([$name, $pattern, $severity, $rateLimitSeconds, (int) $active, $id]); $stmt->execute([$name, $pattern, $severity, $rateLimitSeconds, (int) $active, $id]);
return $this->getRule($id); return $this->getRule($id);
} }
// --- Alerts --- // --- Alerts (ClickHouse) ---
public function createAlert(int $ruleId, string $ruleName, string $severity, string $message, string $rawLine, ?int $sourceId = null, ?string $sourceName = null): Alert public function createAlert(int $ruleId, string $ruleName, string $severity, string $message, string $rawLine, ?int $sourceId = null, ?string $sourceName = null): Alert
{ {
$stmt = $this->db->pdo()->prepare( $result = $this->buffer->pushAlert(
"INSERT INTO alerts (rule_id, rule_name, severity, status, message, raw_line, source_id, source_name) ruleId: $ruleId,
VALUES (?, ?, ?, 'open', ?, ?, ?, ?)" ruleName: $ruleName,
severity: $severity,
status: 'open',
message: $message,
rawLine: $rawLine,
sourceId: $sourceId,
sourceName: $sourceName,
);
$createdAt = new \DateTimeImmutable($result['created_at']);
return new Alert(
id: $result['id'],
ruleId: $ruleId,
ruleName: $ruleName,
severity: AlertSeverity::from($severity),
status: AlertStatus::Open,
message: $message,
rawLine: $rawLine,
sourceId: $sourceId,
sourceName: $sourceName,
createdAt: $createdAt,
); );
$stmt->execute([$ruleId, $ruleName, $severity, $message, $rawLine, $sourceId, $sourceName]);
$id = (int) $this->db->pdo()->lastInsertId();
return $this->getAlert($id);
} }
public function getAlert(int $id): ?Alert public function getAlert(int $id): ?Alert
{ {
$stmt = $this->db->pdo()->prepare("SELECT * FROM alerts WHERE id = ?"); $rows = $this->clickhouse->query(
$stmt->execute([$id]); "SELECT * FROM alerts WHERE id = ? LIMIT 1",
$row = $stmt->fetch(); [$id]
return $row ? Alert::fromRow($row) : null; );
if (empty($rows)) {
return null;
}
return $this->alertFromRow($rows[0]);
} }
public function getAlerts(int $limit = 100, int $offset = 0, ?string $status = null, ?string $severity = null, ?string $since = null, ?string $until = null): array public function getAlerts(int $limit = 100, int $offset = 0, ?string $status = null, ?string $severity = null, ?string $since = null, ?string $until = null): array
@@ -127,19 +176,19 @@ class Repository
$params = []; $params = [];
if ($status) { if ($status) {
$where[] = 'status = ?'; $where[] = "status = ?";
$params[] = $status; $params[] = $status;
} }
if ($severity) { if ($severity) {
$where[] = 'severity = ?'; $where[] = "severity = ?";
$params[] = $severity; $params[] = $severity;
} }
if ($since) { if ($since) {
$where[] = 'created_at >= ?'; $where[] = "created_at >= ?";
$params[] = str_replace('T', ' ', $since); $params[] = str_replace('T', ' ', $since);
} }
if ($until) { if ($until) {
$where[] = 'created_at <= ?'; $where[] = "created_at <= ?";
$params[] = str_replace('T', ' ', $until); $params[] = str_replace('T', ' ', $until);
} }
@@ -147,52 +196,43 @@ class Repository
if ($where) { if ($where) {
$sql .= ' WHERE ' . implode(' AND ', $where); $sql .= ' WHERE ' . implode(' AND ', $where);
} }
$sql .= " ORDER BY created_at DESC LIMIT ? OFFSET ?"; $sql .= " ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?";
$params[] = $limit; $params[] = $limit;
$params[] = $offset; $params[] = $offset;
$stmt = $this->db->pdo()->prepare($sql); $rows = $this->clickhouse->query($sql, $params);
$stmt->execute($params); return array_map(fn(array $r) => $this->alertFromRow($r), $rows);
$rows = $stmt->fetchAll();
return array_map(fn(array $r) => Alert::fromRow($r), $rows);
} }
public function updateAlertStatus(int $id, AlertStatus $status): void public function updateAlertStatus(int $id, AlertStatus $status): void
{ {
$stmt = $this->db->pdo()->prepare("UPDATE alerts SET status = ? WHERE id = ?"); $this->clickhouse->execute(
$stmt->execute([$status->value, $id]); "ALTER TABLE alerts UPDATE status = '{$status->value}' WHERE id = {$id}"
);
} }
public function getAlertCounts(): array public function getAlertCounts(): array
{ {
return $this->db->pdo()->query( return $this->clickhouse->query(
"SELECT status, severity, COUNT(*) as count FROM alerts GROUP BY status, severity" "SELECT status, severity, count() as count FROM alerts GROUP BY status, severity"
)->fetchAll(); );
} }
public function searchAlerts(string $query, int $limit = 100): array public function searchAlerts(string $query, int $limit = 100): array
{ {
$stmt = $this->db->pdo()->prepare( $like = '%' . str_replace(['%', '_'], ['\%', '\_'], $query) . '%';
"SELECT a.* FROM alerts a $rows = $this->clickhouse->query(
JOIN alerts_fts fts ON a.id = fts.rowid "SELECT * FROM alerts WHERE message ILIKE ? OR raw_line ILIKE ? OR rule_name ILIKE ? ORDER BY created_at DESC LIMIT ?",
WHERE alerts_fts MATCH ? [$like, $like, $like, $limit]
ORDER BY rank
LIMIT ?"
); );
$stmt->execute([$query, $limit]); return array_map(fn(array $r) => $this->alertFromRow($r), $rows);
$rows = $stmt->fetchAll();
return array_map(fn(array $r) => Alert::fromRow($r), $rows);
} }
// --- Log Entries --- // --- Log Entries (ClickHouse) ---
public function storeLogEntry(string $line, ?int $sourceId = null, ?string $sourceName = null, ?string $level = null): void public function storeLogEntry(string $line, ?int $sourceId = null, ?string $sourceName = null, ?string $level = null): void
{ {
$stmt = $this->db->pdo()->prepare( $this->buffer->pushLog($line, $sourceId, $sourceName, $level);
"INSERT INTO log_entries (line, source_id, source_name, level) VALUES (?, ?, ?, ?)"
);
$stmt->execute([$line, $sourceId, $sourceName, $level]);
} }
public function searchLogEntries(string $query, int $limit = 200, int $offset = 0, ?string $since = null, ?string $until = null): array public function searchLogEntries(string $query, int $limit = 200, int $offset = 0, ?string $since = null, ?string $until = null): array
@@ -206,11 +246,11 @@ class Repository
$params = []; $params = [];
if ($since) { if ($since) {
$where[] = 'e.created_at >= ?'; $where[] = 'created_at >= ?';
$params[] = str_replace('T', ' ', $since); $params[] = str_replace('T', ' ', $since);
} }
if ($until) { if ($until) {
$where[] = 'e.created_at <= ?'; $where[] = 'created_at <= ?';
$params[] = str_replace('T', ' ', $until); $params[] = str_replace('T', ' ', $until);
} }
@@ -219,29 +259,25 @@ class Repository
if ($where) { if ($where) {
$sql .= ' WHERE ' . implode(' AND ', $where); $sql .= ' WHERE ' . implode(' AND ', $where);
} }
$sql .= " ORDER BY e.created_at DESC LIMIT ? OFFSET ?"; $sql .= " ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?";
$params[] = $limit; $params[] = $limit;
$params[] = $offset; $params[] = $offset;
$stmt = $this->db->pdo()->prepare($sql); return $this->clickhouse->query($sql, $params);
$stmt->execute($params);
return $stmt->fetchAll();
} }
$like = $this->toLikePattern($query); $like = $this->toLikePattern($query);
$where[] = 'e.line LIKE ?'; $where[] = 'line ILIKE ?';
$params[] = $like; $params[] = $like;
$sql = "SELECT e.* FROM log_entries e";
$sql = "SELECT * FROM log_entries";
if ($where) { if ($where) {
$sql .= ' WHERE ' . implode(' AND ', $where); $sql .= ' WHERE ' . implode(' AND ', $where);
} }
$sql .= " ORDER BY e.created_at DESC LIMIT ? OFFSET ?"; $sql .= " ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?";
$params[] = $limit; $params[] = $limit;
$params[] = $offset; $params[] = $offset;
$stmt = $this->db->pdo()->prepare($sql); return $this->clickhouse->query($sql, $params);
$stmt->execute($params);
return $stmt->fetchAll();
} }
private function toLikePattern(string $query): string private function toLikePattern(string $query): string
@@ -265,11 +301,11 @@ class Repository
return implode('%', $likeParts); return implode('%', $likeParts);
} }
// --- Config --- // --- Config (SQLite) ---
public function getAllowedUserTokens(): array public function getAllowedUserTokens(): array
{ {
$stmt = $this->db->pdo()->prepare("SELECT value FROM config WHERE key = ?"); $stmt = $this->pdo->prepare("SELECT value FROM config WHERE key = ?");
$stmt->execute(['allowed_user_tokens']); $stmt->execute(['allowed_user_tokens']);
$row = $stmt->fetch(); $row = $stmt->fetch();
if (!$row || empty($row['value'])) { if (!$row || empty($row['value'])) {
@@ -285,7 +321,7 @@ class Repository
public function getConfig(string $key, mixed $default = null): mixed public function getConfig(string $key, mixed $default = null): mixed
{ {
$stmt = $this->db->pdo()->prepare("SELECT value FROM config WHERE key = ?"); $stmt = $this->pdo->prepare("SELECT value FROM config WHERE key = ?");
$stmt->execute([$key]); $stmt->execute([$key]);
$row = $stmt->fetch(); $row = $stmt->fetch();
return $row ? $row['value'] : $default; return $row ? $row['value'] : $default;
@@ -293,27 +329,27 @@ class Repository
public function setConfig(string $key, string $value): void public function setConfig(string $key, string $value): void
{ {
$stmt = $this->db->pdo()->prepare( $stmt = $this->pdo->prepare(
"INSERT INTO config (key, value) VALUES (?, ?) "INSERT INTO config (key, value) VALUES (?, ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value" ON CONFLICT(key) DO UPDATE SET value = excluded.value"
); );
$stmt->execute([$key, $value]); $stmt->execute([$key, $value]);
} }
// --- Rate Limiting --- // --- Rate Limiting (SQLite) ---
public function checkRateLimit(int $ruleId, int $windowSeconds): bool public function checkRateLimit(int $ruleId, int $windowSeconds): bool
{ {
$now = time(); $now = time();
$window = intdiv($now, $windowSeconds) * $windowSeconds; $window = intdiv($now, $windowSeconds) * $windowSeconds;
$this->db->pdo()->prepare( $this->pdo->prepare(
"INSERT INTO rate_limiter (rule_id, window_start, count) "INSERT INTO rate_limiter (rule_id, window_start, count)
VALUES (?, ?, 1) VALUES (?, ?, 1)
ON CONFLICT(rule_id, window_start) DO UPDATE SET count = count + 1" ON CONFLICT(rule_id, window_start) DO UPDATE SET count = count + 1"
)->execute([$ruleId, $window]); )->execute([$ruleId, $window]);
$stmt = $this->db->pdo()->prepare( $stmt = $this->pdo->prepare(
"SELECT count FROM rate_limiter WHERE rule_id = ? AND window_start = ?" "SELECT count FROM rate_limiter WHERE rule_id = ? AND window_start = ?"
); );
$stmt->execute([$ruleId, $window]); $stmt->execute([$ruleId, $window]);
@@ -322,28 +358,28 @@ class Repository
return $row['count'] <= 1; return $row['count'] <= 1;
} }
// --- False Positives --- // --- False Positives (SQLite) ---
public function getFalsePositives(): array public function getFalsePositives(): array
{ {
return $this->db->pdo()->query( return $this->pdo->query(
"SELECT id, pattern, description, created_at FROM false_positives ORDER BY id" "SELECT id, pattern, description, created_at FROM false_positives ORDER BY id"
)->fetchAll(); )->fetchAll();
} }
public function createFalsePositive(string $pattern, string $description = ''): array public function createFalsePositive(string $pattern, string $description = ''): array
{ {
$stmt = $this->db->pdo()->prepare( $stmt = $this->pdo->prepare(
"INSERT INTO false_positives (pattern, description) VALUES (?, ?)" "INSERT INTO false_positives (pattern, description) VALUES (?, ?)"
); );
$stmt->execute([$pattern, $description]); $stmt->execute([$pattern, $description]);
$id = (int) $this->db->pdo()->lastInsertId(); $id = (int) $this->pdo->lastInsertId();
return $this->getFalsePositive($id); return $this->getFalsePositive($id);
} }
public function getFalsePositive(int $id): ?array public function getFalsePositive(int $id): ?array
{ {
$stmt = $this->db->pdo()->prepare( $stmt = $this->pdo->prepare(
"SELECT id, pattern, description, created_at FROM false_positives WHERE id = ?" "SELECT id, pattern, description, created_at FROM false_positives WHERE id = ?"
); );
$stmt->execute([$id]); $stmt->execute([$id]);
@@ -353,12 +389,12 @@ class Repository
public function deleteFalsePositive(int $id): void public function deleteFalsePositive(int $id): void
{ {
$this->db->pdo()->prepare("DELETE FROM false_positives WHERE id = ?")->execute([$id]); $this->pdo->prepare("DELETE FROM false_positives WHERE id = ?")->execute([$id]);
} }
public function isFalsePositive(string $line): bool public function isFalsePositive(string $line): bool
{ {
$patterns = $this->db->pdo()->query( $patterns = $this->pdo->query(
"SELECT pattern FROM false_positives" "SELECT pattern FROM false_positives"
)->fetchAll(\PDO::FETCH_COLUMN); )->fetchAll(\PDO::FETCH_COLUMN);
@@ -370,11 +406,11 @@ class Repository
return false; return false;
} }
// --- Audit Log --- // --- Audit Log (SQLite) ---
public function logAudit(string $action, string $entityType, ?int $entityId = null, ?string $details = null, ?string $username = null): void public function logAudit(string $action, string $entityType, ?int $entityId = null, ?string $details = null, ?string $username = null): void
{ {
$stmt = $this->db->pdo()->prepare( $stmt = $this->pdo->prepare(
"INSERT INTO audit_log (action, entity_type, entity_id, details, username) VALUES (?, ?, ?, ?, ?)" "INSERT INTO audit_log (action, entity_type, entity_id, details, username) VALUES (?, ?, ?, ?, ?)"
); );
$stmt->execute([$action, $entityType, $entityId, $details, $username]); $stmt->execute([$action, $entityType, $entityId, $details, $username]);
@@ -382,53 +418,41 @@ class Repository
public function getAuditLog(int $limit = 50): array public function getAuditLog(int $limit = 50): array
{ {
$stmt = $this->db->pdo()->prepare( $stmt = $this->pdo->prepare(
"SELECT * FROM audit_log ORDER BY created_at DESC LIMIT ?" "SELECT * FROM audit_log ORDER BY created_at DESC LIMIT ?"
); );
$stmt->execute([$limit]); $stmt->execute([$limit]);
return $stmt->fetchAll(); return $stmt->fetchAll();
} }
// --- Retention --- // --- Retention (ClickHouse TTL handles it; but keep purge for SQLite cleanup) ---
public function purgeOldData(int $logDays = 30, int $alertDays = 90): array public function purgeOldData(int $logDays = 30, int $alertDays = 90): array
{ {
$stmt = $this->db->pdo()->prepare( $this->pdo->exec("DELETE FROM rate_limiter WHERE window_start < " . (time() - 86400));
"DELETE FROM log_entries WHERE created_at < datetime('now', ?)" return ['log_entries_deleted' => -1, 'alerts_deleted' => -1, 'note' => 'ClickHouse TTL manages retention automatically'];
);
$stmt->execute(['-' . $logDays . ' days']);
$deletedLogs = $stmt->rowCount();
$stmt = $this->db->pdo()->prepare(
"DELETE FROM alerts WHERE status = 'resolved' AND created_at < datetime('now', ?)"
);
$stmt->execute(['-' . $alertDays . ' days']);
$deletedAlerts = $stmt->rowCount();
$this->db->pdo()->exec("DELETE FROM rate_limiter WHERE window_start < " . (time() - 86400));
return ['log_entries_deleted' => $deletedLogs, 'alerts_deleted' => $deletedAlerts];
} }
// --- Log Context --- // --- Log Context (ClickHouse) ---
public function getLogContext(int $id, int $before = 5, int $after = 5): array public function getLogContext(int $id, int $before = 5, int $after = 5): array
{ {
$beforeStmt = $this->db->pdo()->prepare( $beforeRows = $this->clickhouse->query(
"SELECT * FROM log_entries WHERE id < ? ORDER BY id DESC LIMIT ?" "SELECT * FROM log_entries WHERE id < ? ORDER BY id DESC LIMIT ?",
[$id, $before]
); );
$beforeStmt->execute([$id, $before]); $beforeRows = array_reverse($beforeRows);
$beforeRows = array_reverse($beforeStmt->fetchAll());
$current = $this->db->pdo()->prepare("SELECT * FROM log_entries WHERE id = ?"); $currentRows = $this->clickhouse->query(
$current->execute([$id]); "SELECT * FROM log_entries WHERE id = ? LIMIT 1",
$currentRow = $current->fetch(); [$id]
);
$afterStmt = $this->db->pdo()->prepare( $currentRow = $currentRows[0] ?? null;
"SELECT * FROM log_entries WHERE id > ? ORDER BY id ASC LIMIT ?"
$afterRows = $this->clickhouse->query(
"SELECT * FROM log_entries WHERE id > ? ORDER BY id ASC LIMIT ?",
[$id, $after]
); );
$afterStmt->execute([$id, $after]);
$afterRows = $afterStmt->fetchAll();
return [ return [
'before' => $beforeRows, 'before' => $beforeRows,
@@ -437,34 +461,52 @@ class Repository
]; ];
} }
// --- Bulk Operations --- // --- Bulk Operations (ClickHouse) ---
public function bulkUpdateAlertStatus(array $ids, AlertStatus $status): int public function bulkUpdateAlertStatus(array $ids, AlertStatus $status): int
{ {
if (empty($ids)) return 0; if (empty($ids)) return 0;
$placeholders = implode(',', array_fill(0, count($ids), '?')); $idList = implode(',', array_map('intval', $ids));
$stmt = $this->db->pdo()->prepare( $this->clickhouse->execute(
"UPDATE alerts SET status = ? WHERE id IN ($placeholders)" "ALTER TABLE alerts UPDATE status = '{$status->value}' WHERE id IN ({$idList})"
); );
$stmt->execute(array_merge([$status->value], $ids)); return count($ids);
return $stmt->rowCount();
} }
public function exportAlerts(array $ids): array public function exportAlerts(array $ids): array
{ {
if (empty($ids)) return []; if (empty($ids)) return [];
$placeholders = implode(',', array_fill(0, count($ids), '?')); $idList = implode(',', array_map('intval', $ids));
$stmt = $this->db->pdo()->prepare("SELECT * FROM alerts WHERE id IN ($placeholders) ORDER BY id"); $rows = $this->clickhouse->query(
$stmt->execute($ids); "SELECT * FROM alerts WHERE id IN ({$idList}) ORDER BY id"
return array_map(fn(array $r) => Alert::fromRow($r), $stmt->fetchAll()); );
return array_map(fn(array $r) => $this->alertFromRow($r), $rows);
} }
public function exportLogs(array $ids): array public function exportLogs(array $ids): array
{ {
if (empty($ids)) return []; if (empty($ids)) return [];
$placeholders = implode(',', array_fill(0, count($ids), '?')); $idList = implode(',', array_map('intval', $ids));
$stmt = $this->db->pdo()->prepare("SELECT * FROM log_entries WHERE id IN ($placeholders) ORDER BY id"); return $this->clickhouse->query(
$stmt->execute($ids); "SELECT * FROM log_entries WHERE id IN ({$idList}) ORDER BY id"
return $stmt->fetchAll(); );
}
// --- Helpers ---
private function alertFromRow(array $row): Alert
{
return new Alert(
id: (int) $row['id'],
ruleId: (int) $row['rule_id'],
ruleName: $row['rule_name'],
severity: AlertSeverity::from($row['severity']),
status: AlertStatus::from($row['status']),
message: $row['message'],
rawLine: $row['raw_line'],
sourceId: isset($row['source_id']) ? (int) $row['source_id'] : null,
sourceName: $row['source_name'] ?? null,
createdAt: new \DateTimeImmutable($row['created_at']),
);
} }
} }
+3
View File
@@ -48,6 +48,7 @@ class Orchestrator
pcntl_signal_dispatch(); pcntl_signal_dispatch();
$this->fileWatcher->tick(); $this->fileWatcher->tick();
$this->socketListener->tick(); $this->socketListener->tick();
$this->repo->buffer()->tick();
if (time() - $this->lastCleanup > 3600) { if (time() - $this->lastCleanup > 3600) {
$this->lastCleanup = time(); $this->lastCleanup = time();
@@ -103,6 +104,8 @@ class Orchestrator
{ {
$this->fileWatcher->stop(); $this->fileWatcher->stop();
$this->socketListener->stop(); $this->socketListener->stop();
fprintf(STDERR, "Flushing remaining buffers...\n");
$this->repo->flush();
fprintf(STDERR, "Worker stopped\n"); fprintf(STDERR, "Worker stopped\n");
} }
} }