|
| 1 | +<?php |
| 2 | + |
| 3 | +namespace Gos\Component\WebSocketClient\Wamp; |
| 4 | + |
| 5 | +use Gos\Component\WebSocketClient\Exception\BadResponseException; |
| 6 | +use Gos\Component\WebSocketClient\Exception\WebsocketException; |
| 7 | + |
| 8 | +/** |
| 9 | + * WS Client |
| 10 | + * |
| 11 | + * @author Martin Bažík <[email protected]> |
| 12 | + * @author Johann Saunier <[email protected]> |
| 13 | + */ |
| 14 | +class Client |
| 15 | +{ |
| 16 | + /** @var string */ |
| 17 | + protected $endpoint; |
| 18 | + |
| 19 | + /** @var string */ |
| 20 | + protected $serverHost; |
| 21 | + |
| 22 | + /** @var int */ |
| 23 | + protected $serverPort; |
| 24 | + |
| 25 | + /** @var resource */ |
| 26 | + protected $socket; |
| 27 | + |
| 28 | + /** @var bool */ |
| 29 | + protected $connected; |
| 30 | + |
| 31 | + /** @var string */ |
| 32 | + protected $sessionId; |
| 33 | + |
| 34 | + /** |
| 35 | + * @param string $endpoint |
| 36 | + */ |
| 37 | + public function __construct($endpoint) |
| 38 | + { |
| 39 | + $this->endpoint = $endpoint; |
| 40 | + $this->parseUrl(); |
| 41 | + $this->connected = false; |
| 42 | + $this->serverPort = 80; |
| 43 | + } |
| 44 | + |
| 45 | + /** |
| 46 | + * @param string $target |
| 47 | + * |
| 48 | + * @return string |
| 49 | + * @throws BadResponseException |
| 50 | + * @throws WebsocketException |
| 51 | + */ |
| 52 | + public function connect($target = '/websocket/') |
| 53 | + { |
| 54 | + if ($this->connected) { |
| 55 | + return $this->sessionId; |
| 56 | + } |
| 57 | + |
| 58 | + $this->socket = stream_socket_client($this->serverHost . ':' . $this->serverPort, $errno, $errstr); |
| 59 | + |
| 60 | + if (!$this->socket) { |
| 61 | + throw new BadResponseException('Could not open socket. Reason: ' . $errstr); |
| 62 | + } |
| 63 | + |
| 64 | + $response = $this->upgradeProtocol($target); |
| 65 | + |
| 66 | + $this->verifyResponse($response); |
| 67 | + |
| 68 | + $payload = json_decode($this->read()); |
| 69 | + |
| 70 | + if ($payload[0] != Protocol::MSG_WELCOME) { |
| 71 | + throw new BadResponseException('WAMP Server did not send welcome message.'); |
| 72 | + } |
| 73 | + |
| 74 | + $this->sessionId = $payload[1]; |
| 75 | + |
| 76 | + return $this->sessionId; |
| 77 | + } |
| 78 | + |
| 79 | + /** |
| 80 | + * @param string $target |
| 81 | + * |
| 82 | + * @return string |
| 83 | + * |
| 84 | + * @throws WebsocketException |
| 85 | + */ |
| 86 | + protected function upgradeProtocol($target) |
| 87 | + { |
| 88 | + $key = $this->generateKey(); |
| 89 | + |
| 90 | + if (false === strpos($target, '/')) { |
| 91 | + throw new WebsocketException('Wamp Server Target is wrong.'); |
| 92 | + } |
| 93 | + |
| 94 | + $out = "GET " . $target . " HTTP/1.1\r\n"; |
| 95 | + $out .= "Host: {$this->serverHost} \r\n"; |
| 96 | + $out .= "Upgrade: WebSocket\r\n"; |
| 97 | + $out .= "Connection: Upgrade\r\n"; |
| 98 | + $out .= "Sec-WebSocket-Key: $key \r\n"; |
| 99 | + $out .= "Sec-WebSocket-Version: 13\r\n"; |
| 100 | + $out .= "Origin: *\r\n\r\n"; |
| 101 | + |
| 102 | + fwrite($this->socket, $out); |
| 103 | + |
| 104 | + return fgets($this->socket); |
| 105 | + } |
| 106 | + |
| 107 | + /** |
| 108 | + * @param $response |
| 109 | + * |
| 110 | + * @throws BadResponseException |
| 111 | + */ |
| 112 | + protected function verifyResponse($response) |
| 113 | + { |
| 114 | + if (false === $response) { |
| 115 | + throw new BadResponseException('WAMP Server did not respond properly'); |
| 116 | + } |
| 117 | + |
| 118 | + $subres = substr($response, 0, 12); |
| 119 | + |
| 120 | + if ($subres != 'HTTP/1.1 101') { |
| 121 | + throw new BadResponseException('Unexpected Response. Expected HTTP/1.1 101 got ' . $subres); |
| 122 | + } |
| 123 | + } |
| 124 | + |
| 125 | + /** |
| 126 | + * Read the buffer and return the oldest event in stack |
| 127 | + * |
| 128 | + * @see https://tools.ietf.org/html/rfc6455#section-5.2 |
| 129 | + * @return string |
| 130 | + */ |
| 131 | + protected function read() |
| 132 | + { |
| 133 | + // Ignore first byte |
| 134 | + fread($this->socket, 1); |
| 135 | + |
| 136 | + // There is also masking bit, as MSB, bit it's 0 |
| 137 | + $payloadLength = ord(fread($this->socket, 1)); |
| 138 | + |
| 139 | + switch ($payloadLength) { |
| 140 | + case 126: |
| 141 | + $payloadLength = unpack("n", fread($this->socket, 2)); |
| 142 | + $payloadLength = $payloadLength[1]; |
| 143 | + break; |
| 144 | + case 127: |
| 145 | + //$this->stdout('error', "Next 8 bytes are 64bit uint payload length, not yet implemented, since PHP can't handle 64bit longs!"); |
| 146 | + break; |
| 147 | + } |
| 148 | + |
| 149 | + return fread($this->socket, $payloadLength); |
| 150 | + } |
| 151 | + |
| 152 | + /** |
| 153 | + * Disconnect |
| 154 | + * |
| 155 | + * @return boolean |
| 156 | + */ |
| 157 | + public function disconnect() |
| 158 | + { |
| 159 | + if ($this->socket) { |
| 160 | + fclose($this->socket); |
| 161 | + |
| 162 | + return true; |
| 163 | + } |
| 164 | + |
| 165 | + return false; |
| 166 | + } |
| 167 | + |
| 168 | + /** |
| 169 | + * Send message to the websocket |
| 170 | + * |
| 171 | + * @access private |
| 172 | + * @param array $data |
| 173 | + * @return $this|Client |
| 174 | + */ |
| 175 | + protected function send($data) |
| 176 | + { |
| 177 | + $rawMessage = json_encode($data); |
| 178 | + $payload = new WebsocketPayload(); |
| 179 | + $payload |
| 180 | + ->setOpcode(WebsocketPayload::OPCODE_TEXT) |
| 181 | + ->setMask(true) |
| 182 | + ->setPayload($rawMessage); |
| 183 | + |
| 184 | + $encoded = $payload->encodePayload(); |
| 185 | + fwrite($this->socket, $encoded); |
| 186 | + |
| 187 | + return $this; |
| 188 | + } |
| 189 | + |
| 190 | + /** |
| 191 | + * Establish a prefix on server |
| 192 | + * @see http://wamp.ws/spec#prefix_message |
| 193 | + * @param string $prefix |
| 194 | + * @param string $uri |
| 195 | + */ |
| 196 | + public function prefix($prefix, $uri) |
| 197 | + { |
| 198 | + $type = Protocol::MSG_PREFIX; |
| 199 | + $data = [$type, $prefix, $uri]; |
| 200 | + $this->send($data); |
| 201 | + } |
| 202 | + |
| 203 | + /** |
| 204 | + * Call a procedure on server |
| 205 | + * @see http://wamp.ws/spec#call_message |
| 206 | + * @param string $procURI |
| 207 | + * @param mixed $arguments |
| 208 | + */ |
| 209 | + public function call($procUri, $arguments = []) |
| 210 | + { |
| 211 | + $args = func_get_args(); |
| 212 | + array_shift($args); |
| 213 | + $type = Protocol::MSG_CALL; |
| 214 | + $callId = uniqid("", $moreEntropy = true); |
| 215 | + $data = array_merge(array($type, $callId, $procUri), $args); |
| 216 | + |
| 217 | + $this->send($data); |
| 218 | + } |
| 219 | + |
| 220 | + /** |
| 221 | + * The client will send an event to all clients connected to the server who have subscribed to the topicURI |
| 222 | + * @see http://wamp.ws/spec#publish_message |
| 223 | + * @param string $topicUri |
| 224 | + * @param string $payload |
| 225 | + * @param string $exclude |
| 226 | + * @param string $eligible |
| 227 | + */ |
| 228 | + public function publish($topicUri, $payload, $exclude = [], $eligible = []) |
| 229 | + { |
| 230 | + $type = Protocol::MSG_PUBLISH; |
| 231 | + $data = array($type, $topicUri, $payload, $exclude, $eligible); |
| 232 | + $this->send($data); |
| 233 | + } |
| 234 | + |
| 235 | + /** |
| 236 | + * Subscribers receive PubSub events published by subscribers via the EVENT message. The EVENT message contains the topicURI, the topic under which the event was published, and event, the PubSub event payload. |
| 237 | + * @param string $topicUri |
| 238 | + * @param string $payload |
| 239 | + */ |
| 240 | + public function event($topicUri, $payload) |
| 241 | + { |
| 242 | + $type = Protocol::MSG_EVENT; |
| 243 | + $data = array($type, $topicUri, $payload); |
| 244 | + $this->send($data); |
| 245 | + } |
| 246 | + |
| 247 | + /** |
| 248 | + * @param int $length |
| 249 | + * |
| 250 | + * @return string |
| 251 | + */ |
| 252 | + protected function generateKey($length = 16) |
| 253 | + { |
| 254 | + $c = 0; |
| 255 | + $tmp = ''; |
| 256 | + |
| 257 | + while ($c++ * 16 < $length) { |
| 258 | + $tmp .= md5(mt_rand(), true); |
| 259 | + } |
| 260 | + |
| 261 | + return base64_encode(substr($tmp, 0, $length)); |
| 262 | + } |
| 263 | + |
| 264 | + /** |
| 265 | + * Parse the url and set server parameters |
| 266 | + * |
| 267 | + * @access private |
| 268 | + * @return bool |
| 269 | + */ |
| 270 | + protected function parseUrl() |
| 271 | + { |
| 272 | + $url = parse_url($this->endpoint); |
| 273 | + |
| 274 | + $this->serverHost = $url['host']; |
| 275 | + $this->serverPort = isset($url['port']) ? $url['port'] : null; |
| 276 | + |
| 277 | + if (array_key_exists('scheme', $url) && $url['scheme'] == 'https') { |
| 278 | + $this->serverHost = 'ssl://' . $this->serverHost; |
| 279 | + if (!$this->serverPort) { |
| 280 | + $this->serverPort = 443; |
| 281 | + } |
| 282 | + } |
| 283 | + |
| 284 | + return true; |
| 285 | + } |
| 286 | +} |
0 commit comments