Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 776cef0

Browse filesBrowse files
jerjoudpebot
authored andcommitted
Minute workaround (GoogleCloudPlatform#834)
* Working draft of continuous listening. * Clean up continuous transcription. * Indefinite transcoding of utterances * Set WRAP_IT_UP to something more obvious. * Reduce amount of audio lost between utterances * Add audio overlap. * Remove utterance sample, so there's only one. * Add tests for minute-workaround. * PR feedback.
1 parent 644fdcb commit 776cef0
Copy full SHA for 776cef0

File tree

Expand file treeCollapse file tree

3 files changed

+393
-0
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+393
-0
lines changed
+298Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
#!/usr/bin/python
2+
3+
# Copyright (C) 2016 Google Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""Sample that streams audio to the Google Cloud Speech API via GRPC.
18+
19+
This sample expands on transcribe_streaming.py to work around the 1-minute
20+
limit on streaming requests. It does this by transcribing normally until
21+
WRAP_IT_UP_SECS seconds before the 1-minute limit. At that point, it waits for
22+
the end of an utterance and once it hears it, it closes the current stream and
23+
opens a new one. It also keeps a buffer of audio around while this is
24+
happening, that it sends to the new stream in its initial request, to minimize
25+
losing any speech that occurs while this happens.
26+
27+
Note that you could do this a little more simply by simply re-starting the
28+
stream after every utterance, though this increases the possibility of audio
29+
being missed between streams. For learning purposes (and robustness), the more
30+
complex implementation is shown here.
31+
"""
32+
33+
from __future__ import division
34+
35+
import argparse
36+
import collections
37+
import contextlib
38+
import functools
39+
import logging
40+
import re
41+
import signal
42+
import sys
43+
import time
44+
45+
import google.auth
46+
import google.auth.transport.grpc
47+
import google.auth.transport.requests
48+
from google.cloud.proto.speech.v1beta1 import cloud_speech_pb2
49+
from google.rpc import code_pb2
50+
import grpc
51+
import pyaudio
52+
from six.moves import queue
53+
54+
# Seconds you have to wrap up your utterance
55+
WRAP_IT_UP_SECS = 15
56+
SECS_OVERLAP = 1
57+
58+
# Audio recording parameters
59+
RATE = 16000
60+
CHUNK = int(RATE / 10) # 100ms
61+
62+
# The Speech API has a streaming limit of 60 seconds of audio*, so keep the
63+
# connection alive for that long, plus some more to give the API time to figure
64+
# out the transcription.
65+
# * https://g.co/cloud/speech/limits#content
66+
DEADLINE_SECS = 60 * 3 + 5
67+
SPEECH_SCOPE = 'https://www.googleapis.com/auth/cloud-platform'
68+
69+
70+
def make_channel(host):
71+
"""Creates a secure channel with auth credentials from the environment."""
72+
# Grab application default credentials from the environment
73+
credentials, _ = google.auth.default(scopes=[SPEECH_SCOPE])
74+
75+
# Create a secure channel using the credentials.
76+
http_request = google.auth.transport.requests.Request()
77+
78+
return google.auth.transport.grpc.secure_authorized_channel(
79+
credentials, http_request, host)
80+
81+
82+
def _audio_data_generator(buff, overlap_buffer):
83+
"""A generator that yields all available data in the given buffer.
84+
85+
Args:
86+
buff (Queue): A Queue where each element is a chunk of data.
87+
overlap_buffer (deque): a ring buffer for storing trailing data chunks
88+
Yields:
89+
bytes: A chunk of data that is the aggregate of all chunks of data in
90+
`buff`. The function will block until at least one data chunk is
91+
available.
92+
"""
93+
if overlap_buffer:
94+
yield b''.join(overlap_buffer)
95+
overlap_buffer.clear()
96+
97+
while True:
98+
# Use a blocking get() to ensure there's at least one chunk of data.
99+
data = [buff.get()]
100+
101+
# Now consume whatever other data's still buffered.
102+
while True:
103+
try:
104+
data.append(buff.get(block=False))
105+
except queue.Empty:
106+
break
107+
108+
# `None` in the buffer signals that we should stop generating. Put the
109+
# data back into the buffer for the next generator.
110+
if None in data:
111+
data.remove(None)
112+
if data:
113+
buff.put(b''.join(data))
114+
break
115+
else:
116+
overlap_buffer.extend(data)
117+
118+
yield b''.join(data)
119+
120+
121+
def _fill_buffer(buff, in_data, frame_count, time_info, status_flags):
122+
"""Continuously collect data from the audio stream, into the buffer."""
123+
buff.put(in_data)
124+
return None, pyaudio.paContinue
125+
126+
127+
# [START audio_stream]
128+
@contextlib.contextmanager
129+
def record_audio(rate, chunk):
130+
"""Opens a recording stream in a context manager."""
131+
# Create a thread-safe buffer of audio data
132+
buff = queue.Queue()
133+
134+
audio_interface = pyaudio.PyAudio()
135+
audio_stream = audio_interface.open(
136+
format=pyaudio.paInt16,
137+
# The API currently only supports 1-channel (mono) audio
138+
# https://goo.gl/z757pE
139+
channels=1, rate=rate,
140+
input=True, frames_per_buffer=chunk,
141+
# Run the audio stream asynchronously to fill the buffer object.
142+
# This is necessary so that the input device's buffer doesn't overflow
143+
# while the calling thread makes network requests, etc.
144+
stream_callback=functools.partial(_fill_buffer, buff),
145+
)
146+
147+
yield buff
148+
149+
audio_stream.stop_stream()
150+
audio_stream.close()
151+
# Signal the _audio_data_generator to finish
152+
buff.put(None)
153+
audio_interface.terminate()
154+
# [END audio_stream]
155+
156+
157+
def request_stream(data_stream, rate, interim_results=True):
158+
"""Yields `StreamingRecognizeRequest`s constructed from a recording audio
159+
stream.
160+
161+
Args:
162+
data_stream (generator): The raw audio data to send.
163+
rate (int): The sampling rate in hertz.
164+
interim_results (boolean): Whether to return intermediate results,
165+
before the transcription is finalized.
166+
"""
167+
# The initial request must contain metadata about the stream, so the
168+
# server knows how to interpret it.
169+
recognition_config = cloud_speech_pb2.RecognitionConfig(
170+
# There are a bunch of config options you can specify. See
171+
# https://goo.gl/KPZn97 for the full list.
172+
encoding='LINEAR16', # raw 16-bit signed LE samples
173+
sample_rate=rate, # the rate in hertz
174+
# See http://g.co/cloud/speech/docs/languages
175+
# for a list of supported languages.
176+
language_code='en-US', # a BCP-47 language tag
177+
)
178+
streaming_config = cloud_speech_pb2.StreamingRecognitionConfig(
179+
interim_results=interim_results,
180+
config=recognition_config,
181+
)
182+
183+
yield cloud_speech_pb2.StreamingRecognizeRequest(
184+
streaming_config=streaming_config)
185+
186+
for data in data_stream:
187+
# Subsequent requests can all just have the content
188+
yield cloud_speech_pb2.StreamingRecognizeRequest(audio_content=data)
189+
190+
191+
def listen_print_loop(
192+
recognize_stream, wrap_it_up_secs, buff, max_recog_secs=60):
193+
"""Iterates through server responses and prints them.
194+
195+
The recognize_stream passed is a generator that will block until a response
196+
is provided by the server. When the transcription response comes, print it.
197+
198+
In this case, responses are provided for interim results as well. If the
199+
response is an interim one, print a line feed at the end of it, to allow
200+
the next result to overwrite it, until the response is a final one. For the
201+
final one, print a newline to preserve the finalized transcription.
202+
"""
203+
# What time should we switch to a new stream?
204+
time_to_switch = time.time() + max_recog_secs - wrap_it_up_secs
205+
graceful_exit = False
206+
num_chars_printed = 0
207+
for resp in recognize_stream:
208+
if resp.error.code != code_pb2.OK:
209+
raise RuntimeError('Server error: ' + resp.error.message)
210+
211+
if not resp.results:
212+
if resp.endpointer_type is resp.END_OF_SPEECH and (
213+
time.time() > time_to_switch):
214+
graceful_exit = True
215+
buff.put(None)
216+
continue
217+
218+
# Display the top transcription
219+
result = resp.results[0]
220+
transcript = result.alternatives[0].transcript
221+
222+
# If the previous result was longer than this one, we need to print
223+
# some extra spaces to overwrite the previous result
224+
overwrite_chars = ' ' * max(0, num_chars_printed - len(transcript))
225+
226+
# Display interim results, but with a carriage return at the end of the
227+
# line, so subsequent lines will overwrite them.
228+
if not result.is_final:
229+
sys.stdout.write(transcript + overwrite_chars + '\r')
230+
sys.stdout.flush()
231+
232+
num_chars_printed = len(transcript)
233+
234+
else:
235+
print(transcript + overwrite_chars)
236+
237+
# Exit recognition if any of the transcribed phrases could be
238+
# one of our keywords.
239+
if re.search(r'\b(exit|quit)\b', transcript, re.I):
240+
print('Exiting..')
241+
recognize_stream.cancel()
242+
243+
elif graceful_exit:
244+
break
245+
246+
num_chars_printed = 0
247+
248+
249+
def main():
250+
service = cloud_speech_pb2.SpeechStub(
251+
make_channel('speech.googleapis.com'))
252+
253+
# For streaming audio from the microphone, there are three threads.
254+
# First, a thread that collects audio data as it comes in
255+
with record_audio(RATE, CHUNK) as buff:
256+
# Second, a thread that sends requests with that data
257+
overlap_buffer = collections.deque(
258+
maxlen=int(SECS_OVERLAP * RATE / CHUNK))
259+
requests = request_stream(
260+
_audio_data_generator(buff, overlap_buffer), RATE)
261+
# Third, a thread that listens for transcription responses
262+
recognize_stream = service.StreamingRecognize(
263+
requests, DEADLINE_SECS)
264+
265+
# Exit things cleanly on interrupt
266+
signal.signal(signal.SIGINT, lambda *_: recognize_stream.cancel())
267+
268+
# Now, put the transcription responses to use.
269+
try:
270+
while True:
271+
listen_print_loop(recognize_stream, WRAP_IT_UP_SECS, buff)
272+
273+
# Discard this stream and create a new one.
274+
# Note: calling .cancel() doesn't immediately raise an RpcError
275+
# - it only raises when the iterator's next() is requested
276+
recognize_stream.cancel()
277+
278+
logging.debug('Starting new stream')
279+
requests = request_stream(_audio_data_generator(
280+
buff, overlap_buffer), RATE)
281+
recognize_stream = service.StreamingRecognize(
282+
requests, DEADLINE_SECS)
283+
284+
except grpc.RpcError:
285+
# This happens because of the interrupt handler
286+
pass
287+
288+
289+
if __name__ == '__main__':
290+
parser = argparse.ArgumentParser()
291+
parser.add_argument(
292+
'-v', '--verbose', help='increase output verbosity',
293+
action='store_true')
294+
args = parser.parse_args()
295+
if args.verbose:
296+
logging.basicConfig(level=logging.DEBUG)
297+
298+
main()
+94Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright 2016, Google, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
import re
17+
import threading
18+
import time
19+
20+
import transcribe_streaming_minute as transcribe_streaming
21+
22+
23+
class MockPyAudio(object):
24+
def __init__(self, *audio_filenames):
25+
self.audio_filenames = audio_filenames
26+
27+
def __call__(self, *args):
28+
return self
29+
30+
def open(self, stream_callback, *args, **kwargs):
31+
self.closed = threading.Event()
32+
self.stream_thread = threading.Thread(
33+
target=self.stream_audio, args=(
34+
self.audio_filenames, stream_callback, self.closed))
35+
self.stream_thread.start()
36+
return self
37+
38+
def close(self):
39+
self.closed.set()
40+
41+
def stop_stream(self):
42+
pass
43+
44+
def terminate(self):
45+
pass
46+
47+
@staticmethod
48+
def stream_audio(audio_filenames, callback, closed, num_frames=512):
49+
# audio is 16-bit samples, whereas python byte is 8-bit
50+
num_bytes = 2 * num_frames
51+
# Approximate realtime by sleeping for the appropriate time for the
52+
# requested number of frames
53+
sleep_secs = num_frames / float(transcribe_streaming.RATE)
54+
55+
for audio_filename in audio_filenames:
56+
with open(audio_filename, 'rb') as audio_file:
57+
# While the audio stream hasn't been closed, give it chunks of
58+
# the audio file, until we run out of audio file.
59+
while not closed.is_set():
60+
chunk = audio_file.read(num_bytes)
61+
if not chunk:
62+
break
63+
time.sleep(sleep_secs)
64+
callback(chunk, None, None, None)
65+
else:
66+
break
67+
68+
# Ran out of audio data. Give a second of silence between files
69+
for _ in range(int(1 + 1 / sleep_secs)):
70+
if closed.is_set():
71+
break
72+
time.sleep(sleep_secs)
73+
callback(b'\0' * num_bytes, None, None, None)
74+
else:
75+
# No more audio left. Just give silence until we're done
76+
while not closed.is_set():
77+
time.sleep(sleep_secs)
78+
callback(b'\0' * num_bytes, None, None, None)
79+
80+
81+
def test_main(resource, monkeypatch, capsys, caplog):
82+
caplog.setLevel(logging.DEBUG)
83+
monkeypatch.setattr(
84+
transcribe_streaming.pyaudio, 'PyAudio',
85+
MockPyAudio(resource('audio.raw'), resource('quit.raw')))
86+
monkeypatch.setattr(
87+
transcribe_streaming, 'WRAP_IT_UP_SECS', 59)
88+
89+
transcribe_streaming.main()
90+
out, err = capsys.readouterr()
91+
92+
assert re.search(
93+
r'old is the.*quit', out, re.DOTALL | re.I)
94+
assert 'Starting new stream' in caplog.text()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.