From 39ee31debb33e42d6fa04070b166b1f6b6528eb1 Mon Sep 17 00:00:00 2001 From: janis steiner Date: Wed, 6 May 2026 11:28:41 +0200 Subject: [PATCH] initial commit --- .dockerignore | 4 + .gitignore | 5 + bin/consume | 15 + bin/seed | 24 ++ composer.json | 16 + config/default.php | 58 ++++ docker-compose.yml | 38 ++ docker/Dockerfile.api | 7 + docker/Dockerfile.php | 9 + docker/nginx.conf | 15 + public/index.html | 632 ++++++++++++++++++++++++++++++++++ public/index.php | 8 + src/Api/Router.php | 105 ++++++ src/Model/Alert.php | 51 +++ src/Model/Enums.php | 25 ++ src/Model/LogSource.php | 42 +++ src/Model/Rule.php | 42 +++ src/RuleEngine/Engine.php | 60 ++++ src/Storage/Database.php | 87 +++++ src/Storage/Repository.php | 169 +++++++++ src/Worker/FileWatcher.php | 78 +++++ src/Worker/Orchestrator.php | 83 +++++ src/Worker/SocketListener.php | 72 ++++ 23 files changed, 1645 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100755 bin/consume create mode 100755 bin/seed create mode 100644 composer.json create mode 100644 config/default.php create mode 100644 docker-compose.yml create mode 100644 docker/Dockerfile.api create mode 100644 docker/Dockerfile.php create mode 100644 docker/nginx.conf create mode 100644 public/index.html create mode 100644 public/index.php create mode 100644 src/Api/Router.php create mode 100644 src/Model/Alert.php create mode 100644 src/Model/Enums.php create mode 100644 src/Model/LogSource.php create mode 100644 src/Model/Rule.php create mode 100644 src/RuleEngine/Engine.php create mode 100644 src/Storage/Database.php create mode 100644 src/Storage/Repository.php create mode 100644 src/Worker/FileWatcher.php create mode 100644 src/Worker/Orchestrator.php create mode 100644 src/Worker/SocketListener.php diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..0004fd8 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +vendor +.git +data +*.md \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c1391f5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/vendor/ +/data/ +.env +*.log +.DS_Store \ No newline at end of file diff --git a/bin/consume b/bin/consume new file mode 100755 index 0000000..7dbd0bd --- /dev/null +++ b/bin/consume @@ -0,0 +1,15 @@ +run(); \ No newline at end of file diff --git a/bin/seed b/bin/seed new file mode 100755 index 0000000..c08776c --- /dev/null +++ b/bin/seed @@ -0,0 +1,24 @@ +createRule( + name: $rule['name'], + pattern: $rule['pattern'], + severity: $rule['severity'], + rateLimitSeconds: $rule['rate_limit_seconds'] ?? null, + ); + echo sprintf(" + Rule #%d: %s (%s)\n", $r->id, $r->name, $r->severity->value); +} + +echo "Done.\n"; \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..331c7d4 --- /dev/null +++ b/composer.json @@ -0,0 +1,16 @@ +{ + "name": "jakach/logging", + "description": "Log analysis and alerting system", + "type": "project", + "autoload": { + "psr-4": { + "Jakach\\Logging\\": "src/" + } + }, + "require": { + "php": ">=8.2" + }, + "scripts": { + "serve": "php -S 0.0.0.0:8080 -t public" + } +} \ No newline at end of file diff --git a/config/default.php b/config/default.php new file mode 100644 index 0000000..5242fc1 --- /dev/null +++ b/config/default.php @@ -0,0 +1,58 @@ + [ + 'path' => '/app/data/logging.db', + ], + 'worker' => [ + 'file_check_interval' => 500000, + ], + 'sources' => [], + 'rules' => [ + [ + 'name' => 'PHP Error', + 'pattern' => '/PHP (Fatal|Parse|Catchable|Notice|Warning)/i', + 'severity' => 'warning', + 'rate_limit_seconds' => 60, + ], + [ + 'name' => 'PHP Exception', + 'pattern' => '/Uncaught (Exception|Error)/', + 'severity' => 'critical', + 'rate_limit_seconds' => 30, + ], + [ + 'name' => 'HTTP 5xx', + 'pattern' => '/" (50[0-9]) /', + 'severity' => 'critical', + ], + [ + 'name' => 'HTTP 4xx', + 'pattern' => '/" (4[0-9]{2}) /', + 'severity' => 'warning', + 'rate_limit_seconds' => 60, + ], + [ + 'name' => 'Failed Login', + 'pattern' => '/Failed (login|password|authentication)/i', + 'severity' => 'critical', + ], + [ + 'name' => 'Out of Memory', + 'pattern' => '/out of memory/i', + 'severity' => 'critical', + 'rate_limit_seconds' => 60, + ], + [ + 'name' => 'Connection Refused', + 'pattern' => '/Connection (refused|reset|timed? out)/i', + 'severity' => 'warning', + ], + [ + 'name' => 'Disk Space', + 'pattern' => '/disk (full|space|usage|low)/i', + 'severity' => 'warning', + 'rate_limit_seconds' => 300, + ], + ], +]; \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2062a03 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,38 @@ +services: + api: + build: + context: . + dockerfile: docker/Dockerfile.api + volumes: + - .:/app + - data:/app/data + depends_on: + - redis + + nginx: + image: nginx:alpine + ports: + - "8080:80" + volumes: + - .:/app + - ./docker/nginx.conf:/etc/nginx/conf.d/default.conf + depends_on: + - api + + worker: + build: + context: . + dockerfile: docker/Dockerfile.php + volumes: + - .:/app + - /var/log:/host/logs:ro + - data:/app/data + depends_on: + - redis + command: ["php", "bin/consume", "--daemon"] + + redis: + image: redis:7-alpine + +volumes: + data: \ No newline at end of file diff --git a/docker/Dockerfile.api b/docker/Dockerfile.api new file mode 100644 index 0000000..26fb5fb --- /dev/null +++ b/docker/Dockerfile.api @@ -0,0 +1,7 @@ +FROM php:8.3-fpm-alpine + +RUN docker-php-ext-install pcntl sockets + +COPY --from=composer:2 /usr/bin/composer /usr/bin/composer + +WORKDIR /app \ No newline at end of file diff --git a/docker/Dockerfile.php b/docker/Dockerfile.php new file mode 100644 index 0000000..76d37fc --- /dev/null +++ b/docker/Dockerfile.php @@ -0,0 +1,9 @@ +FROM php:8.3-cli-alpine + +RUN apk add --no-cache linux-headers git + +RUN docker-php-ext-install pcntl sockets + +COPY --from=composer:2 /usr/bin/composer /usr/bin/composer + +WORKDIR /app \ No newline at end of file diff --git a/docker/nginx.conf b/docker/nginx.conf new file mode 100644 index 0000000..cef55a2 --- /dev/null +++ b/docker/nginx.conf @@ -0,0 +1,15 @@ +server { + listen 80; + root /app/public; + index index.html; + + location / { + try_files $uri /index.html /index.php$is_args$args; + } + + location ~ \.php$ { + fastcgi_pass api:9000; + fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name; + include fastcgi_params; + } +} \ No newline at end of file diff --git a/public/index.html b/public/index.html new file mode 100644 index 0000000..b799859 --- /dev/null +++ b/public/index.html @@ -0,0 +1,632 @@ + + + + + +Jakach Logging + + + + + + + + + + +
+
+ + +
+
+
Dashboard
+ +
+
+
+
+
+
+ Recent Alerts + View all +
+
+

No alerts yet

+
+
+
+
+
+
Alert Distribution
+

No data

+
+
+
+
+ + +
+
+
Alerts
+
+ + + +
+
+
+
+
+ + + + + + + + + + + +
IDSeverityStatusMessageSourceCreated

No alerts

+
+
+ +
+
+ + +
+
+
Log Sources
+ +
+
+
+
+ + + + + +
NameTypeAddressLabelsStatus

No sources configured

+
+
+
+
+ + +
+
+
Alert Rules
+ +
+
+
+
+ + + + + +
NamePatternSeverityRate LimitStatus

No rules configured

+
+
+
+
+ + +
+
+
Settings
+
+
+
+
+
System Info
+
+
+
Health
checking...
+
DB Path
/app/data/logging.db
+
Worker
php bin/consume
+
+
+
+
+
+
+
Quick Reference
+
+

File sources — path to a log file on the worker container

+

TCP/UDP sourcestcp://0.0.0.0:9514 or udp://0.0.0.0:9514

+

Rules — use PHP regex patterns, e.g. /error/i

+
+
+
+
+
+ +
+
+ + + + + + + + + + + +
+ + + + + \ No newline at end of file diff --git a/public/index.php b/public/index.php new file mode 100644 index 0000000..72836f9 --- /dev/null +++ b/public/index.php @@ -0,0 +1,8 @@ +handle(); \ No newline at end of file diff --git a/src/Api/Router.php b/src/Api/Router.php new file mode 100644 index 0000000..2bd8bee --- /dev/null +++ b/src/Api/Router.php @@ -0,0 +1,105 @@ +repo = new Repository($db); + } + + public function handle(): void + { + $method = $_SERVER['REQUEST_METHOD']; + $path = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH); + $path = rtrim($path, '/'); + + header('Content-Type: application/json'); + + try { + $result = match (true) { + $path === '/sources' && $method === 'GET' => $this->repo->getSources(), + $path === '/sources' && $method === 'POST' => $this->createSource(), + $path === '/rules' && $method === 'GET' => $this->repo->getRules(), + $path === '/rules' && $method === 'POST' => $this->createRule(), + $path === '/alerts' && $method === 'GET' => $this->getAlerts(), + preg_match('#^/alerts/(\d+)/ack$#', $path, $m) && $method === 'POST' => $this->ackAlert((int) $m[1]), + preg_match('#^/alerts/counts$#', $path) && $method === 'GET' => $this->repo->getAlertCounts(), + $path === '/health' && $method === 'GET' => ['status' => 'ok'], + default => throw new \RuntimeException('Not found', 404), + }; + + if ($result instanceof \UnitEnum || is_scalar($result)) { + $result = ['data' => $result]; + } elseif (is_array($result) && !empty($result) && $result[array_key_first($result)] instanceof \UnitEnum) { + $result = ['data' => $result]; + } elseif (is_array($result)) { + $needsWrap = false; + foreach ($result as $key => $val) { + if (is_object($val) && method_exists($val, 'toArray')) { + $result[$key] = $val->toArray(); + } else { + $needsWrap = true; + } + } + if ($needsWrap || empty($result)) { + $result = ['data' => $result]; + } + } elseif (is_object($result) && method_exists($result, 'toArray')) { + $result = ['data' => $result->toArray()]; + } + + http_response_code(200); + echo json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES); + + } catch (\RuntimeException $e) { + http_response_code($e->getCode() ?: 500); + echo json_encode(['error' => $e->getMessage()]); + } + } + + private function createSource(): array + { + $body = json_decode(file_get_contents('php://input'), true); + $type = LogSourceType::from($body['type'] ?? ''); + return $this->repo->createSource( + name: $body['name'], + type: $type, + address: $body['address'], + labels: $body['labels'] ?? [], + ); + } + + private function createRule(): array + { + $body = json_decode(file_get_contents('php://input'), true); + return $this->repo->createRule( + name: $body['name'], + pattern: $body['pattern'], + severity: $body['severity'] ?? 'warning', + rateLimitSeconds: $body['rate_limit_seconds'] ?? null, + ); + } + + private function getAlerts(): array + { + $limit = (int) ($_GET['limit'] ?? 100); + $offset = (int) ($_GET['offset'] ?? 0); + $status = $_GET['status'] ?? null; + $severity = $_GET['severity'] ?? null; + return $this->repo->getAlerts($limit, $offset, $status, $severity); + } + + private function ackAlert(int $id): array + { + $this->repo->updateAlertStatus($id, AlertStatus::Acknowledged); + return ['status' => 'acknowledged', 'id' => $id]; + } +} \ No newline at end of file diff --git a/src/Model/Alert.php b/src/Model/Alert.php new file mode 100644 index 0000000..d55630f --- /dev/null +++ b/src/Model/Alert.php @@ -0,0 +1,51 @@ + $this->id, + 'rule_id' => $this->ruleId, + 'rule_name' => $this->ruleName, + 'severity' => $this->severity->value, + 'status' => $this->status->value, + 'message' => $this->message, + 'raw_line' => $this->rawLine, + 'source_id' => $this->sourceId, + 'source_name' => $this->sourceName, + 'created_at' => $this->createdAt->format('c'), + ]; + } +} \ No newline at end of file diff --git a/src/Model/Enums.php b/src/Model/Enums.php new file mode 100644 index 0000000..cb3bb1d --- /dev/null +++ b/src/Model/Enums.php @@ -0,0 +1,25 @@ + $this->id, + 'name' => $this->name, + 'type' => $this->type->value, + 'address' => $this->address, + 'labels' => $this->labels, + 'active' => $this->active, + 'created_at' => $this->createdAt->format('c'), + ]; + } +} \ No newline at end of file diff --git a/src/Model/Rule.php b/src/Model/Rule.php new file mode 100644 index 0000000..8e550d9 --- /dev/null +++ b/src/Model/Rule.php @@ -0,0 +1,42 @@ + $this->id, + 'name' => $this->name, + 'pattern' => $this->pattern, + 'severity' => $this->severity->value, + 'rate_limit_seconds' => $this->rateLimitSeconds, + 'active' => $this->active, + 'created_at' => $this->createdAt->format('c'), + ]; + } +} \ No newline at end of file diff --git a/src/RuleEngine/Engine.php b/src/RuleEngine/Engine.php new file mode 100644 index 0000000..b155a81 --- /dev/null +++ b/src/RuleEngine/Engine.php @@ -0,0 +1,60 @@ +repo->getActiveRules(); + + foreach ($rules as $rule) { + if ($this->matches($line, $rule)) { + if ($rule->rateLimitSeconds !== null && !$this->repo->checkRateLimit($rule->id, $rule->rateLimitSeconds)) { + continue; + } + + $message = sprintf( + '[%s] Rule "%s" matched: %s', + strtoupper($rule->severity->value), + $rule->name, + substr($line, 0, 200) + ); + + $alert = $this->repo->createAlert( + ruleId: $rule->id, + ruleName: $rule->name, + severity: $rule->severity->value, + message: $message, + rawLine: $line, + sourceId: $source?->id, + sourceName: $source?->name, + ); + + return $alert; + } + } + + return null; + } + + private function matches(string $line, Rule $rule): bool + { + if (!isset($this->compiledPatterns[$rule->id])) { + $delimiter = $rule->pattern[0] ?? '/'; + $this->compiledPatterns[$rule->id] = $rule->pattern; + } + + return (bool) preg_match($this->compiledPatterns[$rule->id], $line); + } +} \ No newline at end of file diff --git a/src/Storage/Database.php b/src/Storage/Database.php new file mode 100644 index 0000000..248f72f --- /dev/null +++ b/src/Storage/Database.php @@ -0,0 +1,87 @@ +pdo = new \PDO("sqlite:$path"); + $this->pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); + $this->pdo->setAttribute(\PDO::ATTR_DEFAULT_FETCH_MODE, \PDO::FETCH_ASSOC); + $this->pdo->exec('PRAGMA journal_mode=WAL'); + $this->migrate(); + } + + public function pdo(): \PDO + { + return $this->pdo; + } + + private function migrate(): void + { + $this->pdo->exec(" + CREATE TABLE IF NOT EXISTS log_sources ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + type TEXT NOT NULL, + address TEXT NOT NULL, + labels TEXT DEFAULT '[]', + active INTEGER DEFAULT 1, + created_at TEXT DEFAULT (datetime('now')) + ) + "); + + $this->pdo->exec(" + CREATE TABLE IF NOT EXISTS rules ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + pattern TEXT NOT NULL, + severity TEXT NOT NULL DEFAULT 'warning', + rate_limit_seconds INTEGER, + active INTEGER DEFAULT 1, + created_at TEXT DEFAULT (datetime('now')) + ) + "); + + $this->pdo->exec(" + CREATE TABLE IF NOT EXISTS alerts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + rule_id INTEGER NOT NULL, + rule_name TEXT NOT NULL, + severity TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'open', + message TEXT NOT NULL, + raw_line TEXT NOT NULL, + source_id INTEGER, + source_name TEXT, + created_at TEXT DEFAULT (datetime('now')), + FOREIGN KEY (rule_id) REFERENCES rules(id) + ) + "); + + $this->pdo->exec(" + CREATE INDEX IF NOT EXISTS idx_alerts_status ON alerts(status) + "); + + $this->pdo->exec(" + CREATE INDEX IF NOT EXISTS idx_alerts_created ON alerts(created_at) + "); + + $this->pdo->exec(" + CREATE TABLE IF NOT EXISTS rate_limiter ( + rule_id INTEGER NOT NULL, + window_start INTEGER NOT NULL, + count INTEGER DEFAULT 0, + PRIMARY KEY (rule_id, window_start) + ) + "); + } +} \ No newline at end of file diff --git a/src/Storage/Repository.php b/src/Storage/Repository.php new file mode 100644 index 0000000..ab83447 --- /dev/null +++ b/src/Storage/Repository.php @@ -0,0 +1,169 @@ +db->pdo()->query("SELECT * FROM log_sources ORDER BY name")->fetchAll(); + return array_map(fn(array $r) => LogSource::fromRow($r), $rows); + } + + public function getActiveSources(): array + { + $rows = $this->db->pdo()->query("SELECT * FROM log_sources WHERE active = 1 ORDER BY name")->fetchAll(); + return array_map(fn(array $r) => LogSource::fromRow($r), $rows); + } + + public function getSource(int $id): ?LogSource + { + $stmt = $this->db->pdo()->prepare("SELECT * FROM log_sources WHERE id = ?"); + $stmt->execute([$id]); + $row = $stmt->fetch(); + return $row ? LogSource::fromRow($row) : null; + } + + public function createSource(string $name, LogSourceType $type, string $address, array $labels = []): LogSource + { + $stmt = $this->db->pdo()->prepare( + "INSERT INTO log_sources (name, type, address, labels) VALUES (?, ?, ?, ?)" + ); + $stmt->execute([$name, $type->value, $address, json_encode($labels)]); + return $this->getSource((int) $this->db->pdo()->lastInsertId()); + } + + public function deleteSource(int $id): void + { + $this->db->pdo()->prepare("DELETE FROM log_sources WHERE id = ?")->execute([$id]); + } + + // --- Rules --- + + public function getRules(): array + { + $rows = $this->db->pdo()->query("SELECT * FROM rules ORDER BY name")->fetchAll(); + return array_map(fn(array $r) => Rule::fromRow($r), $rows); + } + + public function getActiveRules(): array + { + $rows = $this->db->pdo()->query("SELECT * FROM rules WHERE active = 1 ORDER BY name")->fetchAll(); + return array_map(fn(array $r) => Rule::fromRow($r), $rows); + } + + public function getRule(int $id): ?Rule + { + $stmt = $this->db->pdo()->prepare("SELECT * FROM rules WHERE id = ?"); + $stmt->execute([$id]); + $row = $stmt->fetch(); + return $row ? Rule::fromRow($row) : null; + } + + public function createRule(string $name, string $pattern, string $severity, ?int $rateLimitSeconds = null): Rule + { + $stmt = $this->db->pdo()->prepare( + "INSERT INTO rules (name, pattern, severity, rate_limit_seconds) VALUES (?, ?, ?, ?)" + ); + $stmt->execute([$name, $pattern, $severity, $rateLimitSeconds]); + return $this->getRule((int) $this->db->pdo()->lastInsertId()); + } + + public function deleteRule(int $id): void + { + $this->db->pdo()->prepare("DELETE FROM rules WHERE id = ?")->execute([$id]); + } + + // --- Alerts --- + + public function createAlert(int $ruleId, string $ruleName, string $severity, string $message, string $rawLine, ?int $sourceId = null, ?string $sourceName = null): Alert + { + $stmt = $this->db->pdo()->prepare( + "INSERT INTO alerts (rule_id, rule_name, severity, status, message, raw_line, source_id, source_name) + VALUES (?, ?, ?, 'open', ?, ?, ?, ?)" + ); + $stmt->execute([$ruleId, $ruleName, $severity, $message, $rawLine, $sourceId, $sourceName]); + $id = (int) $this->db->pdo()->lastInsertId(); + return $this->getAlert($id); + } + + public function getAlert(int $id): ?Alert + { + $stmt = $this->db->pdo()->prepare("SELECT * FROM alerts WHERE id = ?"); + $stmt->execute([$id]); + $row = $stmt->fetch(); + return $row ? Alert::fromRow($row) : null; + } + + public function getAlerts(int $limit = 100, int $offset = 0, ?string $status = null, ?string $severity = null): array + { + $where = []; + $params = []; + + if ($status) { + $where[] = 'status = ?'; + $params[] = $status; + } + if ($severity) { + $where[] = 'severity = ?'; + $params[] = $severity; + } + + $sql = "SELECT * FROM alerts"; + if ($where) { + $sql .= ' WHERE ' . implode(' AND ', $where); + } + $sql .= " ORDER BY created_at DESC LIMIT ? OFFSET ?"; + $params[] = $limit; + $params[] = $offset; + + $stmt = $this->db->pdo()->prepare($sql); + $stmt->execute($params); + $rows = $stmt->fetchAll(); + + return array_map(fn(array $r) => Alert::fromRow($r), $rows); + } + + public function updateAlertStatus(int $id, AlertStatus $status): void + { + $stmt = $this->db->pdo()->prepare("UPDATE alerts SET status = ? WHERE id = ?"); + $stmt->execute([$status->value, $id]); + } + + public function getAlertCounts(): array + { + return $this->db->pdo()->query( + "SELECT status, severity, COUNT(*) as count FROM alerts GROUP BY status, severity" + )->fetchAll(); + } + + // --- Rate Limiting --- + + public function checkRateLimit(int $ruleId, int $windowSeconds): bool + { + $now = time(); + $window = intdiv($now, $windowSeconds) * $windowSeconds; + + $this->db->pdo()->prepare( + "INSERT INTO rate_limiter (rule_id, window_start, count) + VALUES (?, ?, 1) + ON CONFLICT(rule_id, window_start) DO UPDATE SET count = count + 1" + )->execute([$ruleId, $window]); + + $stmt = $this->db->pdo()->prepare( + "SELECT count FROM rate_limiter WHERE rule_id = ? AND window_start = ?" + ); + $stmt->execute([$ruleId, $window]); + $row = $stmt->fetch(); + + return $row['count'] <= 1; + } +} \ No newline at end of file diff --git a/src/Worker/FileWatcher.php b/src/Worker/FileWatcher.php new file mode 100644 index 0000000..687e4d5 --- /dev/null +++ b/src/Worker/FileWatcher.php @@ -0,0 +1,78 @@ +checkInterval = $checkInterval; + } + + public function watch(LogSource $source): void + { + if ($source->type !== LogSourceType::File) { + return; + } + + $path = $source->address; + + if (!file_exists($path)) { + fprintf(STDERR, "File not found: %s\n", $path); + return; + } + + $handle = fopen($path, 'r'); + if (!$handle) { + fprintf(STDERR, "Cannot open file: %s\n", $path); + return; + } + + fseek($handle, 0, SEEK_END); + $stat = fstat($handle); + $this->handles[$source->id] = $handle; + $this->inodes[$source->id] = $stat['ino'] ?? 0; + + fprintf(STDERR, "Watching file: %s (source: %s)\n", $path, $source->name); + } + + public function tick(): void + { + foreach ($this->handles as $id => $handle) { + if (feof($handle)) { + clearstatcache(); + $source = null; + + if (file_exists(stream_get_meta_data($handle)['uri'])) { + usleep(10000); + continue; + } + } + + while ($line = fgets($handle)) { + $line = rtrim($line, "\r\n"); + if ($line !== '') { + ($this->onLine)($line, $id); + } + } + } + + usleep($this->checkInterval); + } + + public function stop(): void + { + foreach ($this->handles as $handle) { + fclose($handle); + } + $this->handles = []; + } +} \ No newline at end of file diff --git a/src/Worker/Orchestrator.php b/src/Worker/Orchestrator.php new file mode 100644 index 0000000..e0220f7 --- /dev/null +++ b/src/Worker/Orchestrator.php @@ -0,0 +1,83 @@ +repo = $repo; + $this->engine = $engine; + $this->socketListener = new SocketListener(function (string $line, int $sourceId) { + $this->handleLine($line, $sourceId); + }); + $this->fileWatcher = new FileWatcher(function (string $line, int $sourceId) { + $this->handleLine($line, $sourceId); + }); + } + + public function run(): void + { + $this->loadSources(); + + pcntl_signal(SIGTERM, function () { $this->running = false; }); + pcntl_signal(SIGINT, function () { $this->running = false; }); + + fprintf(STDERR, "Worker started, watching %d sources\n", count($this->sourceMap)); + + while ($this->running) { + pcntl_signal_dispatch(); + $this->fileWatcher->tick(); + $this->socketListener->tick(); + } + + $this->stop(); + } + + private function loadSources(): void + { + $sources = $this->repo->getActiveSources(); + + foreach ($sources as $source) { + $this->sourceMap[$source->id] = $source; + $this->fileWatcher->watch($source); + $this->socketListener->listen($source); + } + } + + private function handleLine(string $line, int $sourceId): void + { + $source = $this->sourceMap[$sourceId] ?? null; + $alert = $this->engine->evaluate($line, $source); + + if ($alert !== null) { + $msg = sprintf( + "[%s] ALERT #%d [%s] %s", + date('c'), + $alert->id, + strtoupper($alert->severity->value), + $alert->message + ); + fprintf(STDERR, "%s\n", $msg); + echo $msg . "\n"; + } + } + + private function stop(): void + { + $this->fileWatcher->stop(); + $this->socketListener->stop(); + fprintf(STDERR, "Worker stopped\n"); + } +} \ No newline at end of file diff --git a/src/Worker/SocketListener.php b/src/Worker/SocketListener.php new file mode 100644 index 0000000..fb91399 --- /dev/null +++ b/src/Worker/SocketListener.php @@ -0,0 +1,72 @@ +type, [LogSourceType::Tcp, LogSourceType::Udp])) { + return; + } + + $parts = parse_url($source->address); + $host = $parts['host'] ?? '0.0.0.0'; + $port = $parts['port'] ?? 9514; + + $protocol = $source->type === LogSourceType::Udp ? SOL_UDP : SOL_TCP; + $sockType = $source->type === LogSourceType::Udp ? SOCK_DGRAM : SOCK_STREAM; + + $sock = socket_create(AF_INET, $sockType, $protocol); + if (!$sock) { + fprintf(STDERR, "Cannot create socket for %s: %s\n", $source->name, socket_strerror(socket_last_error())); + return; + } + + socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1); + + if (!socket_bind($sock, $host, $port)) { + fprintf(STDERR, "Cannot bind %s on %s:%d: %s\n", $source->name, $host, $port, socket_strerror(socket_last_error())); + return; + } + + if ($source->type === LogSourceType::Tcp) { + socket_listen($sock, 5); + socket_set_nonblock($sock); + } + + $this->servers[$source->id] = $sock; + fprintf(STDERR, "Listening on %s://%s:%d (source: %s)\n", $source->type->value, $host, $port, $source->name); + } + + public function tick(): void + { + foreach ($this->servers as $id => $sock) { + $from = ''; + $port = 0; + + if (@socket_recvfrom($sock, $data, 65535, 0, $from, $port)) { + $line = rtrim($data, "\r\n"); + if ($line !== '') { + ($this->onLine)($line, $id); + } + } + } + } + + public function stop(): void + { + foreach ($this->servers as $sock) { + socket_close($sock); + } + $this->servers = []; + } +} \ No newline at end of file