Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 1c71119

Browse filesBrowse files
author
Jerjou Cheng
committed
Clean up & refactor of indefinite speech transcrib
1 parent c6f5f11 commit 1c71119
Copy full SHA for 1c71119

File tree

Expand file treeCollapse file tree

2 files changed

+143
-154
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+143
-154
lines changed

‎speech/cloud-client/transcribe_streaming_indefinite.py

Copy file name to clipboardExpand all lines: speech/cloud-client/transcribe_streaming_indefinite.py
+119-133Lines changed: 119 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@
2828
# [START import_libraries]
2929
from __future__ import division
3030

31+
import argparse
3132
import collections
3233
import itertools
3334
import re
3435
import sys
36+
import threading
37+
import time
3538

3639
from google.cloud import speech
3740
from google.cloud.speech import enums
@@ -40,64 +43,32 @@
4043
import grpc
4144
import pyaudio
4245
from six.moves import queue
46+
import six
47+
48+
import transcribe_streaming_mic
4349
# [END import_libraries]
4450

45-
# Audio recording parameters
46-
RATE = 16000
47-
CHUNK = int(RATE / 10) # 100ms
51+
52+
def duration_to_secs(duration):
53+
return duration.seconds + (duration.nanos / float(1e9))
4854

4955

50-
class MicrophoneStream(object):
56+
class ResumableMicrophoneStream(transcribe_streaming_mic.MicrophoneStream):
5157
"""Opens a recording stream as a generator yielding the audio chunks."""
5258
def __init__(self, rate, chunk_size, max_replay_secs=5):
53-
self._rate = rate
54-
self._chunk_size = chunk_size
59+
super(ResumableMicrophoneStream, self).__init__(rate, chunk_size)
5560
self._max_replay_secs = max_replay_secs
5661

57-
# Create a thread-safe buffer of audio data
58-
self._buff = queue.Queue()
59-
self.closed = True
60-
61-
def __enter__(self):
62-
num_channels = 1
63-
self._audio_interface = pyaudio.PyAudio()
64-
self._audio_stream = self._audio_interface.open(
65-
format=pyaudio.paInt16,
66-
# The API currently only supports 1-channel (mono) audio
67-
# https://goo.gl/z757pE
68-
channels=num_channels, rate=self._rate,
69-
input=True, frames_per_buffer=self._chunk_size,
70-
# Run the audio stream asynchronously to fill the buffer object.
71-
# This is necessary so that the input device's buffer doesn't
72-
# overflow while the calling thread makes network requests, etc.
73-
stream_callback=self._fill_buffer,
74-
)
75-
76-
self.closed = False
62+
# Some useful numbers
63+
# 2 bytes in 16 bit samples
64+
self._bytes_per_sample = 2 * self._num_channels
65+
self._bytes_per_second = self._rate * self._bytes_per_sample
7766

78-
bytes_per_sample = 2 * num_channels # 2 bytes in 16 bit samples
79-
self._bytes_per_second = self._rate * bytes_per_sample
80-
81-
bytes_per_chunk = (self._chunk_size * bytes_per_sample)
82-
chunks_per_second = self._bytes_per_second / bytes_per_chunk
67+
self._bytes_per_chunk = (self._chunk_size * self._bytes_per_sample)
68+
self._chunks_per_second = (
69+
self._bytes_per_second / self._bytes_per_chunk)
8370
self._untranscribed = collections.deque(
84-
maxlen=self._max_replay_secs * chunks_per_second)
85-
86-
return self
87-
88-
def __exit__(self, type, value, traceback):
89-
self._audio_stream.stop_stream()
90-
self._audio_stream.close()
91-
self.closed = True
92-
# Signal the generator to terminate so that the client's
93-
# streaming_recognize method will not block the process termination.
94-
self._buff.put(None)
95-
self._audio_interface.terminate()
96-
97-
def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
98-
"""Continuously collect data from the audio stream, into the buffer."""
99-
self._buff.put(in_data)
100-
return None, pyaudio.paContinue
71+
maxlen=self._max_replay_secs * self._chunks_per_second)
10172

10273
def on_transcribe(self, end_time):
10374
while self._untranscribed and end_time > self._untranscribed[0][1]:
@@ -106,145 +77,160 @@ def on_transcribe(self, end_time):
10677
def generator(self, resume=False):
10778
total_bytes_sent = 0
10879
if resume:
80+
# Make a copy, in case on_transcribe is called while yielding them
81+
catchup = list(self._untranscribed)
10982
# Yield all the untranscribed chunks first
110-
for chunk, _ in self._untranscribed:
83+
for chunk, _ in catchup:
11184
yield chunk
112-
while not self.closed:
113-
# Use a blocking get() to ensure there's at least one chunk of
114-
# data, and stop iteration if the chunk is None, indicating the
115-
# end of the audio stream.
116-
chunk = self._buff.get()
117-
if chunk is None:
118-
return
119-
data = [chunk]
120-
121-
# Now consume whatever other data's still buffered.
122-
while True:
123-
try:
124-
chunk = self._buff.get(block=False)
125-
if chunk is None:
126-
return
127-
data.append(chunk)
128-
except queue.Empty:
129-
break
130-
131-
byte_data = b''.join(data)
13285

86+
for byte_data in super(ResumableMicrophoneStream, self).generator():
13387
# Populate the replay buffer of untranscribed audio bytes
13488
total_bytes_sent += len(byte_data)
13589
chunk_end_time = total_bytes_sent / self._bytes_per_second
13690
self._untranscribed.append((byte_data, chunk_end_time))
13791

13892
yield byte_data
139-
# [END audio_stream]
14093

14194

142-
def duration_to_secs(duration):
143-
return duration.seconds + (duration.nanos / float(1e9))
95+
class SimulatedMicrophoneStream(ResumableMicrophoneStream):
96+
def __init__(self, audio_src, *args, **kwargs):
97+
super(SimulatedMicrophoneStream, self).__init__(*args, **kwargs)
98+
self._audio_src = audio_src
14499

100+
def _delayed(self, get_data):
101+
total_bytes_read = 0
102+
start_time = time.time()
145103

146-
def listen_print_loop(responses, stream):
147-
"""Iterates through server responses and prints them.
104+
chunk = get_data(self._bytes_per_chunk)
148105

149-
The responses passed is a generator that will block until a response
150-
is provided by the server.
106+
while chunk and not self.closed:
107+
total_bytes_read += len(chunk)
108+
expected_yield_time = start_time + (
109+
total_bytes_read / self._bytes_per_second)
110+
now = time.time()
111+
if expected_yield_time > now:
112+
time.sleep(expected_yield_time - now)
151113

152-
Each response may contain multiple results, and each result may contain
153-
multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we
154-
print only the transcription for the top alternative of the top result.
114+
yield chunk
155115

156-
In this case, responses are provided for interim results as well. If the
157-
response is an interim one, print a line feed at the end of it, to allow
158-
the next result to overwrite it, until the response is a final one. For the
159-
final one, print a newline to preserve the finalized transcription.
160-
"""
161-
num_chars_printed = 0
162-
for response in responses:
163-
if not response.results:
164-
continue
165-
166-
# The `results` list is consecutive. For streaming, we only care about
167-
# the first result being considered, since once it's `is_final`, it
168-
# moves on to considering the next utterance.
169-
result = response.results[0]
170-
if not result.alternatives:
171-
continue
172-
173-
top_alternative = result.alternatives[0]
174-
# Display the transcription of the top alternative.
175-
transcript = top_alternative.transcript
176-
177-
# Display interim results, but with a carriage return at the end of the
178-
# line, so subsequent lines will overwrite them.
179-
#
180-
# If the previous result was longer than this one, we need to print
181-
# some extra spaces to overwrite the previous result
182-
overwrite_chars = ' ' * (num_chars_printed - len(transcript))
183-
184-
if not result.is_final:
185-
sys.stdout.write(transcript + overwrite_chars + '\r')
186-
sys.stdout.flush()
187-
188-
num_chars_printed = len(transcript)
189-
else:
190-
print(transcript + overwrite_chars)
191-
192-
# Exit recognition if any of the transcribed phrases could be
193-
# one of our keywords.
194-
if re.search(r'\b(exit|quit)\b', transcript, re.I):
195-
print('Exiting..')
196-
break
116+
chunk = get_data(self._bytes_per_chunk)
117+
118+
def _stream_from_file(self, audio_src):
119+
with open(audio_src, 'rb') as f:
120+
for chunk in self._delayed(
121+
lambda b_per_chunk: f.read(b_per_chunk)):
122+
yield chunk
123+
124+
# Continue sending silence - 10s worth
125+
trailing_silence = six.StringIO(
126+
b'\0' * self._bytes_per_second * 10)
127+
for chunk in self._delayed(trailing_silence.read):
128+
yield chunk
129+
130+
def _thread(self):
131+
for chunk in self._stream_from_file(self._audio_src):
132+
self._fill_buffer(chunk)
133+
self._fill_buffer(None)
134+
135+
def __enter__(self):
136+
self.closed = False
137+
138+
threading.Thread(target=self._thread).start()
139+
140+
return self
141+
142+
def __exit__(self, type, value, traceback):
143+
self.closed = True
197144

198-
num_chars_printed = 0
199145

146+
def _record_keeper(responses, stream):
147+
"""Calls the stream's on_transcribe callback for each final response.
148+
149+
Args:
150+
responses - a generator of responses. The responses must already be
151+
filtered for ones with results and alternatives.
152+
stream - a ResumableMicrophoneStream.
153+
"""
154+
for r in responses:
155+
result = r.results[0]
156+
if result.is_final:
157+
top_alternative = result.alternatives[0]
200158
# Keep track of what transcripts we've received, so we can resume
201159
# intelligently when we hit the deadline
202160
stream.on_transcribe(duration_to_secs(
203161
top_alternative.words[-1].end_time))
162+
yield r
163+
204164

165+
def listen_print_loop(responses, stream):
166+
"""Iterates through server responses and prints them.
205167
206-
def main():
168+
Same as in transcribe_streaming_mic, but keeps track of when a sent
169+
audio_chunk has been transcribed.
170+
"""
171+
with_results = (r for r in responses if (
172+
r.results and r.results[0].alternatives))
173+
transcribe_streaming_mic.listen_print_loop(
174+
_record_keeper(with_results, stream))
175+
176+
177+
def main(sample_rate, audio_src):
207178
# See http://g.co/cloud/speech/docs/languages
208179
# for a list of supported languages.
209180
language_code = 'en-US' # a BCP-47 language tag
210181

211182
client = speech.SpeechClient()
212183
config = types.RecognitionConfig(
213184
encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16,
214-
sample_rate_hertz=RATE,
185+
sample_rate_hertz=sample_rate,
215186
language_code=language_code,
216187
max_alternatives=1,
217188
enable_word_time_offsets=True)
218189
streaming_config = types.StreamingRecognitionConfig(
219190
config=config,
220191
interim_results=True)
221192

222-
with MicrophoneStream(RATE, CHUNK) as stream:
193+
if audio_src:
194+
mic_manager = SimulatedMicrophoneStream(
195+
audio_src, sample_rate, int(sample_rate / 10))
196+
else:
197+
mic_manager = ResumableMicrophoneStream(
198+
sample_rate, int(sample_rate / 10))
199+
200+
with mic_manager as stream:
223201
resume = False
224202
while True:
225203
audio_generator = stream.generator(resume=resume)
226204
requests = (types.StreamingRecognizeRequest(audio_content=content)
227205
for content in audio_generator)
228206

229-
responses = client.streaming_recognize(
230-
streaming_config, requests,
231-
options=gax.CallOptions(timeout=(60 * 4)))
207+
responses = client.streaming_recognize(streaming_config, requests)
232208

233209
try:
234210
# Now, put the transcription responses to use.
235211
listen_print_loop(responses, stream)
236212
break
237-
except grpc.RpcError, e: # TODO: wrong exception
238-
if e.code() != grpc.StatusCode.INVALID_ARGUMENT:
213+
except grpc.RpcError, e:
214+
if e.code() not in (grpc.StatusCode.INVALID_ARGUMENT,
215+
grpc.StatusCode.OUT_OF_RANGE):
239216
raise
240-
241217
details = e.details()
242-
if 'deadline too short' not in details:
243-
raise
218+
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
219+
if 'deadline too short' not in details:
220+
raise
221+
else:
222+
if 'maximum allowed stream duration' not in details:
223+
raise
244224

245225
print('Resuming..')
246226
resume = True
247227

248228

249229
if __name__ == '__main__':
250-
main()
230+
parser = argparse.ArgumentParser(
231+
description=__doc__,
232+
formatter_class=argparse.RawDescriptionHelpFormatter)
233+
parser.add_argument('--rate', default=16000, help='Sample rate.', type=int)
234+
parser.add_argument('--audio_src', help='File to simulate streaming of.')
235+
args = parser.parse_args()
236+
main(args.rate, args.audio_src)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.