14
14
# See the License for the specific language governing permissions and
15
15
# limitations under the License.
16
16
17
- """Command-line app to perform synchronous queries with parameters in BigQuery.
17
+ """Command-line app to perform queries with parameters in BigQuery.
18
18
19
19
For more information, see the README.md under /bigquery.
20
20
21
21
Example invocation:
22
- $ python sync_query_params .py --use-named-params 'romeoandjuliet' 100
23
- $ python sync_query_params .py --use-positional-params 'romeoandjuliet' 100
22
+ $ python query_params .py --use-named-params 'romeoandjuliet' 100
23
+ $ python query_params .py --use-positional-params 'romeoandjuliet' 100
24
24
"""
25
25
26
26
import argparse
27
27
import datetime
28
+ import time
29
+ import uuid
28
30
29
31
from google .cloud import bigquery
30
32
import pytz
31
33
32
34
35
+ def wait_for_job (job ):
36
+ while True :
37
+ job .reload () # Refreshes the state via a GET request.
38
+ if job .state == 'DONE' :
39
+ if job .error_result :
40
+ raise RuntimeError (job .errors )
41
+ return
42
+ time .sleep (1 )
43
+
44
+
33
45
def print_results (query_results ):
34
46
"""Print the query results by requesting a page at a time."""
35
47
page_token = None
@@ -46,15 +58,16 @@ def print_results(query_results):
46
58
break
47
59
48
60
49
- def sync_query_positional_params (corpus , min_word_count ):
61
+ def query_positional_params (corpus , min_word_count ):
50
62
client = bigquery .Client ()
51
63
query = """SELECT word, word_count
52
64
FROM `bigquery-public-data.samples.shakespeare`
53
65
WHERE corpus = ?
54
66
AND word_count >= ?
55
67
ORDER BY word_count DESC;
56
68
"""
57
- query_results = client .run_sync_query (
69
+ query_job = client .run_async_query (
70
+ str (uuid .uuid4 ()),
58
71
query ,
59
72
query_parameters = (
60
73
bigquery .ScalarQueryParameter (
@@ -68,33 +81,40 @@ def sync_query_positional_params(corpus, min_word_count):
68
81
69
82
# Only standard SQL syntax supports parameters in queries.
70
83
# See: https://cloud.google.com/bigquery/sql-reference/
71
- query_results .use_legacy_sql = False
72
- query_results .run ()
73
- print_results (query_results )
84
+ query_job .use_legacy_sql = False
85
+
86
+ # Start the query and wait for the job to complete.
87
+ query_job .begin ()
88
+ wait_for_job (query_job )
89
+ print_results (query_job .results ())
74
90
75
91
76
- def sync_query_named_params (corpus , min_word_count ):
92
+ def query_named_params (corpus , min_word_count ):
77
93
client = bigquery .Client ()
78
94
query = """SELECT word, word_count
79
95
FROM `bigquery-public-data.samples.shakespeare`
80
96
WHERE corpus = @corpus
81
97
AND word_count >= @min_word_count
82
98
ORDER BY word_count DESC;
83
99
"""
84
- query_results = client .run_sync_query (
100
+ query_job = client .run_async_query (
101
+ str (uuid .uuid4 ()),
85
102
query ,
86
103
query_parameters = (
87
104
bigquery .ScalarQueryParameter ('corpus' , 'STRING' , corpus ),
88
105
bigquery .ScalarQueryParameter (
89
106
'min_word_count' ,
90
107
'INT64' ,
91
108
min_word_count )))
92
- query_results .use_legacy_sql = False
93
- query_results .run ()
94
- print_results (query_results )
109
+ query_job .use_legacy_sql = False
95
110
111
+ # Start the query and wait for the job to complete.
112
+ query_job .begin ()
113
+ wait_for_job (query_job )
114
+ print_results (query_job .results ())
96
115
97
- def sync_query_array_params (gender , states ):
116
+
117
+ def query_array_params (gender , states ):
98
118
client = bigquery .Client ()
99
119
query = """SELECT name, sum(number) as count
100
120
FROM `bigquery-public-data.usa_names.usa_1910_2013`
@@ -104,45 +124,57 @@ def sync_query_array_params(gender, states):
104
124
ORDER BY count DESC
105
125
LIMIT 10;
106
126
"""
107
- query_results = client .run_sync_query (
127
+ query_job = client .run_async_query (
128
+ str (uuid .uuid4 ()),
108
129
query ,
109
130
query_parameters = (
110
131
bigquery .ScalarQueryParameter ('gender' , 'STRING' , gender ),
111
132
bigquery .ArrayQueryParameter ('states' , 'STRING' , states )))
112
- query_results .use_legacy_sql = False
113
- query_results .run ()
114
- print_results (query_results )
133
+ query_job .use_legacy_sql = False
134
+
135
+ # Start the query and wait for the job to complete.
136
+ query_job .begin ()
137
+ wait_for_job (query_job )
138
+ print_results (query_job .results ())
115
139
116
140
117
- def sync_query_timestamp_params (year , month , day , hour , minute ):
141
+ def query_timestamp_params (year , month , day , hour , minute ):
118
142
client = bigquery .Client ()
119
143
query = 'SELECT TIMESTAMP_ADD(@ts_value, INTERVAL 1 HOUR);'
120
- query_results = client .run_sync_query (
144
+ query_job = client .run_async_query (
145
+ str (uuid .uuid4 ()),
121
146
query ,
122
147
query_parameters = [
123
148
bigquery .ScalarQueryParameter (
124
149
'ts_value' ,
125
150
'TIMESTAMP' ,
126
151
datetime .datetime (
127
152
year , month , day , hour , minute , tzinfo = pytz .UTC ))])
128
- query_results .use_legacy_sql = False
129
- query_results .run ()
130
- print_results (query_results )
153
+ query_job .use_legacy_sql = False
131
154
155
+ # Start the query and wait for the job to complete.
156
+ query_job .begin ()
157
+ wait_for_job (query_job )
158
+ print_results (query_job .results ())
132
159
133
- def sync_query_struct_params (x , y ):
160
+
161
+ def query_struct_params (x , y ):
134
162
client = bigquery .Client ()
135
163
query = 'SELECT @struct_value AS s;'
136
- query_results = client .run_sync_query (
164
+ query_job = client .run_async_query (
165
+ str (uuid .uuid4 ()),
137
166
query ,
138
167
query_parameters = [
139
168
bigquery .StructQueryParameter (
140
169
'struct_value' ,
141
170
bigquery .ScalarQueryParameter ('x' , 'INT64' , x ),
142
171
bigquery .ScalarQueryParameter ('y' , 'STRING' , y ))])
143
- query_results .use_legacy_sql = False
144
- query_results .run ()
145
- print_results (query_results )
172
+ query_job .use_legacy_sql = False
173
+
174
+ # Start the query and wait for the job to complete.
175
+ query_job .begin ()
176
+ wait_for_job (query_job )
177
+ print_results (query_job .results ())
146
178
147
179
148
180
if __name__ == '__main__' :
@@ -197,15 +229,15 @@ def sync_query_struct_params(x, y):
197
229
args = parser .parse_args ()
198
230
199
231
if args .sample == 'named' :
200
- sync_query_named_params (args .corpus , args .min_word_count )
232
+ query_named_params (args .corpus , args .min_word_count )
201
233
elif args .sample == 'positional' :
202
- sync_query_positional_params (args .corpus , args .min_word_count )
234
+ query_positional_params (args .corpus , args .min_word_count )
203
235
elif args .sample == 'array' :
204
- sync_query_array_params (args .gender , args .states )
236
+ query_array_params (args .gender , args .states )
205
237
elif args .sample == 'timestamp' :
206
- sync_query_timestamp_params (
238
+ query_timestamp_params (
207
239
args .year , args .month , args .day , args .hour , args .minute )
208
240
elif args .sample == 'struct' :
209
- sync_query_struct_params (args .x , args .y )
241
+ query_struct_params (args .x , args .y )
210
242
else :
211
243
print ('Unexpected value for sample' )
0 commit comments