MQTT

https://www.osys.it/mqtt/

Introduzione

Cos’è MQTT?

Il protocollo MQTT (Message Queuing Telemetry Transport) è nato nel 1999, sviluppato da IBM e Arcom (oggi parte di Eurotech). È stato pensato per fornire una soluzione di messaggistica leggera, adatta a dispositivi con risorse limitate e reti instabili o a bassa larghezza di banda.

Progettato inizialmente per soddisfare le esigenze della monitoraggio remoto degli oleodotti:

  • Comunicazioni via satellitari (costose, poca banda).
  • Necessario protocollo robusto, semplice, overhead minimo e affidabile anche su reti poco stabili.

Risultato: oggi è il protocollo principale per IoT e applicazioni M2M

Standardizzazione: Nel 2013, IBM ha donato MQTT all’OASIS (Organization for the Advancement of Structured Information Standards).

OASIS ha poi pubblicato lo standard MQTT 3.1.1 nel 2014, che è anche diventato uno standard ISO/IEC (20922) nel 2016.

Si parte direttamente dalla versione 3.1.1?

Si, la versione formalizzata da OASIS, ma esiste la versione 3.1 che è la prima rilasciata pubblicamente.

La precedente versione del protocollo era chiamata MQIsdp (MQ Telemetry Transport Sensor Network Device Protocol), usata internamente da IBM ed Eurotech

Successivamente è stato cambiato in nome di MQTT e standardizzato in 3.1.1

Quindi la versione successiva è la 4 ?

No...ma si

Il byte di versione presente alla connessione di un client MQTT:

MQTT ver. byte ver CONNECT Rilascio
3.1 0x3 IBM ed Eurotech 2010
3.1.1 0x4 OASIS 2014
5.0 0x5 OASIS 2019

Architettura di MQTT: Pub/Sub

https://www.twilio.com/en-us/blog/what-is-mqtt

Paradigma Pro Contro
Request/Response - Semplice da implementare
- Adatto per API e servizi web
- Ampio supporto
- Accoppiamento stretto client/server
- Non adatto per eventi asincroni
Publish/Subscribe - Decoupling tra publisher e subscriber
- Scalabile
- Ideale per eventi
- Nessuna garanzia che i messaggi vengano ricevuti
- Dipende da un broker
Message Queue - Affidabile e persistente
- Load balancing naturale
- Supporto a retry
- Accoppiamento moderato
- Più adatto a comunicazioni punto-punto
Event Streaming - Alta scalabilità
- Eventi persistenti
- Ottimo per analytics e replay
- Complessità di gestione
- Overhead infrastrutturale
Remote Procedure Call (RPC) - Astrazione potente
- Sembra una chiamata locale
- Buone performance con gRPC
- Può mascherare latenza e fallimenti
- Accoppiamento forte
Shared Memory - Altissima velocità
- Nessun overhead di rete
- Difficile da gestire (sincronizzazione)
- Scalabilità limitata
Tuple Space (Linda) - Massimo decoupling (spaziale e temporale)
- Concettualmente elegante
- Poco supporto moderno
- Performance non adatte a scenari real-time

Struttura di un messaggio MQTT

  • Fixed header, present in all MQTT Control Packets
  • Variable header, present in some MQTT Control Packets
  • Payload, present in some MQTT Control Packets

Dimensione massima di un pacchetto 256MB

Tipi di pacchetti MQTT

Codice (Dec) Codice (Hex) Nome Descrizione
1 0x01 CONNECT Il client richiede una connessione al server
2 0x02 CONNACK Il server risponde a una richiesta di connessione
3 0x03 PUBLISH Client o Server pubblica un messaggio
4 0x04 PUBACK ACK per messaggi QoS 1
5 0x05 PUBREC Ricevuta pubblicazione (QoS 2, fase 1)
6 0x06 PUBREL Rilascio pubblicazione (QoS 2, fase 2)
7 0x07 PUBCOMP Completamento pubblicazione (QoS 2, fase 3)
8 0x08 SUBSCRIBE Il client si iscrive a uno o più topic
9 0x09 SUBACK Risposta del server a SUBSCRIBE
10 0x0A UNSUBSCRIBE Il client cancella una o più sottoscrizioni
11 0x0B UNSUBACK Risposta del server a UNSUBSCRIBE
12 0x0C PINGREQ Ping del client al server
13 0x0D PINGRESP Risposta del server a PINGREQ
14 0x0E DISCONNECT Client (o server in ver 5.0) chiude la connessione
15 0x0F AUTH Riautentica la sessione (ver 5.0)

Topic

Rappresentano il percorso per la localizzazione del dato, separando il path con il carattere "/"

A differenza di altri protocolli, NON è necessario definirli a priori

È possibile usare qualunque carattere ma se iniziano per $ in genere sono riservati al funzionamento interno dei broker

Sono case sensitive

“/finance” is different from “finance”

Come strutturarli al meglio:

  • Usare una struttura gerarchica chiara
  • Nomi dei topic descrittivi
  • Separare i topic per area/funzionalità
  • Sfruttare la sicurezza e i permessi sui topic
  • Stato e controllo separati
  • Pianificare per scalabilità

In pubblicazione:

È necesario specificare un topic completo

In sottoscrizione:

È possibile sottoscriversi ad un topic specifico

Usare il carattere + invece che un subpath per indicare che si è interessati a qualunque subpath (Single-level wildcard)

Usare il carattere # per dire che si è interessati a qualunque subpath discendente (Multi-level wildcard)

Topic: Esempi

PUBLISH SUBSCRIBE Match
home/kitchen/temperature home/kitchen/temperature Sì ✅
home/kitchen/temperature home/+/temperature Sì ✅
home/livingroom/temperature home/+/temperature Sì ✅
home/livingroom/humidity home/+/temperature No ❌
home/livingroom/temperature home/# Sì ✅
home home/# Sì ✅
home home/+/temperature No ❌
factory/line1/machine1/status factory/+/machine1/status Sì ✅
factory/line2/machine3/status factory/+/machine1/status No ❌
office/rome/printer/status office/+/printer/# Sì ✅
office/rome/printer/status office/+/+/status Sì ✅
office/rome/printer/error/paperjam office/+/printer/# Sì ✅
office/rome/scanner/status office/+/printer/# No ❌

Topic: Esempi reali

  • _model_/_serial_/command/subcommand/v1
  • _model_/_serial_/state/subsystem/v1
  • _model_/_serial_/telemetry/v1
  • _model_/_serial_/config/v1
  • _model_/_serial_/config/set

Payload dei messaggi

Non è definito nessuno schema per il messaggi scambiati, che possono essere testo o dati binari

IIoT: Industrial IoT

Ci sono altre specifiche che definiscono sia topic che payload come Eclipse Sparkplug o Eclipse Kura

Permettono interoperabilità immediata (plug&play) tra produttori diversi.

Quality of Service (QoS)

Definisce il livello di affidabilità con cui il messaggio viene scambiato

QoS 0: At most once

QoS 1: At least once

QoS 2: Exactly once

Ha senso usarli quando si abbiato TCP come trasporto?

Si, e non solo per eventi di disconnessione...

Anche per regolare il flusso di messaggi (backpressure) quando QoS > 0

Attenzione ai sequence diagram ERRATI:

https://www.mathworks.com/help/coder/nvidia/ug/publish-and-subscribe-to-mqtt-messages.html

Attenzione ai sequence diagram ERRATI:

https://www.4next.eu/news/livelli-qos/

Il QoS viene stabilito tra il Sender ed il Receiver di volta in volta.

Attenzione ai sequence diagram CORRETTO:

https://www.hivemq.com/sb-assets/f/243938/1774x782/705e7b49d8/debunking-common-mqtt-qos-misconceptions-image-2.webp

MQTT QoS e QoS di Sottoscrizione

QoS Mess. Significato QoS Sub. Comportamento
0 (At most once) Il messaggio viene inviato una sola volta senza conferma 0 Il subscriber riceve il messaggio con QoS 0
0 (At most once) Il messaggio viene inviato una sola volta senza conferma 1 Il subscriber riceve il messaggio con QoS 0
0 (At most once) Il messaggio viene inviato una sola volta senza conferma 2 Il subscriber riceve il messaggio con QoS 0
1 (At least once) Il messaggio viene confermato almeno una volta 0 Il subscriber riceve il messaggio con QoS 0
1 (At least once) Il messaggio viene confermato almeno una volta 1 Il subscriber riceve il messaggio con QoS 1
1 (At least once) Il messaggio viene confermato almeno una volta 2 Il subscriber riceve il messaggio con QoS 1
2 (Exactly once) Il messaggio viene consegnato esattamente una volta 0 Il subscriber riceve il messaggio con QoS 0
2 (Exactly once) Il messaggio viene consegnato esattamente una volta 1 Il subscriber riceve il messaggio con QoS 1
2 (Exactly once) Il messaggio viene consegnato esattamente una volta 2 Il subscriber riceve il messaggio con QoS 2

Sessioni in MQTT

Esistono le code in MQTT?

Il flag Clean Session / Clean Start in accoppiata al QoS > 0 sia in pubblicazione che in sottoscrizione

Da usare quando:

  • ci sono dei dati accumulabili per elaborazioni successive
  • la mancanza di un campione può portare ad errori (!)

Inutile quando:

  • quando il dato va consumato "fresco": accendere una luce
  • nel caso in cui la perdita di campioni non comporti problemi

Domanda, come trasmettereste i valori di consumo di energia elettrica della vostra abitazione?

Messaggi retained

Per ogni topic è possibile manterene UN messaggio da inviare a tutti i sottoscrittori anche quando la sottoscrizione viene fatta dopo la pubblicazione dello stesso: il messaggio deve avere il flag retained attivo.

In altre parole è "l'ultimo valore noto" per quel topic.

Può essere eliminato pubblicando sullo stesso topic un messaggio con flag retained vuoto.

Quando è utile?

  • Inivio di configurazioni anche quando il dispositivo è offline
  • Dare l'ultimo stato a nuovi sottoscrittori (es: stato luce, temperatura, ecc...)
  • Stato di connessione di un dispositivo (vedi LWT)

LWT (o solo Will in 5.0)

Alla connessione al Broker è possibile dire di, in caso di disconnessione non pulita, pubblicare un messaggio su un determinato topic

Può avere sia un QoS che flag Retained

Esempi pratici di utilizzo

  • Conoscere lo stato connessione di un dispositivo
  • Eseguire azioni di pulizia (es. chiudere file, disattivare segnali, liberare memoria in un altro sistema)

Keep Alive di MQTT

Chi invia il PING request?

Quando vengono inviati i ping?

Quando il broker invia l'LWT?

Ordine di ricezione dei messaggi

I messaggi con QoS > 0 sono recapitati nello stesso ordine di pubblicazione

Ma attenzione che i messaggi retained non fanno parte delle coda di invio e possono essere pubblicati prima, durante o dopo lo scodamneto dei messaggi

Per questo è importante non basarsi sull'ordine di arrivo dei messaggi in presenza di messaggi retained

Ancora più importante nel caso di Broker in cluster

Domanda: come evitare il problema visto in precedenza?

Shared Subscriptions (MQTT 5.0 e backport su VerneMQ Mqtt 3.1.1)

Permette di distributire i messaggi di un determinato topic verso diversi client

Basta sottoscriversi in questo modo: $share/sharename/topic

MQTT 3.1.1 vs MQTT 5.0

Caratteristica MQTT 3.1.1 MQTT 5.0
Formato dei pacchetti Compatto ma limitato Più estensibile grazie agli attributi opzionali
Reason Codes Assenti Inclusi per migliorare il debugging e la gestione degli errori
Proprietà dei messaggi Limitate Estese con chiavi/valori per maggiore flessibilità
Sessioni persistenti Basate su "Clean Session" Più flessibili con "Session Expiry Interval"
Flusso dei messaggi Basic Migliorato con "Flow Control"
Supporto per autenticazione Username/Password Autenticazione avanzata (es. challenge-response)
Messaggi di disconnessione Non specificano il motivo Reason Codes per diagnosi dettagliata
Topic Alias Non supportati Supportati per ridurre overhead nei pacchetti
Shared Subscriptions Non supportate Supportate per bilanciare il carico
Fine protocollo! Altri aspetti...

Autenticazione e Sicurezza

  • Accesso anonimo
  • Autenticazione tramite username/password (file, database, ecc.)
  • Autenticazione tramite PSK
  • Autenticazione tramite Certificati SSL

Sicurezza

Uso di TLS per la crittografia

Access Control List (ACL)

Occhio a non finire su SHODAN!

DEMO

Istruzioni su PAD

Realizziamo un sistema di chat, e vedremo aspetti quali:

  • clean session e qos ricevere i messaggi quando siamo offline
  • retained welcome message per gruppi o persone
  • acl per sicurezza e privacy
  • lwt stato online / offline di un utente
User: Topic: Canale pubblico: chat/public/NOME_CANALE/welcome sub chat/public/NOME_CANALE/+/messages pub chat/public/NOME_CANALE/IO/messages Welcome message utente: pub+sub welcome/IO Stato utente online/offline (retained + lwt): pub onlinestatus/IO Chat privata: pub chat/private/ALTRO/IO/messages sub chat/private/IO/+/messages
Esempio comandi con mosquitto:
							mosquitto_sub -d -h mqtt.osys.it -u domenico -i domenico_sub -c --will-payload 'offline' --will-topic 'onlinestatus/domenico' --will-retain  -t 'onlinestatus/+' -t 'welcome/+' -t 'chat/private/domenico/+/messages'  -F '%t %I %j' -q 1

							mosquitto_pub -d -h mqtt.osys.it -u cristina -i cristina_pub -t 'chat/private/domenico/cristina/messages' -m 'ciao!'
							
							mosquitto_sub -d -h mqtt.osys.it -u cristina -i cristina_sub -c --will-payload 'offline' --will-topic 'onlinestatus/cristina' --will-retain  -t 'onlinestatus/+' -t 'welcome/+' -t 'chat/private/cristina/+/messages'  -F '%t %I %j' -q 1
						

ACL in file di testo o database (o WebHook) con VerneMQ

SSL offloading con HAProxy

Possibilità di usare certificati client

Uso di Proxy Protocol

Clustering di VerneMQ: Alta affidabilità o Alto throughput

Soggetto al CAP Theorem

Split-brain: Si ferma tutto! (sub e pub)

Bridge di Broker con VerneMQ

Esempio di bridge con mqtt.eclipseprojects.io


							- DOCKER_VERNEMQ_PLUGINS__VMQ_BRIDGE=on
							- DOCKER_VERNEMQ_VMQ_BRIDGE__TCP__SBR0=mqtt.eclipseprojects.io:1883
							- DOCKER_VERNEMQ_VMQ_BRIDGE__TCP__SBR0__CLEANSESSION=on
							- DOCKER_VERNEMQ_VMQ_BRIDGE__TCP__SBR0__CLIENT_ID=auto
							- DOCKER_VERNEMQ_VMQ_BRIDGE__TCP__SBR0__KEEPALIVE_INTERVAL=60
							- DOCKER_VERNEMQ_VMQ_BRIDGE__TCP__SBR0__TOPIC__1=hacklabcormano/* both 0
							- DOCKER_VERNEMQ_VMQ_BRIDGE__TCP__SBR0__TRY_PRIVATE=on
						

Gestire riconnessioni massive dal campo

A seguito di manutenzioni / riavvii / crash

Uso HAProxy con un rate limit sulle nuove sessioni

Attenzione al parametro del S.O. net.core.somaxconn...