在多个协程间共用同一个协程客户端

与同步阻塞程序不同,协程是并发处理请求的,因此同一时间可能会有很多个请求在并行处理,一旦共用客户端连接,就会导致不同协程之间发生数据错乱。

❌错误的代码

  1. $server = new Swoole\Http\Server('127.0.0.1', 9501);
  2. $server->on('Receive', function ($serv, $fd, $rid, $data) {
  3. $redis = RedisFactory::getRedis();
  4. $result = $redis->hgetall('key');
  5. $resp->end(var_export($result, true));
  6. });
  7. $server->start();
  8. class RedisFactory
  9. {
  10. private static $_redis = null;
  11. public static function getRedis()
  12. {
  13. if (null === self::$_redis) {
  14. $redis = new \Swoole\Coroutine\Redis();
  15. $redis->connect('127.0.0.1', 6379);
  16. self::$_redis = $redis;
  17. }
  18. return self::$_redis;
  19. }
  20. }

✅正确的代码

基于SplQueue实现协程客户端的连接池,可以复用协程客户端,实现长连接。

  1. $pool = new RedisPool();
  2. $server = new Swoole\Http\Server('127.0.0.1', 9501);
  3. $server->set([
  4. // 如开启异步安全重启, 需要在workerExit释放连接池资源
  5. 'reload_async' => true
  6. ]);
  7. $server->on('start', function (swoole_http_server $server) {
  8. var_dump($server->master_pid);
  9. });
  10. $server->on('workerExit', function (swoole_http_server $server) use ($pool) {
  11. $pool->destruct();
  12. });
  13. $server->on('request', function (swoole_http_request $req, swoole_http_response $resp) use ($pool) {
  14. //从连接池中获取一个Redis协程客户端
  15. $redis = $pool->get();
  16. //连接失败
  17. if ($redis === false) {
  18. $resp->end("ERROR");
  19. return;
  20. }
  21. $result = $redis->hgetall('key');
  22. $resp->end(var_export($result, true));
  23. //释放客户端,其他协程可复用此对象
  24. $pool->put($redis);
  25. });
  26. $server->start();
  27. class RedisPool
  28. {
  29. protected $available = true;
  30. protected $pool;
  31. public function __construct()
  32. {
  33. $this->pool = new SplQueue;
  34. }
  35. public function put($redis)
  36. {
  37. $this->pool->push($redis);
  38. }
  39. /**
  40. * @return bool|mixed|\Swoole\Coroutine\Redis
  41. */
  42. public function get()
  43. {
  44. //有空闲连接且连接池处于可用状态
  45. if ($this->available && count($this->pool) > 0) {
  46. return $this->pool->pop();
  47. }
  48. //无空闲连接,创建新连接
  49. $redis = new Swoole\Coroutine\Redis();
  50. $res = $redis->connect('127.0.0.1', 6379);
  51. if ($res == false) {
  52. return false;
  53. } else {
  54. return $redis;
  55. }
  56. }
  57. public function destruct()
  58. {
  59. // 连接池销毁, 置不可用状态, 防止新的客户端进入常驻连接池, 导致服务器无法平滑退出
  60. $this->available = false;
  61. while (!$this->pool->isEmpty()) {
  62. $this->pool->pop();
  63. }
  64. }
  65. }