Created
August 21, 2024 09:22
-
-
Save contrid/1229a4665235874a16a4986a5e235119 to your computer and use it in GitHub Desktop.
Kafka Streaming with PHP Socket Connection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
require 'vendor/autoload.php'; | |
use RdKafka\Producer; | |
use RdKafka\Conf; | |
use Ratchet\Server\IoServer; | |
use Ratchet\Http\HttpServer; | |
use Ratchet\WebSocket\WsServer; | |
use Ratchet\WebSocket\MessageComponentInterface; | |
// Kafka configuration | |
$conf = new Conf(); | |
$conf->set('bootstrap.servers', 'localhost:9092'); | |
$producer = new Producer($conf); | |
// WebSocket server | |
class WebSocketServer implements MessageComponentInterface | |
{ | |
protected $clients; | |
public function __construct() | |
{ | |
$this->clients = new SplObjectHash(); | |
} | |
public function onOpen(ConnectionInterface $conn) | |
{ | |
$this->clients->attach($conn); | |
} | |
public function onMessage(ConnectionInterface $conn, $msg) | |
{ | |
// Handle incoming messages from clients (if needed) | |
} | |
public function onClose(ConnectionInterface $conn) | |
{ | |
$this->clients->detach($conn); | |
} | |
public function onError(ConnectionInterface $conn, \Exception $e) | |
{ | |
$conn->close(); | |
} | |
public function broadcast($data) | |
{ | |
foreach ($this->clients as $client) { | |
if ($client->send($data) === false) { | |
$client->close(); | |
} | |
} | |
} | |
} | |
// Start the WebSocket server | |
$server = IoServer::factory( | |
new HttpServer( | |
new WsServer( | |
new WebSocketServer() | |
) | |
), | |
8080 | |
); | |
$server->run(); | |
// Kafka producer (example) | |
$producer->produce(RD_KAFKA_PARTITION_UA, 0, json_encode(['message' => 'Hello from Kafka']), 'my-topic'); | |
$producer->flush(10000); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment