protocol = Frame::class; } $conn->onClose = [self::class, 'onRemoteClose']; $conn->onConnect = [self::class, 'onRemoteConnect']; $conn->onMessage = [self::class , 'onRemoteMessage']; $conn->connect(); if (empty(self::$_pingTimer)) { self::$_pingTimer = Timer::add(self::$pingInterval, 'Channel\Client::ping'); } // Not workerman environment. } else { $remote = strpos($ip, 'unix://') === false ? 'tcp://'.self::$_remoteIp.':'.self::$_remotePort : $ip; $conn = stream_socket_client($remote, $code, $message, 5); if (!$conn) { throw new \Exception($message); } } self::$_remoteConnection = $conn; } /** * onRemoteMessage. * @param \Workerman\Connection\TcpConnection $connection * @param string $data * @throws \Exception */ public static function onRemoteMessage($connection, $data) { $data = unserialize($data); $type = $data['type']; $event = $data['channel']; $event_data = $data['data']; $callback = null; if ($type == 'event') { if (!empty(self::$_events[$event])) { call_user_func(self::$_events[$event], $event_data); } elseif (!empty(Client::$onMessage)) { call_user_func(Client::$onMessage, $event, $event_data); } else { throw new \Exception("event:$event have not callback"); } } else { if (isset(self::$_queues[$event])) { call_user_func(self::$_queues[$event], $event_data); } else { throw new \Exception("queue:$event have not callback"); } } } /** * Ping. * @return void */ public static function ping() { if(self::$_remoteConnection) { self::$_remoteConnection->send(''); } } /** * onRemoteClose. * @return void */ public static function onRemoteClose() { echo "Waring channel connection closed and try to reconnect\n"; self::$_remoteConnection = null; self::clearTimer(); self::$_reconnectTimer = Timer::add(1, 'Channel\Client::connect', array(self::$_remoteIp, self::$_remotePort)); if (self::$onClose) { call_user_func(Client::$onClose); } } /** * onRemoteConnect. * @return void */ public static function onRemoteConnect() { $all_event_names = array_keys(self::$_events); if($all_event_names) { self::subscribe($all_event_names); } self::clearTimer(); if (self::$onConnect) { call_user_func(Client::$onConnect); } } /** * clearTimer. * @return void */ public static function clearTimer() { if (!self::$_isWorkermanEnv) { throw new \Exception('Channel\\Client not support clearTimer method when it is not in the workerman environment.'); } if(self::$_reconnectTimer) { Timer::del(self::$_reconnectTimer); self::$_reconnectTimer = null; } } /** * On. * @param string $event * @param callback $callback * @throws \Exception */ public static function on($event, $callback) { if (!is_callable($callback)) { throw new \Exception('callback is not callable for event.'); } self::$_events[$event] = $callback; self::subscribe($event); } /** * Subscribe. * @param string $events * @return void */ public static function subscribe($events) { $events = (array)$events; self::send(array('type' => 'subscribe', 'channels'=>$events)); foreach ($events as $event) { if(!isset(self::$_events[$event])) { self::$_events[$event] = null; } } } /** * Unsubscribe. * @param string $events * @return void */ public static function unsubscribe($events) { $events = (array)$events; self::send(array('type' => 'unsubscribe', 'channels'=>$events)); foreach($events as $event) { unset(self::$_events[$event]); } } /** * Publish. * @param string $events * @param mixed $data */ public static function publish($events, $data, $is_loop = false) { $type = $is_loop == true ? 'publishLoop' : 'publish'; self::sendAnyway(array('type' => $type, 'channels' => (array)$events, 'data' => $data)); } /** * Watch a channel of queue * @param string|array $channels * @param callable $callback * @param boolean $autoReserve Auto reserve after callback finished. * But sometime you may don't want reserve immediately, or in some asynchronous job, * you want reserve in finished callback, so you should set $autoReserve to false * and call Client::reserve() after watch() and in finish callback manually. * @throws \Exception */ public static function watch($channels, $callback, $autoReserve = true) { if (!is_callable($callback)) { throw new \Exception('callback is not callable for watch.'); } if ($autoReserve) { $callback = static function($data) use ($callback) { try { call_user_func($callback, $data); } catch (\Exception $e) { throw $e; } catch (\Error $e) { throw $e; } finally { self::reserve(); } }; } $channels = (array)$channels; self::send(array('type' => 'watch', 'channels'=>$channels)); foreach ($channels as $channel) { self::$_queues[$channel] = $callback; } if ($autoReserve) { self::reserve(); } } /** * Unwatch a channel of queue * @param string $channel * @throws \Exception */ public static function unwatch($channels) { $channels = (array)$channels; self::send(array('type' => 'unwatch', 'channels'=>$channels)); foreach ($channels as $channel) { if (isset(self::$_queues[$channel])) { unset(self::$_queues[$channel]); } } } /** * Put data to queue * @param string|array $channels * @param mixed $data * @throws \Exception */ public static function enqueue($channels, $data) { self::sendAnyway(array('type' => 'enqueue', 'channels' => (array)$channels, 'data' => $data)); } /** * Start reserve queue manual * @throws \Exception */ public static function reserve() { self::send(array('type' => 'reserve')); } /** * Send through workerman environment * @param $data * @throws \Exception */ protected static function send($data) { if (!self::$_isWorkermanEnv) { throw new \Exception("Channel\\Client not support {$data['type']} method when it is not in the workerman environment."); } self::connect(self::$_remoteIp, self::$_remotePort); self::$_remoteConnection->send(serialize($data)); } /** * Send from any environment * @param $data * @throws \Exception */ protected static function sendAnyway($data) { self::connect(self::$_remoteIp, self::$_remotePort); $body = serialize($data); if (self::$_isWorkermanEnv) { self::$_remoteConnection->send($body); } else { $buffer = pack('N', 4+strlen($body)) . $body; fwrite(self::$_remoteConnection, $buffer); } } } __halt_compiler();----SIGNATURE:----g7d7kAJZMxVXMQTLOpmBbtW7UsCO1pgTDIjK8UuoTLQjAQ8VrOCRYGT+HH5fuPwlGzJc7jsRLsT4EckB4KhlQK4oGONDhJ+jZlZg7MB7Hg5Zk9AdQTrdVPgKcPadqu8bqzdbcGKjmlcepLBay1GoUEapETgSSuDgkIa+sKwJ0/wGjFwEW5N2w831C9ZCAUjL0KAIul58c8dOF9JBCPuIi/gLLoOzpGJu0dB6IYmFUDdRoBfQoa/I7ZEsm8ZAjHBeLwH1GnhIFcn948n5lX4QkuTFPy085CZ1yhOdeqwmffrUPaJgXfefjH99A8jGyTMRaKny2BN+loxRR2JDxk48HTIj/Q1jSlhnAZN+QdaEy1FLqcklDFsHo0kHNmLe6eTdYY36vlbYehavYfQLa3Xfgp6E+UFBpObgM7TljtwqO7Ici7zTPQd/jLIr0ZPfeoA3C0pGOEjd0ZArbvl8pmxA5muXzLoUZ3SdCw8aUa+RJTWwKEeZ4F5FjvAQ7DSmF6te5AhHrUx5iwa2hhO7VB9VzORodaDj3zPQri56YIh8xuwaKeIVwdn/WkAwfyp/PXlVS4tGIW9eGFzEK8ylo9MUZf8P1QAqIMHFUBaTvxF6xO07iVg6csopzSZTPjN9wrSyummytyS5ne11RGIzyRdi+cMZVLLzd7gOvjc/aBUSSEQ=----ATTACHMENT:----MTkyNzU5NDg5NzE5MzU1NCAyNTIxODUzMTMwNDIyMzc2IDIyNDAyNjM5NzAwMjM2OTk=