File modules/mqtt/Mqtt.class.php

Last commit: Thu Dec 26 19:48:18 2024 +0100	Jan Dankert	New: Import the module 'phariable' via a new dependency script. The Variable Resolver has now it's own repository.
1 <?php 2 namespace mqtt; 3 4 /** 5 * MQTT client. 6 * 7 * @author Jan Dankert 8 */ 9 class Mqtt { 10 11 const TYPE_CONNECT = 1; 12 const TYPE_CONNACK = 2; 13 const TYPE_PUBLISH = 3; 14 const TYPE_PUBACK = 4; 15 const TYPE_SUBSCRIBE = 8; 16 const TYPE_SUBACK = 9; 17 const TYPE_DISCONNECT = 14; 18 19 const FORMAT_1_BYTE = 'C'; 20 const FORMAT_2_BYTE = 'n'; 21 22 const CONNECT_ACCEPTED = 0; 23 const CONNECT_WRONG_PROTOCOL_VERSION = 1; 24 const CONNECT_IDENTIFIER_REJECTED = 2; 25 const CONNECT_SERVER_UNAVAILABLE = 3; 26 const CONNECT_BAD_USERNAME_OR_PASSWORD = 4; 27 const CONNECT_NOT_AUTHORIZED = 5; 28 29 protected $connection; 30 31 /** 32 * @var Callable 33 */ 34 protected $log; 35 36 protected $clientId = "MQTTWEBCLIENT"; 37 38 public function setLog( $log ) 39 { 40 $this->log = $log; 41 return $this; 42 } 43 44 public function open( $url='mqtt://localhost' ) { 45 46 $urlParts = parse_url( $url ); 47 48 $port = @$urlParts['port'] ?: (@$urlParts['scheme']=='mqtts'?8883:1883); 49 50 if ( @$urlParts['scheme'] == 'mqtts' ) 51 $proto = 'ssl://'; // SSL 52 else 53 $proto = 'tcp://'; // Default 54 55 if ( !@$urlParts['host'] ) 56 throw new \Exception('MQTT-Host must be present'); 57 58 $this->connection = @fsockopen($proto . $urlParts['host'], $port, $errno, $errstr, 5); 59 60 if (!$this->connection || !is_resource($this->connection)) 61 // Keine Verbindung zum Host moeglich. 62 throw new \Exception("Connection refused: '" . $proto . $urlParts['host'] . ':' . $port . " - $errstr ($errno)" ); 63 64 return $this; 65 } 66 67 68 public function setClientId( $clientId ) 69 { 70 $this->clientId = $clientId; 71 return $this; 72 } 73 74 75 /** 76 * Connect to the server. 77 * @param $username 78 * @param $password 79 * @return $this 80 * @throws \Exception 81 */ 82 public function connect( $username='',$password='' ) { 83 84 $proto = 'MQTT'; 85 $protoVersion = 4; // MQTT 3.x 86 87 $connectFlag = 0b00000010; // with new session 88 $payload = $this->wrapWithLength($this->clientId); 89 90 if ( $username ) { 91 $connectFlag |= 0b10000000; 92 $payload .= $this->wrapWithLength($username); 93 94 if ( $password ) { 95 $connectFlag |= 0b01000000; 96 $payload .= $this->wrapWithLength($password); 97 } 98 } 99 $timeout = 10; 100 101 $variableHeader = 102 pack(self::FORMAT_2_BYTE,strlen($proto)). 103 $proto. 104 pack(self::FORMAT_1_BYTE,$protoVersion). 105 pack(self::FORMAT_1_BYTE,$connectFlag ). 106 pack(self::FORMAT_2_BYTE,$timeout ); 107 108 $this->sendCommand( self::TYPE_CONNECT,0,$variableHeader,$payload); 109 $r = $this->readPacketFromServer(); 110 111 list( $commandType,$flags,$response ) = $r; 112 113 if ( $commandType != self::TYPE_CONNACK ) 114 throw new \Exception('Server did not respond with CONNACK after CONNECT but with: '.$commandType); 115 116 $connectAcknowledgeFlags = ord($response[0]); 117 $connectReturnCode = ord($response[1]); 118 119 switch( $connectReturnCode ) { 120 121 case self::CONNECT_ACCEPTED: 122 return $this; 123 case self::CONNECT_BAD_USERNAME_OR_PASSWORD: 124 throw new \Exception('Bad username or password'); 125 case self::CONNECT_IDENTIFIER_REJECTED: 126 throw new \Exception('Identifier rejected'); 127 case self::CONNECT_NOT_AUTHORIZED: 128 throw new \Exception('Not authorized'); 129 case self::CONNECT_SERVER_UNAVAILABLE: 130 throw new \Exception('Server unavailable'); 131 case self::CONNECT_WRONG_PROTOCOL_VERSION: 132 throw new \Exception('Wrong protocol version'); 133 default: 134 throw new \Exception('CONNECT/CONNACK return code is : '.$connectReturnCode); 135 } 136 } 137 138 139 public function subscribe( $topic ) { 140 141 $packetId = 1; 142 $qos = 0b01; // at least once. 143 144 $variableHeader = pack(self::FORMAT_2_BYTE,$packetId); 145 $payload = $this->wrapWithLength($topic).pack(self::FORMAT_1_BYTE,$qos ); 146 147 $this->sendCommand( self::TYPE_SUBSCRIBE,2,$variableHeader,$payload); 148 $r = $this->readPacketFromServer(); 149 150 list( $commandType,$flags,$response ) = $r; 151 152 if ( $commandType != self::TYPE_SUBACK ) 153 throw new \Exception('Server did not respond with SUBACK after SUBSCRIBE but with: '.$commandType); 154 155 $returnCode = ord($response[2]); 156 157 switch( $returnCode ) { 158 case 0: // Success - Maximum QoS 0 159 case 1: // Success - Maximum QoS 1 160 case 2: // Maximum QoS 2 161 break; 162 default: 163 throw new \Exception('Returncode of SUBACK is not 0-2, but: '.$returnCode); 164 } 165 166 //if ( $packetId != bindec($response[0].$response[1] ) ) 167 // throw new \Exception('Packet-Id does not match: '.$packetId.' vs '.bindec($response[0].$response[1])) ; 168 169 170 171 172 173 $r = $this->readPacketFromServer(); // get a retained message (hopefully) 174 175 list( $commandType,$flags,$response ) = $r; 176 177 if ( $commandType != self::TYPE_PUBLISH ) 178 throw new \Exception('Server did not sent a PUBLISH packet after SUBSCRIBE, but: '.$commandType); 179 180 $lengthTopic = hexdec(bin2hex(substr($response,0,2))); 181 $response = substr($response,2); 182 183 if ( $this->log ) $this->log("Length of topic is ".$lengthTopic); 184 185 $topicFromResponse = substr($response,0,$lengthTopic); 186 187 if ( $topic != $topicFromResponse ) 188 throw new \Exception('Topic from Server does not match'); 189 190 $response = substr($response,$lengthTopic); 191 192 // if QOS 1 there is a package identifier 193 if ( $flags & 0b10 ) { 194 $packetId = hexdec(bin2hex(substr($response,0,2))); 195 if ( $this->log ) $this->log("packet id ".$packetId); 196 197 $response = substr($response,2); // Truncate package identifier from response 198 } 199 200 // if QOS 1 there is a payload length header 201 /* 202 if ( $flags & 0b10 ) { 203 204 $lengthPayload = hexdec(bin2hex(substr($response, 0, 2))); 205 if ($this->log) $this->log("Length of payload is " . $lengthPayload); 206 $response = substr($response, 2); 207 $value = substr($response,0,$lengthPayload); 208 $response = substr($response,$lengthPayload); 209 210 if ( strlen($response ) ) 211 throw new \Exception("response has more bytes than expected"); 212 return $value; 213 } else {*/ 214 return $response; 215 /*}*/ 216 217 218 } 219 220 221 /** 222 * Publishing a value to a MQTT topic. 223 * 224 * @param $topic 225 * @param $value 226 * @return void 227 * @throws \Exception 228 */ 229 public function publish( $topic,$value ) { 230 231 $packetId = 1; 232 $variableHeader = $this->wrapWithLength($topic).pack(self::FORMAT_2_BYTE,$packetId); 233 // The length of the payload can be calculated by subtracting the length of the variable header 234 // from the Remaining Length field that is in the Fixed Header. 235 // It is valid for a PUBLISH Packet to contain a zero length payload. 236 $payload = $value; // do not prepend 237 $controlFlags = 0b0011; // at least once, retain 238 $this->sendCommand( self::TYPE_PUBLISH,$controlFlags,$variableHeader,$payload ); 239 $r = $this->readPacketFromServer(); 240 241 list( $commandType,$flags,$response ) = $r; 242 243 if ( $commandType != self::TYPE_PUBACK ) 244 throw new \Exception('Server did not respond with PUBACK after publishing but with: '.$commandType); 245 } 246 247 248 /** 249 * @param $commandType integer 250 * @param $controlFlag 251 * @param $variableHeader 252 * @param $payloads String[] 253 * @throws \Exception 254 */ 255 protected function sendCommand($commandType, $controlFlag, $variableHeader, $payloadValue ) { 256 257 // Control Header is the 1-byte header which contains 258 // - Command type (4 bit) 259 // - Control flag (4 bit) 260 $controlHeader = pack(self::FORMAT_1_BYTE,($commandType << 4) + $controlFlag); 261 262 //$payload = pack(self::FORMAT_2_BYTE,strlen($payloadValue)) . $payloadValue; 263 $payload = $payloadValue; 264 265 $remainingLength = $this->encodeMessageLength(strlen( $variableHeader ) + strlen( $payload )); 266 267 // Control Header : 1 byte 268 // Packet Length : 1 to 4 bytes (using 7 bits, 8th bit is continuation bit) 269 // Variable Header: 0..n bytes 270 // Payload : 0..n bytes 271 $packet = $controlHeader . $remainingLength . $variableHeader . $payload; 272 if ( $this->log ) $this->log( "MQTT Sending packet\n" . self::hexDump($packet) ); 273 274 if ( ! $this->connection ) 275 throw new \Exception("There is no open connection"); 276 277 $writtenBytes = fwrite($this->connection, $packet ); 278 if ( $writtenBytes === false ) 279 throw new \Exception('Could not write to MQTT tcp socket' ); 280 if ( $this->log ) $this->log( "MQTT Sent bytes: " . $writtenBytes ); 281 } 282 283 284 protected function readPacketFromServer() { 285 286 if (!is_resource($this->connection)) 287 throw new \Exception('Connection lost during transfer' ); 288 289 if (feof($this->connection)) 290 throw new \Exception('Unexpected EOF while reading HTTP-Response'); 291 292 // read the response 293 $responseControlHeader = fread( $this->connection, 1); 294 295 if ($responseControlHeader === false || $responseControlHeader === '') 296 throw new \Exception('Could not read control header from response'); 297 298 if ( $this->log ) $this->log( "MQTT got response control header: ".$responseControlHeader.' ('.gettype($responseControlHeader).')'."\n".self::hexDump($responseControlHeader) ); 299 300 $responseCommandType = ( ord($responseControlHeader) >> 4 ); 301 $responseControlFlags = ( ord($responseControlHeader) & 0b00001111 ); // get 4 bits from right 302 if ( $this->log ) $this->log( "MQTT Getting response control Header : " . bin2hex($responseControlHeader).' => command type: '.$responseCommandType.', control flags: '.str_pad(decbin($responseControlFlags),4,'0',STR_PAD_LEFT) ); 303 304 $responseRemainingLength = $this->readRemainingLengthFromSocket(); 305 if ( $this->log ) $this->log( "MQTT Response length : " . $responseRemainingLength ); 306 307 $response = fread( $this->connection, $responseRemainingLength ); 308 309 if ($response === false || $response === '') 310 throw new \Exception('Could not read response data from socket'); 311 312 if ( $this->log ) $this->log( "MQTT Getting response packet\n" . self::hexDump($response) ); 313 314 return( [ $responseCommandType, $responseControlFlags,$response ] ); 315 } 316 317 public function disconnect() { 318 $r = $this->sendCommand( self::TYPE_DISCONNECT,0,'','' ); 319 fclose( $this->connection ); 320 } 321 322 323 /** 324 * Prepend a value with a 2-byte length header. 325 * 326 * @param $value 327 * @return string 328 */ 329 protected function wrapWithLength( $value ) { 330 331 return pack(self::FORMAT_2_BYTE,strlen($value)).$value; 332 } 333 334 335 /** 336 * Encodes the length of a message. 337 * 338 * @param int $length 339 * @return string 340 */ 341 protected function encodeMessageLength(int $length): string 342 { 343 $result = ''; 344 345 do { 346 $digit = $length % 128; 347 $length = $length >> 7; // 7 bits are used with the 8th bit being a continuation bit. 348 349 // if there are more digits to encode, set the top bit of this digit 350 if ($length > 0) { 351 $digit |= 0x80; 352 } 353 354 $result .= chr($digit); 355 } while ($length > 0); 356 357 return $result; 358 } 359 360 361 362 protected function readRemainingLengthFromSocket() 363 { 364 $byteIndex = 1; 365 $remainingLength = 0; 366 $multiplier = 1; 367 368 do { 369 // we can take seven bits to calculate the length and the remaining eighth bit 370 // as continuation bit. 371 $digit = fread( $this->connection,1 ); 372 if ( $digit === false || $digit === '' ) 373 throw new \Exception('Cannot read the remaining length from the socket.'); 374 375 $remainingLength += ( ord($digit) & 127) * $multiplier; 376 $multiplier *= 128; 377 $byteIndex++; 378 } while ((ord($digit) & 128) !== 0); 379 380 return $remainingLength; 381 } 382 383 384 protected static function hexDump( $data, $newline="\n") 385 { 386 $width = 16; # number of bytes per line 387 $pad = '.'; # padding for non-visible characters 388 389 $from = ''; 390 $to = ''; 391 $output = ''; 392 393 for ($i=0; $i<=0xFF; $i++) 394 { 395 $from .= chr($i); 396 $to .= ($i >= 0x20 && $i <= 0x7E) ? chr($i) : $pad; 397 } 398 399 $hex = str_split(bin2hex($data), $width*2); 400 $chars = str_split(strtr($data, $from, $to), $width); 401 402 foreach ($hex as $i=>$line) 403 $output .= 404 implode(' ',array_pad(str_split($chars[$i]),16,' ') ) . ' ['.str_pad($chars[$i],16).']' . $newline . 405 implode(' ' ,array_pad(str_split($line ,2),16,' ') ) . $newline; 406 return $output; 407 } 408 409 410 protected function log( $log ) 411 { 412 if ( $this->log ) 413 call_user_func($this->log,$log ); 414 } 415 416 }
Download modules/mqtt/Mqtt.class.php
History Thu, 26 Dec 2024 19:48:18 +0100 Jan Dankert New: Import the module 'phariable' via a new dependency script. The Variable Resolver has now it's own repository.