File modules/util/Mqtt.class.php

Last commit: Mon Jun 27 00:40:42 2022 +0200	Jan Dankert	New: Marker interface 'Scriptable', Proxy class for MQTT, help() method in Scripts.
1 <?php 2 // OpenRat Content Management System 3 // Copyright (C) 2002-2012 Jan Dankert, cms@jandankert.de 4 // 5 // This program is free software; you can redistribute it and/or 6 // modify it under the terms of the GNU General Public License 7 // as published by the Free Software Foundation; either version 2 8 // of the License, or (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU General Public License for more details. 14 // 15 // You should have received a copy of the GNU General Public License 16 // along with this program; if not, write to the Free Software 17 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 18 19 20 namespace util; 21 use logger\Logger; 22 23 /** 24 * MQTT client. 25 * 26 * @author Jan Dankert 27 */ 28 class Mqtt { 29 30 const TYPE_CONNECT = 1; 31 const TYPE_CONNACK = 2; 32 const TYPE_PUBLISH = 3; 33 const TYPE_PUBACK = 4; 34 const TYPE_SUBSCRIBE = 8; 35 const TYPE_SUBACK = 9; 36 const TYPE_DISCONNECT = 14; 37 38 const FORMAT_1_BYTE = 'C'; 39 const FORMAT_2_BYTE = 'n'; 40 41 const CONNECT_ACCEPTED = 0; 42 const CONNECT_WRONG_PROTOCOL_VERSION = 1; 43 const CONNECT_IDENTIFIER_REJECTED = 2; 44 const CONNECT_SERVER_UNAVAILABLE = 3; 45 const CONNECT_BAD_USERNAME_OR_PASSWORD = 4; 46 const CONNECT_NOT_AUTHORIZED = 5; 47 48 protected $connection; 49 50 51 public function __construct( $url ) { 52 53 $urlParts = parse_url( $url ); 54 55 $port = @$urlParts['port'] ?: (@$urlParts['scheme']=='mqtts'?8883:1883); 56 57 if ( @$urlParts['scheme'] == 'mqtts' ) 58 $proto = 'ssl://'; // SSL 59 else 60 $proto = 'tcp://'; // Default 61 62 if ( !@$urlParts['host'] ) 63 throw new \Exception('MQTT-Host must be present'); 64 65 $this->connection = @fsockopen($proto . $urlParts['host'], $port, $errno, $errstr, 5); 66 67 if (!$this->connection || !is_resource($this->connection)) 68 // Keine Verbindung zum Host moeglich. 69 throw new \Exception("Connection refused: '" . $proto . $urlParts['host'] . ':' . $port . " - $errstr ($errno)" ); 70 } 71 72 73 public function connect( $username,$password ) { 74 75 $clientID = 'CMS'; 76 $proto = 'MQTT'; 77 $protoVersion = 4; // MQTT 3.x 78 $connectFlag = 0b11000010; // Username,Password,new session 79 $timeout = 10; 80 81 $variableHeader = 82 pack(self::FORMAT_2_BYTE,strlen($proto)). 83 $proto. 84 pack(self::FORMAT_1_BYTE,$protoVersion). 85 pack(self::FORMAT_1_BYTE,$connectFlag ). 86 pack(self::FORMAT_2_BYTE,$timeout ); 87 88 $payload = array_reduce( [ $clientID,$username,$password ],function($carry,$item) { 89 return $carry.$this->wrapWithLength($item); 90 },''); 91 92 $this->sendCommand( self::TYPE_CONNECT,0,$variableHeader,$payload); 93 $r = $this->readPacketFromServer(); 94 95 list( $commandType,$flags,$response ) = $r; 96 97 if ( $commandType != self::TYPE_CONNACK ) 98 throw new \Exception('Server did not respond with CONNACK after CONNECT but with: '.$commandType); 99 100 $connectAcknowledgeFlags = ord($response[0]); 101 $connectReturnCode = ord($response[1]); 102 103 switch( $connectReturnCode ) { 104 105 case self::CONNECT_ACCEPTED: 106 return; 107 case self::CONNECT_BAD_USERNAME_OR_PASSWORD: 108 throw new \Exception('Bad username or password'); 109 case self::CONNECT_IDENTIFIER_REJECTED: 110 throw new \Exception('Identifier rejected'); 111 case self::CONNECT_NOT_AUTHORIZED: 112 throw new \Exception('Not authorized'); 113 case self::CONNECT_SERVER_UNAVAILABLE: 114 throw new \Exception('Server unavailable'); 115 case self::CONNECT_WRONG_PROTOCOL_VERSION: 116 throw new \Exception('Wrong protocol version'); 117 default: 118 throw new \Exception('CONNECT/CONNACK return code is : '.$connectReturnCode); 119 } 120 } 121 122 123 public function subscribe( $topic ) { 124 125 $packetId = 1; 126 $qos = 0b01; // at least once. 127 128 $variableHeader = pack(self::FORMAT_2_BYTE,$packetId); 129 $payload = $this->wrapWithLength($topic).pack(self::FORMAT_1_BYTE,$qos ); 130 131 $this->sendCommand( self::TYPE_SUBSCRIBE,2,$variableHeader,$payload); 132 $r = $this->readPacketFromServer(); 133 134 list( $commandType,$flags,$response ) = $r; 135 136 if ( $commandType != self::TYPE_SUBACK ) 137 throw new \Exception('Server did not respond with SUBACK after SUBSCRIBE but with: '.$commandType); 138 139 $returnCode = ord($response[2]); 140 141 switch( $returnCode ) { 142 case 0: // Success - Maximum QoS 0 143 case 1: // Success - Maximum QoS 1 144 case 2: // Maximum QoS 2 145 break; 146 default: 147 throw new \Exception('Returncode of SUBACK is not 0-2, but: '.$returnCode); 148 } 149 150 //if ( $packetId != bindec($response[0].$response[1] ) ) 151 // throw new \Exception('Packet-Id does not match: '.$packetId.' vs '.bindec($response[0].$response[1])) ; 152 153 154 155 156 157 $r = $this->readPacketFromServer(); // get a retained message (hopefully) 158 159 list( $commandType,$flags,$response ) = $r; 160 161 if ( $commandType != self::TYPE_PUBLISH ) 162 throw new \Exception('Server did not sent a PUBLISH packet after SUBSCRIBE, but: '.$commandType); 163 164 $lengthTopic = hexdec(bin2hex(substr($response,0,2))); 165 $response = substr($response,2); 166 167 Logger::debug("Length of topic is ".$lengthTopic); 168 169 $topic = substr($response,0,$lengthTopic); 170 $response = substr($response,$lengthTopic); 171 172 $packetId = hexdec(bin2hex(substr($response,0,2))); 173 $response = substr($response,2); 174 Logger::debug("packet id ".$packetId); 175 176 return $response; 177 178 $lengthPayload = hexdec(bin2hex(substr($response,0,2))); 179 Logger::debug("Length of payload is ".$lengthPayload); 180 $response = substr($response,2); 181 182 $value = substr($response,0,$lengthPayload); 183 $response = substr($response,$lengthPayload); 184 185 if ( strlen($response ) ) 186 throw new \Exception("response has more bytes than expected"); 187 188 return $value; 189 } 190 191 192 public function publish( $topic,$value ) { 193 194 $packetId = 1; 195 $variableHeader = $this->wrapWithLength($topic).pack(self::FORMAT_2_BYTE,$packetId); 196 $payload = $this->wrapWithLength($value); 197 $controlFlags = 0b0011; // at least once, retain 198 $this->sendCommand( self::TYPE_PUBLISH,$controlFlags,$variableHeader,$payload ); 199 $r = $this->readPacketFromServer(); 200 201 list( $commandType,$flags,$response ) = $r; 202 203 if ( $commandType != self::TYPE_PUBACK ) 204 throw new \Exception('Server did not respond with PUBACK after publishing but with: '.$commandType); 205 } 206 207 208 /** 209 * @param $commandType integer 210 * @param $controlFlag 211 * @param $variableHeader 212 * @param $payloads String[] 213 * @throws \Exception 214 */ 215 protected function sendCommand($commandType, $controlFlag, $variableHeader, $payloadValue ) { 216 217 $controlHeader = ($commandType << 4) + $controlFlag; 218 219 //$payload = pack(self::FORMAT_2_BYTE,strlen($payloadValue)) . $payloadValue; 220 $payload = $payloadValue; 221 222 $remainingLength = $this->encodeMessageLength(strlen( $variableHeader ) + strlen( $payload )); 223 224 $packet = pack(self::FORMAT_1_BYTE,$controlHeader) . $remainingLength . $variableHeader . $payload; 225 Logger::debug( "MQTT Sending packet\n" . Text::hexDump($packet) ); 226 $writtenBytes = fwrite($this->connection, $packet ); 227 if ( $writtenBytes === false ) 228 throw new \Exception('Could not write to MQTT tcp socket' ); 229 Logger::debug( "MQTT Sent bytes: " . $writtenBytes ); 230 } 231 232 233 protected function readPacketFromServer() { 234 235 if (!is_resource($this->connection)) 236 throw new \Exception('Connection lost during transfer' ); 237 238 if (feof($this->connection)) 239 throw new \Exception('Unexpected EOF while reading HTTP-Response'); 240 241 // read the response 242 $responseControlHeader = fread( $this->connection, 1); 243 244 if ($responseControlHeader === false || $responseControlHeader === '') 245 throw new \Exception('Could not read control header from response'); 246 247 Logger::debug( "MQTT got response control header: ".$responseControlHeader.' ('.gettype($responseControlHeader).')'."\n".Text::hexDump($responseControlHeader) ); 248 249 $responseCommandType = ( ord($responseControlHeader) >> 4 ); 250 $responseControlFlags = ( ord($responseControlHeader) & 0b00001111 ); // get 4 bits from right 251 Logger::debug( "MQTT Getting response control Header : " . bin2hex($responseControlHeader).' => command type: '.$responseCommandType.', control flags: '.decbin($responseControlFlags) ); 252 253 $responseRemainingLength = $this->readRemainingLengthFromSocket(); 254 Logger::debug( "MQTT Response length : " . $responseRemainingLength ); 255 256 $response = fread( $this->connection, $responseRemainingLength ); 257 258 if ($response === false || $response === '') 259 throw new \Exception('Could not read response data from socket'); 260 261 Logger::debug( "MQTT Getting response packet\n" . Text::hexDump($response) ); 262 263 return( [ $responseCommandType, $responseControlFlags,$response ] ); 264 } 265 266 public function disconnect() { 267 $r = $this->sendCommand( self::TYPE_DISCONNECT,0,null,null ); 268 fclose( $this->connection ); 269 } 270 271 272 /** 273 * Prepend a value with a 2-byte length header. 274 * 275 * @param $value 276 * @return string 277 */ 278 protected function wrapWithLength( $value ) { 279 280 return pack(self::FORMAT_2_BYTE,strlen($value)).$value; 281 } 282 283 284 /** 285 * Encodes the length of a message as string, so it can be transmitted 286 * over the wire. 287 * 288 * @param int $length 289 * @return string 290 */ 291 protected function encodeMessageLength(int $length): string 292 { 293 $result = ''; 294 295 do { 296 $digit = $length % 128; 297 $length = $length >> 7; 298 299 // if there are more digits to encode, set the top bit of this digit 300 if ($length > 0) { 301 $digit = ($digit | 0x80); 302 } 303 304 $result .= chr($digit); 305 } while ($length > 0); 306 307 return $result; 308 } 309 310 311 312 protected function readRemainingLengthFromSocket() 313 { 314 $byteIndex = 1; 315 $remainingLength = 0; 316 $multiplier = 1; 317 318 do { 319 // we can take seven bits to calculate the length and the remaining eighth bit 320 // as continuation bit. 321 $digit = fread( $this->connection,1 ); 322 if ( $digit === false || $digit === '' ) 323 throw new \Exception('Cannot read the remaining length from the socket.'); 324 325 $remainingLength += ( ord($digit) & 127) * $multiplier; 326 $multiplier *= 128; 327 $byteIndex++; 328 } while ((ord($digit) & 128) !== 0); 329 330 return $remainingLength; 331 } 332 }
Download modules/util/Mqtt.class.php
History Mon, 27 Jun 2022 00:40:42 +0200 Jan Dankert New: Marker interface 'Scriptable', Proxy class for MQTT, help() method in Scripts. Sun, 12 Jun 2022 05:27:30 +0200 Jan Dankert New: MQTT support in DSL.