Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 65830eb

Browse filesBrowse files
authored
Migrates Gateway code to MQTT example (GoogleCloudPlatform#1977)
* Migrates Gateway code to MQTT example * Refactors attach device and updates tests
1 parent 2fea7b6 commit 65830eb
Copy full SHA for 65830eb

File tree

Expand file treeCollapse file tree

4 files changed

+382
-29
lines changed
Filter options
Expand file treeCollapse file tree

4 files changed

+382
-29
lines changed

‎iot/api-client/manager/requirements.txt

Copy file name to clipboardExpand all lines: iot/api-client/manager/requirements.txt
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
cryptography==2.4.2
2-
flaky==3.4.0
2+
flaky==3.5.3
33
gcp-devrel-py-tools==0.0.15
44
google-api-python-client==1.7.5
55
google-auth-httplib2==0.0.3

‎iot/api-client/mqtt_example/cloudiot_mqtt_example.py

Copy file name to clipboardExpand all lines: iot/api-client/mqtt_example/cloudiot_mqtt_example.py
+238-27Lines changed: 238 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
# [START iot_mqtt_includes]
2525
import argparse
2626
import datetime
27+
import logging
2728
import os
2829
import random
2930
import ssl
@@ -33,6 +34,8 @@
3334
import paho.mqtt.client as mqtt
3435
# [END iot_mqtt_includes]
3536

37+
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.CRITICAL)
38+
3639
# The initial backoff time after a disconnection occurs, in seconds.
3740
minimum_backoff_time = 1
3841

@@ -169,37 +172,193 @@ def get_client(
169172
# [END iot_mqtt_config]
170173

171174

175+
def detach_device(client, device_id):
176+
"""Detach the device from the gateway."""
177+
# [START detach_device]
178+
detach_topic = '/devices/{}/detach'.format(device_id)
179+
print('Detaching: {}'.format(detach_topic))
180+
client.publish(detach_topic, '{}', qos=1)
181+
# [END detach_device]
182+
183+
184+
def attach_device(client, device_id, auth):
185+
"""Attach the device to the gateway."""
186+
# [START attach_device]
187+
attach_topic = '/devices/{}/attach'.format(device_id)
188+
attach_payload = '{{"authorization" : "{}"}}'.format(auth)
189+
client.publish(attach_topic, attach_payload, qos=1)
190+
# [END attach_device]
191+
192+
193+
def listen_for_messages(
194+
service_account_json, project_id, cloud_region, registry_id, device_id,
195+
gateway_id, num_messages, private_key_file, algorithm, ca_certs,
196+
mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, duration,
197+
cb=None):
198+
"""Listens for messages sent to the gateway and bound devices."""
199+
# [START listen_for_messages]
200+
global minimum_backoff_time
201+
202+
jwt_iat = datetime.datetime.utcnow()
203+
jwt_exp_mins = jwt_expires_minutes
204+
# Use gateway to connect to server
205+
client = get_client(
206+
project_id, cloud_region, registry_id, gateway_id,
207+
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
208+
mqtt_bridge_port)
209+
210+
attach_device(client, device_id, '')
211+
print('Waiting for device to attach.')
212+
time.sleep(5)
213+
214+
# The topic devices receive configuration updates on.
215+
device_config_topic = '/devices/{}/config'.format(device_id)
216+
client.subscribe(device_config_topic, qos=1)
217+
218+
# The topic gateways receive configuration updates on.
219+
gateway_config_topic = '/devices/{}/config'.format(gateway_id)
220+
client.subscribe(gateway_config_topic, qos=1)
221+
222+
# The topic gateways receive error updates on. QoS must be 0.
223+
error_topic = '/devices/{}/errors'.format(gateway_id)
224+
client.subscribe(error_topic, qos=0)
225+
226+
# Wait for about a minute for config messages.
227+
for i in range(1, duration):
228+
client.loop()
229+
if cb is not None:
230+
cb(client)
231+
232+
if should_backoff:
233+
# If backoff time is too large, give up.
234+
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
235+
print('Exceeded maximum backoff time. Giving up.')
236+
break
237+
238+
delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
239+
time.sleep(delay)
240+
minimum_backoff_time *= 2
241+
client.connect(mqtt_bridge_hostname, mqtt_bridge_port)
242+
243+
seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
244+
if seconds_since_issue > 60 * jwt_exp_mins:
245+
print('Refreshing token after {}s').format(seconds_since_issue)
246+
jwt_iat = datetime.datetime.utcnow()
247+
client = get_client(
248+
project_id, cloud_region, registry_id, gateway_id,
249+
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
250+
mqtt_bridge_port)
251+
252+
time.sleep(1)
253+
254+
detach_device(client, device_id)
255+
256+
print('Finished.')
257+
# [END listen_for_messages]
258+
259+
260+
def send_data_from_bound_device(
261+
service_account_json, project_id, cloud_region, registry_id, device_id,
262+
gateway_id, num_messages, private_key_file, algorithm, ca_certs,
263+
mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, payload):
264+
"""Sends data from a gateway on behalf of a device that is bound to it."""
265+
# [START send_data_from_bound_device]
266+
global minimum_backoff_time
267+
268+
# Publish device events and gateway state.
269+
device_topic = '/devices/{}/{}'.format(device_id, 'state')
270+
gateway_topic = '/devices/{}/{}'.format(gateway_id, 'state')
271+
272+
jwt_iat = datetime.datetime.utcnow()
273+
jwt_exp_mins = jwt_expires_minutes
274+
# Use gateway to connect to server
275+
client = get_client(
276+
project_id, cloud_region, registry_id, gateway_id,
277+
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
278+
mqtt_bridge_port)
279+
280+
attach_device(client, device_id, '')
281+
print('Waiting for device to attach.')
282+
time.sleep(5)
283+
284+
# Publish state to gateway topic
285+
gateway_state = 'Starting gateway at: {}'.format(time.time())
286+
print(gateway_state)
287+
client.publish(gateway_topic, gateway_state, qos=1)
288+
289+
# Publish num_messages mesages to the MQTT bridge
290+
for i in range(1, num_messages + 1):
291+
client.loop()
292+
293+
if should_backoff:
294+
# If backoff time is too large, give up.
295+
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
296+
print('Exceeded maximum backoff time. Giving up.')
297+
break
298+
299+
delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
300+
time.sleep(delay)
301+
minimum_backoff_time *= 2
302+
client.connect(mqtt_bridge_hostname, mqtt_bridge_port)
303+
304+
payload = '{}/{}-{}-payload-{}'.format(
305+
registry_id, gateway_id, device_id, i)
306+
307+
print('Publishing message {}/{}: \'{}\' to {}'.format(
308+
i, num_messages, payload, device_topic))
309+
client.publish(
310+
device_topic, '{} : {}'.format(device_id, payload), qos=1)
311+
312+
seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
313+
if seconds_since_issue > 60 * jwt_exp_mins:
314+
print('Refreshing token after {}s').format(seconds_since_issue)
315+
jwt_iat = datetime.datetime.utcnow()
316+
client = get_client(
317+
project_id, cloud_region, registry_id, gateway_id,
318+
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
319+
mqtt_bridge_port)
320+
321+
time.sleep(5)
322+
323+
detach_device(client, device_id)
324+
325+
print('Finished.')
326+
# [END send_data_from_bound_device]
327+
328+
172329
def parse_command_line_args():
173330
"""Parse command line arguments."""
174331
parser = argparse.ArgumentParser(description=(
175332
'Example Google Cloud IoT Core MQTT device connection code.'))
176-
parser.add_argument(
177-
'--project_id',
178-
default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
179-
help='GCP cloud project name')
180-
parser.add_argument(
181-
'--registry_id', required=True, help='Cloud IoT Core registry id')
182-
parser.add_argument(
183-
'--device_id', required=True, help='Cloud IoT Core device id')
184-
parser.add_argument(
185-
'--private_key_file',
186-
required=True, help='Path to private key file.')
187333
parser.add_argument(
188334
'--algorithm',
189335
choices=('RS256', 'ES256'),
190336
required=True,
191337
help='Which encryption algorithm to use to generate the JWT.')
192-
parser.add_argument(
193-
'--cloud_region', default='us-central1', help='GCP cloud region')
194338
parser.add_argument(
195339
'--ca_certs',
196340
default='roots.pem',
197341
help=('CA root from https://pki.google.com/roots.pem'))
198342
parser.add_argument(
199-
'--num_messages',
343+
'--cloud_region', default='us-central1', help='GCP cloud region')
344+
parser.add_argument(
345+
'--data',
346+
default='Hello there',
347+
help='The telemetry data sent on behalf of a device')
348+
parser.add_argument(
349+
'--device_id', required=True, help='Cloud IoT Core device id')
350+
parser.add_argument(
351+
'--gateway_id', required=False, help='Gateway identifier.')
352+
parser.add_argument(
353+
'--jwt_expires_minutes',
354+
default=20,
200355
type=int,
201-
default=100,
202-
help='Number of messages to publish.')
356+
help=('Expiration time, in minutes, for JWT tokens.'))
357+
parser.add_argument(
358+
'--listen_dur',
359+
default=60,
360+
type=int,
361+
help='Duration (seconds) to listen for configuration messages')
203362
parser.add_argument(
204363
'--message_type',
205364
choices=('event', 'state'),
@@ -217,19 +376,48 @@ def parse_command_line_args():
217376
type=int,
218377
help='MQTT bridge port.')
219378
parser.add_argument(
220-
'--jwt_expires_minutes',
221-
default=20,
379+
'--num_messages',
222380
type=int,
223-
help=('Expiration time, in minutes, for JWT tokens.'))
381+
default=100,
382+
help='Number of messages to publish.')
383+
parser.add_argument(
384+
'--private_key_file',
385+
required=True,
386+
help='Path to private key file.')
387+
parser.add_argument(
388+
'--project_id',
389+
default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
390+
help='GCP cloud project name')
391+
parser.add_argument(
392+
'--registry_id', required=True, help='Cloud IoT Core registry id')
393+
parser.add_argument(
394+
'--service_account_json',
395+
default=os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"),
396+
help='Path to service account json file.')
397+
398+
# Command subparser
399+
command = parser.add_subparsers(dest='command')
400+
401+
command.add_parser(
402+
'device_demo',
403+
help=mqtt_device_demo.__doc__)
404+
405+
command.add_parser(
406+
'gateway_send',
407+
help=send_data_from_bound_device.__doc__)
408+
409+
command.add_parser(
410+
'gateway_listen',
411+
help=listen_for_messages.__doc__)
224412

225413
return parser.parse_args()
226414

227415

228-
# [START iot_mqtt_run]
229-
def main():
416+
def mqtt_device_demo(args):
417+
"""Connects a device, sends data, and receives data."""
418+
# [START iot_mqtt_run]
230419
global minimum_backoff_time
231-
232-
args = parse_command_line_args()
420+
global MAXIMUM_BACKOFF_TIME
233421

234422
# Publish to the events or state topic based on the flag.
235423
sub_topic = 'events' if args.message_type == 'event' else 'state'
@@ -239,9 +427,9 @@ def main():
239427
jwt_iat = datetime.datetime.utcnow()
240428
jwt_exp_mins = args.jwt_expires_minutes
241429
client = get_client(
242-
args.project_id, args.cloud_region, args.registry_id, args.device_id,
243-
args.private_key_file, args.algorithm, args.ca_certs,
244-
args.mqtt_bridge_hostname, args.mqtt_bridge_port)
430+
args.project_id, args.cloud_region, args.registry_id,
431+
args.device_id, args.private_key_file, args.algorithm,
432+
args.ca_certs, args.mqtt_bridge_hostname, args.mqtt_bridge_port)
245433

246434
# Publish num_messages mesages to the MQTT bridge once per second.
247435
for i in range(1, args.num_messages + 1):
@@ -284,9 +472,32 @@ def main():
284472

285473
# Send events every second. State should not be updated as often
286474
time.sleep(1 if args.message_type == 'event' else 5)
475+
# [END iot_mqtt_run]
476+
287477

478+
def main():
479+
args = parse_command_line_args()
480+
481+
if args.command == 'gateway_listen':
482+
listen_for_messages(
483+
args.service_account_json, args.project_id,
484+
args.cloud_region, args.registry_id, args.device_id,
485+
args.gateway_id, args.num_messages, args.private_key_file,
486+
args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
487+
args.mqtt_bridge_port, args.jwt_expires_minutes,
488+
args.listen_dur)
489+
return
490+
elif args.command == 'gateway_send':
491+
send_data_from_bound_device(
492+
args.service_account_json, args.project_id,
493+
args.cloud_region, args.registry_id, args.device_id,
494+
args.gateway_id, args.num_messages, args.private_key_file,
495+
args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
496+
args.mqtt_bridge_port, args.jwt_expires_minutes, args.data)
497+
return
498+
else:
499+
mqtt_device_demo(args)
288500
print('Finished.')
289-
# [END iot_mqtt_run]
290501

291502

292503
if __name__ == '__main__':

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.