diff --git a/src/Symfony/Component/Process/InputStream.php b/src/Symfony/Component/Process/InputStream.php
new file mode 100644
index 0000000000000..831b10932599d
--- /dev/null
+++ b/src/Symfony/Component/Process/InputStream.php
@@ -0,0 +1,90 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Process;
+
+use Symfony\Component\Process\Exception\RuntimeException;
+
+/**
+ * Provides a way to continuously write to the input of a Process until the InputStream is closed.
+ *
+ * @author Nicolas Grekas
+ */
+class InputStream implements \IteratorAggregate
+{
+ private $onEmpty = null;
+ private $input = array();
+ private $open = true;
+
+ /**
+ * Sets a callback that is called when the write buffer becomes empty.
+ */
+ public function onEmpty(callable $onEmpty = null)
+ {
+ $this->onEmpty = $onEmpty;
+ }
+
+ /**
+ * Appends an input to the write buffer.
+ *
+ * @param resource|scalar|\Traversable|null The input to append as stream resource, scalar or \Traversable
+ */
+ public function write($input)
+ {
+ if (null === $input) {
+ return;
+ }
+ if ($this->isClosed()) {
+ throw new RuntimeException(sprintf('%s is closed', static::class));
+ }
+ $this->input[] = ProcessUtils::validateInput(__METHOD__, $input);
+ }
+
+ /**
+ * Closes the write buffer.
+ */
+ public function close()
+ {
+ $this->open = false;
+ }
+
+ /**
+ * Tells whether the write buffer is closed or not.
+ */
+ public function isClosed()
+ {
+ return !$this->open;
+ }
+
+ public function getIterator()
+ {
+ $this->open = true;
+
+ while ($this->open || $this->input) {
+ if (!$this->input) {
+ yield '';
+ continue;
+ }
+ $current = array_shift($this->input);
+
+ if ($current instanceof \Iterator) {
+ foreach ($current as $cur) {
+ yield $cur;
+ }
+ } else {
+ yield $current;
+ }
+ if (!$this->input && $this->open && null !== $onEmpty = $this->onEmpty) {
+ $this->write($onEmpty($this));
+ }
+ }
+ }
+}
diff --git a/src/Symfony/Component/Process/Pipes/AbstractPipes.php b/src/Symfony/Component/Process/Pipes/AbstractPipes.php
index 73c4e5cf5a507..3bf2cd469c874 100644
--- a/src/Symfony/Component/Process/Pipes/AbstractPipes.php
+++ b/src/Symfony/Component/Process/Pipes/AbstractPipes.php
@@ -11,6 +11,8 @@
namespace Symfony\Component\Process\Pipes;
+use Symfony\Component\Process\Exception\InvalidArgumentException;
+
/**
* @author Romain Neutron
*
@@ -23,7 +25,7 @@ abstract class AbstractPipes implements PipesInterface
/** @var string */
private $inputBuffer = '';
- /** @var resource|\Iterator|null */
+ /** @var resource|scalar|\Iterator|null */
private $input;
/** @var bool */
private $blocked = true;
@@ -84,6 +86,8 @@ protected function unblock()
/**
* Writes input to stdin.
+ *
+ * @throws InvalidArgumentException When an input iterator yields a non supported value
*/
protected function write()
{
@@ -97,10 +101,18 @@ protected function write()
$input = null;
} elseif (is_resource($input = $input->current())) {
stream_set_blocking($input, 0);
- } else {
- $this->inputBuffer .= $input;
+ } elseif (!isset($this->inputBuffer[0])) {
+ if (!is_string($input)) {
+ if (!is_scalar($input)) {
+ throw new InvalidArgumentException(sprintf('%s yielded a value of type "%s", but only scalars and stream resources are supported', get_class($this->input), gettype($input)));
+ }
+ $input = (string) $input;
+ }
+ $this->inputBuffer = $input;
$this->input->next();
$input = null;
+ } else {
+ $input = null;
}
}
diff --git a/src/Symfony/Component/Process/Process.php b/src/Symfony/Component/Process/Process.php
index 11cffca4dfc52..4820892fadb74 100644
--- a/src/Symfony/Component/Process/Process.php
+++ b/src/Symfony/Component/Process/Process.php
@@ -132,7 +132,7 @@ class Process
* @param string $commandline The command line to run
* @param string|null $cwd The working directory or null to use the working dir of the current PHP process
* @param array|null $env The environment variables or null to use the same environment as the current PHP process
- * @param string|null $input The input
+ * @param mixed|null $input The input as stream resource, scalar or \Traversable, or null for no input
* @param int|float|null $timeout The timeout in seconds or null to disable
* @param array $options An array of options for proc_open
*
@@ -1027,7 +1027,7 @@ public function setEnv(array $env)
/**
* Gets the Process input.
*
- * @return null|string The Process input
+ * @return resource|string|\Iterator|null The Process input
*/
public function getInput()
{
@@ -1039,7 +1039,7 @@ public function getInput()
*
* This content will be passed to the underlying process standard input.
*
- * @param mixed $input The content
+ * @param resource|scalar|\Traversable|null $input The content
*
* @return self The current Process instance
*
diff --git a/src/Symfony/Component/Process/ProcessBuilder.php b/src/Symfony/Component/Process/ProcessBuilder.php
index 69843caeb94ca..32fd2ed67903d 100644
--- a/src/Symfony/Component/Process/ProcessBuilder.php
+++ b/src/Symfony/Component/Process/ProcessBuilder.php
@@ -167,7 +167,7 @@ public function addEnvironmentVariables(array $variables)
/**
* Sets the input of the process.
*
- * @param mixed $input The input as a string
+ * @param resource|scalar|\Traversable|null $input The input content
*
* @return ProcessBuilder
*
diff --git a/src/Symfony/Component/Process/Tests/ProcessTest.php b/src/Symfony/Component/Process/Tests/ProcessTest.php
index db9a01051fec0..a488e3d110f67 100644
--- a/src/Symfony/Component/Process/Tests/ProcessTest.php
+++ b/src/Symfony/Component/Process/Tests/ProcessTest.php
@@ -14,6 +14,7 @@
use Symfony\Component\Process\Exception\LogicException;
use Symfony\Component\Process\Exception\ProcessTimedOutException;
use Symfony\Component\Process\Exception\RuntimeException;
+use Symfony\Component\Process\InputStream;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Pipes\PipesInterface;
use Symfony\Component\Process\Process;
@@ -1176,33 +1177,99 @@ public function provideVariousIncrementals() {
public function testIteratorInput()
{
- $nextData = 'ping';
- $input = function () use (&$nextData) {
- while (false !== $nextData) {
- yield $nextData;
- yield $nextData = '';
- }
+ $input = function () {
+ yield 'ping';
+ yield 'pong';
};
- $input = $input();
- $process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
+ $process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'), null, null, $input());
+ $process->run();
+ $this->assertSame('pingpong', $process->getOutput());
+ }
+
+ public function testSimpleInputStream()
+ {
+ $input = new InputStream();
+
+ $process = new Process(self::$phpBin.' -r '.escapeshellarg('echo \'ping\'; stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
- $process->start(function ($type, $data) use ($input, &$nextData) {
+
+ $process->start(function ($type, $data) use ($input) {
if ('ping' === $data) {
- $h = fopen('php://memory', 'r+');
- fwrite($h, 'pong');
- rewind($h);
- $nextData = $h;
- $input->next();
- } else {
- $nextData = false;
+ $input->write('pang');
+ } elseif (!$input->isClosed()) {
+ $input->write('pong');
+ $input->close();
+ }
+ });
+
+ $process->wait();
+ $this->assertSame('pingpangpong', $process->getOutput());
+ }
+
+ public function testInputStreamWithCallable()
+ {
+ $i = 0;
+ $stream = fopen('php://memory', 'w+');
+ $stream = function () use ($stream, &$i) {
+ if ($i < 3) {
+ rewind($stream);
+ fwrite($stream, ++$i);
+ rewind($stream);
+
+ return $stream;
}
+ };
+
+ $input = new InputStream();
+ $input->onEmpty($stream);
+ $input->write($stream());
+
+ $process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
+ $process->setInput($input);
+ $process->start(function ($type, $data) use ($input) {
+ $input->close();
+ });
+
+ $process->wait();
+ $this->assertSame('123', $process->getOutput());
+ }
+
+ public function testInputStreamWithGenerator()
+ {
+ $input = new InputStream();
+ $input->onEmpty(function ($input) {
+ yield 'pong';
+ $input->close();
});
+ $process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
+ $process->setInput($input);
+ $process->start();
+ $input->write('ping');
$process->wait();
$this->assertSame('pingpong', $process->getOutput());
}
+ public function testInputStreamOnEmpty()
+ {
+ $i = 0;
+ $input = new InputStream();
+ $input->onEmpty(function () use (&$i) {++$i;});
+
+ $process = new Process(self::$phpBin.' -r '.escapeshellarg('echo 123; echo fread(STDIN, 1); echo 456;'));
+ $process->setInput($input);
+ $process->start(function ($type, $data) use ($input) {
+ if ('123' === $data) {
+ $input->close();
+ }
+ });
+ $process->wait();
+
+ $this->assertSame(0, $i, 'InputStream->onEmpty callback should be called only when the input *becomes* empty');
+ $this->assertSame('123456', $process->getOutput());
+ }
+
/**
* @param string $commandline
* @param null|string $cwd