From 8db6ba6ce26496ff02a05b9164bb703a98d84699 Mon Sep 17 00:00:00 2001 From: janis steiner Date: Wed, 6 May 2026 12:14:43 +0200 Subject: [PATCH] adding tcp input --- bin/seed | 35 +++++++++++++++++++++ docker-compose.yml | 7 ++++- docker/Dockerfile.php | 5 ++- src/Api/Router.php | 23 +++++++++++++- src/Worker/FileWatcher.php | 64 +++++++++++++++++++++++++++++++------- 5 files changed, 118 insertions(+), 16 deletions(-) diff --git a/bin/seed b/bin/seed index c08776c..e4f7ef3 100755 --- a/bin/seed +++ b/bin/seed @@ -21,4 +21,39 @@ foreach ($config['rules'] as $rule) { echo sprintf(" + Rule #%d: %s (%s)\n", $r->id, $r->name, $r->severity->value); } +echo "Seeding default log sources...\n"; + +$existing = $repo->getSources(); +$existingNames = array_map(fn($s) => $s->name, $existing); + +if (!in_array('syslog-tcp', $existingNames)) { + $s = $repo->createSource( + name: 'syslog-tcp', + type: LogSourceType::Tcp, + address: 'tcp://0.0.0.0:9514', + labels: ['protocol' => 'syslog'], + ); + echo sprintf(" + Source #%d: syslog-tcp (TCP :9514)\n", $s->id); +} + +if (!in_array('syslog-udp', $existingNames)) { + $s = $repo->createSource( + name: 'syslog-udp', + type: LogSourceType::Udp, + address: 'udp://0.0.0.0:9514', + labels: ['protocol' => 'syslog'], + ); + echo sprintf(" + Source #%d: syslog-udp (UDP :9514)\n", $s->id); +} + +if (!in_array('collect-volume', $existingNames)) { + $s = $repo->createSource( + name: 'collect-volume', + type: LogSourceType::File, + address: '/collect/*.log', + labels: ['type' => 'shared-volume'], + ); + echo sprintf(" + Source #%d: collect-volume (/collect/*.log)\n", $s->id); +} + echo "Done.\n"; \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 0cc84cf..8d547d2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,6 +27,9 @@ services: build: context: . dockerfile: docker/Dockerfile.php + ports: + - "9514:9514/tcp" + - "9514:9514/udp" volumes: - ./src:/app/src - ./bin:/app/bin @@ -34,6 +37,7 @@ services: - ./composer.json:/app/composer.json - /var/log:/host/logs:ro - data:/app/data + - log_collect:/collect depends_on: - redis command: ["php", "bin/consume", "--daemon"] @@ -42,4 +46,5 @@ services: image: redis:7-alpine volumes: - data: \ No newline at end of file + data: + log_collect: \ No newline at end of file diff --git a/docker/Dockerfile.php b/docker/Dockerfile.php index 95d818e..f149f7a 100644 --- a/docker/Dockerfile.php +++ b/docker/Dockerfile.php @@ -1,9 +1,8 @@ FROM php:8.3-cli-alpine -RUN apk add --no-cache linux-headers curl-dev git +RUN apk add --no-cache curl-dev git linux-headers -RUN docker-php-ext-install curl pcntl sockets 2>/dev/null || \ - docker-php-ext-install curl pcntl +RUN docker-php-ext-install curl pcntl sockets COPY --from=composer:2 /usr/bin/composer /usr/bin/composer diff --git a/src/Api/Router.php b/src/Api/Router.php index 3d2836d..4317df9 100644 --- a/src/Api/Router.php +++ b/src/Api/Router.php @@ -4,17 +4,20 @@ namespace Jakach\Logging\Api; use Jakach\Logging\Model\{LogSourceType, AlertStatus}; use Jakach\Logging\Storage\{Database, Repository}; +use Jakach\Logging\RuleEngine\Engine; class Router { private Repository $repo; private AuthMiddleware $auth; + private Engine $engine; public function __construct() { $db = new Database(); $this->repo = new Repository($db); $this->auth = new AuthMiddleware($this->repo); + $this->engine = new Engine($this->repo); } public function handle(): void @@ -27,7 +30,7 @@ class Router header('Content-Type: application/json'); try { - $publicPaths = ['/health', '/oauth', '/auth/me', '/auth/logout']; + $publicPaths = ['/health', '/oauth', '/auth/me', '/auth/logout', '/ingest']; $isPublic = false; foreach ($publicPaths as $pp) { if ($path === $pp || str_starts_with($path, $pp . '/')) { @@ -48,6 +51,8 @@ class Router $result = match (true) { $path === '/health' && $method === 'GET' => ['status' => 'ok'], + $path === '/ingest' && $method === 'POST' => $this->ingest(), + $path === '/auth/me' && $method === 'GET' => $this->getMe(), $path === '/auth/logout' && $method === 'POST' => $this->logout(), @@ -195,4 +200,20 @@ class Router $this->repo->setAllowedUserTokens($tokens); return ['status' => 'saved', 'tokens' => $this->repo->getAllowedUserTokens()]; } + + private function ingest(): array + { + $body = json_decode(file_get_contents('php://input'), true); + $line = $body['line'] ?? ''; + $source = $body['source'] ?? 'http'; + + if (empty($line)) { + http_response_code(400); + return ['error' => 'Missing "line" field']; + } + + $this->engine->evaluate($line, null); + + return ['status' => 'ingested', 'line' => substr($line, 0, 100)]; + } } \ No newline at end of file diff --git a/src/Worker/FileWatcher.php b/src/Worker/FileWatcher.php index 687e4d5..0663921 100644 --- a/src/Worker/FileWatcher.php +++ b/src/Worker/FileWatcher.php @@ -8,6 +8,7 @@ class FileWatcher { private array $handles = []; private array $inodes = []; + private array $patterns = []; private int $checkInterval; public function __construct( @@ -25,11 +26,46 @@ class FileWatcher $path = $source->address; - if (!file_exists($path)) { - fprintf(STDERR, "File not found: %s\n", $path); + if (str_contains($path, '*') || str_contains($path, '?')) { + $dir = dirname($path); + $pattern = basename($path); + if (!isset($this->patterns[$source->id])) { + $this->patterns[$source->id] = ['dir' => $dir, 'pattern' => $pattern, 'source' => $source]; + } + fprintf(STDERR, "Watching pattern: %s (source: %s)\n", $path, $source->name); + $this->scanPattern($source->id, $dir, $pattern); return; } + if (!file_exists($path)) { + fprintf(STDERR, "File not found: %s (source: %s), will retry\n", $path, $source->name); + return; + } + + $this->openFile($source->id, $path); + fprintf(STDERR, "Watching file: %s (source: %s)\n", $path, $source->name); + } + + private function scanPattern(int $sourceId, string $dir, string $pattern): void + { + if (!is_dir($dir)) { + return; + } + $files = glob($dir . '/' . $pattern); + if ($files === false) { + return; + } + foreach ($files as $file) { + $key = $sourceId . ':' . $file; + if (!isset($this->handles[$key])) { + $this->openFile($key, $file); + fprintf(STDERR, " Watching matched file: %s\n", $file); + } + } + } + + private function openFile(string|int $key, string $path): void + { $handle = fopen($path, 'r'); if (!$handle) { fprintf(STDERR, "Cannot open file: %s\n", $path); @@ -38,29 +74,34 @@ class FileWatcher fseek($handle, 0, SEEK_END); $stat = fstat($handle); - $this->handles[$source->id] = $handle; - $this->inodes[$source->id] = $stat['ino'] ?? 0; - - fprintf(STDERR, "Watching file: %s (source: %s)\n", $path, $source->name); + $this->handles[$key] = $handle; + $this->inodes[$key] = $stat['ino'] ?? 0; } public function tick(): void { - foreach ($this->handles as $id => $handle) { + foreach ($this->patterns as $id => $p) { + $this->scanPattern($id, $p['dir'], $p['pattern']); + } + + foreach ($this->handles as $key => $handle) { if (feof($handle)) { clearstatcache(); - $source = null; - - if (file_exists(stream_get_meta_data($handle)['uri'])) { + $uri = stream_get_meta_data($handle)['uri']; + if (file_exists($uri)) { usleep(10000); continue; } + fclose($handle); + unset($this->handles[$key]); + continue; } while ($line = fgets($handle)) { $line = rtrim($line, "\r\n"); if ($line !== '') { - ($this->onLine)($line, $id); + $sourceId = is_string($key) ? (int) explode(':', $key)[0] : $key; + ($this->onLine)($line, $sourceId); } } } @@ -74,5 +115,6 @@ class FileWatcher fclose($handle); } $this->handles = []; + $this->patterns = []; } } \ No newline at end of file