diff --git a/config/default.php b/config/default.php index b6373d6..c1db007 100644 --- a/config/default.php +++ b/config/default.php @@ -4,8 +4,17 @@ return [ 'db' => [ 'path' => '/app/data/logging.db', ], + 'clickhouse' => [ + 'host' => 'clickhouse', + 'port' => 8123, + 'database' => 'jakach_logging', + 'username' => 'default', + 'password' => '', + ], 'worker' => [ 'file_check_interval' => 500000, + 'buffer_flush_interval_ms' => 100, + 'buffer_max_rows' => 1000, ], 'sources' => [], 'rules' => [ diff --git a/docker-compose.yml b/docker-compose.yml index 403355c..c750835 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,8 @@ services: - ./composer.json:/app/composer.json - data:/app/data depends_on: - - redis + clickhouse: + condition: service_healthy restart: unless-stopped nginx: @@ -41,13 +42,29 @@ services: - data:/app/data - log_collect:/collect depends_on: - - redis + clickhouse: + condition: service_healthy command: ["php", "bin/consume", "--daemon"] restart: unless-stopped - redis: - image: redis:7-alpine + clickhouse: + image: clickhouse/clickhouse-server:24.3-alpine + ports: + - "8123:8123" + - "9000:9000" + volumes: + - clickhouse_data:/var/lib/clickhouse + ulimits: + nofile: + soft: 262144 + hard: 262144 + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:8123/ping"] + interval: 5s + timeout: 3s + retries: 10 volumes: data: - log_collect: \ No newline at end of file + log_collect: + clickhouse_data: \ No newline at end of file diff --git a/docker/Dockerfile.api b/docker/Dockerfile.api index 7b2f849..8cb17a1 100644 --- a/docker/Dockerfile.api +++ b/docker/Dockerfile.api @@ -4,6 +4,8 @@ RUN apk add --no-cache linux-headers curl-dev \ && docker-php-ext-install curl pcntl sockets || \ docker-php-ext-install curl pcntl +RUN apk add --no-cache curl + COPY --from=composer:2 /usr/bin/composer /usr/bin/composer WORKDIR /app diff --git a/docker/Dockerfile.php b/docker/Dockerfile.php index f149f7a..facc7c7 100644 --- a/docker/Dockerfile.php +++ b/docker/Dockerfile.php @@ -1,6 +1,6 @@ FROM php:8.3-cli-alpine -RUN apk add --no-cache curl-dev git linux-headers +RUN apk add --no-cache curl-dev git linux-headers curl RUN docker-php-ext-install curl pcntl sockets diff --git a/docker/entrypoint-api.sh b/docker/entrypoint-api.sh index 7892d9b..43ccf2f 100755 --- a/docker/entrypoint-api.sh +++ b/docker/entrypoint-api.sh @@ -2,4 +2,16 @@ mkdir -p /app/data chmod -R 777 /app/data rm -f /app/data/*.lock + +# Wait for ClickHouse to be ready +echo "Waiting for ClickHouse..." +for i in $(seq 1 30); do + if wget --spider -q http://clickhouse:8123/ping 2>/dev/null; then + echo "ClickHouse is ready" + break + fi + echo " attempt $i..." + sleep 1 +done + exec docker-php-entrypoint "$@" \ No newline at end of file diff --git a/src/Api/Router.php b/src/Api/Router.php index 383efe8..77b4d4a 100644 --- a/src/Api/Router.php +++ b/src/Api/Router.php @@ -164,7 +164,8 @@ class Router private function health(): array { - $dbOk = true; + $sqliteOk = true; + $clickhouseOk = true; $dbSize = 'unknown'; try { $this->repo->getAlerts(1); @@ -176,12 +177,21 @@ class Router : round($bytes / 1024, 1) . ' KB'); } } catch (\Throwable) { - $dbOk = false; + $sqliteOk = false; } + try { + $this->repo->clickhouse()->query('SELECT 1'); + } catch (\Throwable) { + $clickhouseOk = false; + } + + $allOk = $sqliteOk && $clickhouseOk; + return [ - 'status' => $dbOk ? 'ok' : 'degraded', - 'database' => $dbOk ? 'connected' : 'error', + 'status' => $allOk ? 'ok' : 'degraded', + 'sqlite' => $sqliteOk ? 'connected' : 'error', + 'clickhouse' => $clickhouseOk ? 'connected' : 'error', 'db_size' => $dbSize, 'time' => date('c'), ]; diff --git a/src/Storage/ClickHouseBuffer.php b/src/Storage/ClickHouseBuffer.php new file mode 100644 index 0000000..7e782a6 --- /dev/null +++ b/src/Storage/ClickHouseBuffer.php @@ -0,0 +1,128 @@ +client = $client; + $this->maxRows = $maxRows; + $this->flushIntervalMs = $flushIntervalMs; + $this->lastFlush = hrtime(true); + $this->idCounter = $this->loadMaxId(); + } + + private function loadMaxId(): int + { + try { + $rows = $this->client->query("SELECT max(id) as max_id FROM (SELECT max(id) as id FROM log_entries UNION ALL SELECT max(id) as id FROM alerts)"); + $max = $rows[0]['max_id'] ?? null; + return $max ? (int) $max : 0; + } catch (\Throwable) { + return 0; + } + } + + private function nextId(): int + { + return ++$this->idCounter; + } + + public function pushLog(string $line, ?int $sourceId, ?string $sourceName, ?string $level): void + { + $this->logBuffer[] = [ + 'id' => $this->nextId(), + 'line' => $line, + 'source_id' => $sourceId, + 'source_name' => $sourceName, + 'level' => $level, + 'created_at' => gmdate('Y-m-d H:i:s'), + ]; + + if (count($this->logBuffer) >= $this->maxRows) { + $this->flush(); + } + } + + public function pushAlert(int $ruleId, string $ruleName, string $severity, string $status, string $message, string $rawLine, ?int $sourceId, ?string $sourceName): array + { + $id = $this->nextId(); + $createdAt = gmdate('Y-m-d H:i:s'); + $this->alertBuffer[] = [ + 'id' => $id, + 'rule_id' => $ruleId, + 'rule_name' => $ruleName, + 'severity' => $severity, + 'status' => $status, + 'message' => $message, + 'raw_line' => $rawLine, + 'source_id' => $sourceId, + 'source_name' => $sourceName, + 'created_at' => $createdAt, + ]; + + if (count($this->alertBuffer) >= $this->maxRows) { + $this->flush(); + } + + return ['id' => $id, 'created_at' => $createdAt]; + } + + public function tick(): void + { + $elapsed = (hrtime(true) - $this->lastFlush) / 1_000_000; + if ($elapsed >= $this->flushIntervalMs && ($this->logBuffer || $this->alertBuffer)) { + $this->flush(); + } + } + + public function flush(): void + { + $this->flushLogs(); + $this->flushAlerts(); + $this->lastFlush = hrtime(true); + } + + public function flushLogs(): void + { + if (empty($this->logBuffer)) { + return; + } + $batch = $this->logBuffer; + $this->logBuffer = []; + try { + $this->client->insert('log_entries', self::LOG_COLUMNS, $batch); + } catch (\Throwable $e) { + error_log("ClickHouse log insert error: " . $e->getMessage()); + } + } + + public function flushAlerts(): void + { + if (empty($this->alertBuffer)) { + return; + } + $batch = $this->alertBuffer; + $this->alertBuffer = []; + try { + $this->client->insert('alerts', self::ALERT_COLUMNS, $batch); + } catch (\Throwable $e) { + error_log("ClickHouse alert insert error: " . $e->getMessage()); + } + } +} \ No newline at end of file diff --git a/src/Storage/ClickHouseClient.php b/src/Storage/ClickHouseClient.php new file mode 100644 index 0000000..6847b37 --- /dev/null +++ b/src/Storage/ClickHouseClient.php @@ -0,0 +1,208 @@ +host = $host; + $this->port = $port; + $this->database = $database; + $this->username = $username; + $this->password = $password; + $this->timeout = $timeout; + } + + private function url(): string + { + return "http://{$this->host}:{$this->port}/"; + } + + public function query(string $sql, array $params = []): array + { + if (!empty($params)) { + $sql = $this->formatParams($sql, $params); + } + + $ch = curl_init(); + curl_setopt_array($ch, [ + CURLOPT_URL => $this->url() . '?' . http_build_query([ + 'database' => $this->database, + 'default_format' => 'JSONCompact', + ]), + CURLOPT_POST => true, + CURLOPT_POSTFIELDS => $sql, + CURLOPT_RETURNTRANSFER => true, + CURLOPT_TIMEOUT => $this->timeout, + CURLOPT_CONNECTTIMEOUT => $this->timeout, + CURLOPT_HTTPHEADER => ['Content-Type: text/plain'], + ]); + + if ($this->username) { + curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); + curl_setopt($ch, CURLOPT_USERPWD, $this->username . ':' . $this->password); + } + + $response = curl_exec($ch); + $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); + $error = curl_error($ch); + curl_close($ch); + + if ($error) { + throw new \RuntimeException("ClickHouse connection error: $error"); + } + + if ($httpCode !== 200) { + throw new \RuntimeException("ClickHouse error (HTTP $httpCode): $response"); + } + + $decoded = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + return []; + } + + return $decoded['data'] ?? []; + } + + public function execute(string $sql): void + { + $ch = curl_init(); + curl_setopt_array($ch, [ + CURLOPT_URL => $this->url() . '?' . http_build_query([ + 'database' => $this->database, + ]), + CURLOPT_POST => true, + CURLOPT_POSTFIELDS => $sql, + CURLOPT_RETURNTRANSFER => true, + CURLOPT_TIMEOUT => $this->timeout, + CURLOPT_CONNECTTIMEOUT => $this->timeout, + CURLOPT_HTTPHEADER => ['Content-Type: text/plain'], + ]); + + if ($this->username) { + curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); + curl_setopt($ch, CURLOPT_USERPWD, $this->username . ':' . $this->password); + } + + $response = curl_exec($ch); + $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); + $error = curl_error($ch); + curl_close($ch); + + if ($error) { + throw new \RuntimeException("ClickHouse connection error: $error"); + } + + if ($httpCode !== 200) { + if (str_contains($response, 'already exists')) { + return; + } + throw new \RuntimeException("ClickHouse error (HTTP $httpCode): $response"); + } + } + + public function insert(string $table, array $columns, array $rows): void + { + if (empty($rows)) { + return; + } + + $escapedRows = []; + foreach ($rows as $row) { + $vals = []; + foreach ($columns as $col) { + $v = $row[$col] ?? null; + if ($v === null) { + $vals[] = 'NULL'; + } elseif (is_int($v) || is_float($v)) { + $vals[] = (string) $v; + } else { + $vals[] = "'" . str_replace(['\\', "'"], ['\\\\', "\\'"], (string) $v) . "'"; + } + } + $escapedRows[] = '(' . implode(',', $vals) . ')'; + } + + $sql = sprintf( + 'INSERT INTO %s (%s) VALUES %s', + $table, + implode(', ', $columns), + implode(', ', $escapedRows) + ); + + $this->execute($sql); + } + + private function formatParams(string $sql, array $params): string + { + $parts = explode('?', $sql); + $result = $parts[0]; + for ($i = 0; $i < count($params); $i++) { + $v = $params[$i]; + if ($v === null) { + $result .= 'NULL'; + } elseif (is_int($v) || is_float($v)) { + $result .= (string) $v; + } else { + $result .= "'" . str_replace(["'", "\\"], ["\\'", "\\\\"], (string) $v) . "'"; + } + $result .= $parts[$i + 1] ?? ''; + } + return $result; + } + + public function migrate(): void + { + $this->execute("CREATE DATABASE IF NOT EXISTS {$this->database}"); + + $this->execute(" + CREATE TABLE IF NOT EXISTS log_entries ( + id UInt64, + line String, + source_id Nullable(Int32), + source_name Nullable(String), + level Nullable(String), + created_at DateTime('UTC') + ) ENGINE = MergeTree() + PARTITION BY toDate(created_at) + ORDER BY (created_at, id) + TTL toDate(created_at) + INTERVAL 90 DAY DELETE + SETTINGS index_granularity = 8192 + "); + + $this->execute(" + CREATE TABLE IF NOT EXISTS alerts ( + id UInt64, + rule_id Int32, + rule_name String, + severity String, + status String, + message String, + raw_line String, + source_id Nullable(Int32), + source_name Nullable(String), + created_at DateTime('UTC') + ) ENGINE = MergeTree() + PARTITION BY toDate(created_at) + ORDER BY (created_at, id) + TTL toDate(created_at) + INTERVAL 365 DAY DELETE + SETTINGS index_granularity = 8192 + "); + + $this->query("SELECT count() FROM log_entries"); + } +} \ No newline at end of file diff --git a/src/Storage/Repository.php b/src/Storage/Repository.php index 3ab7adc..7cff9c5 100644 --- a/src/Storage/Repository.php +++ b/src/Storage/Repository.php @@ -6,27 +6,54 @@ use Jakach\Logging\Model\{LogSource, Rule, Alert, AlertStatus, LogSourceType}; class Repository { + private \PDO $pdo; + private ClickHouseClient $clickhouse; + private ClickHouseBuffer $buffer; + public function __construct( private Database $db, - ) {} + ?ClickHouseClient $clickhouse = null, + ?ClickHouseBuffer $buffer = null, + ) { + $this->pdo = $db->pdo(); + $this->clickhouse = $clickhouse ?? new ClickHouseClient(); + $this->buffer = $buffer ?? new ClickHouseBuffer($this->clickhouse); - // --- Log Sources --- + $this->clickhouse->migrate(); + } + + public function clickhouse(): ClickHouseClient + { + return $this->clickhouse; + } + + public function buffer(): ClickHouseBuffer + { + return $this->buffer; + } + + public function flush(): void + { + $this->buffer->flush(); + } + + // --- Log Sources (SQLite) --- public function getSources(): array { - $rows = $this->db->pdo()->query("SELECT * FROM log_sources ORDER BY name")->fetchAll(); + $rows = $this->pdo->query("SELECT * FROM log_sources ORDER BY name")->fetchAll(); return array_map(fn(array $r) => LogSource::fromRow($r), $rows); } public function getActiveSources(): array { - $rows = $this->db->pdo()->query("SELECT * FROM log_sources WHERE active = 1 ORDER BY name")->fetchAll(); + $rows = $this->pdo->query("SELECT * FROM log_sources WHERE active = 1 ORDER BY name")->fetchAll(); return array_map(fn(array $r) => LogSource::fromRow($r), $rows); } public function getSource(int $id): ?LogSource { - $stmt = $this->db->pdo()->prepare("SELECT * FROM log_sources WHERE id = ?"); + $stmt = $this->pdo->prepare("SELECT * FROM log_sources WHERE id = ?"); $stmt->execute([$id]); $row = $stmt->fetch(); return $row ? LogSource::fromRow($row) : null; @@ -34,44 +61,44 @@ class Repository public function createSource(string $name, LogSourceType $type, string $address, array $labels = []): LogSource { - $stmt = $this->db->pdo()->prepare( + $stmt = $this->pdo->prepare( "INSERT INTO log_sources (name, type, address, labels) VALUES (?, ?, ?, ?)" ); $stmt->execute([$name, $type->value, $address, json_encode($labels)]); - return $this->getSource((int) $this->db->pdo()->lastInsertId()); + return $this->getSource((int) $this->pdo->lastInsertId()); } public function deleteSource(int $id): void { - $this->db->pdo()->prepare("DELETE FROM log_sources WHERE id = ?")->execute([$id]); + $this->pdo->prepare("DELETE FROM log_sources WHERE id = ?")->execute([$id]); } public function updateSource(int $id, string $name, LogSourceType $type, string $address, array $labels = [], bool $active = true): LogSource { - $stmt = $this->db->pdo()->prepare( + $stmt = $this->pdo->prepare( "UPDATE log_sources SET name = ?, type = ?, address = ?, labels = ?, active = ? WHERE id = ?" ); $stmt->execute([$name, $type->value, $address, json_encode($labels), (int) $active, $id]); return $this->getSource($id); } - // --- Rules --- + // --- Rules (SQLite) --- public function getRules(): array { - $rows = $this->db->pdo()->query("SELECT * FROM rules ORDER BY name")->fetchAll(); + $rows = $this->pdo->query("SELECT * FROM rules ORDER BY name")->fetchAll(); return array_map(fn(array $r) => Rule::fromRow($r), $rows); } public function getActiveRules(): array { - $rows = $this->db->pdo()->query("SELECT * FROM rules WHERE active = 1 ORDER BY name")->fetchAll(); + $rows = $this->pdo->query("SELECT * FROM rules WHERE active = 1 ORDER BY name")->fetchAll(); return array_map(fn(array $r) => Rule::fromRow($r), $rows); } public function getRule(int $id): ?Rule { - $stmt = $this->db->pdo()->prepare("SELECT * FROM rules WHERE id = ?"); + $stmt = $this->pdo->prepare("SELECT * FROM rules WHERE id = ?"); $stmt->execute([$id]); $row = $stmt->fetch(); return $row ? Rule::fromRow($row) : null; @@ -79,46 +106,68 @@ class Repository public function createRule(string $name, string $pattern, string $severity, ?int $rateLimitSeconds = null): Rule { - $stmt = $this->db->pdo()->prepare( + $stmt = $this->pdo->prepare( "INSERT INTO rules (name, pattern, severity, rate_limit_seconds) VALUES (?, ?, ?, ?)" ); $stmt->execute([$name, $pattern, $severity, $rateLimitSeconds]); - return $this->getRule((int) $this->db->pdo()->lastInsertId()); + return $this->getRule((int) $this->pdo->lastInsertId()); } public function deleteRule(int $id): void { - $this->db->pdo()->prepare("DELETE FROM rules WHERE id = ?")->execute([$id]); + $this->pdo->prepare("DELETE FROM rules WHERE id = ?")->execute([$id]); } public function updateRule(int $id, string $name, string $pattern, string $severity, ?int $rateLimitSeconds = null, bool $active = true): Rule { - $stmt = $this->db->pdo()->prepare( + $stmt = $this->pdo->prepare( "UPDATE rules SET name = ?, pattern = ?, severity = ?, rate_limit_seconds = ?, active = ? WHERE id = ?" ); $stmt->execute([$name, $pattern, $severity, $rateLimitSeconds, (int) $active, $id]); return $this->getRule($id); } - // --- Alerts --- + // --- Alerts (ClickHouse) --- public function createAlert(int $ruleId, string $ruleName, string $severity, string $message, string $rawLine, ?int $sourceId = null, ?string $sourceName = null): Alert { - $stmt = $this->db->pdo()->prepare( - "INSERT INTO alerts (rule_id, rule_name, severity, status, message, raw_line, source_id, source_name) - VALUES (?, ?, ?, 'open', ?, ?, ?, ?)" + $result = $this->buffer->pushAlert( + ruleId: $ruleId, + ruleName: $ruleName, + severity: $severity, + status: 'open', + message: $message, + rawLine: $rawLine, + sourceId: $sourceId, + sourceName: $sourceName, + ); + + $createdAt = new \DateTimeImmutable($result['created_at']); + + return new Alert( + id: $result['id'], + ruleId: $ruleId, + ruleName: $ruleName, + severity: AlertSeverity::from($severity), + status: AlertStatus::Open, + message: $message, + rawLine: $rawLine, + sourceId: $sourceId, + sourceName: $sourceName, + createdAt: $createdAt, ); - $stmt->execute([$ruleId, $ruleName, $severity, $message, $rawLine, $sourceId, $sourceName]); - $id = (int) $this->db->pdo()->lastInsertId(); - return $this->getAlert($id); } public function getAlert(int $id): ?Alert { - $stmt = $this->db->pdo()->prepare("SELECT * FROM alerts WHERE id = ?"); - $stmt->execute([$id]); - $row = $stmt->fetch(); - return $row ? Alert::fromRow($row) : null; + $rows = $this->clickhouse->query( + "SELECT * FROM alerts WHERE id = ? LIMIT 1", + [$id] + ); + if (empty($rows)) { + return null; + } + return $this->alertFromRow($rows[0]); } public function getAlerts(int $limit = 100, int $offset = 0, ?string $status = null, ?string $severity = null, ?string $since = null, ?string $until = null): array @@ -127,19 +176,19 @@ class Repository $params = []; if ($status) { - $where[] = 'status = ?'; + $where[] = "status = ?"; $params[] = $status; } if ($severity) { - $where[] = 'severity = ?'; + $where[] = "severity = ?"; $params[] = $severity; } if ($since) { - $where[] = 'created_at >= ?'; + $where[] = "created_at >= ?"; $params[] = str_replace('T', ' ', $since); } if ($until) { - $where[] = 'created_at <= ?'; + $where[] = "created_at <= ?"; $params[] = str_replace('T', ' ', $until); } @@ -147,52 +196,43 @@ class Repository if ($where) { $sql .= ' WHERE ' . implode(' AND ', $where); } - $sql .= " ORDER BY created_at DESC LIMIT ? OFFSET ?"; + $sql .= " ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?"; $params[] = $limit; $params[] = $offset; - $stmt = $this->db->pdo()->prepare($sql); - $stmt->execute($params); - $rows = $stmt->fetchAll(); - - return array_map(fn(array $r) => Alert::fromRow($r), $rows); + $rows = $this->clickhouse->query($sql, $params); + return array_map(fn(array $r) => $this->alertFromRow($r), $rows); } public function updateAlertStatus(int $id, AlertStatus $status): void { - $stmt = $this->db->pdo()->prepare("UPDATE alerts SET status = ? WHERE id = ?"); - $stmt->execute([$status->value, $id]); + $this->clickhouse->execute( + "ALTER TABLE alerts UPDATE status = '{$status->value}' WHERE id = {$id}" + ); } public function getAlertCounts(): array { - return $this->db->pdo()->query( - "SELECT status, severity, COUNT(*) as count FROM alerts GROUP BY status, severity" - )->fetchAll(); + return $this->clickhouse->query( + "SELECT status, severity, count() as count FROM alerts GROUP BY status, severity" + ); } public function searchAlerts(string $query, int $limit = 100): array { - $stmt = $this->db->pdo()->prepare( - "SELECT a.* FROM alerts a - JOIN alerts_fts fts ON a.id = fts.rowid - WHERE alerts_fts MATCH ? - ORDER BY rank - LIMIT ?" + $like = '%' . str_replace(['%', '_'], ['\%', '\_'], $query) . '%'; + $rows = $this->clickhouse->query( + "SELECT * FROM alerts WHERE message ILIKE ? OR raw_line ILIKE ? OR rule_name ILIKE ? ORDER BY created_at DESC LIMIT ?", + [$like, $like, $like, $limit] ); - $stmt->execute([$query, $limit]); - $rows = $stmt->fetchAll(); - return array_map(fn(array $r) => Alert::fromRow($r), $rows); + return array_map(fn(array $r) => $this->alertFromRow($r), $rows); } - // --- Log Entries --- + // --- Log Entries (ClickHouse) --- public function storeLogEntry(string $line, ?int $sourceId = null, ?string $sourceName = null, ?string $level = null): void { - $stmt = $this->db->pdo()->prepare( - "INSERT INTO log_entries (line, source_id, source_name, level) VALUES (?, ?, ?, ?)" - ); - $stmt->execute([$line, $sourceId, $sourceName, $level]); + $this->buffer->pushLog($line, $sourceId, $sourceName, $level); } public function searchLogEntries(string $query, int $limit = 200, int $offset = 0, ?string $since = null, ?string $until = null): array @@ -206,11 +246,11 @@ class Repository $params = []; if ($since) { - $where[] = 'e.created_at >= ?'; + $where[] = 'created_at >= ?'; $params[] = str_replace('T', ' ', $since); } if ($until) { - $where[] = 'e.created_at <= ?'; + $where[] = 'created_at <= ?'; $params[] = str_replace('T', ' ', $until); } @@ -219,29 +259,25 @@ class Repository if ($where) { $sql .= ' WHERE ' . implode(' AND ', $where); } - $sql .= " ORDER BY e.created_at DESC LIMIT ? OFFSET ?"; + $sql .= " ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?"; $params[] = $limit; $params[] = $offset; - $stmt = $this->db->pdo()->prepare($sql); - $stmt->execute($params); - return $stmt->fetchAll(); + return $this->clickhouse->query($sql, $params); } $like = $this->toLikePattern($query); - $where[] = 'e.line LIKE ?'; + $where[] = 'line ILIKE ?'; $params[] = $like; - $sql = "SELECT e.* FROM log_entries e"; + $sql = "SELECT * FROM log_entries"; if ($where) { $sql .= ' WHERE ' . implode(' AND ', $where); } - $sql .= " ORDER BY e.created_at DESC LIMIT ? OFFSET ?"; + $sql .= " ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?"; $params[] = $limit; $params[] = $offset; - $stmt = $this->db->pdo()->prepare($sql); - $stmt->execute($params); - return $stmt->fetchAll(); + return $this->clickhouse->query($sql, $params); } private function toLikePattern(string $query): string @@ -265,11 +301,11 @@ class Repository return implode('%', $likeParts); } - // --- Config --- + // --- Config (SQLite) --- public function getAllowedUserTokens(): array { - $stmt = $this->db->pdo()->prepare("SELECT value FROM config WHERE key = ?"); + $stmt = $this->pdo->prepare("SELECT value FROM config WHERE key = ?"); $stmt->execute(['allowed_user_tokens']); $row = $stmt->fetch(); if (!$row || empty($row['value'])) { @@ -285,7 +321,7 @@ class Repository public function getConfig(string $key, mixed $default = null): mixed { - $stmt = $this->db->pdo()->prepare("SELECT value FROM config WHERE key = ?"); + $stmt = $this->pdo->prepare("SELECT value FROM config WHERE key = ?"); $stmt->execute([$key]); $row = $stmt->fetch(); return $row ? $row['value'] : $default; @@ -293,27 +329,27 @@ class Repository public function setConfig(string $key, string $value): void { - $stmt = $this->db->pdo()->prepare( + $stmt = $this->pdo->prepare( "INSERT INTO config (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value" ); $stmt->execute([$key, $value]); } - // --- Rate Limiting --- + // --- Rate Limiting (SQLite) --- public function checkRateLimit(int $ruleId, int $windowSeconds): bool { $now = time(); $window = intdiv($now, $windowSeconds) * $windowSeconds; - $this->db->pdo()->prepare( + $this->pdo->prepare( "INSERT INTO rate_limiter (rule_id, window_start, count) VALUES (?, ?, 1) ON CONFLICT(rule_id, window_start) DO UPDATE SET count = count + 1" )->execute([$ruleId, $window]); - $stmt = $this->db->pdo()->prepare( + $stmt = $this->pdo->prepare( "SELECT count FROM rate_limiter WHERE rule_id = ? AND window_start = ?" ); $stmt->execute([$ruleId, $window]); @@ -322,28 +358,28 @@ class Repository return $row['count'] <= 1; } - // --- False Positives --- + // --- False Positives (SQLite) --- public function getFalsePositives(): array { - return $this->db->pdo()->query( + return $this->pdo->query( "SELECT id, pattern, description, created_at FROM false_positives ORDER BY id" )->fetchAll(); } public function createFalsePositive(string $pattern, string $description = ''): array { - $stmt = $this->db->pdo()->prepare( + $stmt = $this->pdo->prepare( "INSERT INTO false_positives (pattern, description) VALUES (?, ?)" ); $stmt->execute([$pattern, $description]); - $id = (int) $this->db->pdo()->lastInsertId(); + $id = (int) $this->pdo->lastInsertId(); return $this->getFalsePositive($id); } public function getFalsePositive(int $id): ?array { - $stmt = $this->db->pdo()->prepare( + $stmt = $this->pdo->prepare( "SELECT id, pattern, description, created_at FROM false_positives WHERE id = ?" ); $stmt->execute([$id]); @@ -353,12 +389,12 @@ class Repository public function deleteFalsePositive(int $id): void { - $this->db->pdo()->prepare("DELETE FROM false_positives WHERE id = ?")->execute([$id]); + $this->pdo->prepare("DELETE FROM false_positives WHERE id = ?")->execute([$id]); } public function isFalsePositive(string $line): bool { - $patterns = $this->db->pdo()->query( + $patterns = $this->pdo->query( "SELECT pattern FROM false_positives" )->fetchAll(\PDO::FETCH_COLUMN); @@ -370,11 +406,11 @@ class Repository return false; } - // --- Audit Log --- + // --- Audit Log (SQLite) --- public function logAudit(string $action, string $entityType, ?int $entityId = null, ?string $details = null, ?string $username = null): void { - $stmt = $this->db->pdo()->prepare( + $stmt = $this->pdo->prepare( "INSERT INTO audit_log (action, entity_type, entity_id, details, username) VALUES (?, ?, ?, ?, ?)" ); $stmt->execute([$action, $entityType, $entityId, $details, $username]); @@ -382,53 +418,41 @@ class Repository public function getAuditLog(int $limit = 50): array { - $stmt = $this->db->pdo()->prepare( + $stmt = $this->pdo->prepare( "SELECT * FROM audit_log ORDER BY created_at DESC LIMIT ?" ); $stmt->execute([$limit]); return $stmt->fetchAll(); } - // --- Retention --- + // --- Retention (ClickHouse TTL handles it; but keep purge for SQLite cleanup) --- public function purgeOldData(int $logDays = 30, int $alertDays = 90): array { - $stmt = $this->db->pdo()->prepare( - "DELETE FROM log_entries WHERE created_at < datetime('now', ?)" - ); - $stmt->execute(['-' . $logDays . ' days']); - $deletedLogs = $stmt->rowCount(); - - $stmt = $this->db->pdo()->prepare( - "DELETE FROM alerts WHERE status = 'resolved' AND created_at < datetime('now', ?)" - ); - $stmt->execute(['-' . $alertDays . ' days']); - $deletedAlerts = $stmt->rowCount(); - - $this->db->pdo()->exec("DELETE FROM rate_limiter WHERE window_start < " . (time() - 86400)); - - return ['log_entries_deleted' => $deletedLogs, 'alerts_deleted' => $deletedAlerts]; + $this->pdo->exec("DELETE FROM rate_limiter WHERE window_start < " . (time() - 86400)); + return ['log_entries_deleted' => -1, 'alerts_deleted' => -1, 'note' => 'ClickHouse TTL manages retention automatically']; } - // --- Log Context --- + // --- Log Context (ClickHouse) --- public function getLogContext(int $id, int $before = 5, int $after = 5): array { - $beforeStmt = $this->db->pdo()->prepare( - "SELECT * FROM log_entries WHERE id < ? ORDER BY id DESC LIMIT ?" + $beforeRows = $this->clickhouse->query( + "SELECT * FROM log_entries WHERE id < ? ORDER BY id DESC LIMIT ?", + [$id, $before] ); - $beforeStmt->execute([$id, $before]); - $beforeRows = array_reverse($beforeStmt->fetchAll()); + $beforeRows = array_reverse($beforeRows); - $current = $this->db->pdo()->prepare("SELECT * FROM log_entries WHERE id = ?"); - $current->execute([$id]); - $currentRow = $current->fetch(); - - $afterStmt = $this->db->pdo()->prepare( - "SELECT * FROM log_entries WHERE id > ? ORDER BY id ASC LIMIT ?" + $currentRows = $this->clickhouse->query( + "SELECT * FROM log_entries WHERE id = ? LIMIT 1", + [$id] + ); + $currentRow = $currentRows[0] ?? null; + + $afterRows = $this->clickhouse->query( + "SELECT * FROM log_entries WHERE id > ? ORDER BY id ASC LIMIT ?", + [$id, $after] ); - $afterStmt->execute([$id, $after]); - $afterRows = $afterStmt->fetchAll(); return [ 'before' => $beforeRows, @@ -437,34 +461,52 @@ class Repository ]; } - // --- Bulk Operations --- + // --- Bulk Operations (ClickHouse) --- public function bulkUpdateAlertStatus(array $ids, AlertStatus $status): int { if (empty($ids)) return 0; - $placeholders = implode(',', array_fill(0, count($ids), '?')); - $stmt = $this->db->pdo()->prepare( - "UPDATE alerts SET status = ? WHERE id IN ($placeholders)" + $idList = implode(',', array_map('intval', $ids)); + $this->clickhouse->execute( + "ALTER TABLE alerts UPDATE status = '{$status->value}' WHERE id IN ({$idList})" ); - $stmt->execute(array_merge([$status->value], $ids)); - return $stmt->rowCount(); + return count($ids); } public function exportAlerts(array $ids): array { if (empty($ids)) return []; - $placeholders = implode(',', array_fill(0, count($ids), '?')); - $stmt = $this->db->pdo()->prepare("SELECT * FROM alerts WHERE id IN ($placeholders) ORDER BY id"); - $stmt->execute($ids); - return array_map(fn(array $r) => Alert::fromRow($r), $stmt->fetchAll()); + $idList = implode(',', array_map('intval', $ids)); + $rows = $this->clickhouse->query( + "SELECT * FROM alerts WHERE id IN ({$idList}) ORDER BY id" + ); + return array_map(fn(array $r) => $this->alertFromRow($r), $rows); } public function exportLogs(array $ids): array { if (empty($ids)) return []; - $placeholders = implode(',', array_fill(0, count($ids), '?')); - $stmt = $this->db->pdo()->prepare("SELECT * FROM log_entries WHERE id IN ($placeholders) ORDER BY id"); - $stmt->execute($ids); - return $stmt->fetchAll(); + $idList = implode(',', array_map('intval', $ids)); + return $this->clickhouse->query( + "SELECT * FROM log_entries WHERE id IN ({$idList}) ORDER BY id" + ); + } + + // --- Helpers --- + + private function alertFromRow(array $row): Alert + { + return new Alert( + id: (int) $row['id'], + ruleId: (int) $row['rule_id'], + ruleName: $row['rule_name'], + severity: AlertSeverity::from($row['severity']), + status: AlertStatus::from($row['status']), + message: $row['message'], + rawLine: $row['raw_line'], + sourceId: isset($row['source_id']) ? (int) $row['source_id'] : null, + sourceName: $row['source_name'] ?? null, + createdAt: new \DateTimeImmutable($row['created_at']), + ); } } \ No newline at end of file diff --git a/src/Worker/Orchestrator.php b/src/Worker/Orchestrator.php index 2e3a20e..8c213c2 100644 --- a/src/Worker/Orchestrator.php +++ b/src/Worker/Orchestrator.php @@ -48,6 +48,7 @@ class Orchestrator pcntl_signal_dispatch(); $this->fileWatcher->tick(); $this->socketListener->tick(); + $this->repo->buffer()->tick(); if (time() - $this->lastCleanup > 3600) { $this->lastCleanup = time(); @@ -103,6 +104,8 @@ class Orchestrator { $this->fileWatcher->stop(); $this->socketListener->stop(); + fprintf(STDERR, "Flushing remaining buffers...\n"); + $this->repo->flush(); fprintf(STDERR, "Worker stopped\n"); } } \ No newline at end of file