Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b8f73b6

Browse filesBrowse files
gguussandrewsg
authored andcommitted
MQTT Exponential backoff and manager updates (GoogleCloudPlatform#1345)
1 parent 9240951 commit b8f73b6
Copy full SHA for b8f73b6

File tree

Expand file treeCollapse file tree

4 files changed

+62
-10
lines changed
Filter options
Expand file treeCollapse file tree

4 files changed

+62
-10
lines changed

‎iot/api-client/manager/README.rst

Copy file name to clipboardExpand all lines: iot/api-client/manager/README.rst
+13-3Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ To run this sample:
7777
[--project_id PROJECT_ID] [--registry_id REGISTRY_ID]
7878
[--rsa_certificate_file RSA_CERTIFICATE_FILE]
7979
[--service_account_json SERVICE_ACCOUNT_JSON]
80-
[--version VERSION]
81-
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config}
80+
[--version VERSION] [--member MEMBER] [--role ROLE]
81+
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-config-versions,get-iam-permissions,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config,set-iam-permissions}
8282
...
8383
8484
Example of using the Google Cloud IoT Core device manager to administer
@@ -95,7 +95,7 @@ To run this sample:
9595
list
9696
9797
positional arguments:
98-
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config}
98+
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-config-versions,get-iam-permissions,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config,set-iam-permissions}
9999
create-es256 Create a new device with the given id, using ES256 for
100100
authentication.
101101
create-registry Gets or creates a device registry.
@@ -107,6 +107,11 @@ To run this sample:
107107
delete-device Delete the device with the given id.
108108
delete-registry Deletes the specified registry.
109109
get Retrieve the device with the given id.
110+
get-config-versions
111+
Lists versions of a device config in descending order
112+
(newest first).
113+
get-iam-permissions
114+
Retrieves IAM permissions for the given registry.
110115
get-registry Retrieves a device registry.
111116
get-state Retrieve a device's state blobs.
112117
list List all devices in the registry.
@@ -117,6 +122,9 @@ To run this sample:
117122
device.
118123
set-config Patch the device to add an RSA256 public key to the
119124
device.
125+
set-iam-permissions
126+
Sets IAM permissions for the given registry to a
127+
single role/member.
120128
121129
optional arguments:
122130
-h, --help show this help message and exit
@@ -139,6 +147,8 @@ To run this sample:
139147
--service_account_json SERVICE_ACCOUNT_JSON
140148
Path to service account json file.
141149
--version VERSION Version number for setting device configuration.
150+
--member MEMBER Member used for IAM commands.
151+
--role ROLE Role used for IAM commands.
142152
143153
144154

‎iot/api-client/manager/manager.py

Copy file name to clipboardExpand all lines: iot/api-client/manager/manager.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ def get_iam_permissions(
438438
def set_iam_permissions(
439439
service_account_json, project_id, cloud_region, registry_id, role,
440440
member):
441-
"""Retrieves IAM permissions for the given registry."""
441+
"""Sets IAM permissions for the given registry to a single role/member."""
442442
client = get_client(service_account_json)
443443

444444
registry_path = 'projects/{}/locations/{}/registries/{}'.format(

‎iot/api-client/mqtt_example/cloudiot_mqtt_example.py

Copy file name to clipboardExpand all lines: iot/api-client/mqtt_example/cloudiot_mqtt_example.py
+41-6Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,23 @@
2424
import argparse
2525
import datetime
2626
import os
27+
import random
2728
import time
2829

2930
import jwt
3031
import paho.mqtt.client as mqtt
3132

3233

34+
# The initial backoff time after a disconnection occurs, in seconds.
35+
minimum_backoff_time = 1
36+
37+
# The maximum backoff time before giving up, in seconds.
38+
MAXIMUM_BACKOFF_TIME = 32
39+
40+
# Whether to wait with exponential backoff before publishing.
41+
should_backoff = False
42+
43+
3344
# [START iot_mqtt_jwt]
3445
def create_jwt(project_id, private_key_file, algorithm):
3546
"""Creates a JWT (https://jwt.io) to establish an MQTT connection.
@@ -76,11 +87,22 @@ def on_connect(unused_client, unused_userdata, unused_flags, rc):
7687
"""Callback for when a device connects."""
7788
print('on_connect', mqtt.connack_string(rc))
7889

90+
# After a successful connect, reset backoff time and stop backing off.
91+
global should_backoff
92+
global minimum_backoff_time
93+
should_backoff = False
94+
minimum_backoff_time = 1
95+
7996

8097
def on_disconnect(unused_client, unused_userdata, rc):
8198
"""Paho callback for when a device disconnects."""
8299
print('on_disconnect', error_str(rc))
83100

101+
# Since a disconnect occurred, the next loop iteration will wait with
102+
# exponential backoff.
103+
global should_backoff
104+
should_backoff = True
105+
84106

85107
def on_publish(unused_client, unused_userdata, unused_mid):
86108
"""Paho callback when a message is sent to the broker."""
@@ -134,9 +156,6 @@ def get_client(
134156
# Subscribe to the config topic.
135157
client.subscribe(mqtt_config_topic, qos=1)
136158

137-
# Start the network loop.
138-
client.loop_start()
139-
140159
return client
141160
# [END iot_mqtt_config]
142161

@@ -199,6 +218,8 @@ def parse_command_line_args():
199218

200219
# [START iot_mqtt_run]
201220
def main():
221+
global minimum_backoff_time
222+
202223
args = parse_command_line_args()
203224

204225
# Publish to the events or state topic based on the flag.
@@ -215,6 +236,23 @@ def main():
215236

216237
# Publish num_messages mesages to the MQTT bridge once per second.
217238
for i in range(1, args.num_messages + 1):
239+
# Process network events.
240+
client.loop()
241+
242+
# Wait if backoff is required.
243+
if should_backoff:
244+
# If backoff time is too large, give up.
245+
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
246+
print('Exceeded maximum backoff time. Giving up.')
247+
break
248+
249+
# Otherwise, wait and connect again.
250+
delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
251+
print('Waiting for {} before reconnecting.'.format(delay))
252+
time.sleep(delay)
253+
minimum_backoff_time *= 2
254+
client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)
255+
218256
payload = '{}/{}-payload-{}'.format(
219257
args.registry_id, args.device_id, i)
220258
print('Publishing message {}/{}: \'{}\''.format(
@@ -223,7 +261,6 @@ def main():
223261
seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
224262
if seconds_since_issue > 60 * jwt_exp_mins:
225263
print('Refreshing token after {}s').format(seconds_since_issue)
226-
client.loop_stop()
227264
jwt_iat = datetime.datetime.utcnow()
228265
client = get_client(
229266
args.project_id, args.cloud_region,
@@ -239,8 +276,6 @@ def main():
239276
# Send events every second. State should not be updated as often
240277
time.sleep(1 if args.message_type == 'event' else 5)
241278

242-
# End the network loop and finish.
243-
client.loop_stop()
244279
print('Finished.')
245280
# [END iot_mqtt_run]
246281

‎iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py

Copy file name to clipboardExpand all lines: iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py
+7Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def test_event(test_topic, capsys):
7474
rsa_private_path, 'RS256', ca_cert_path,
7575
'mqtt.googleapis.com', 443)
7676

77+
client.loop_start()
7778
client.publish(mqtt_topic, 'just test', qos=1)
7879
time.sleep(2)
7980
client.loop_stop()
@@ -115,7 +116,10 @@ def test_state(test_topic, capsys):
115116
rsa_private_path, 'RS256', ca_cert_path,
116117
'mqtt.googleapis.com', 443)
117118
client.publish(mqtt_topic, 'state test', qos=1)
119+
client.loop_start()
120+
118121
time.sleep(3)
122+
119123
client.loop_stop()
120124

121125
manager.get_state(
@@ -152,7 +156,10 @@ def test_config(test_topic, capsys):
152156
project_id, cloud_region, registry_id, device_id,
153157
rsa_private_path, 'RS256', ca_cert_path,
154158
'mqtt.googleapis.com', 443)
159+
client.loop_start()
160+
155161
time.sleep(5)
162+
156163
client.loop_stop()
157164

158165
manager.get_state(

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.