initial commit
This commit is contained in:
@@ -0,0 +1,105 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\Api;
|
||||
|
||||
use Jakach\Logging\Model\{LogSourceType, AlertStatus};
|
||||
use Jakach\Logging\Storage\{Database, Repository};
|
||||
|
||||
class Router
|
||||
{
|
||||
private Repository $repo;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
$db = new Database();
|
||||
$this->repo = new Repository($db);
|
||||
}
|
||||
|
||||
public function handle(): void
|
||||
{
|
||||
$method = $_SERVER['REQUEST_METHOD'];
|
||||
$path = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH);
|
||||
$path = rtrim($path, '/');
|
||||
|
||||
header('Content-Type: application/json');
|
||||
|
||||
try {
|
||||
$result = match (true) {
|
||||
$path === '/sources' && $method === 'GET' => $this->repo->getSources(),
|
||||
$path === '/sources' && $method === 'POST' => $this->createSource(),
|
||||
$path === '/rules' && $method === 'GET' => $this->repo->getRules(),
|
||||
$path === '/rules' && $method === 'POST' => $this->createRule(),
|
||||
$path === '/alerts' && $method === 'GET' => $this->getAlerts(),
|
||||
preg_match('#^/alerts/(\d+)/ack$#', $path, $m) && $method === 'POST' => $this->ackAlert((int) $m[1]),
|
||||
preg_match('#^/alerts/counts$#', $path) && $method === 'GET' => $this->repo->getAlertCounts(),
|
||||
$path === '/health' && $method === 'GET' => ['status' => 'ok'],
|
||||
default => throw new \RuntimeException('Not found', 404),
|
||||
};
|
||||
|
||||
if ($result instanceof \UnitEnum || is_scalar($result)) {
|
||||
$result = ['data' => $result];
|
||||
} elseif (is_array($result) && !empty($result) && $result[array_key_first($result)] instanceof \UnitEnum) {
|
||||
$result = ['data' => $result];
|
||||
} elseif (is_array($result)) {
|
||||
$needsWrap = false;
|
||||
foreach ($result as $key => $val) {
|
||||
if (is_object($val) && method_exists($val, 'toArray')) {
|
||||
$result[$key] = $val->toArray();
|
||||
} else {
|
||||
$needsWrap = true;
|
||||
}
|
||||
}
|
||||
if ($needsWrap || empty($result)) {
|
||||
$result = ['data' => $result];
|
||||
}
|
||||
} elseif (is_object($result) && method_exists($result, 'toArray')) {
|
||||
$result = ['data' => $result->toArray()];
|
||||
}
|
||||
|
||||
http_response_code(200);
|
||||
echo json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES);
|
||||
|
||||
} catch (\RuntimeException $e) {
|
||||
http_response_code($e->getCode() ?: 500);
|
||||
echo json_encode(['error' => $e->getMessage()]);
|
||||
}
|
||||
}
|
||||
|
||||
private function createSource(): array
|
||||
{
|
||||
$body = json_decode(file_get_contents('php://input'), true);
|
||||
$type = LogSourceType::from($body['type'] ?? '');
|
||||
return $this->repo->createSource(
|
||||
name: $body['name'],
|
||||
type: $type,
|
||||
address: $body['address'],
|
||||
labels: $body['labels'] ?? [],
|
||||
);
|
||||
}
|
||||
|
||||
private function createRule(): array
|
||||
{
|
||||
$body = json_decode(file_get_contents('php://input'), true);
|
||||
return $this->repo->createRule(
|
||||
name: $body['name'],
|
||||
pattern: $body['pattern'],
|
||||
severity: $body['severity'] ?? 'warning',
|
||||
rateLimitSeconds: $body['rate_limit_seconds'] ?? null,
|
||||
);
|
||||
}
|
||||
|
||||
private function getAlerts(): array
|
||||
{
|
||||
$limit = (int) ($_GET['limit'] ?? 100);
|
||||
$offset = (int) ($_GET['offset'] ?? 0);
|
||||
$status = $_GET['status'] ?? null;
|
||||
$severity = $_GET['severity'] ?? null;
|
||||
return $this->repo->getAlerts($limit, $offset, $status, $severity);
|
||||
}
|
||||
|
||||
private function ackAlert(int $id): array
|
||||
{
|
||||
$this->repo->updateAlertStatus($id, AlertStatus::Acknowledged);
|
||||
return ['status' => 'acknowledged', 'id' => $id];
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\Model;
|
||||
|
||||
class Alert
|
||||
{
|
||||
public function __construct(
|
||||
public readonly int $id,
|
||||
public readonly int $ruleId,
|
||||
public readonly string $ruleName,
|
||||
public readonly AlertSeverity $severity,
|
||||
public readonly AlertStatus $status,
|
||||
public readonly string $message,
|
||||
public readonly string $rawLine,
|
||||
public readonly ?int $sourceId = null,
|
||||
public readonly ?string $sourceName = null,
|
||||
public readonly \DateTimeImmutable $createdAt = new \DateTimeImmutable(),
|
||||
) {}
|
||||
|
||||
public static function fromRow(array $row): self
|
||||
{
|
||||
return new self(
|
||||
id: (int) $row['id'],
|
||||
ruleId: (int) $row['rule_id'],
|
||||
ruleName: $row['rule_name'],
|
||||
severity: AlertSeverity::from($row['severity']),
|
||||
status: AlertStatus::from($row['status']),
|
||||
message: $row['message'],
|
||||
rawLine: $row['raw_line'],
|
||||
sourceId: isset($row['source_id']) ? (int) $row['source_id'] : null,
|
||||
sourceName: $row['source_name'] ?? null,
|
||||
createdAt: new \DateTimeImmutable($row['created_at']),
|
||||
);
|
||||
}
|
||||
|
||||
public function toArray(): array
|
||||
{
|
||||
return [
|
||||
'id' => $this->id,
|
||||
'rule_id' => $this->ruleId,
|
||||
'rule_name' => $this->ruleName,
|
||||
'severity' => $this->severity->value,
|
||||
'status' => $this->status->value,
|
||||
'message' => $this->message,
|
||||
'raw_line' => $this->rawLine,
|
||||
'source_id' => $this->sourceId,
|
||||
'source_name' => $this->sourceName,
|
||||
'created_at' => $this->createdAt->format('c'),
|
||||
];
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\Model;
|
||||
|
||||
enum LogSourceType: string
|
||||
{
|
||||
case File = 'file';
|
||||
case Tcp = 'tcp';
|
||||
case Udp = 'udp';
|
||||
case Http = 'http';
|
||||
}
|
||||
|
||||
enum AlertSeverity: string
|
||||
{
|
||||
case Info = 'info';
|
||||
case Warning = 'warning';
|
||||
case Critical = 'critical';
|
||||
}
|
||||
|
||||
enum AlertStatus: string
|
||||
{
|
||||
case Open = 'open';
|
||||
case Acknowledged = 'acknowledged';
|
||||
case Resolved = 'resolved';
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\Model;
|
||||
|
||||
class LogSource
|
||||
{
|
||||
public function __construct(
|
||||
public readonly int $id,
|
||||
public readonly string $name,
|
||||
public readonly LogSourceType $type,
|
||||
public readonly string $address,
|
||||
public readonly array $labels = [],
|
||||
public readonly bool $active = true,
|
||||
public readonly \DateTimeImmutable $createdAt = new \DateTimeImmutable(),
|
||||
) {}
|
||||
|
||||
public static function fromRow(array $row): self
|
||||
{
|
||||
return new self(
|
||||
id: (int) $row['id'],
|
||||
name: $row['name'],
|
||||
type: LogSourceType::from($row['type']),
|
||||
address: $row['address'],
|
||||
labels: json_decode($row['labels'] ?? '[]', true),
|
||||
active: (bool) $row['active'],
|
||||
createdAt: new \DateTimeImmutable($row['created_at']),
|
||||
);
|
||||
}
|
||||
|
||||
public function toArray(): array
|
||||
{
|
||||
return [
|
||||
'id' => $this->id,
|
||||
'name' => $this->name,
|
||||
'type' => $this->type->value,
|
||||
'address' => $this->address,
|
||||
'labels' => $this->labels,
|
||||
'active' => $this->active,
|
||||
'created_at' => $this->createdAt->format('c'),
|
||||
];
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\Model;
|
||||
|
||||
class Rule
|
||||
{
|
||||
public function __construct(
|
||||
public readonly int $id,
|
||||
public readonly string $name,
|
||||
public readonly string $pattern,
|
||||
public readonly AlertSeverity $severity,
|
||||
public readonly ?int $rateLimitSeconds = null,
|
||||
public readonly bool $active = true,
|
||||
public readonly \DateTimeImmutable $createdAt = new \DateTimeImmutable(),
|
||||
) {}
|
||||
|
||||
public static function fromRow(array $row): self
|
||||
{
|
||||
return new self(
|
||||
id: (int) $row['id'],
|
||||
name: $row['name'],
|
||||
pattern: $row['pattern'],
|
||||
severity: AlertSeverity::from($row['severity']),
|
||||
rateLimitSeconds: isset($row['rate_limit_seconds']) ? (int) $row['rate_limit_seconds'] : null,
|
||||
active: (bool) $row['active'],
|
||||
createdAt: new \DateTimeImmutable($row['created_at']),
|
||||
);
|
||||
}
|
||||
|
||||
public function toArray(): array
|
||||
{
|
||||
return [
|
||||
'id' => $this->id,
|
||||
'name' => $this->name,
|
||||
'pattern' => $this->pattern,
|
||||
'severity' => $this->severity->value,
|
||||
'rate_limit_seconds' => $this->rateLimitSeconds,
|
||||
'active' => $this->active,
|
||||
'created_at' => $this->createdAt->format('c'),
|
||||
];
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\RuleEngine;
|
||||
|
||||
use Jakach\Logging\Model\{Rule, Alert, LogSource};
|
||||
use Jakach\Logging\Storage\Repository;
|
||||
|
||||
class Engine
|
||||
{
|
||||
private array $rateCache = [];
|
||||
private array $compiledPatterns = [];
|
||||
|
||||
public function __construct(
|
||||
private Repository $repo,
|
||||
) {}
|
||||
|
||||
public function evaluate(string $line, ?LogSource $source = null): ?Alert
|
||||
{
|
||||
$rules = $this->repo->getActiveRules();
|
||||
|
||||
foreach ($rules as $rule) {
|
||||
if ($this->matches($line, $rule)) {
|
||||
if ($rule->rateLimitSeconds !== null && !$this->repo->checkRateLimit($rule->id, $rule->rateLimitSeconds)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$message = sprintf(
|
||||
'[%s] Rule "%s" matched: %s',
|
||||
strtoupper($rule->severity->value),
|
||||
$rule->name,
|
||||
substr($line, 0, 200)
|
||||
);
|
||||
|
||||
$alert = $this->repo->createAlert(
|
||||
ruleId: $rule->id,
|
||||
ruleName: $rule->name,
|
||||
severity: $rule->severity->value,
|
||||
message: $message,
|
||||
rawLine: $line,
|
||||
sourceId: $source?->id,
|
||||
sourceName: $source?->name,
|
||||
);
|
||||
|
||||
return $alert;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private function matches(string $line, Rule $rule): bool
|
||||
{
|
||||
if (!isset($this->compiledPatterns[$rule->id])) {
|
||||
$delimiter = $rule->pattern[0] ?? '/';
|
||||
$this->compiledPatterns[$rule->id] = $rule->pattern;
|
||||
}
|
||||
|
||||
return (bool) preg_match($this->compiledPatterns[$rule->id], $line);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\Storage;
|
||||
|
||||
class Database
|
||||
{
|
||||
private \PDO $pdo;
|
||||
|
||||
public function __construct(string $path = '/app/data/logging.db')
|
||||
{
|
||||
$dir = dirname($path);
|
||||
if (!is_dir($dir)) {
|
||||
mkdir($dir, 0755, true);
|
||||
}
|
||||
|
||||
$this->pdo = new \PDO("sqlite:$path");
|
||||
$this->pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
|
||||
$this->pdo->setAttribute(\PDO::ATTR_DEFAULT_FETCH_MODE, \PDO::FETCH_ASSOC);
|
||||
$this->pdo->exec('PRAGMA journal_mode=WAL');
|
||||
$this->migrate();
|
||||
}
|
||||
|
||||
public function pdo(): \PDO
|
||||
{
|
||||
return $this->pdo;
|
||||
}
|
||||
|
||||
private function migrate(): void
|
||||
{
|
||||
$this->pdo->exec("
|
||||
CREATE TABLE IF NOT EXISTS log_sources (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
address TEXT NOT NULL,
|
||||
labels TEXT DEFAULT '[]',
|
||||
active INTEGER DEFAULT 1,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
)
|
||||
");
|
||||
|
||||
$this->pdo->exec("
|
||||
CREATE TABLE IF NOT EXISTS rules (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
pattern TEXT NOT NULL,
|
||||
severity TEXT NOT NULL DEFAULT 'warning',
|
||||
rate_limit_seconds INTEGER,
|
||||
active INTEGER DEFAULT 1,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
)
|
||||
");
|
||||
|
||||
$this->pdo->exec("
|
||||
CREATE TABLE IF NOT EXISTS alerts (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
rule_id INTEGER NOT NULL,
|
||||
rule_name TEXT NOT NULL,
|
||||
severity TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'open',
|
||||
message TEXT NOT NULL,
|
||||
raw_line TEXT NOT NULL,
|
||||
source_id INTEGER,
|
||||
source_name TEXT,
|
||||
created_at TEXT DEFAULT (datetime('now')),
|
||||
FOREIGN KEY (rule_id) REFERENCES rules(id)
|
||||
)
|
||||
");
|
||||
|
||||
$this->pdo->exec("
|
||||
CREATE INDEX IF NOT EXISTS idx_alerts_status ON alerts(status)
|
||||
");
|
||||
|
||||
$this->pdo->exec("
|
||||
CREATE INDEX IF NOT EXISTS idx_alerts_created ON alerts(created_at)
|
||||
");
|
||||
|
||||
$this->pdo->exec("
|
||||
CREATE TABLE IF NOT EXISTS rate_limiter (
|
||||
rule_id INTEGER NOT NULL,
|
||||
window_start INTEGER NOT NULL,
|
||||
count INTEGER DEFAULT 0,
|
||||
PRIMARY KEY (rule_id, window_start)
|
||||
)
|
||||
");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,169 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\Storage;
|
||||
|
||||
use Jakach\Logging\Model\{LogSource, Rule, Alert, AlertStatus, LogSourceType};
|
||||
|
||||
class Repository
|
||||
{
|
||||
public function __construct(
|
||||
private Database $db,
|
||||
) {}
|
||||
|
||||
// --- Log Sources ---
|
||||
|
||||
public function getSources(): array
|
||||
{
|
||||
$rows = $this->db->pdo()->query("SELECT * FROM log_sources ORDER BY name")->fetchAll();
|
||||
return array_map(fn(array $r) => LogSource::fromRow($r), $rows);
|
||||
}
|
||||
|
||||
public function getActiveSources(): array
|
||||
{
|
||||
$rows = $this->db->pdo()->query("SELECT * FROM log_sources WHERE active = 1 ORDER BY name")->fetchAll();
|
||||
return array_map(fn(array $r) => LogSource::fromRow($r), $rows);
|
||||
}
|
||||
|
||||
public function getSource(int $id): ?LogSource
|
||||
{
|
||||
$stmt = $this->db->pdo()->prepare("SELECT * FROM log_sources WHERE id = ?");
|
||||
$stmt->execute([$id]);
|
||||
$row = $stmt->fetch();
|
||||
return $row ? LogSource::fromRow($row) : null;
|
||||
}
|
||||
|
||||
public function createSource(string $name, LogSourceType $type, string $address, array $labels = []): LogSource
|
||||
{
|
||||
$stmt = $this->db->pdo()->prepare(
|
||||
"INSERT INTO log_sources (name, type, address, labels) VALUES (?, ?, ?, ?)"
|
||||
);
|
||||
$stmt->execute([$name, $type->value, $address, json_encode($labels)]);
|
||||
return $this->getSource((int) $this->db->pdo()->lastInsertId());
|
||||
}
|
||||
|
||||
public function deleteSource(int $id): void
|
||||
{
|
||||
$this->db->pdo()->prepare("DELETE FROM log_sources WHERE id = ?")->execute([$id]);
|
||||
}
|
||||
|
||||
// --- Rules ---
|
||||
|
||||
public function getRules(): array
|
||||
{
|
||||
$rows = $this->db->pdo()->query("SELECT * FROM rules ORDER BY name")->fetchAll();
|
||||
return array_map(fn(array $r) => Rule::fromRow($r), $rows);
|
||||
}
|
||||
|
||||
public function getActiveRules(): array
|
||||
{
|
||||
$rows = $this->db->pdo()->query("SELECT * FROM rules WHERE active = 1 ORDER BY name")->fetchAll();
|
||||
return array_map(fn(array $r) => Rule::fromRow($r), $rows);
|
||||
}
|
||||
|
||||
public function getRule(int $id): ?Rule
|
||||
{
|
||||
$stmt = $this->db->pdo()->prepare("SELECT * FROM rules WHERE id = ?");
|
||||
$stmt->execute([$id]);
|
||||
$row = $stmt->fetch();
|
||||
return $row ? Rule::fromRow($row) : null;
|
||||
}
|
||||
|
||||
public function createRule(string $name, string $pattern, string $severity, ?int $rateLimitSeconds = null): Rule
|
||||
{
|
||||
$stmt = $this->db->pdo()->prepare(
|
||||
"INSERT INTO rules (name, pattern, severity, rate_limit_seconds) VALUES (?, ?, ?, ?)"
|
||||
);
|
||||
$stmt->execute([$name, $pattern, $severity, $rateLimitSeconds]);
|
||||
return $this->getRule((int) $this->db->pdo()->lastInsertId());
|
||||
}
|
||||
|
||||
public function deleteRule(int $id): void
|
||||
{
|
||||
$this->db->pdo()->prepare("DELETE FROM rules WHERE id = ?")->execute([$id]);
|
||||
}
|
||||
|
||||
// --- Alerts ---
|
||||
|
||||
public function createAlert(int $ruleId, string $ruleName, string $severity, string $message, string $rawLine, ?int $sourceId = null, ?string $sourceName = null): Alert
|
||||
{
|
||||
$stmt = $this->db->pdo()->prepare(
|
||||
"INSERT INTO alerts (rule_id, rule_name, severity, status, message, raw_line, source_id, source_name)
|
||||
VALUES (?, ?, ?, 'open', ?, ?, ?, ?)"
|
||||
);
|
||||
$stmt->execute([$ruleId, $ruleName, $severity, $message, $rawLine, $sourceId, $sourceName]);
|
||||
$id = (int) $this->db->pdo()->lastInsertId();
|
||||
return $this->getAlert($id);
|
||||
}
|
||||
|
||||
public function getAlert(int $id): ?Alert
|
||||
{
|
||||
$stmt = $this->db->pdo()->prepare("SELECT * FROM alerts WHERE id = ?");
|
||||
$stmt->execute([$id]);
|
||||
$row = $stmt->fetch();
|
||||
return $row ? Alert::fromRow($row) : null;
|
||||
}
|
||||
|
||||
public function getAlerts(int $limit = 100, int $offset = 0, ?string $status = null, ?string $severity = null): array
|
||||
{
|
||||
$where = [];
|
||||
$params = [];
|
||||
|
||||
if ($status) {
|
||||
$where[] = 'status = ?';
|
||||
$params[] = $status;
|
||||
}
|
||||
if ($severity) {
|
||||
$where[] = 'severity = ?';
|
||||
$params[] = $severity;
|
||||
}
|
||||
|
||||
$sql = "SELECT * FROM alerts";
|
||||
if ($where) {
|
||||
$sql .= ' WHERE ' . implode(' AND ', $where);
|
||||
}
|
||||
$sql .= " ORDER BY created_at DESC LIMIT ? OFFSET ?";
|
||||
$params[] = $limit;
|
||||
$params[] = $offset;
|
||||
|
||||
$stmt = $this->db->pdo()->prepare($sql);
|
||||
$stmt->execute($params);
|
||||
$rows = $stmt->fetchAll();
|
||||
|
||||
return array_map(fn(array $r) => Alert::fromRow($r), $rows);
|
||||
}
|
||||
|
||||
public function updateAlertStatus(int $id, AlertStatus $status): void
|
||||
{
|
||||
$stmt = $this->db->pdo()->prepare("UPDATE alerts SET status = ? WHERE id = ?");
|
||||
$stmt->execute([$status->value, $id]);
|
||||
}
|
||||
|
||||
public function getAlertCounts(): array
|
||||
{
|
||||
return $this->db->pdo()->query(
|
||||
"SELECT status, severity, COUNT(*) as count FROM alerts GROUP BY status, severity"
|
||||
)->fetchAll();
|
||||
}
|
||||
|
||||
// --- Rate Limiting ---
|
||||
|
||||
public function checkRateLimit(int $ruleId, int $windowSeconds): bool
|
||||
{
|
||||
$now = time();
|
||||
$window = intdiv($now, $windowSeconds) * $windowSeconds;
|
||||
|
||||
$this->db->pdo()->prepare(
|
||||
"INSERT INTO rate_limiter (rule_id, window_start, count)
|
||||
VALUES (?, ?, 1)
|
||||
ON CONFLICT(rule_id, window_start) DO UPDATE SET count = count + 1"
|
||||
)->execute([$ruleId, $window]);
|
||||
|
||||
$stmt = $this->db->pdo()->prepare(
|
||||
"SELECT count FROM rate_limiter WHERE rule_id = ? AND window_start = ?"
|
||||
);
|
||||
$stmt->execute([$ruleId, $window]);
|
||||
$row = $stmt->fetch();
|
||||
|
||||
return $row['count'] <= 1;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\Worker;
|
||||
|
||||
use Jakach\Logging\Model\{LogSource, LogSourceType, Alert};
|
||||
|
||||
class FileWatcher
|
||||
{
|
||||
private array $handles = [];
|
||||
private array $inodes = [];
|
||||
private int $checkInterval;
|
||||
|
||||
public function __construct(
|
||||
private \Closure $onLine,
|
||||
int $checkInterval = 500000,
|
||||
) {
|
||||
$this->checkInterval = $checkInterval;
|
||||
}
|
||||
|
||||
public function watch(LogSource $source): void
|
||||
{
|
||||
if ($source->type !== LogSourceType::File) {
|
||||
return;
|
||||
}
|
||||
|
||||
$path = $source->address;
|
||||
|
||||
if (!file_exists($path)) {
|
||||
fprintf(STDERR, "File not found: %s\n", $path);
|
||||
return;
|
||||
}
|
||||
|
||||
$handle = fopen($path, 'r');
|
||||
if (!$handle) {
|
||||
fprintf(STDERR, "Cannot open file: %s\n", $path);
|
||||
return;
|
||||
}
|
||||
|
||||
fseek($handle, 0, SEEK_END);
|
||||
$stat = fstat($handle);
|
||||
$this->handles[$source->id] = $handle;
|
||||
$this->inodes[$source->id] = $stat['ino'] ?? 0;
|
||||
|
||||
fprintf(STDERR, "Watching file: %s (source: %s)\n", $path, $source->name);
|
||||
}
|
||||
|
||||
public function tick(): void
|
||||
{
|
||||
foreach ($this->handles as $id => $handle) {
|
||||
if (feof($handle)) {
|
||||
clearstatcache();
|
||||
$source = null;
|
||||
|
||||
if (file_exists(stream_get_meta_data($handle)['uri'])) {
|
||||
usleep(10000);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
while ($line = fgets($handle)) {
|
||||
$line = rtrim($line, "\r\n");
|
||||
if ($line !== '') {
|
||||
($this->onLine)($line, $id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
usleep($this->checkInterval);
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
foreach ($this->handles as $handle) {
|
||||
fclose($handle);
|
||||
}
|
||||
$this->handles = [];
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\Worker;
|
||||
|
||||
use Jakach\Logging\Model\{LogSource, Alert};
|
||||
use Jakach\Logging\RuleEngine\Engine;
|
||||
use Jakach\Logging\Storage\Repository;
|
||||
|
||||
class Orchestrator
|
||||
{
|
||||
private FileWatcher $fileWatcher;
|
||||
private SocketListener $socketListener;
|
||||
private Repository $repo;
|
||||
private Engine $engine;
|
||||
private array $sourceMap = [];
|
||||
private bool $running = true;
|
||||
|
||||
public function __construct(Repository $repo, Engine $engine)
|
||||
{
|
||||
$this->repo = $repo;
|
||||
$this->engine = $engine;
|
||||
$this->socketListener = new SocketListener(function (string $line, int $sourceId) {
|
||||
$this->handleLine($line, $sourceId);
|
||||
});
|
||||
$this->fileWatcher = new FileWatcher(function (string $line, int $sourceId) {
|
||||
$this->handleLine($line, $sourceId);
|
||||
});
|
||||
}
|
||||
|
||||
public function run(): void
|
||||
{
|
||||
$this->loadSources();
|
||||
|
||||
pcntl_signal(SIGTERM, function () { $this->running = false; });
|
||||
pcntl_signal(SIGINT, function () { $this->running = false; });
|
||||
|
||||
fprintf(STDERR, "Worker started, watching %d sources\n", count($this->sourceMap));
|
||||
|
||||
while ($this->running) {
|
||||
pcntl_signal_dispatch();
|
||||
$this->fileWatcher->tick();
|
||||
$this->socketListener->tick();
|
||||
}
|
||||
|
||||
$this->stop();
|
||||
}
|
||||
|
||||
private function loadSources(): void
|
||||
{
|
||||
$sources = $this->repo->getActiveSources();
|
||||
|
||||
foreach ($sources as $source) {
|
||||
$this->sourceMap[$source->id] = $source;
|
||||
$this->fileWatcher->watch($source);
|
||||
$this->socketListener->listen($source);
|
||||
}
|
||||
}
|
||||
|
||||
private function handleLine(string $line, int $sourceId): void
|
||||
{
|
||||
$source = $this->sourceMap[$sourceId] ?? null;
|
||||
$alert = $this->engine->evaluate($line, $source);
|
||||
|
||||
if ($alert !== null) {
|
||||
$msg = sprintf(
|
||||
"[%s] ALERT #%d [%s] %s",
|
||||
date('c'),
|
||||
$alert->id,
|
||||
strtoupper($alert->severity->value),
|
||||
$alert->message
|
||||
);
|
||||
fprintf(STDERR, "%s\n", $msg);
|
||||
echo $msg . "\n";
|
||||
}
|
||||
}
|
||||
|
||||
private function stop(): void
|
||||
{
|
||||
$this->fileWatcher->stop();
|
||||
$this->socketListener->stop();
|
||||
fprintf(STDERR, "Worker stopped\n");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
<?php
|
||||
|
||||
namespace Jakach\Logging\Worker;
|
||||
|
||||
use Jakach\Logging\Model\{LogSource, LogSourceType};
|
||||
|
||||
class SocketListener
|
||||
{
|
||||
private array $servers = [];
|
||||
|
||||
public function __construct(
|
||||
private \Closure $onLine,
|
||||
) {}
|
||||
|
||||
public function listen(LogSource $source): void
|
||||
{
|
||||
if (!in_array($source->type, [LogSourceType::Tcp, LogSourceType::Udp])) {
|
||||
return;
|
||||
}
|
||||
|
||||
$parts = parse_url($source->address);
|
||||
$host = $parts['host'] ?? '0.0.0.0';
|
||||
$port = $parts['port'] ?? 9514;
|
||||
|
||||
$protocol = $source->type === LogSourceType::Udp ? SOL_UDP : SOL_TCP;
|
||||
$sockType = $source->type === LogSourceType::Udp ? SOCK_DGRAM : SOCK_STREAM;
|
||||
|
||||
$sock = socket_create(AF_INET, $sockType, $protocol);
|
||||
if (!$sock) {
|
||||
fprintf(STDERR, "Cannot create socket for %s: %s\n", $source->name, socket_strerror(socket_last_error()));
|
||||
return;
|
||||
}
|
||||
|
||||
socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1);
|
||||
|
||||
if (!socket_bind($sock, $host, $port)) {
|
||||
fprintf(STDERR, "Cannot bind %s on %s:%d: %s\n", $source->name, $host, $port, socket_strerror(socket_last_error()));
|
||||
return;
|
||||
}
|
||||
|
||||
if ($source->type === LogSourceType::Tcp) {
|
||||
socket_listen($sock, 5);
|
||||
socket_set_nonblock($sock);
|
||||
}
|
||||
|
||||
$this->servers[$source->id] = $sock;
|
||||
fprintf(STDERR, "Listening on %s://%s:%d (source: %s)\n", $source->type->value, $host, $port, $source->name);
|
||||
}
|
||||
|
||||
public function tick(): void
|
||||
{
|
||||
foreach ($this->servers as $id => $sock) {
|
||||
$from = '';
|
||||
$port = 0;
|
||||
|
||||
if (@socket_recvfrom($sock, $data, 65535, 0, $from, $port)) {
|
||||
$line = rtrim($data, "\r\n");
|
||||
if ($line !== '') {
|
||||
($this->onLine)($line, $id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
foreach ($this->servers as $sock) {
|
||||
socket_close($sock);
|
||||
}
|
||||
$this->servers = [];
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user