Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Process] Add InputStream to seamlessly feed running processes #18386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions 90 src/Symfony/Component/Process/InputStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Process;

use Symfony\Component\Process\Exception\RuntimeException;

/**
* Provides a way to continuously write to the input of a Process until the InputStream is closed.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
class InputStream implements \IteratorAggregate
{
private $onEmpty = null;
private $input = array();
private $open = true;

/**
* Sets a callback that is called when the write buffer becomes empty.
*/
public function onEmpty(callable $onEmpty = null)
{
$this->onEmpty = $onEmpty;
}

/**
* Appends an input to the write buffer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing phpdoc for the argument type

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*
* @param resource|scalar|\Traversable|null The input to append as stream resource, scalar or \Traversable
*/
public function write($input)
{
if (null === $input) {
return;
}
if ($this->isClosed()) {
throw new RuntimeException(sprintf('%s is closed', static::class));
}
$this->input[] = ProcessUtils::validateInput(__METHOD__, $input);
}

/**
* Closes the write buffer.
*/
public function close()
{
$this->open = false;
}

/**
* Tells whether the write buffer is closed or not.
*/
public function isClosed()
{
return !$this->open;
}

public function getIterator()
{
$this->open = true;

while ($this->open || $this->input) {
if (!$this->input) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this reach the onEmpty here ? Otherwise, it looks like this will never end when entering this condition, as input will stay empty

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The yield is what breaks the infinite loop: the state can change between two iterations. But you know that.
About your suggestion, that's how it was in an earlier version, but I ended up preferring this semantic for onEmpty: to be called when the buffer becomes empty.
Otherwise, the callback would be called both for initializing the buffer (assuming it started empty) and repeatedly when it's empty again, which makes it very hard to reliably know how many times it will be called.
In the submitted version, the responsibility looks more clear to me: repopulating the buffer when it's getting empty.
That's how I thought about it at least.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. The behavior is indeed better (but I haven't figured it out just by reading the code previously).
Please expand the doc PR to explain usage of this class, so that such feature is understandable by users, not only by you.

Btw, is there a test ensuring that it is called only when becoming empty (so that someone doing the change I mentioned here would break the test, making it clear that it is a behavior change) ? If no, please add one

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specific test added, even some of the existing ones fail also when doing it wrong.

yield '';
continue;
}
$current = array_shift($this->input);

if ($current instanceof \Iterator) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ths should actually check for Traversable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary for consistency with ProcessUtils::validateInput

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But validateInput wraps Traversable into IteratorIterator, which makes Iterator valid technically, isn't it? Then why prefer Traversable?

foreach ($current as $cur) {
yield $cur;
}
} else {
yield $current;
}
if (!$this->input && $this->open && null !== $onEmpty = $this->onEmpty) {
$this->write($onEmpty($this));
}
}
}
}
18 changes: 15 additions & 3 deletions 18 src/Symfony/Component/Process/Pipes/AbstractPipes.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

namespace Symfony\Component\Process\Pipes;

use Symfony\Component\Process\Exception\InvalidArgumentException;

/**
* @author Romain Neutron <imprec@gmail.com>
*
Expand All @@ -23,7 +25,7 @@ abstract class AbstractPipes implements PipesInterface

/** @var string */
private $inputBuffer = '';
/** @var resource|\Iterator|null */
/** @var resource|scalar|\Iterator|null */
private $input;
/** @var bool */
private $blocked = true;
Expand Down Expand Up @@ -84,6 +86,8 @@ protected function unblock()

/**
* Writes input to stdin.
*
* @throws InvalidArgumentException When an input iterator yields a non supported value
*/
protected function write()
{
Expand All @@ -97,10 +101,18 @@ protected function write()
$input = null;
} elseif (is_resource($input = $input->current())) {
stream_set_blocking($input, 0);
} else {
$this->inputBuffer .= $input;
} elseif (!isset($this->inputBuffer[0])) {
if (!is_string($input)) {
if (!is_scalar($input)) {
throw new InvalidArgumentException(sprintf('%s yielded a value of type "%s", but only scalars and stream resources are supported', get_class($this->input), gettype($input)));
}
$input = (string) $input;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too familiar with streams, but the exception makes me believe that a stream will be casted to a string here.

$ php -r '$f = fopen("composer.json", "r"); echo (string) $f;'
Resource id #5

Is this what it's intended to to or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're missing #18386 (diff)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, that explains the "resource" part of the exception, gotcha!

}
$this->inputBuffer = $input;
$this->input->next();
$input = null;
} else {
$input = null;
}
}

Expand Down
6 changes: 3 additions & 3 deletions 6 src/Symfony/Component/Process/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class Process
* @param string $commandline The command line to run
* @param string|null $cwd The working directory or null to use the working dir of the current PHP process
* @param array|null $env The environment variables or null to use the same environment as the current PHP process
* @param string|null $input The input
* @param mixed|null $input The input as stream resource, scalar or \Traversable, or null for no input
* @param int|float|null $timeout The timeout in seconds or null to disable
* @param array $options An array of options for proc_open
*
Expand Down Expand Up @@ -1027,7 +1027,7 @@ public function setEnv(array $env)
/**
* Gets the Process input.
*
* @return null|string The Process input
* @return resource|string|\Iterator|null The Process input
*/
public function getInput()
{
Expand All @@ -1039,7 +1039,7 @@ public function getInput()
*
* This content will be passed to the underlying process standard input.
*
* @param mixed $input The content
* @param resource|scalar|\Traversable|null $input The content
*
* @return self The current Process instance
*
Expand Down
2 changes: 1 addition & 1 deletion 2 src/Symfony/Component/Process/ProcessBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public function addEnvironmentVariables(array $variables)
/**
* Sets the input of the process.
*
* @param mixed $input The input as a string
* @param resource|scalar|\Traversable|null $input The input content
*
* @return ProcessBuilder
*
Expand Down
99 changes: 83 additions & 16 deletions 99 src/Symfony/Component/Process/Tests/ProcessTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Symfony\Component\Process\Exception\LogicException;
use Symfony\Component\Process\Exception\ProcessTimedOutException;
use Symfony\Component\Process\Exception\RuntimeException;
use Symfony\Component\Process\InputStream;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Pipes\PipesInterface;
use Symfony\Component\Process\Process;
Expand Down Expand Up @@ -1176,33 +1177,99 @@ public function provideVariousIncrementals() {

public function testIteratorInput()
{
$nextData = 'ping';
$input = function () use (&$nextData) {
while (false !== $nextData) {
yield $nextData;
yield $nextData = '';
}
$input = function () {
yield 'ping';
yield 'pong';
};
$input = $input();

$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'), null, null, $input());
$process->run();
$this->assertSame('pingpong', $process->getOutput());
}

public function testSimpleInputStream()
{
$input = new InputStream();

$process = new Process(self::$phpBin.' -r '.escapeshellarg('echo \'ping\'; stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input, &$nextData) {

$process->start(function ($type, $data) use ($input) {
if ('ping' === $data) {
$h = fopen('php://memory', 'r+');
fwrite($h, 'pong');
rewind($h);
$nextData = $h;
$input->next();
} else {
$nextData = false;
$input->write('pang');
} elseif (!$input->isClosed()) {
$input->write('pong');
$input->close();
}
});

$process->wait();
$this->assertSame('pingpangpong', $process->getOutput());
}

public function testInputStreamWithCallable()
{
$i = 0;
$stream = fopen('php://memory', 'w+');
$stream = function () use ($stream, &$i) {
if ($i < 3) {
rewind($stream);
fwrite($stream, ++$i);
rewind($stream);

return $stream;
}
};

$input = new InputStream();
$input->onEmpty($stream);
$input->write($stream());

$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input) {
$input->close();
});

$process->wait();
$this->assertSame('123', $process->getOutput());
}

public function testInputStreamWithGenerator()
{
$input = new InputStream();
$input->onEmpty(function ($input) {
yield 'pong';
$input->close();
});

$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
$process->start();
$input->write('ping');
$process->wait();
$this->assertSame('pingpong', $process->getOutput());
}

public function testInputStreamOnEmpty()
{
$i = 0;
$input = new InputStream();
$input->onEmpty(function () use (&$i) {++$i;});

$process = new Process(self::$phpBin.' -r '.escapeshellarg('echo 123; echo fread(STDIN, 1); echo 456;'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input) {
if ('123' === $data) {
$input->close();
}
});
$process->wait();

$this->assertSame(0, $i, 'InputStream->onEmpty callback should be called only when the input *becomes* empty');
$this->assertSame('123456', $process->getOutput());
}

/**
* @param string $commandline
* @param null|string $cwd
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.