MQTT Data Collection and B+B SmartWorx


This blog post was written by Stephen Dekker.

A look at how Business Intelligence is connecting devices and collecting data with B+B SmartWorx

MQTT (MQ Telemetry Transport) is a lightweight machine-to-machine transport protocol. The messaging transport pattern follows a publish/subscribe method and is useful for reducing network activity of a communicating system. MQTT defines methods performed on identified resources, similar to a RESTful web service. By addressing a specific resource, or topic, to apply an action to the system handles the request appropriately.

MQTT’s methods include Connect, Disconnect, Subscribe, Unsubscribe, and Publish. A MQTT broker exists as an exchange service that receives messages from devices in the network and reroutes the messages to the correct locations. Some MQTT brokers implement additional features on top of the standard MQTT functionality.

The complete MQTT protocol is available at http://mqtt.org .

B+B SmartWorx, an Internet of Things mesh sensor platform, implements MQTT data transport for connecting to the system’s gateway and retrieving node data. The system supports two modes of MQTT data access: the gateway serving as a broker to serve data or a bridge to pass data through the gateway to a broker located on a different machine. A high-level diagram of the architecture of B+B SmartWorx is shown below:

When the gateway is serving as the MQTT broker, a client application must subscribe to the broker’s broadcasting port. The standard TCP/IP port is 1883 for access, while TCP/IP port 8883 is registered for using MQTT over SSL. By using a library such as Paho-MQTT, a quick client application can be developed to connect to the broker, subscribe to a topic, and begin receiving data from the broker. Data can then be transferred to a database for storage.

Some examples of python code snippets to perform each of these operations are shown below:

Using Paho-MQTT to connect and listen client = mqtt.Client()

client.on_connect = on_connect             #override the on_connect method

client.on_message = on_message             #override the on_message method

 

client.connect(“192.168.24.150”,1883,60)   #Host, Port, Keepalive

 

client.loop_forever();

Overriding the on_connect method def on_connect(client, userdata, flags, rc):

print(“Connected with result code “+str(rc))

client.subscribe(“BB/+/data”)   #Subscribe to all BB/*/data topics

Overriding the on_message method, parsing the JSON def on_message(client, userdata, msg):

obj = json.loads(str(msg.payload))

tagname = msg.topic.replace(‘/’, ‘.’);

obj[‘t’] = obj[‘t’].replace(‘T’, ‘ ‘);

obj[‘t’] = obj[‘t’][:-1]

if ‘tempint’ in obj:       #Only looking at tempint

#… Execute arbitrary storage code (see below)

 

Writing the data to MySQL conn = mysql.connector.connect(user=’test’,password=’admin’,host=’127.0.0.       1′,database=’bbdata’)

cursor = conn.cursor();

 

data = “INSERT INTO bbdatahist (TagName, Timestamp, Count, Quality,        ConfigCount, Valued) VALUES (‘” + tagname + “.tempint’,'” +        str(obj[‘t’]) + “‘,” + str(obj[‘s’]) + “,” + str(obj[‘q’]) + “,” + str(obj[‘c’]) + “,” +str(obj[‘tempint’]) + “);”

cursor.execute(data)

conn.commit();

 

 

When the gateway serves as a bridge, the gateway connects to a different address and port for transmission. This is typically used to push data to cloud services such as RoboMQ, SeeControl, BlueMix, or other third party applications. To demonstrate this form of data collection, RoboMQ was utilized to create an initial connection, collect data, and confirm the data matches the broker data. Consult the respective web application documentation for proper connection details.

MQTT2

 


RELATED POSTS