Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[monitoring] fix: use retrying module in the fixture class #3285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 1 monitoring/api/v3/alerts-client/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pytest==5.3.2
retrying==1.3.3
107 changes: 74 additions & 33 deletions 107 monitoring/api/v3/alerts-client/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import random
import string

from google.api_core.exceptions import Aborted
from google.cloud import monitoring_v3
import google.protobuf.json_format
import pytest
from retrying import retry

import snippets

Expand All @@ -29,6 +31,10 @@ def random_name(length):
[random.choice(string.ascii_lowercase) for i in range(length)])


def retry_if_aborted(exception):
return isinstance(exception, Aborted)


class PochanFixture:
"""A test fixture that creates an alert POlicy and a notification CHANnel,
hence the name, pochan.
Expand All @@ -42,30 +48,40 @@ def __init__(self):
monitoring_v3.NotificationChannelServiceClient())

def __enter__(self):
# Create a policy.
policy = monitoring_v3.types.alert_pb2.AlertPolicy()
json = open('test_alert_policy.json').read()
google.protobuf.json_format.Parse(json, policy)
policy.display_name = 'snippets-test-' + random_name(10)
self.alert_policy = self.alert_policy_client.create_alert_policy(
self.project_name, policy)
# Create a notification channel.
notification_channel = (
monitoring_v3.types.notification_pb2.NotificationChannel())
json = open('test_notification_channel.json').read()
google.protobuf.json_format.Parse(json, notification_channel)
notification_channel.display_name = 'snippets-test-' + random_name(10)
self.notification_channel = (
self.notification_channel_client.create_notification_channel(
self.project_name, notification_channel))
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_attempt_number=5, retry_on_exception=retry_if_aborted)
def setup():
# Create a policy.
policy = monitoring_v3.types.alert_pb2.AlertPolicy()
json = open('test_alert_policy.json').read()
google.protobuf.json_format.Parse(json, policy)
policy.display_name = 'snippets-test-' + random_name(10)
self.alert_policy = self.alert_policy_client.create_alert_policy(
self.project_name, policy)
# Create a notification channel.
notification_channel = (
monitoring_v3.types.notification_pb2.NotificationChannel())
json = open('test_notification_channel.json').read()
google.protobuf.json_format.Parse(json, notification_channel)
notification_channel.display_name = (
'snippets-test-' + random_name(10))
self.notification_channel = (
self.notification_channel_client.create_notification_channel(
self.project_name, notification_channel))
setup()
return self

def __exit__(self, type, value, traceback):
# Delete the policy and channel we created.
self.alert_policy_client.delete_alert_policy(self.alert_policy.name)
if self.notification_channel.name:
self.notification_channel_client.delete_notification_channel(
self.notification_channel.name)
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_attempt_number=5, retry_on_exception=retry_if_aborted)
def teardown():
self.alert_policy_client.delete_alert_policy(
self.alert_policy.name)
if self.notification_channel.name:
self.notification_channel_client.delete_notification_channel(
self.notification_channel.name)
teardown()


@pytest.fixture(scope='session')
Expand All @@ -81,36 +97,55 @@ def test_list_alert_policies(capsys, pochan):


def test_enable_alert_policies(capsys, pochan):
snippets.enable_alert_policies(pochan.project_name, False)
out, _ = capsys.readouterr()
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_attempt_number=5, retry_on_exception=retry_if_aborted)
def invoke_sample(val):
snippets.enable_alert_policies(pochan.project_name, val)

snippets.enable_alert_policies(pochan.project_name, False)
invoke_sample(False)
invoke_sample(False)
out, _ = capsys.readouterr()
assert "already disabled" in out

snippets.enable_alert_policies(pochan.project_name, True)
invoke_sample(True)
out, _ = capsys.readouterr()
assert "Enabled {0}".format(pochan.project_name) in out

snippets.enable_alert_policies(pochan.project_name, True)
invoke_sample(True)
out, _ = capsys.readouterr()
assert "already enabled" in out


def test_replace_channels(capsys, pochan):
alert_policy_id = pochan.alert_policy.name.split('/')[-1]
notification_channel_id = pochan.notification_channel.name.split('/')[-1]
snippets.replace_notification_channels(
pochan.project_name, alert_policy_id, [notification_channel_id])
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_attempt_number=5, retry_on_exception=retry_if_aborted)
def invoke_sample():
alert_policy_id = pochan.alert_policy.name.split('/')[-1]
notification_channel_id = pochan.notification_channel.name.split(
'/')[-1]
snippets.replace_notification_channels(
pochan.project_name, alert_policy_id, [notification_channel_id])

invoke_sample()
out, _ = capsys.readouterr()
assert "Updated {0}".format(pochan.alert_policy.name) in out


def test_backup_and_restore(capsys, pochan):
snippets.backup(pochan.project_name, 'backup.json')
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_attempt_number=5, retry_on_exception=retry_if_aborted)
def invoke_backup():
snippets.backup(pochan.project_name, 'backup.json')

invoke_backup()
out, _ = capsys.readouterr()

snippets.restore(pochan.project_name, 'backup.json')
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_attempt_number=5, retry_on_exception=retry_if_aborted)
def invoke_restore():
snippets.restore(pochan.project_name, 'backup.json')

invoke_restore()
out, _ = capsys.readouterr()
assert "Updated {0}".format(pochan.alert_policy.name) in out
assert "Updating channel {0}".format(
Expand All @@ -119,8 +154,14 @@ def test_backup_and_restore(capsys, pochan):

def test_delete_channels(capsys, pochan):
notification_channel_id = pochan.notification_channel.name.split('/')[-1]
snippets.delete_notification_channels(
pochan.project_name, [notification_channel_id], force=True)

@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_attempt_number=5, retry_on_exception=retry_if_aborted)
def invoke_delete():
snippets.delete_notification_channels(
pochan.project_name, [notification_channel_id], force=True)

invoke_delete()
out, _ = capsys.readouterr()
assert "{0} deleted".format(notification_channel_id) in out
pochan.notification_channel.name = '' # So teardown is not tried
Morty Proxy This is a proxified and sanitized view of the page, visit original site.