19
19
import time
20
20
21
21
import backoff
22
- from google .cloud import logging
22
+ from google .cloud import logging , storage
23
23
import pytest
24
24
25
25
import export
34
34
# old sink, in seconds
35
35
CLEANUP_THRESHOLD = 7200 # 2 hours
36
36
37
+ # Max buckets to delete at a time, to mitigate operation timeout
38
+ # issues. To turn off in the future, set to None.
39
+ MAX_BUCKETS = 1500
40
+
37
41
38
42
def _random_id ():
39
43
return "" .join (
@@ -46,8 +50,8 @@ def _create_sink_name():
46
50
47
51
48
52
@backoff .on_exception (backoff .expo , Exception , max_time = 60 , raise_on_giveup = False )
49
- def _delete_sink ( sink ):
50
- sink .delete ()
53
+ def _delete_object ( obj ):
54
+ obj .delete ()
51
55
52
56
53
57
# Runs once for entire test suite
@@ -62,7 +66,20 @@ def cleanup_old_sinks():
62
66
if match :
63
67
sink_timestamp = int (match .group (1 ))
64
68
if TIMESTAMP - sink_timestamp > CLEANUP_THRESHOLD :
65
- _delete_sink (sink )
69
+ _delete_object (sink )
70
+
71
+ storage_client = storage .Client ()
72
+
73
+ # See _sink_storage_setup in usage_guide.py for details about how
74
+ # sinks are named.
75
+ test_bucket_name_regex = r"^sink\-storage\-(\d+)$"
76
+ for bucket in storage_client .list_buckets (max_results = MAX_BUCKETS ):
77
+ match = re .match (test_bucket_name_regex , bucket .name )
78
+ if match :
79
+ # Bucket timestamp is int(time.time() * 1000)
80
+ bucket_timestamp = int (match .group (1 ))
81
+ if TIMESTAMP - bucket_timestamp // 1000 > CLEANUP_THRESHOLD :
82
+ _delete_object (bucket )
66
83
67
84
68
85
@pytest .fixture
@@ -79,7 +96,7 @@ def example_sink(cleanup_old_sinks):
79
96
80
97
yield sink
81
98
82
- _delete_sink (sink )
99
+ _delete_object (sink )
83
100
84
101
85
102
def test_list (example_sink , capsys ):
@@ -99,7 +116,7 @@ def test_create(capsys):
99
116
export .create_sink (sink_name , BUCKET , TEST_SINK_FILTER )
100
117
# Clean-up the temporary sink.
101
118
finally :
102
- _delete_sink (logging .Client ().sink (sink_name ))
119
+ _delete_object (logging .Client ().sink (sink_name ))
103
120
104
121
out , _ = capsys .readouterr ()
105
122
assert sink_name in out
0 commit comments