28
28
# [START import_libraries]
29
29
from __future__ import division
30
30
31
+ import argparse
31
32
import collections
32
33
import itertools
33
34
import re
34
35
import sys
36
+ import threading
37
+ import time
35
38
36
39
from google .cloud import speech
37
40
from google .cloud .speech import enums
40
43
import grpc
41
44
import pyaudio
42
45
from six .moves import queue
46
+ import six
47
+
48
+ import transcribe_streaming_mic
43
49
# [END import_libraries]
44
50
45
- # Audio recording parameters
46
- RATE = 16000
47
- CHUNK = int ( RATE / 10 ) # 100ms
51
+
52
+ def duration_to_secs ( duration ):
53
+ return duration . seconds + ( duration . nanos / float ( 1e9 ))
48
54
49
55
50
- class MicrophoneStream ( object ):
56
+ class ResumableMicrophoneStream ( transcribe_streaming_mic . MicrophoneStream ):
51
57
"""Opens a recording stream as a generator yielding the audio chunks."""
52
58
def __init__ (self , rate , chunk_size , max_replay_secs = 5 ):
53
- self ._rate = rate
54
- self ._chunk_size = chunk_size
59
+ super (ResumableMicrophoneStream , self ).__init__ (rate , chunk_size )
55
60
self ._max_replay_secs = max_replay_secs
56
61
57
- # Create a thread-safe buffer of audio data
58
- self ._buff = queue .Queue ()
59
- self .closed = True
60
-
61
- def __enter__ (self ):
62
- num_channels = 1
63
- self ._audio_interface = pyaudio .PyAudio ()
64
- self ._audio_stream = self ._audio_interface .open (
65
- format = pyaudio .paInt16 ,
66
- # The API currently only supports 1-channel (mono) audio
67
- # https://goo.gl/z757pE
68
- channels = num_channels , rate = self ._rate ,
69
- input = True , frames_per_buffer = self ._chunk_size ,
70
- # Run the audio stream asynchronously to fill the buffer object.
71
- # This is necessary so that the input device's buffer doesn't
72
- # overflow while the calling thread makes network requests, etc.
73
- stream_callback = self ._fill_buffer ,
74
- )
75
-
76
- self .closed = False
62
+ # Some useful numbers
63
+ # 2 bytes in 16 bit samples
64
+ self ._bytes_per_sample = 2 * self ._num_channels
65
+ self ._bytes_per_second = self ._rate * self ._bytes_per_sample
77
66
78
- bytes_per_sample = 2 * num_channels # 2 bytes in 16 bit samples
79
- self ._bytes_per_second = self ._rate * bytes_per_sample
80
-
81
- bytes_per_chunk = (self ._chunk_size * bytes_per_sample )
82
- chunks_per_second = self ._bytes_per_second / bytes_per_chunk
67
+ self ._bytes_per_chunk = (self ._chunk_size * self ._bytes_per_sample )
68
+ self ._chunks_per_second = (
69
+ self ._bytes_per_second / self ._bytes_per_chunk )
83
70
self ._untranscribed = collections .deque (
84
- maxlen = self ._max_replay_secs * chunks_per_second )
85
-
86
- return self
87
-
88
- def __exit__ (self , type , value , traceback ):
89
- self ._audio_stream .stop_stream ()
90
- self ._audio_stream .close ()
91
- self .closed = True
92
- # Signal the generator to terminate so that the client's
93
- # streaming_recognize method will not block the process termination.
94
- self ._buff .put (None )
95
- self ._audio_interface .terminate ()
96
-
97
- def _fill_buffer (self , in_data , frame_count , time_info , status_flags ):
98
- """Continuously collect data from the audio stream, into the buffer."""
99
- self ._buff .put (in_data )
100
- return None , pyaudio .paContinue
71
+ maxlen = self ._max_replay_secs * self ._chunks_per_second )
101
72
102
73
def on_transcribe (self , end_time ):
103
74
while self ._untranscribed and end_time > self ._untranscribed [0 ][1 ]:
@@ -106,145 +77,160 @@ def on_transcribe(self, end_time):
106
77
def generator (self , resume = False ):
107
78
total_bytes_sent = 0
108
79
if resume :
80
+ # Make a copy, in case on_transcribe is called while yielding them
81
+ catchup = list (self ._untranscribed )
109
82
# Yield all the untranscribed chunks first
110
- for chunk , _ in self . _untranscribed :
83
+ for chunk , _ in catchup :
111
84
yield chunk
112
- while not self .closed :
113
- # Use a blocking get() to ensure there's at least one chunk of
114
- # data, and stop iteration if the chunk is None, indicating the
115
- # end of the audio stream.
116
- chunk = self ._buff .get ()
117
- if chunk is None :
118
- return
119
- data = [chunk ]
120
-
121
- # Now consume whatever other data's still buffered.
122
- while True :
123
- try :
124
- chunk = self ._buff .get (block = False )
125
- if chunk is None :
126
- return
127
- data .append (chunk )
128
- except queue .Empty :
129
- break
130
-
131
- byte_data = b'' .join (data )
132
85
86
+ for byte_data in super (ResumableMicrophoneStream , self ).generator ():
133
87
# Populate the replay buffer of untranscribed audio bytes
134
88
total_bytes_sent += len (byte_data )
135
89
chunk_end_time = total_bytes_sent / self ._bytes_per_second
136
90
self ._untranscribed .append ((byte_data , chunk_end_time ))
137
91
138
92
yield byte_data
139
- # [END audio_stream]
140
93
141
94
142
- def duration_to_secs (duration ):
143
- return duration .seconds + (duration .nanos / float (1e9 ))
95
+ class SimulatedMicrophoneStream (ResumableMicrophoneStream ):
96
+ def __init__ (self , audio_src , * args , ** kwargs ):
97
+ super (SimulatedMicrophoneStream , self ).__init__ (* args , ** kwargs )
98
+ self ._audio_src = audio_src
144
99
100
+ def _delayed (self , get_data ):
101
+ total_bytes_read = 0
102
+ start_time = time .time ()
145
103
146
- def listen_print_loop (responses , stream ):
147
- """Iterates through server responses and prints them.
104
+ chunk = get_data (self ._bytes_per_chunk )
148
105
149
- The responses passed is a generator that will block until a response
150
- is provided by the server.
106
+ while chunk and not self .closed :
107
+ total_bytes_read += len (chunk )
108
+ expected_yield_time = start_time + (
109
+ total_bytes_read / self ._bytes_per_second )
110
+ now = time .time ()
111
+ if expected_yield_time > now :
112
+ time .sleep (expected_yield_time - now )
151
113
152
- Each response may contain multiple results, and each result may contain
153
- multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we
154
- print only the transcription for the top alternative of the top result.
114
+ yield chunk
155
115
156
- In this case, responses are provided for interim results as well. If the
157
- response is an interim one, print a line feed at the end of it, to allow
158
- the next result to overwrite it, until the response is a final one. For the
159
- final one, print a newline to preserve the finalized transcription.
160
- """
161
- num_chars_printed = 0
162
- for response in responses :
163
- if not response .results :
164
- continue
165
-
166
- # The `results` list is consecutive. For streaming, we only care about
167
- # the first result being considered, since once it's `is_final`, it
168
- # moves on to considering the next utterance.
169
- result = response .results [0 ]
170
- if not result .alternatives :
171
- continue
172
-
173
- top_alternative = result .alternatives [0 ]
174
- # Display the transcription of the top alternative.
175
- transcript = top_alternative .transcript
176
-
177
- # Display interim results, but with a carriage return at the end of the
178
- # line, so subsequent lines will overwrite them.
179
- #
180
- # If the previous result was longer than this one, we need to print
181
- # some extra spaces to overwrite the previous result
182
- overwrite_chars = ' ' * (num_chars_printed - len (transcript ))
183
-
184
- if not result .is_final :
185
- sys .stdout .write (transcript + overwrite_chars + '\r ' )
186
- sys .stdout .flush ()
187
-
188
- num_chars_printed = len (transcript )
189
- else :
190
- print (transcript + overwrite_chars )
191
-
192
- # Exit recognition if any of the transcribed phrases could be
193
- # one of our keywords.
194
- if re .search (r'\b(exit|quit)\b' , transcript , re .I ):
195
- print ('Exiting..' )
196
- break
116
+ chunk = get_data (self ._bytes_per_chunk )
117
+
118
+ def _stream_from_file (self , audio_src ):
119
+ with open (audio_src , 'rb' ) as f :
120
+ for chunk in self ._delayed (
121
+ lambda b_per_chunk : f .read (b_per_chunk )):
122
+ yield chunk
123
+
124
+ # Continue sending silence - 10s worth
125
+ trailing_silence = six .StringIO (
126
+ b'\0 ' * self ._bytes_per_second * 10 )
127
+ for chunk in self ._delayed (trailing_silence .read ):
128
+ yield chunk
129
+
130
+ def _thread (self ):
131
+ for chunk in self ._stream_from_file (self ._audio_src ):
132
+ self ._fill_buffer (chunk )
133
+ self ._fill_buffer (None )
134
+
135
+ def __enter__ (self ):
136
+ self .closed = False
137
+
138
+ threading .Thread (target = self ._thread ).start ()
139
+
140
+ return self
141
+
142
+ def __exit__ (self , type , value , traceback ):
143
+ self .closed = True
197
144
198
- num_chars_printed = 0
199
145
146
+ def _record_keeper (responses , stream ):
147
+ """Calls the stream's on_transcribe callback for each final response.
148
+
149
+ Args:
150
+ responses - a generator of responses. The responses must already be
151
+ filtered for ones with results and alternatives.
152
+ stream - a ResumableMicrophoneStream.
153
+ """
154
+ for r in responses :
155
+ result = r .results [0 ]
156
+ if result .is_final :
157
+ top_alternative = result .alternatives [0 ]
200
158
# Keep track of what transcripts we've received, so we can resume
201
159
# intelligently when we hit the deadline
202
160
stream .on_transcribe (duration_to_secs (
203
161
top_alternative .words [- 1 ].end_time ))
162
+ yield r
163
+
204
164
165
+ def listen_print_loop (responses , stream ):
166
+ """Iterates through server responses and prints them.
205
167
206
- def main ():
168
+ Same as in transcribe_streaming_mic, but keeps track of when a sent
169
+ audio_chunk has been transcribed.
170
+ """
171
+ with_results = (r for r in responses if (
172
+ r .results and r .results [0 ].alternatives ))
173
+ transcribe_streaming_mic .listen_print_loop (
174
+ _record_keeper (with_results , stream ))
175
+
176
+
177
+ def main (sample_rate , audio_src ):
207
178
# See http://g.co/cloud/speech/docs/languages
208
179
# for a list of supported languages.
209
180
language_code = 'en-US' # a BCP-47 language tag
210
181
211
182
client = speech .SpeechClient ()
212
183
config = types .RecognitionConfig (
213
184
encoding = enums .RecognitionConfig .AudioEncoding .LINEAR16 ,
214
- sample_rate_hertz = RATE ,
185
+ sample_rate_hertz = sample_rate ,
215
186
language_code = language_code ,
216
187
max_alternatives = 1 ,
217
188
enable_word_time_offsets = True )
218
189
streaming_config = types .StreamingRecognitionConfig (
219
190
config = config ,
220
191
interim_results = True )
221
192
222
- with MicrophoneStream (RATE , CHUNK ) as stream :
193
+ if audio_src :
194
+ mic_manager = SimulatedMicrophoneStream (
195
+ audio_src , sample_rate , int (sample_rate / 10 ))
196
+ else :
197
+ mic_manager = ResumableMicrophoneStream (
198
+ sample_rate , int (sample_rate / 10 ))
199
+
200
+ with mic_manager as stream :
223
201
resume = False
224
202
while True :
225
203
audio_generator = stream .generator (resume = resume )
226
204
requests = (types .StreamingRecognizeRequest (audio_content = content )
227
205
for content in audio_generator )
228
206
229
- responses = client .streaming_recognize (
230
- streaming_config , requests ,
231
- options = gax .CallOptions (timeout = (60 * 4 )))
207
+ responses = client .streaming_recognize (streaming_config , requests )
232
208
233
209
try :
234
210
# Now, put the transcription responses to use.
235
211
listen_print_loop (responses , stream )
236
212
break
237
- except grpc .RpcError , e : # TODO: wrong exception
238
- if e .code () != grpc .StatusCode .INVALID_ARGUMENT :
213
+ except grpc .RpcError , e :
214
+ if e .code () not in (grpc .StatusCode .INVALID_ARGUMENT ,
215
+ grpc .StatusCode .OUT_OF_RANGE ):
239
216
raise
240
-
241
217
details = e .details ()
242
- if 'deadline too short' not in details :
243
- raise
218
+ if e .code () == grpc .StatusCode .INVALID_ARGUMENT :
219
+ if 'deadline too short' not in details :
220
+ raise
221
+ else :
222
+ if 'maximum allowed stream duration' not in details :
223
+ raise
244
224
245
225
print ('Resuming..' )
246
226
resume = True
247
227
248
228
249
229
if __name__ == '__main__' :
250
- main ()
230
+ parser = argparse .ArgumentParser (
231
+ description = __doc__ ,
232
+ formatter_class = argparse .RawDescriptionHelpFormatter )
233
+ parser .add_argument ('--rate' , default = 16000 , help = 'Sample rate.' , type = int )
234
+ parser .add_argument ('--audio_src' , help = 'File to simulate streaming of.' )
235
+ args = parser .parse_args ()
236
+ main (args .rate , args .audio_src )
0 commit comments