diff --git a/src/Api/Router.php b/src/Api/Router.php index 7f77481..2fc6d4f 100644 --- a/src/Api/Router.php +++ b/src/Api/Router.php @@ -85,8 +85,10 @@ class Router $this->respond(200, $result); - } catch (\RuntimeException $e) { - $this->respond($e->getCode() ?: 500, ['error' => $e->getMessage()]); + } catch (\Throwable $e) { + $code = is_int($e->getCode()) && $e->getCode() >= 100 && $e->getCode() < 600 + ? $e->getCode() : 500; + $this->respond($code, ['error' => $e->getMessage()]); } } diff --git a/src/Worker/SocketListener.php b/src/Worker/SocketListener.php index fb91399..299493b 100644 --- a/src/Worker/SocketListener.php +++ b/src/Worker/SocketListener.php @@ -7,6 +7,8 @@ use Jakach\Logging\Model\{LogSource, LogSourceType}; class SocketListener { private array $servers = []; + private array $tcpClients = []; + private int $maxClients = 64; public function __construct( private \Closure $onLine, @@ -22,50 +24,128 @@ class SocketListener $host = $parts['host'] ?? '0.0.0.0'; $port = $parts['port'] ?? 9514; - $protocol = $source->type === LogSourceType::Udp ? SOL_UDP : SOL_TCP; - $sockType = $source->type === LogSourceType::Udp ? SOCK_DGRAM : SOCK_STREAM; + if ($source->type === LogSourceType::Udp) { + $this->listenUdp($source->id, $host, $port); + } else { + $this->listenTcp($source->id, $host, $port); + } + } - $sock = socket_create(AF_INET, $sockType, $protocol); + private function listenUdp(int $id, string $host, int $port): void + { + $sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if (!$sock) { - fprintf(STDERR, "Cannot create socket for %s: %s\n", $source->name, socket_strerror(socket_last_error())); + fprintf(STDERR, "Cannot create UDP socket for source #%d: %s\n", $id, socket_strerror(socket_last_error())); return; } - socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1); - if (!socket_bind($sock, $host, $port)) { - fprintf(STDERR, "Cannot bind %s on %s:%d: %s\n", $source->name, $host, $port, socket_strerror(socket_last_error())); + fprintf(STDERR, "Cannot bind UDP on %s:%d: %s\n", $host, $port, socket_strerror(socket_last_error())); return; } + socket_set_nonblock($sock); + $this->servers[$id] = $sock; + fprintf(STDERR, "Listening on UDP %s:%d (source #%d)\n", $host, $port, $id); + } - if ($source->type === LogSourceType::Tcp) { - socket_listen($sock, 5); - socket_set_nonblock($sock); + private function listenTcp(int $id, string $host, int $port): void + { + $sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); + if (!$sock) { + fprintf(STDERR, "Cannot create TCP socket for source #%d: %s\n", $id, socket_strerror(socket_last_error())); + return; } - - $this->servers[$source->id] = $sock; - fprintf(STDERR, "Listening on %s://%s:%d (source: %s)\n", $source->type->value, $host, $port, $source->name); + socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1); + if (!socket_bind($sock, $host, $port)) { + fprintf(STDERR, "Cannot bind TCP on %s:%d: %s\n", $host, $port, socket_strerror(socket_last_error())); + return; + } + socket_listen($sock, 5); + socket_set_nonblock($sock); + $this->servers[$id] = $sock; + $this->tcpClients[$id] = []; + fprintf(STDERR, "Listening on TCP %s:%d (source #%d)\n", $host, $port, $id); } public function tick(): void { foreach ($this->servers as $id => $sock) { - $from = ''; - $port = 0; + $protocol = get_resource_type($sock) === 'Socket' ? socket_get_option($sock, SOL_SOCKET, SO_TYPE) : -1; + $this->processServer($id, $sock); + } + } - if (@socket_recvfrom($sock, $data, 65535, 0, $from, $port)) { - $line = rtrim($data, "\r\n"); - if ($line !== '') { - ($this->onLine)($line, $id); + private function processServer(int $id, \Socket $sock): void + { + if (!isset($this->tcpClients[$id])) { + $this->readUdp($id, $sock); + return; + } + + while (count($this->tcpClients[$id]) < $this->maxClients) { + $client = @socket_accept($sock); + if ($client === false) { + break; + } + socket_set_nonblock($client); + $this->tcpClients[$id][] = $client; + } + + foreach ($this->tcpClients[$id] as $i => $client) { + if (!is_resource($client) && !$client instanceof \Socket) { + unset($this->tcpClients[$id][$i]); + continue; + } + $data = @socket_read($client, 65535, PHP_BINARY_READ); + if ($data === false) { + $err = socket_last_error($client); + if ($err !== SOCKET_EWOULDBLOCK && $err !== SOCKET_EAGAIN) { + @socket_close($client); + unset($this->tcpClients[$id][$i]); } + } elseif ($data === '') { + @socket_close($client); + unset($this->tcpClients[$id][$i]); + } else { + $lines = explode("\n", $data); + foreach ($lines as $line) { + $line = rtrim($line, "\r\n"); + if ($line !== '') { + ($this->onLine)($line, $id); + } + } + } + } + $this->tcpClients[$id] = array_values($this->tcpClients[$id]); + } + + private function readUdp(int $id, \Socket $sock): void + { + $from = ''; + $port = 0; + $data = @socket_recvfrom($sock, $buf, 65535, 0, $from, $port); + if ($data === false) { + return; + } + $lines = explode("\n", rtrim($buf, "\r\n")); + foreach ($lines as $line) { + $line = rtrim($line, "\r\n"); + if ($line !== '') { + ($this->onLine)($line, $id); } } } public function stop(): void { + foreach ($this->tcpClients as $id => $clients) { + foreach ($clients as $client) { + @socket_close($client); + } + } + $this->tcpClients = []; foreach ($this->servers as $sock) { - socket_close($sock); + @socket_close($sock); } $this->servers = []; }