-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Process] Add InputStream to seamlessly feed running processes #18386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Process; | ||
|
||
use Symfony\Component\Process\Exception\RuntimeException; | ||
|
||
/** | ||
* Provides a way to continuously write to the input of a Process until the InputStream is closed. | ||
* | ||
* @author Nicolas Grekas <p@tchwork.com> | ||
*/ | ||
class InputStream implements \IteratorAggregate | ||
{ | ||
private $onEmpty = null; | ||
private $input = array(); | ||
private $open = true; | ||
|
||
/** | ||
* Sets a callback that is called when the write buffer becomes empty. | ||
*/ | ||
public function onEmpty(callable $onEmpty = null) | ||
{ | ||
$this->onEmpty = $onEmpty; | ||
} | ||
|
||
/** | ||
* Appends an input to the write buffer. | ||
* | ||
* @param resource|scalar|\Traversable|null The input to append as stream resource, scalar or \Traversable | ||
*/ | ||
public function write($input) | ||
{ | ||
if (null === $input) { | ||
return; | ||
} | ||
if ($this->isClosed()) { | ||
throw new RuntimeException(sprintf('%s is closed', static::class)); | ||
} | ||
$this->input[] = ProcessUtils::validateInput(__METHOD__, $input); | ||
} | ||
|
||
/** | ||
* Closes the write buffer. | ||
*/ | ||
public function close() | ||
{ | ||
$this->open = false; | ||
} | ||
|
||
/** | ||
* Tells whether the write buffer is closed or not. | ||
*/ | ||
public function isClosed() | ||
{ | ||
return !$this->open; | ||
} | ||
|
||
public function getIterator() | ||
{ | ||
$this->open = true; | ||
|
||
while ($this->open || $this->input) { | ||
if (!$this->input) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this reach the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The yield is what breaks the infinite loop: the state can change between two iterations. But you know that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great. The behavior is indeed better (but I haven't figured it out just by reading the code previously). Btw, is there a test ensuring that it is called only when becoming empty (so that someone doing the change I mentioned here would break the test, making it clear that it is a behavior change) ? If no, please add one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Specific test added, even some of the existing ones fail also when doing it wrong. |
||
yield ''; | ||
continue; | ||
} | ||
$current = array_shift($this->input); | ||
|
||
if ($current instanceof \Iterator) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ths should actually check for Traversable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is necessary for consistency with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But validateInput wraps Traversable into IteratorIterator, which makes Iterator valid technically, isn't it? Then why prefer Traversable? |
||
foreach ($current as $cur) { | ||
yield $cur; | ||
} | ||
} else { | ||
yield $current; | ||
} | ||
if (!$this->input && $this->open && null !== $onEmpty = $this->onEmpty) { | ||
$this->write($onEmpty($this)); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,8 @@ | |
|
||
namespace Symfony\Component\Process\Pipes; | ||
|
||
use Symfony\Component\Process\Exception\InvalidArgumentException; | ||
|
||
/** | ||
* @author Romain Neutron <imprec@gmail.com> | ||
* | ||
|
@@ -23,7 +25,7 @@ abstract class AbstractPipes implements PipesInterface | |
|
||
/** @var string */ | ||
private $inputBuffer = ''; | ||
/** @var resource|\Iterator|null */ | ||
/** @var resource|scalar|\Iterator|null */ | ||
private $input; | ||
/** @var bool */ | ||
private $blocked = true; | ||
|
@@ -84,6 +86,8 @@ protected function unblock() | |
|
||
/** | ||
* Writes input to stdin. | ||
* | ||
* @throws InvalidArgumentException When an input iterator yields a non supported value | ||
*/ | ||
protected function write() | ||
{ | ||
|
@@ -97,10 +101,18 @@ protected function write() | |
$input = null; | ||
} elseif (is_resource($input = $input->current())) { | ||
stream_set_blocking($input, 0); | ||
} else { | ||
$this->inputBuffer .= $input; | ||
} elseif (!isset($this->inputBuffer[0])) { | ||
if (!is_string($input)) { | ||
if (!is_scalar($input)) { | ||
throw new InvalidArgumentException(sprintf('%s yielded a value of type "%s", but only scalars and stream resources are supported', get_class($this->input), gettype($input))); | ||
} | ||
$input = (string) $input; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not too familiar with streams, but the exception makes me believe that a stream will be casted to a string here.
Is this what it's intended to to or am I missing something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're missing #18386 (diff) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah right, that explains the "resource" part of the exception, gotcha! |
||
} | ||
$this->inputBuffer = $input; | ||
$this->input->next(); | ||
$input = null; | ||
} else { | ||
$input = null; | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing phpdoc for the argument type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.