+4
-2
@@ -85,8 +85,10 @@ class Router
|
|||||||
|
|
||||||
$this->respond(200, $result);
|
$this->respond(200, $result);
|
||||||
|
|
||||||
} catch (\RuntimeException $e) {
|
} catch (\Throwable $e) {
|
||||||
$this->respond($e->getCode() ?: 500, ['error' => $e->getMessage()]);
|
$code = is_int($e->getCode()) && $e->getCode() >= 100 && $e->getCode() < 600
|
||||||
|
? $e->getCode() : 500;
|
||||||
|
$this->respond($code, ['error' => $e->getMessage()]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ use Jakach\Logging\Model\{LogSource, LogSourceType};
|
|||||||
class SocketListener
|
class SocketListener
|
||||||
{
|
{
|
||||||
private array $servers = [];
|
private array $servers = [];
|
||||||
|
private array $tcpClients = [];
|
||||||
|
private int $maxClients = 64;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
private \Closure $onLine,
|
private \Closure $onLine,
|
||||||
@@ -22,50 +24,128 @@ class SocketListener
|
|||||||
$host = $parts['host'] ?? '0.0.0.0';
|
$host = $parts['host'] ?? '0.0.0.0';
|
||||||
$port = $parts['port'] ?? 9514;
|
$port = $parts['port'] ?? 9514;
|
||||||
|
|
||||||
$protocol = $source->type === LogSourceType::Udp ? SOL_UDP : SOL_TCP;
|
if ($source->type === LogSourceType::Udp) {
|
||||||
$sockType = $source->type === LogSourceType::Udp ? SOCK_DGRAM : SOCK_STREAM;
|
$this->listenUdp($source->id, $host, $port);
|
||||||
|
} else {
|
||||||
|
$this->listenTcp($source->id, $host, $port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
$sock = socket_create(AF_INET, $sockType, $protocol);
|
private function listenUdp(int $id, string $host, int $port): void
|
||||||
|
{
|
||||||
|
$sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
|
||||||
if (!$sock) {
|
if (!$sock) {
|
||||||
fprintf(STDERR, "Cannot create socket for %s: %s\n", $source->name, socket_strerror(socket_last_error()));
|
fprintf(STDERR, "Cannot create UDP socket for source #%d: %s\n", $id, socket_strerror(socket_last_error()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1);
|
socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1);
|
||||||
|
|
||||||
if (!socket_bind($sock, $host, $port)) {
|
if (!socket_bind($sock, $host, $port)) {
|
||||||
fprintf(STDERR, "Cannot bind %s on %s:%d: %s\n", $source->name, $host, $port, socket_strerror(socket_last_error()));
|
fprintf(STDERR, "Cannot bind UDP on %s:%d: %s\n", $host, $port, socket_strerror(socket_last_error()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
socket_set_nonblock($sock);
|
||||||
|
$this->servers[$id] = $sock;
|
||||||
|
fprintf(STDERR, "Listening on UDP %s:%d (source #%d)\n", $host, $port, $id);
|
||||||
|
}
|
||||||
|
|
||||||
if ($source->type === LogSourceType::Tcp) {
|
private function listenTcp(int $id, string $host, int $port): void
|
||||||
|
{
|
||||||
|
$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
|
||||||
|
if (!$sock) {
|
||||||
|
fprintf(STDERR, "Cannot create TCP socket for source #%d: %s\n", $id, socket_strerror(socket_last_error()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1);
|
||||||
|
if (!socket_bind($sock, $host, $port)) {
|
||||||
|
fprintf(STDERR, "Cannot bind TCP on %s:%d: %s\n", $host, $port, socket_strerror(socket_last_error()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
socket_listen($sock, 5);
|
socket_listen($sock, 5);
|
||||||
socket_set_nonblock($sock);
|
socket_set_nonblock($sock);
|
||||||
}
|
$this->servers[$id] = $sock;
|
||||||
|
$this->tcpClients[$id] = [];
|
||||||
$this->servers[$source->id] = $sock;
|
fprintf(STDERR, "Listening on TCP %s:%d (source #%d)\n", $host, $port, $id);
|
||||||
fprintf(STDERR, "Listening on %s://%s:%d (source: %s)\n", $source->type->value, $host, $port, $source->name);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function tick(): void
|
public function tick(): void
|
||||||
{
|
{
|
||||||
foreach ($this->servers as $id => $sock) {
|
foreach ($this->servers as $id => $sock) {
|
||||||
$from = '';
|
$protocol = get_resource_type($sock) === 'Socket' ? socket_get_option($sock, SOL_SOCKET, SO_TYPE) : -1;
|
||||||
$port = 0;
|
$this->processServer($id, $sock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (@socket_recvfrom($sock, $data, 65535, 0, $from, $port)) {
|
private function processServer(int $id, \Socket $sock): void
|
||||||
$line = rtrim($data, "\r\n");
|
{
|
||||||
|
if (!isset($this->tcpClients[$id])) {
|
||||||
|
$this->readUdp($id, $sock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (count($this->tcpClients[$id]) < $this->maxClients) {
|
||||||
|
$client = @socket_accept($sock);
|
||||||
|
if ($client === false) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
socket_set_nonblock($client);
|
||||||
|
$this->tcpClients[$id][] = $client;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach ($this->tcpClients[$id] as $i => $client) {
|
||||||
|
if (!is_resource($client) && !$client instanceof \Socket) {
|
||||||
|
unset($this->tcpClients[$id][$i]);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
$data = @socket_read($client, 65535, PHP_BINARY_READ);
|
||||||
|
if ($data === false) {
|
||||||
|
$err = socket_last_error($client);
|
||||||
|
if ($err !== SOCKET_EWOULDBLOCK && $err !== SOCKET_EAGAIN) {
|
||||||
|
@socket_close($client);
|
||||||
|
unset($this->tcpClients[$id][$i]);
|
||||||
|
}
|
||||||
|
} elseif ($data === '') {
|
||||||
|
@socket_close($client);
|
||||||
|
unset($this->tcpClients[$id][$i]);
|
||||||
|
} else {
|
||||||
|
$lines = explode("\n", $data);
|
||||||
|
foreach ($lines as $line) {
|
||||||
|
$line = rtrim($line, "\r\n");
|
||||||
if ($line !== '') {
|
if ($line !== '') {
|
||||||
($this->onLine)($line, $id);
|
($this->onLine)($line, $id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
$this->tcpClients[$id] = array_values($this->tcpClients[$id]);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function readUdp(int $id, \Socket $sock): void
|
||||||
|
{
|
||||||
|
$from = '';
|
||||||
|
$port = 0;
|
||||||
|
$data = @socket_recvfrom($sock, $buf, 65535, 0, $from, $port);
|
||||||
|
if ($data === false) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$lines = explode("\n", rtrim($buf, "\r\n"));
|
||||||
|
foreach ($lines as $line) {
|
||||||
|
$line = rtrim($line, "\r\n");
|
||||||
|
if ($line !== '') {
|
||||||
|
($this->onLine)($line, $id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public function stop(): void
|
public function stop(): void
|
||||||
{
|
{
|
||||||
|
foreach ($this->tcpClients as $id => $clients) {
|
||||||
|
foreach ($clients as $client) {
|
||||||
|
@socket_close($client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
$this->tcpClients = [];
|
||||||
foreach ($this->servers as $sock) {
|
foreach ($this->servers as $sock) {
|
||||||
socket_close($sock);
|
@socket_close($sock);
|
||||||
}
|
}
|
||||||
$this->servers = [];
|
$this->servers = [];
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user