adding tcp input
Deploy / deploy (push) Has been cancelled

This commit is contained in:
2026-05-06 12:14:43 +02:00
parent edfc4181d2
commit 8db6ba6ce2
5 changed files with 118 additions and 16 deletions
+35
View File
@@ -21,4 +21,39 @@ foreach ($config['rules'] as $rule) {
echo sprintf(" + Rule #%d: %s (%s)\n", $r->id, $r->name, $r->severity->value);
}
echo "Seeding default log sources...\n";
$existing = $repo->getSources();
$existingNames = array_map(fn($s) => $s->name, $existing);
if (!in_array('syslog-tcp', $existingNames)) {
$s = $repo->createSource(
name: 'syslog-tcp',
type: LogSourceType::Tcp,
address: 'tcp://0.0.0.0:9514',
labels: ['protocol' => 'syslog'],
);
echo sprintf(" + Source #%d: syslog-tcp (TCP :9514)\n", $s->id);
}
if (!in_array('syslog-udp', $existingNames)) {
$s = $repo->createSource(
name: 'syslog-udp',
type: LogSourceType::Udp,
address: 'udp://0.0.0.0:9514',
labels: ['protocol' => 'syslog'],
);
echo sprintf(" + Source #%d: syslog-udp (UDP :9514)\n", $s->id);
}
if (!in_array('collect-volume', $existingNames)) {
$s = $repo->createSource(
name: 'collect-volume',
type: LogSourceType::File,
address: '/collect/*.log',
labels: ['type' => 'shared-volume'],
);
echo sprintf(" + Source #%d: collect-volume (/collect/*.log)\n", $s->id);
}
echo "Done.\n";
+5
View File
@@ -27,6 +27,9 @@ services:
build:
context: .
dockerfile: docker/Dockerfile.php
ports:
- "9514:9514/tcp"
- "9514:9514/udp"
volumes:
- ./src:/app/src
- ./bin:/app/bin
@@ -34,6 +37,7 @@ services:
- ./composer.json:/app/composer.json
- /var/log:/host/logs:ro
- data:/app/data
- log_collect:/collect
depends_on:
- redis
command: ["php", "bin/consume", "--daemon"]
@@ -43,3 +47,4 @@ services:
volumes:
data:
log_collect:
+2 -3
View File
@@ -1,9 +1,8 @@
FROM php:8.3-cli-alpine
RUN apk add --no-cache linux-headers curl-dev git
RUN apk add --no-cache curl-dev git linux-headers
RUN docker-php-ext-install curl pcntl sockets 2>/dev/null || \
docker-php-ext-install curl pcntl
RUN docker-php-ext-install curl pcntl sockets
COPY --from=composer:2 /usr/bin/composer /usr/bin/composer
+22 -1
View File
@@ -4,17 +4,20 @@ namespace Jakach\Logging\Api;
use Jakach\Logging\Model\{LogSourceType, AlertStatus};
use Jakach\Logging\Storage\{Database, Repository};
use Jakach\Logging\RuleEngine\Engine;
class Router
{
private Repository $repo;
private AuthMiddleware $auth;
private Engine $engine;
public function __construct()
{
$db = new Database();
$this->repo = new Repository($db);
$this->auth = new AuthMiddleware($this->repo);
$this->engine = new Engine($this->repo);
}
public function handle(): void
@@ -27,7 +30,7 @@ class Router
header('Content-Type: application/json');
try {
$publicPaths = ['/health', '/oauth', '/auth/me', '/auth/logout'];
$publicPaths = ['/health', '/oauth', '/auth/me', '/auth/logout', '/ingest'];
$isPublic = false;
foreach ($publicPaths as $pp) {
if ($path === $pp || str_starts_with($path, $pp . '/')) {
@@ -48,6 +51,8 @@ class Router
$result = match (true) {
$path === '/health' && $method === 'GET' => ['status' => 'ok'],
$path === '/ingest' && $method === 'POST' => $this->ingest(),
$path === '/auth/me' && $method === 'GET' => $this->getMe(),
$path === '/auth/logout' && $method === 'POST' => $this->logout(),
@@ -195,4 +200,20 @@ class Router
$this->repo->setAllowedUserTokens($tokens);
return ['status' => 'saved', 'tokens' => $this->repo->getAllowedUserTokens()];
}
private function ingest(): array
{
$body = json_decode(file_get_contents('php://input'), true);
$line = $body['line'] ?? '';
$source = $body['source'] ?? 'http';
if (empty($line)) {
http_response_code(400);
return ['error' => 'Missing "line" field'];
}
$this->engine->evaluate($line, null);
return ['status' => 'ingested', 'line' => substr($line, 0, 100)];
}
}
+53 -11
View File
@@ -8,6 +8,7 @@ class FileWatcher
{
private array $handles = [];
private array $inodes = [];
private array $patterns = [];
private int $checkInterval;
public function __construct(
@@ -25,11 +26,46 @@ class FileWatcher
$path = $source->address;
if (!file_exists($path)) {
fprintf(STDERR, "File not found: %s\n", $path);
if (str_contains($path, '*') || str_contains($path, '?')) {
$dir = dirname($path);
$pattern = basename($path);
if (!isset($this->patterns[$source->id])) {
$this->patterns[$source->id] = ['dir' => $dir, 'pattern' => $pattern, 'source' => $source];
}
fprintf(STDERR, "Watching pattern: %s (source: %s)\n", $path, $source->name);
$this->scanPattern($source->id, $dir, $pattern);
return;
}
if (!file_exists($path)) {
fprintf(STDERR, "File not found: %s (source: %s), will retry\n", $path, $source->name);
return;
}
$this->openFile($source->id, $path);
fprintf(STDERR, "Watching file: %s (source: %s)\n", $path, $source->name);
}
private function scanPattern(int $sourceId, string $dir, string $pattern): void
{
if (!is_dir($dir)) {
return;
}
$files = glob($dir . '/' . $pattern);
if ($files === false) {
return;
}
foreach ($files as $file) {
$key = $sourceId . ':' . $file;
if (!isset($this->handles[$key])) {
$this->openFile($key, $file);
fprintf(STDERR, " Watching matched file: %s\n", $file);
}
}
}
private function openFile(string|int $key, string $path): void
{
$handle = fopen($path, 'r');
if (!$handle) {
fprintf(STDERR, "Cannot open file: %s\n", $path);
@@ -38,29 +74,34 @@ class FileWatcher
fseek($handle, 0, SEEK_END);
$stat = fstat($handle);
$this->handles[$source->id] = $handle;
$this->inodes[$source->id] = $stat['ino'] ?? 0;
fprintf(STDERR, "Watching file: %s (source: %s)\n", $path, $source->name);
$this->handles[$key] = $handle;
$this->inodes[$key] = $stat['ino'] ?? 0;
}
public function tick(): void
{
foreach ($this->handles as $id => $handle) {
foreach ($this->patterns as $id => $p) {
$this->scanPattern($id, $p['dir'], $p['pattern']);
}
foreach ($this->handles as $key => $handle) {
if (feof($handle)) {
clearstatcache();
$source = null;
if (file_exists(stream_get_meta_data($handle)['uri'])) {
$uri = stream_get_meta_data($handle)['uri'];
if (file_exists($uri)) {
usleep(10000);
continue;
}
fclose($handle);
unset($this->handles[$key]);
continue;
}
while ($line = fgets($handle)) {
$line = rtrim($line, "\r\n");
if ($line !== '') {
($this->onLine)($line, $id);
$sourceId = is_string($key) ? (int) explode(':', $key)[0] : $key;
($this->onLine)($line, $sourceId);
}
}
}
@@ -74,5 +115,6 @@ class FileWatcher
fclose($handle);
}
$this->handles = [];
$this->patterns = [];
}
}