kafka
This project code is referenced from https://github.com/weiboad/kafka-php
Installation
composer required easyswoole/kafka
Register kafka service
namespace EasySwoole\EasySwoole;use App\Producer\Process as ProducerProcess;use App\Consumer\Process as ConsumerProcess;use EasySwoole\EasySwoole\Swoole\EventRegister;use EasySwoole\EasySwoole\AbstractInterface\Event;use EasySwoole\Http\Request;use EasySwoole\Http\Response;class EasySwooleEvent implements Event{public static function initialize(){// TODO: Implement initialize() method.date_default_timezone_set('Asia/Shanghai');}public static function mainServerCreate(EventRegister $register){// TODO: Implement mainServerCreate() method.// ProducerServerManager::getInstance()->getSwooleServer()->addProcess((new ProducerProcess())->getProcess());// consumerServerManager::getInstance()->getSwooleServer()->addProcess((new ConsumerProcess())->getProcess());}......}
Producer
namespace App\Producer;use EasySwoole\Component\Process\AbstractProcess;use EasySwoole\Kafka\Config\ProducerConfig;use EasySwoole\Kafka\kafka;class Process extends AbstractProcess{protected function run($arg){go(function () {$config = new ProducerConfig();$config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');$config->setBrokerVersion('0.9.0');$config->setRequiredAck(1);$kafka = new kafka($config);$result = $kafka->producer()->send([['topic' => 'test','value' => 'message--','key' => 'key--',],]);var_dump($result);var_dump('ok');});}}
consumer
namespace App\Consumer;use EasySwoole\Component\Process\AbstractProcess;use EasySwoole\Kafka\Config\ConsumerConfig;use EasySwoole\Kafka\kafka;class Process extends AbstractProcess{protected function run($arg){go(function () {$config = new ConsumerConfig();$config->setRefreshIntervalMs(1000);$config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');$config->setBrokerVersion('0.9.0');$config->setGroupId('test');$config->setTopics(['test']);$config->setOffsetReset('earliest');$kafka = new kafka($config);// Set consumption callback$func = function ($topic, $partition, $message) {var_dump($topic);var_dump($partition);var_dump($message);};$kafka->consumer()->subscribe($func);});}}
Bonus
- Kafka cluster deployment docker-compose.yml one, use as follows
- Ensure that the ports 2181, 9092, 9093, and 9000 are not occupied (you can modify the port number in the compose file after occupying)
- Under the root directory, docker-compose up -d
- Visit localhost:9000 to view the kafka cluster status.
any Question
Kafka use questions and bugs, welcome to questions or feedback in the kaka group of Easyswoole QQ group number: 827432930
