Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4ee1a63

Browse filesBrowse files
author
Ross MacLeod
committed
work around issue where stdout/stderr never EOF when running pg_ctl start on windows by giving a 50ms grace period after subprocess exit and returning without the EOF
1 parent 9d6f088 commit 4ee1a63
Copy full SHA for 4ee1a63

File tree

Expand file treeCollapse file tree

1 file changed

+94
-19
lines changed
Filter options
Expand file treeCollapse file tree

1 file changed

+94
-19
lines changed

‎postgresql_commands/src/traits.rs

Copy file name to clipboardExpand all lines: postgresql_commands/src/traits.rs
+94-19Lines changed: 94 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -160,28 +160,103 @@ impl CommandExecutor for std::process::Command {
160160
impl AsyncCommandExecutor for tokio::process::Command {
161161
/// Execute the command and return the stdout and stderr
162162
async fn execute(&mut self, timeout: Option<Duration>) -> Result<(String, String)> {
163+
#[tracing::instrument(level = "debug", skip(reader))]
164+
async fn read_process(
165+
mut reader: Option<impl tokio::io::AsyncRead + Unpin>,
166+
mut exit_anyway_broadcast_receiver: tokio::sync::broadcast::Receiver<()>,
167+
) -> Result<String> {
168+
let Some(reader) = reader.as_mut() else {
169+
return Ok(String::new());
170+
};
171+
let mut vec = Vec::new();
172+
loop {
173+
use tokio::io::AsyncReadExt as _;
174+
tokio::select! {
175+
n = reader.read_buf(&mut vec) => {
176+
if n? == 0 {
177+
return Ok(String::from_utf8_lossy(&*vec).into_owned());
178+
}
179+
},
180+
_ = exit_anyway_broadcast_receiver.recv() => {
181+
return Ok(String::from_utf8_lossy(&*vec).into_owned());
182+
},
183+
}
184+
}
185+
}
186+
163187
debug!("Executing command: {}", self.to_command_string());
164-
let output = match timeout {
165-
Some(duration) => tokio::time::timeout(duration, self.output()).await?,
166-
None => self.output().await,
167-
}?;
168188

169-
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
170-
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
171-
debug!(
172-
"Result: {}\nstdout: {}\nstderr: {}",
173-
output
174-
.status
175-
.code()
176-
.map_or("None".to_string(), |c| c.to_string()),
177-
stdout,
178-
stderr
179-
);
189+
let res_fut = async {
190+
let mut child = self
191+
.stdout(std::process::Stdio::piped())
192+
.stderr(std::process::Stdio::piped())
193+
.spawn()?;
194+
195+
let stdout = child.stdout.take();
196+
let stderr = child.stderr.take();
197+
198+
// on windows, pg_ctl start will appear to hang if you try to read out all of stdout
199+
// and stderr. so, on windows do a horrible hack and forcibly end reading of stdout
200+
// and stderr 50ms after the process exits. on not-windows, this early exit mechanism
201+
// is set up but never sent to, resulting in the same behavior as `read_to_end`.
202+
203+
let (exit_anyway_broadcast_sender, exit_anyway_broadcast_receiver_stdout) =
204+
tokio::sync::broadcast::channel(1);
205+
let exit_anyway_broadcast_receiver_stderr = exit_anyway_broadcast_sender.subscribe();
206+
let stdout = tokio::spawn(async {
207+
read_process(stdout, exit_anyway_broadcast_receiver_stdout).await
208+
});
209+
let stderr = tokio::spawn(async {
210+
read_process(stderr, exit_anyway_broadcast_receiver_stderr).await
211+
});
212+
let exit_status = child.wait().await;
213+
#[cfg(target_os = "windows")]
214+
{
215+
tokio::time::sleep(Duration::from_millis(50)).await;
216+
let _ = exit_anyway_broadcast_sender.send(());
217+
}
218+
let (stdout, stderr) = tokio::join!(stdout, stderr);
219+
std::mem::drop(exit_anyway_broadcast_sender);
220+
221+
let exit_status = exit_status?;
222+
fn debug_render(
223+
which: &'static str,
224+
res: &std::result::Result<Result<String>, tokio::task::JoinError>,
225+
) -> String {
226+
match res {
227+
Ok(Ok(s)) => s.into(),
228+
Ok(Err(io_err)) => format!("<failed to read {}: {:?}>", which, io_err),
229+
Err(join_err) => format!("<failed to read {}: {:?}>", which, join_err),
230+
}
231+
}
232+
debug!(
233+
"Result: {}\nstdout: {}\nstderr: {}",
234+
exit_status
235+
.code()
236+
.map_or("None".to_string(), |c| c.to_string()),
237+
debug_render("stdout", &stdout),
238+
debug_render("stderr", &stderr)
239+
);
240+
241+
fn unwrap2_or_empty_string<E, E2>(
242+
r: std::result::Result<std::result::Result<String, E>, E2>,
243+
) -> String {
244+
r.map_or_else(|_| String::new(), |r| r.unwrap_or_else(|_| String::new()))
245+
}
180246

181-
if output.status.success() {
182-
Ok((stdout, stderr))
183-
} else {
184-
Err(Error::CommandError { stdout, stderr })
247+
let stdout = unwrap2_or_empty_string(stdout);
248+
let stderr = unwrap2_or_empty_string(stderr);
249+
250+
if exit_status.success() {
251+
Ok((stdout, stderr))
252+
} else {
253+
Err(Error::CommandError { stdout, stderr })
254+
}
255+
};
256+
257+
match timeout {
258+
Some(duration) => tokio::time::timeout(duration, res_fut).await?,
259+
None => res_fut.await,
185260
}
186261
}
187262
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.