@@ -160,28 +160,103 @@ impl CommandExecutor for std::process::Command {
160
160
impl AsyncCommandExecutor for tokio:: process:: Command {
161
161
/// Execute the command and return the stdout and stderr
162
162
async fn execute ( & mut self , timeout : Option < Duration > ) -> Result < ( String , String ) > {
163
+ #[ tracing:: instrument( level = "debug" , skip( reader) ) ]
164
+ async fn read_process (
165
+ mut reader : Option < impl tokio:: io:: AsyncRead + Unpin > ,
166
+ mut exit_anyway_broadcast_receiver : tokio:: sync:: broadcast:: Receiver < ( ) > ,
167
+ ) -> Result < String > {
168
+ let Some ( reader) = reader. as_mut ( ) else {
169
+ return Ok ( String :: new ( ) ) ;
170
+ } ;
171
+ let mut vec = Vec :: new ( ) ;
172
+ loop {
173
+ use tokio:: io:: AsyncReadExt as _;
174
+ tokio:: select! {
175
+ n = reader. read_buf( & mut vec) => {
176
+ if n? == 0 {
177
+ return Ok ( String :: from_utf8_lossy( & * vec) . into_owned( ) ) ;
178
+ }
179
+ } ,
180
+ _ = exit_anyway_broadcast_receiver. recv( ) => {
181
+ return Ok ( String :: from_utf8_lossy( & * vec) . into_owned( ) ) ;
182
+ } ,
183
+ }
184
+ }
185
+ }
186
+
163
187
debug ! ( "Executing command: {}" , self . to_command_string( ) ) ;
164
- let output = match timeout {
165
- Some ( duration) => tokio:: time:: timeout ( duration, self . output ( ) ) . await ?,
166
- None => self . output ( ) . await ,
167
- } ?;
168
188
169
- let stdout = String :: from_utf8_lossy ( & output. stdout ) . into_owned ( ) ;
170
- let stderr = String :: from_utf8_lossy ( & output. stderr ) . into_owned ( ) ;
171
- debug ! (
172
- "Result: {}\n stdout: {}\n stderr: {}" ,
173
- output
174
- . status
175
- . code( )
176
- . map_or( "None" . to_string( ) , |c| c. to_string( ) ) ,
177
- stdout,
178
- stderr
179
- ) ;
189
+ let res_fut = async {
190
+ let mut child = self
191
+ . stdout ( std:: process:: Stdio :: piped ( ) )
192
+ . stderr ( std:: process:: Stdio :: piped ( ) )
193
+ . spawn ( ) ?;
194
+
195
+ let stdout = child. stdout . take ( ) ;
196
+ let stderr = child. stderr . take ( ) ;
197
+
198
+ // on windows, pg_ctl start will appear to hang if you try to read out all of stdout
199
+ // and stderr. so, on windows do a horrible hack and forcibly end reading of stdout
200
+ // and stderr 50ms after the process exits. on not-windows, this early exit mechanism
201
+ // is set up but never sent to, resulting in the same behavior as `read_to_end`.
202
+
203
+ let ( exit_anyway_broadcast_sender, exit_anyway_broadcast_receiver_stdout) =
204
+ tokio:: sync:: broadcast:: channel ( 1 ) ;
205
+ let exit_anyway_broadcast_receiver_stderr = exit_anyway_broadcast_sender. subscribe ( ) ;
206
+ let stdout = tokio:: spawn ( async {
207
+ read_process ( stdout, exit_anyway_broadcast_receiver_stdout) . await
208
+ } ) ;
209
+ let stderr = tokio:: spawn ( async {
210
+ read_process ( stderr, exit_anyway_broadcast_receiver_stderr) . await
211
+ } ) ;
212
+ let exit_status = child. wait ( ) . await ;
213
+ #[ cfg( target_os = "windows" ) ]
214
+ {
215
+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
216
+ let _ = exit_anyway_broadcast_sender. send ( ( ) ) ;
217
+ }
218
+ let ( stdout, stderr) = tokio:: join!( stdout, stderr) ;
219
+ std:: mem:: drop ( exit_anyway_broadcast_sender) ;
220
+
221
+ let exit_status = exit_status?;
222
+ fn debug_render (
223
+ which : & ' static str ,
224
+ res : & std:: result:: Result < Result < String > , tokio:: task:: JoinError > ,
225
+ ) -> String {
226
+ match res {
227
+ Ok ( Ok ( s) ) => s. into ( ) ,
228
+ Ok ( Err ( io_err) ) => format ! ( "<failed to read {}: {:?}>" , which, io_err) ,
229
+ Err ( join_err) => format ! ( "<failed to read {}: {:?}>" , which, join_err) ,
230
+ }
231
+ }
232
+ debug ! (
233
+ "Result: {}\n stdout: {}\n stderr: {}" ,
234
+ exit_status
235
+ . code( )
236
+ . map_or( "None" . to_string( ) , |c| c. to_string( ) ) ,
237
+ debug_render( "stdout" , & stdout) ,
238
+ debug_render( "stderr" , & stderr)
239
+ ) ;
240
+
241
+ fn unwrap2_or_empty_string < E , E2 > (
242
+ r : std:: result:: Result < std:: result:: Result < String , E > , E2 > ,
243
+ ) -> String {
244
+ r. map_or_else ( |_| String :: new ( ) , |r| r. unwrap_or_else ( |_| String :: new ( ) ) )
245
+ }
180
246
181
- if output. status . success ( ) {
182
- Ok ( ( stdout, stderr) )
183
- } else {
184
- Err ( Error :: CommandError { stdout, stderr } )
247
+ let stdout = unwrap2_or_empty_string ( stdout) ;
248
+ let stderr = unwrap2_or_empty_string ( stderr) ;
249
+
250
+ if exit_status. success ( ) {
251
+ Ok ( ( stdout, stderr) )
252
+ } else {
253
+ Err ( Error :: CommandError { stdout, stderr } )
254
+ }
255
+ } ;
256
+
257
+ match timeout {
258
+ Some ( duration) => tokio:: time:: timeout ( duration, res_fut) . await ?,
259
+ None => res_fut. await ,
185
260
}
186
261
}
187
262
}
0 commit comments