Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Process] Add support for Fiber #43678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 7.3
Choose a base branch
Loading
from
Open

Conversation

lyrixx
Copy link
Member

@lyrixx lyrixx commented Oct 23, 2021

Q A
Branch? 7.1
Bug fix? no
New feature? yes
Deprecations? no
Tickets
License MIT
Doc PR

Example:

(Note: The loop is very naive, but it's for demo purpose)

<?php

use Symfony\Component\Process\Process;

require __DIR__ . '/vendor/autoload.php';

function logg(string $message, string ...$params)
{
    dump(sprintf($message, ...$params));
}

class Loop
{
    public function __construct(
        private array $fibers
    ) {
    }

    public function run()
    {
        foreach ($this->fibers as $k => $fiber) {
            logg('Fiber %d started', $k);
            $fiber->start();
        }

        while ($this->fibers) {
            foreach ($this->fibers as $k => $fiber) {
                if ($fiber->isTerminated()) {
                    logg('Fiber %d finished', $k);
                    unset($this->fibers[$k]);
                    continue;
                }

                $fiber->resume();
            }

            usleep(100_000);
        }
    }
}

// // Sync
// $process = new Process(['sleep', 1]);
// $process->mustRun();

// Async
$fiber1 = new Fiber(function () {
    $process = new Process(['sleep', 1]);
    $process->mustRun();
});
$fiber2 = new Fiber(function () {
    $process = new Process(['sleep', 2]);
    $process->mustRun();
});
$fiber3 = new Fiber(function () {
    $process = new Process(['sleep', 3]);
    $process->mustRun();
});

// dump($fiber1);die();


$loop = new Loop([$fiber3, $fiber2, $fiber1]);
$loop->run();
process-fiber.mp4

@nicolas-grekas

This comment has been minimized.

@lyrixx

This comment has been minimized.

@b-viguier
Copy link

Glad to see that you consider using Fibers! 🤗
Just some thoughts about this:

  • What if the fiber is never resumed?
  • What if the fiber is resumed with throw?
  • What if the fiber is resumed without calling usleep in the meantime? (To be honest, I'm not sure Process actually needs this…)

My point is that it could be interesting to define a contract about "how to use safely a SF Process in a Fiber".
A part of this contract could also be transmitted via the suspend function, a kind of promise to be resumed in n microseconds, or when a goal is reached… 🤷

I have no particular idea about this, I'm not sure if Symfony needs a complete async tools set, but I'm just wondering if this suspend can be an open door to wrong usages.

Anyway, this PR has my full attention 😉👍

@lyrixx
Copy link
Member Author

lyrixx commented Oct 23, 2021

Glad to see that you consider using Fibers!

Thanks a lot for your feedback

  • What if the fiber is never resumed?

The process will last, but it's exactly the same as when you write the follow code

$p = new Process['sleep', '10'];
$p->start();
// Nothing here

So IMHO, nothing has to be done here

  • What if the fiber is resumed with throw?

What would be the use case to resume it with throw?

  • What if the fiber is resumed without calling usleep in the meantime? (To be honest, I'm not sure Process actually needs this…)

Indeed, I was not sure about the use case, but actually, it eases the async handling of process.

My point is that it could be interesting to define a contract about "how to use safely a SF Process in a Fiber".

I totally agree with that 👍🏼, and more generally in Symfony

I'm just wondering if this suspend can be an open door to wrong usages.

Yes, it can. I said that, because we keeps reading issue where people do really strange things :) I guess it's how OSS works :)

@b-viguier
Copy link

The process will last, but it's exactly the same as when you write the follow code [...]
So IMHO, nothing has to be done here

👍

What would be the use case to resume it with throw?

Since it's possible, we can expect that someone will try 😅.
But it could a way to stop a too long process (timeout), or to cancel it for other external reasons (too much opened fibers, too much memory... 🤷‍♂️)

Yes, it can. I said that, because we keeps reading issue where people do really strange things :) I guess it's how OSS works :)

😅😉

@fabpot fabpot modified the milestones: 5.4, 6.1 Oct 29, 2021
@fabpot
Copy link
Member

fabpot commented Mar 26, 2022

What's the status here? @lyrixx?

@lyrixx
Copy link
Member Author

lyrixx commented Mar 28, 2022

I think there is consensus yet on how to add fibrer support in Symfony. Let's close it

But if someone want it, we could also merge it.

@lyrixx lyrixx closed this Mar 28, 2022
@lyrixx lyrixx deleted the process-fiber branch February 27, 2024 12:57
@soyuka
Copy link
Contributor

soyuka commented Feb 27, 2024

mhh wait why is this not merged?

@lyrixx
Copy link
Member Author

lyrixx commented Feb 27, 2024

Wep, too bad. People are not ready for that!

And we need that in castor

cc @joelwurtz

@fabpot
Copy link
Member

fabpot commented Feb 28, 2024

@lyrixx Not sure how you came to that conclusion as you were the one closing this PR.

@lyrixx lyrixx restored the process-fiber branch February 28, 2024 07:32
@lyrixx
Copy link
Member Author

lyrixx commented Feb 28, 2024

Let's re-open the PR 👍🏼 (rebased, and updated)

@lyrixx lyrixx changed the base branch from 6.1 to 7.1 February 28, 2024 07:36
Comment on lines +434 to +437
$startedAt = microtime(true);
$fiber->suspend();
$sleepFor = (int) (1000 - (microtime(true) - $startedAt) * 1000000);
if (0 < $sleepFor) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same below)

Suggested change
$startedAt = microtime(true);
$fiber->suspend();
$sleepFor = (int) (1000 - (microtime(true) - $startedAt) * 1000000);
if (0 < $sleepFor) {
$suspendedAt = microtime(true);
$fiber->suspend();
if (0 < $sleepFor = (int) (1000 - (microtime(true) - $suspendedAt) * 1000000)) {

But I would also make readPipe return true/false whether the process did yield some activity or not. This way, we won't force a sleep while there are things to do.

@joelwurtz
Copy link
Contributor

joelwurtz commented Feb 28, 2024

After playing a bit with the process and fiber, i think there is no need to add fiber support in this class, the only things the process class need would be getter for the stdout and stderr stream.

Here is an example on how it may work (i use closure binding to get the streams) :

require 'vendor/autoload.php';

$pid = null;
$running = true;

// listen for sigchld
pcntl_signal(SIGCHLD, function ($signo, $data) use (&$pid, &$running) {
    if ($pid === $data['pid']) {
        $running = false;
    }
});

pcntl_async_signals(true);

$fiber = new Fiber(function () {
    $process = \Symfony\Component\Process\Process::fromShellCommandline('echo -n "test"; echo -n "bar" 1>&2 ; sleep 2');

    // bind a function to get the output stream
    $getStdout = Closure::bind(function () {
        return $this->stdout;
    }, $process, $process);
    $getStderr = Closure::bind(function () {
        return $this->stderr;
    }, $process, $process);

    $process->start();

    $stdout = $getStdout();
    $stderr = $getStderr();

    stream_set_blocking($stdout, 0);
    stream_set_blocking($stderr, 0);

    while ($process->isRunning()) {
        $action = \Fiber::suspend([$stdout, $stderr]);

        if ($action === 'stdout') {
            $content = $process->getIncrementalOutput();

            if ($content !== '') {
                var_dump('stdout : ' . $content);
            }
        }

        if ($action === 'stderr') {
            $content = $process->getIncrementalErrorOutput();

            if ($content !== '') {
                var_dump('stderr : ' . $content);
            }
        }
    }

    return $process->getExitCode();
});

[$stdout, $stderr] = $fiber->start();
$streams = [$stdout, $stderr];

while (!$fiber->isTerminated()) {
    $write = null;
    $except = null;

    $read = $streams;
    $modified = stream_select($read, $write, $except, 0, 100_000);

    if ($modified === false) {
        break;
    }

    if ($modified > 0) {
        foreach ($read as $stream) {
            if ($fiber->isTerminated()) {
                break;
            }

            if ($stream === $stdout) {
                $fiber->resume('stdout');
            }

            if ($stream === $stderr) {
                $fiber->resume('stderr');
            }
        }
    }
}

var_dump('exit code : ' . $fiber->getReturn());

There may be some adjustement to do in the process class, but checking for fibers may not be needed IMHO

Also this totally remove sleep inside the process, (it may sleep but it depends only on the event loop implementation)

Exposing those streams make it be compatible with Revolt also, and i believe it would be better to provider a helper around the Process class that make it compatible with revolt ?

@joelwurtz
Copy link
Contributor

joelwurtz commented Feb 28, 2024

Didn't see this was in the wait function, it can make sense for it for existing library / app that use this method.

Part of this example can be reuse, like instead of doing a sleep it could do a stream_select with a timeout so it wakes up as soon as there is something to read / write or use the SIGCHLD signal to wake up also when the process has stopped (only with pcntl extension) ?

@@ -429,12 +429,31 @@ public function wait(?callable $callback = null): int
do {
$this->checkTimeout();
$running = $this->isRunning() && ('\\' === \DIRECTORY_SEPARATOR || $this->processPipes->areOpen());
$this->readPipes($running, '\\' !== \DIRECTORY_SEPARATOR || !$running);
if ($fiber = \Fiber::getCurrent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there should be an additional flag to enable this support otherwise it would be a bc break for people already using this method in a Fiber

$fiber->suspend();
$sleepFor = (int) (1000 - (microtime(true) - $startedAt) * 1000000);
if (0 < $sleepFor) {
usleep($sleepFor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it could use stream_select on the pipes instead of usleep ? so it would directly read / write if there is something to do instead of sleeping ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do use stream_select already so I guess we added this to address a situation where stream_select doesn't apply. git blame could help remember :)

@lyrixx lyrixx modified the milestones: 6.1, 7.1 Mar 11, 2024
@xabbuh xabbuh modified the milestones: 7.1, 7.2 May 15, 2024
@fabpot fabpot modified the milestones: 7.2, 7.3 Nov 20, 2024
@fabpot
Copy link
Member

fabpot commented Mar 28, 2025

Is it still relevant?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Morty Proxy This is a proxified and sanitized view of the page, visit original site.