openrat-cms

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README

Mqtt.class.php (10150B)


      1 <?php
      2 // OpenRat Content Management System
      3 // Copyright (C) 2002-2012 Jan Dankert, cms@jandankert.de
      4 //
      5 // This program is free software; you can redistribute it and/or
      6 // modify it under the terms of the GNU General Public License
      7 // as published by the Free Software Foundation; either version 2
      8 // of the License, or (at your option) any later version.
      9 //
     10 // This program is distributed in the hope that it will be useful,
     11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 // GNU General Public License for more details.
     14 //
     15 // You should have received a copy of the GNU General Public License
     16 // along with this program; if not, write to the Free Software
     17 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
     18 
     19 
     20 namespace util;
     21 use logger\Logger;
     22 
     23 /**
     24  * MQTT client.
     25  *
     26  * @author Jan Dankert
     27  */
     28 class Mqtt {
     29 
     30 	const TYPE_CONNECT    =  1;
     31 	const TYPE_CONNACK    =  2;
     32 	const TYPE_PUBLISH    =  3;
     33 	const TYPE_PUBACK     =  4;
     34 	const TYPE_SUBSCRIBE  =  8;
     35 	const TYPE_SUBACK     =  9;
     36 	const TYPE_DISCONNECT = 14;
     37 
     38 	const FORMAT_1_BYTE = 'C';
     39 	const FORMAT_2_BYTE = 'n';
     40 
     41 	const CONNECT_ACCEPTED = 0;
     42 	const CONNECT_WRONG_PROTOCOL_VERSION = 1;
     43 	const CONNECT_IDENTIFIER_REJECTED = 2;
     44 	const CONNECT_SERVER_UNAVAILABLE = 3;
     45 	const CONNECT_BAD_USERNAME_OR_PASSWORD = 4;
     46 	const CONNECT_NOT_AUTHORIZED = 5;
     47 
     48 	protected $connection;
     49 
     50 
     51 	public function __construct( $url ) {
     52 
     53 		$urlParts = parse_url( $url );
     54 
     55 		$port = @$urlParts['port'] ?: (@$urlParts['scheme']=='mqtts'?8883:1883);
     56 
     57 		if ( @$urlParts['scheme'] == 'mqtts' )
     58 			$proto = 'ssl://'; // SSL
     59 		else
     60 			$proto = 'tcp://'; // Default
     61 
     62 		if   ( !@$urlParts['host'] )
     63 			throw new \Exception('MQTT-Host must be present');
     64 
     65 		$this->connection = @fsockopen($proto . $urlParts['host'], $port, $errno, $errstr, 5);
     66 
     67 		if (!$this->connection || !is_resource($this->connection))
     68 			// Keine Verbindung zum Host moeglich.
     69 			throw new \Exception("Connection refused: '" . $proto . $urlParts['host'] . ':' . $port . " - $errstr ($errno)" );
     70 	}
     71 
     72 
     73 	public function connect( $username,$password ) {
     74 
     75 		$clientID     = 'CMS';
     76 		$proto        = 'MQTT';
     77 		$protoVersion = 4; // MQTT 3.x
     78 		$connectFlag  = 0b11000010; // Username,Password,new session
     79 		$timeout      = 10;
     80 
     81 		$variableHeader =
     82 			pack(self::FORMAT_2_BYTE,strlen($proto)).
     83 			$proto.
     84 			pack(self::FORMAT_1_BYTE,$protoVersion).
     85 			pack(self::FORMAT_1_BYTE,$connectFlag ).
     86 			pack(self::FORMAT_2_BYTE,$timeout     );
     87 
     88 		$payload = array_reduce( [ $clientID,$username,$password ],function($carry,$item) {
     89 			return $carry.$this->wrapWithLength($item);
     90 		},'');
     91 
     92 		$this->sendCommand( self::TYPE_CONNECT,0,$variableHeader,$payload);
     93 		$r = $this->readPacketFromServer();
     94 
     95 		list( $commandType,$flags,$response ) = $r;
     96 
     97 		if   ( $commandType != self::TYPE_CONNACK )
     98 			throw new \Exception('Server did not respond with CONNACK after CONNECT but with: '.$commandType);
     99 
    100 		$connectAcknowledgeFlags = ord($response[0]);
    101 		$connectReturnCode = ord($response[1]);
    102 
    103 		switch( $connectReturnCode ) {
    104 
    105 			case self::CONNECT_ACCEPTED:
    106 				return;
    107 			case self::CONNECT_BAD_USERNAME_OR_PASSWORD:
    108 				throw new \Exception('Bad username or password');
    109 			case self::CONNECT_IDENTIFIER_REJECTED:
    110 				throw new \Exception('Identifier rejected');
    111 			case self::CONNECT_NOT_AUTHORIZED:
    112 				throw new \Exception('Not authorized');
    113 			case self::CONNECT_SERVER_UNAVAILABLE:
    114 				throw new \Exception('Server unavailable');
    115 			case self::CONNECT_WRONG_PROTOCOL_VERSION:
    116 				throw new \Exception('Wrong protocol version');
    117 			default:
    118 				throw new \Exception('CONNECT/CONNACK return code is : '.$connectReturnCode);
    119 		}
    120 	}
    121 
    122 
    123 	public function subscribe( $topic ) {
    124 
    125 		$packetId = 1;
    126 		$qos      = 0b01; // at least once.
    127 
    128 		$variableHeader = pack(self::FORMAT_2_BYTE,$packetId);
    129 		$payload        = $this->wrapWithLength($topic).pack(self::FORMAT_1_BYTE,$qos );
    130 
    131 		$this->sendCommand( self::TYPE_SUBSCRIBE,2,$variableHeader,$payload);
    132 		$r = $this->readPacketFromServer();
    133 
    134 		list( $commandType,$flags,$response ) = $r;
    135 
    136 		if   ( $commandType != self::TYPE_SUBACK )
    137 			throw new \Exception('Server did not respond with SUBACK after SUBSCRIBE but with: '.$commandType);
    138 
    139 		$returnCode = ord($response[2]);
    140 
    141 		switch( $returnCode ) {
    142 			case 0: // Success - Maximum QoS 0
    143 			case 1: // Success - Maximum QoS 1
    144 			case 2: // Maximum QoS 2
    145 				break;
    146 			default:
    147 				throw new \Exception('Returncode of SUBACK is not 0-2, but: '.$returnCode);
    148 		}
    149 
    150 		//if   ( $packetId != bindec($response[0].$response[1] ) )
    151 		//	throw new \Exception('Packet-Id does not match: '.$packetId.' vs '.bindec($response[0].$response[1])) ;
    152 
    153 
    154 
    155 
    156 
    157 		$r = $this->readPacketFromServer(); // get a retained message (hopefully)
    158 
    159 		list( $commandType,$flags,$response ) = $r;
    160 
    161 		if   ( $commandType != self::TYPE_PUBLISH )
    162 			throw new \Exception('Server did not sent a PUBLISH packet after SUBSCRIBE, but: '.$commandType);
    163 
    164 		$lengthTopic = hexdec(bin2hex(substr($response,0,2)));
    165 		$response = substr($response,2);
    166 
    167 		Logger::debug("Length of topic is ".$lengthTopic);
    168 
    169 		$topic = substr($response,0,$lengthTopic);
    170 		$response = substr($response,$lengthTopic);
    171 
    172 		$packetId = hexdec(bin2hex(substr($response,0,2)));
    173 		$response = substr($response,2);
    174 		Logger::debug("packet id ".$packetId);
    175 
    176 		return $response;
    177 
    178 		$lengthPayload = hexdec(bin2hex(substr($response,0,2)));
    179 		Logger::debug("Length of payload is ".$lengthPayload);
    180 		$response = substr($response,2);
    181 
    182 		$value = substr($response,0,$lengthPayload);
    183 		$response = substr($response,$lengthPayload);
    184 
    185 		if   ( strlen($response ) )
    186 			throw new \Exception("response has more bytes than expected");
    187 
    188 		return $value;
    189 	}
    190 
    191 
    192 	public function publish( $topic,$value ) {
    193 
    194 		$packetId = 1;
    195 		$variableHeader = $this->wrapWithLength($topic).pack(self::FORMAT_2_BYTE,$packetId);
    196 		$payload        = $this->wrapWithLength($value);
    197 		$controlFlags   = 0b0011; // at least once, retain
    198 		$this->sendCommand( self::TYPE_PUBLISH,$controlFlags,$variableHeader,$payload );
    199 		$r = $this->readPacketFromServer();
    200 
    201 		list( $commandType,$flags,$response ) = $r;
    202 
    203 		if   ( $commandType != self::TYPE_PUBACK )
    204 			throw new \Exception('Server did not respond with PUBACK after publishing but with: '.$commandType);
    205 	}
    206 
    207 
    208 	/**
    209 	 * @param $commandType integer
    210 	 * @param $controlFlag
    211 	 * @param $variableHeader
    212 	 * @param $payloads String[]
    213 	 * @throws \Exception
    214 	 */
    215 	protected function sendCommand($commandType, $controlFlag, $variableHeader, $payloadValue ) {
    216 
    217 		$controlHeader = ($commandType << 4) + $controlFlag;
    218 
    219 		//$payload  = pack(self::FORMAT_2_BYTE,strlen($payloadValue)) . $payloadValue;
    220 		$payload  = $payloadValue;
    221 
    222 		$remainingLength = $this->encodeMessageLength(strlen( $variableHeader ) + strlen( $payload ));
    223 
    224 		$packet = pack(self::FORMAT_1_BYTE,$controlHeader) . $remainingLength . $variableHeader . $payload;
    225 		Logger::debug( "MQTT Sending packet\n"         . Text::hexDump($packet) );
    226 		$writtenBytes = fwrite($this->connection, $packet );
    227 		if   ( $writtenBytes === false )
    228 			throw new \Exception('Could not write to MQTT tcp socket' );
    229 		Logger::debug( "MQTT Sent bytes: "         . $writtenBytes );
    230 	}
    231 
    232 
    233 	protected function readPacketFromServer() {
    234 
    235 		if (!is_resource($this->connection))
    236 			throw new \Exception('Connection lost during transfer' );
    237 
    238 		if (feof($this->connection))
    239 			throw new \Exception('Unexpected EOF while reading HTTP-Response');
    240 
    241 		// read the response
    242 		$responseControlHeader   = fread( $this->connection, 1);
    243 
    244 		if ($responseControlHeader === false || $responseControlHeader === '')
    245 			throw new \Exception('Could not read control header from response');
    246 
    247 		Logger::debug( "MQTT got response control header: ".$responseControlHeader.' ('.gettype($responseControlHeader).')'."\n".Text::hexDump($responseControlHeader) );
    248 
    249 		$responseCommandType     = ( ord($responseControlHeader) >> 4 );
    250 		$responseControlFlags    = ( ord($responseControlHeader) & 0b00001111 ); // get 4 bits from right
    251 		Logger::debug( "MQTT Getting response control Header : " . bin2hex($responseControlHeader).' => command type: '.$responseCommandType.', control flags: '.decbin($responseControlFlags) );
    252 
    253 		$responseRemainingLength  = $this->readRemainingLengthFromSocket();
    254 		Logger::debug( "MQTT Response length                 : " . $responseRemainingLength );
    255 
    256 		$response = fread( $this->connection, $responseRemainingLength );
    257 
    258 		if ($response === false || $response === '')
    259 			throw new \Exception('Could not read response data from socket');
    260 
    261 		Logger::debug( "MQTT Getting response packet\n" . Text::hexDump($response) );
    262 
    263 		return( [ $responseCommandType, $responseControlFlags,$response ] );
    264 	}
    265 
    266 	public function disconnect() {
    267 		$r = $this->sendCommand( self::TYPE_DISCONNECT,0,null,null );
    268 		fclose( $this->connection );
    269 	}
    270 
    271 
    272 	/**
    273 	 * Prepend a value with a 2-byte length header.
    274 	 *
    275 	 * @param $value
    276 	 * @return string
    277 	 */
    278 	protected function wrapWithLength( $value ) {
    279 
    280 		return pack(self::FORMAT_2_BYTE,strlen($value)).$value;
    281 	}
    282 
    283 
    284 	/**
    285 	 * Encodes the length of a message as string, so it can be transmitted
    286 	 * over the wire.
    287 	 *
    288 	 * @param int $length
    289 	 * @return string
    290 	 */
    291 	protected function encodeMessageLength(int $length): string
    292 	{
    293 		$result = '';
    294 
    295 		do {
    296 			$digit  = $length % 128;
    297 			$length = $length >> 7;
    298 
    299 			// if there are more digits to encode, set the top bit of this digit
    300 			if ($length > 0) {
    301 				$digit = ($digit | 0x80);
    302 			}
    303 
    304 			$result .= chr($digit);
    305 		} while ($length > 0);
    306 
    307 		return $result;
    308 	}
    309 
    310 
    311 
    312 	protected function readRemainingLengthFromSocket()
    313 	{
    314 		$byteIndex       = 1;
    315 		$remainingLength = 0;
    316 		$multiplier      = 1;
    317 
    318 		do {
    319 			// we can take seven bits to calculate the length and the remaining eighth bit
    320 			// as continuation bit.
    321 			$digit = fread( $this->connection,1 );
    322 			if   ( $digit === false || $digit === '' )
    323 				throw new \Exception('Cannot read the remaining length from the socket.');
    324 
    325 			$remainingLength += ( ord($digit) & 127) * $multiplier;
    326 			$multiplier *= 128;
    327 			$byteIndex++;
    328 		} while ((ord($digit) & 128) !== 0);
    329 
    330 		return $remainingLength;
    331 	}
    332 }