Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 8bf7f9a

Browse filesBrowse files
Merge pull request theseus-rs#67 from Dridus/rmm/windows-io-hang
work around hang starting database on windows
2 parents 2783485 + 0df2e49 commit 8bf7f9a
Copy full SHA for 8bf7f9a

File tree

Expand file treeCollapse file tree

1 file changed

+94
-19
lines changed
Filter options
Expand file treeCollapse file tree

1 file changed

+94
-19
lines changed

‎postgresql_commands/src/traits.rs

Copy file name to clipboardExpand all lines: postgresql_commands/src/traits.rs
+94-19Lines changed: 94 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -160,28 +160,103 @@ impl CommandExecutor for std::process::Command {
160160
impl AsyncCommandExecutor for tokio::process::Command {
161161
/// Execute the command and return the stdout and stderr
162162
async fn execute(&mut self, timeout: Option<Duration>) -> Result<(String, String)> {
163+
#[tracing::instrument(level = "debug", skip(reader))]
164+
async fn read_process(
165+
mut reader: Option<impl tokio::io::AsyncRead + Unpin>,
166+
mut exit_anyway_broadcast_receiver: tokio::sync::broadcast::Receiver<()>,
167+
) -> Result<String> {
168+
let Some(reader) = reader.as_mut() else {
169+
return Ok(String::new());
170+
};
171+
let mut vec = Vec::new();
172+
loop {
173+
use tokio::io::AsyncReadExt as _;
174+
tokio::select! {
175+
n = reader.read_buf(&mut vec) => {
176+
if n? == 0 {
177+
return Ok(String::from_utf8_lossy(&vec).into_owned());
178+
}
179+
},
180+
_ = exit_anyway_broadcast_receiver.recv() => {
181+
return Ok(String::from_utf8_lossy(&vec).into_owned());
182+
},
183+
}
184+
}
185+
}
186+
163187
debug!("Executing command: {}", self.to_command_string());
164-
let output = match timeout {
165-
Some(duration) => tokio::time::timeout(duration, self.output()).await?,
166-
None => self.output().await,
167-
}?;
168188

169-
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
170-
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
171-
debug!(
172-
"Result: {}\nstdout: {}\nstderr: {}",
173-
output
174-
.status
175-
.code()
176-
.map_or("None".to_string(), |c| c.to_string()),
177-
stdout,
178-
stderr
179-
);
189+
let res_fut = async {
190+
let mut child = self
191+
.stdout(std::process::Stdio::piped())
192+
.stderr(std::process::Stdio::piped())
193+
.spawn()?;
194+
195+
let stdout = child.stdout.take();
196+
let stderr = child.stderr.take();
197+
198+
// on windows, pg_ctl start will appear to hang if you try to read out all of stdout
199+
// and stderr. so, on windows do a horrible hack and forcibly end reading of stdout
200+
// and stderr 50ms after the process exits. on not-windows, this early exit mechanism
201+
// is set up but never sent to, resulting in the same behavior as `read_to_end`.
202+
203+
let (exit_anyway_broadcast_sender, exit_anyway_broadcast_receiver_stdout) =
204+
tokio::sync::broadcast::channel(1);
205+
let exit_anyway_broadcast_receiver_stderr = exit_anyway_broadcast_sender.subscribe();
206+
let stdout = tokio::spawn(async {
207+
read_process(stdout, exit_anyway_broadcast_receiver_stdout).await
208+
});
209+
let stderr = tokio::spawn(async {
210+
read_process(stderr, exit_anyway_broadcast_receiver_stderr).await
211+
});
212+
let exit_status = child.wait().await;
213+
#[cfg(target_os = "windows")]
214+
{
215+
tokio::time::sleep(Duration::from_millis(50)).await;
216+
let _ = exit_anyway_broadcast_sender.send(());
217+
}
218+
let (stdout, stderr) = tokio::join!(stdout, stderr);
219+
std::mem::drop(exit_anyway_broadcast_sender);
220+
221+
let exit_status = exit_status?;
222+
fn debug_render(
223+
which: &'static str,
224+
res: &std::result::Result<Result<String>, tokio::task::JoinError>,
225+
) -> String {
226+
match res {
227+
Ok(Ok(s)) => s.into(),
228+
Ok(Err(io_err)) => format!("<failed to read {}: {:?}>", which, io_err),
229+
Err(join_err) => format!("<failed to read {}: {:?}>", which, join_err),
230+
}
231+
}
232+
debug!(
233+
"Result: {}\nstdout: {}\nstderr: {}",
234+
exit_status
235+
.code()
236+
.map_or("None".to_string(), |c| c.to_string()),
237+
debug_render("stdout", &stdout),
238+
debug_render("stderr", &stderr)
239+
);
240+
241+
fn unwrap2_or_empty_string<E, E2>(
242+
r: std::result::Result<std::result::Result<String, E>, E2>,
243+
) -> String {
244+
r.map_or_else(|_| String::new(), |r| r.unwrap_or_else(|_| String::new()))
245+
}
180246

181-
if output.status.success() {
182-
Ok((stdout, stderr))
183-
} else {
184-
Err(Error::CommandError { stdout, stderr })
247+
let stdout = unwrap2_or_empty_string(stdout);
248+
let stderr = unwrap2_or_empty_string(stderr);
249+
250+
if exit_status.success() {
251+
Ok((stdout, stderr))
252+
} else {
253+
Err(Error::CommandError { stdout, stderr })
254+
}
255+
};
256+
257+
match timeout {
258+
Some(duration) => tokio::time::timeout(duration, res_fut).await?,
259+
None => res_fut.await,
185260
}
186261
}
187262
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.