Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 0bfd3d3

Browse filesBrowse files
committed
Streamlining server event streaming
1 parent 7e9ecaf commit 0bfd3d3
Copy full SHA for 0bfd3d3

File tree

4 files changed

+385
-0
lines changed
Filter options

4 files changed

+385
-0
lines changed

‎src/Symfony/Component/HttpFoundation/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/HttpFoundation/CHANGELOG.md
+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add support for iterable of string in `StreamedResponse`
8+
* Add `EventStreamResponse` and `ServerEvent` classes to streamline server event streaming
89

910
7.2
1011
---
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpFoundation;
13+
14+
/**
15+
* Represents a streaming HTTP response for sending server events
16+
* as part of the Server-Sent Events (SSE) streaming technique.
17+
*
18+
* To broadcast events to multiple users at once, for long-running
19+
* connections and for high-traffic websites, prefer using the Mercure
20+
* Symfony Component, which relies on Software designed for these use
21+
* cases: https://symfony.com/doc/current/mercure.html
22+
*
23+
* @see ServerEvent
24+
*
25+
* @author Yonel Ceruto <open@yceruto.dev>
26+
*
27+
* Example usage:
28+
*
29+
* return new EventStreamResponse(function () {
30+
* yield new ServerEvent(time());
31+
*
32+
* sleep(1);
33+
*
34+
* yield new ServerEvent(time());
35+
* });
36+
*/
37+
class EventStreamResponse extends StreamedResponse
38+
{
39+
/**
40+
* @param int|null $retry The number of milliseconds the client should wait
41+
* before reconnecting in case of network failure
42+
*/
43+
public function __construct(?callable $callback = null, int $status = 200, array $headers = [], private ?int $retry = null)
44+
{
45+
$headers += [
46+
'Connection' => 'keep-alive',
47+
'Content-Type' => 'text/event-stream',
48+
'Cache-Control' => 'private, no-cache, no-store, must-revalidate, max-age=0',
49+
'X-Accel-Buffering' => 'no',
50+
'Pragma' => 'no-cache',
51+
'Expire' => '0',
52+
];
53+
54+
parent::__construct($callback, $status, $headers);
55+
}
56+
57+
public function setCallback(callable $callback): static
58+
{
59+
if ($this->callback) {
60+
return parent::setCallback($callback);
61+
}
62+
63+
$this->callback = function () use ($callback) {
64+
if (is_iterable($events = $callback($this))) {
65+
foreach ($events as $event) {
66+
$this->sendEvent($event);
67+
68+
if (connection_aborted()) {
69+
break;
70+
}
71+
}
72+
}
73+
};
74+
75+
return $this;
76+
}
77+
78+
/**
79+
* Sends a server event to the client.
80+
*
81+
* @return $this
82+
*/
83+
public function sendEvent(ServerEvent $event): static
84+
{
85+
if ($this->retry > 0 && !$event->getRetry()) {
86+
$event->setRetry($this->retry);
87+
}
88+
89+
foreach ($event as $part) {
90+
echo $part;
91+
92+
if (!\in_array(\PHP_SAPI, ['cli', 'phpdbg', 'embed'], true)) {
93+
static::closeOutputBuffers(0, true);
94+
flush();
95+
}
96+
}
97+
98+
return $this;
99+
}
100+
101+
public function getRetry(): ?int
102+
{
103+
return $this->retry;
104+
}
105+
106+
public function setRetry(int $retry): void
107+
{
108+
$this->retry = $retry;
109+
}
110+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpFoundation;
13+
14+
/**
15+
* An event generated on the server intended for streaming to the client
16+
* as part of the SSE streaming technique.
17+
*
18+
* @implements \IteratorAggregate<string>
19+
*
20+
* @author Yonel Ceruto <open@yceruto.dev>
21+
*/
22+
class ServerEvent implements \IteratorAggregate
23+
{
24+
/**
25+
* @param string|iterable<string> $data The event data field for the message
26+
* @param string|null $type The event type
27+
* @param int|null $retry The number of milliseconds the client should wait
28+
* before reconnecting in case of network failure
29+
* @param string|null $id The event ID to set the EventSource object's last event ID value
30+
* @param string|null $comment The event comment
31+
*/
32+
public function __construct(
33+
private string|iterable $data,
34+
private ?string $type = null,
35+
private ?int $retry = null,
36+
private ?string $id = null,
37+
private ?string $comment = null,
38+
) {
39+
}
40+
41+
public function getData(): iterable|string
42+
{
43+
return $this->data;
44+
}
45+
46+
/**
47+
* @return $this
48+
*/
49+
public function setData(iterable|string $data): static
50+
{
51+
$this->data = $data;
52+
53+
return $this;
54+
}
55+
56+
public function getType(): ?string
57+
{
58+
return $this->type;
59+
}
60+
61+
/**
62+
* @return $this
63+
*/
64+
public function setType(string $type): static
65+
{
66+
$this->type = $type;
67+
68+
return $this;
69+
}
70+
71+
public function getRetry(): ?int
72+
{
73+
return $this->retry;
74+
}
75+
76+
/**
77+
* @return $this
78+
*/
79+
public function setRetry(?int $retry): static
80+
{
81+
$this->retry = $retry;
82+
83+
return $this;
84+
}
85+
86+
public function getId(): ?string
87+
{
88+
return $this->id;
89+
}
90+
91+
/**
92+
* @return $this
93+
*/
94+
public function setId(string $id): static
95+
{
96+
$this->id = $id;
97+
98+
return $this;
99+
}
100+
101+
public function getComment(): ?string
102+
{
103+
return $this->comment;
104+
}
105+
106+
public function setComment(string $comment): static
107+
{
108+
$this->comment = $comment;
109+
110+
return $this;
111+
}
112+
113+
/**
114+
* @return \Traversable<string>
115+
*/
116+
public function getIterator(): \Traversable
117+
{
118+
static $lastRetry = null;
119+
120+
$head = '';
121+
if ($this->comment) {
122+
$head .= \sprintf(': %s', $this->comment)."\n";
123+
}
124+
if ($this->id) {
125+
$head .= \sprintf('id: %s', $this->id)."\n";
126+
}
127+
if ($this->retry > 0 && $this->retry !== $lastRetry) {
128+
$head .= \sprintf('retry: %s', $lastRetry = $this->retry)."\n";
129+
}
130+
if ($this->type) {
131+
$head .= \sprintf('event: %s', $this->type)."\n";
132+
}
133+
yield $head;
134+
135+
if ($this->data) {
136+
if (is_iterable($this->data)) {
137+
foreach ($this->data as $data) {
138+
yield \sprintf('data: %s', $data)."\n";
139+
}
140+
} else {
141+
yield \sprintf('data: %s', $this->data)."\n";
142+
}
143+
}
144+
145+
yield "\n";
146+
}
147+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpFoundation\Tests;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\HttpFoundation\EventStreamResponse;
16+
use Symfony\Component\HttpFoundation\ServerEvent;
17+
18+
class EventStreamResponseTest extends TestCase
19+
{
20+
public function testInitializationWithDefaultValues()
21+
{
22+
$response = new EventStreamResponse();
23+
24+
$this->assertSame('text/event-stream', $response->headers->get('content-type'));
25+
$this->assertSame('max-age=0, must-revalidate, no-cache, no-store, private', $response->headers->get('cache-control'));
26+
$this->assertSame('keep-alive', $response->headers->get('connection'));
27+
28+
$this->assertSame(200, $response->getStatusCode());
29+
$this->assertNull($response->getRetry());
30+
}
31+
32+
public function testStreamSingleEvent()
33+
{
34+
$response = new EventStreamResponse(function () {
35+
yield new ServerEvent(
36+
data: 'foo',
37+
type: 'bar',
38+
retry: 100,
39+
id: '1',
40+
comment: 'bla bla',
41+
);
42+
});
43+
44+
$expected = <<<STR
45+
: bla bla
46+
id: 1
47+
retry: 100
48+
event: bar
49+
data: foo
50+
51+
52+
STR;
53+
54+
$this->assertSameResponseContent($expected, $response);
55+
}
56+
57+
public function testStreamEventsAndData()
58+
{
59+
$data = static function (): iterable {
60+
yield 'first line';
61+
yield 'second line';
62+
yield 'third line';
63+
};
64+
65+
$response = new EventStreamResponse(function () use ($data) {
66+
yield new ServerEvent('single line');
67+
yield new ServerEvent(['first line', 'second line']);
68+
yield new ServerEvent($data());
69+
});
70+
71+
$expected = <<<STR
72+
data: single line
73+
74+
data: first line
75+
data: second line
76+
77+
data: first line
78+
data: second line
79+
data: third line
80+
81+
82+
STR;
83+
84+
$this->assertSameResponseContent($expected, $response);
85+
}
86+
87+
public function testStreamEventsWithRetryFallback()
88+
{
89+
$response = new EventStreamResponse(function () {
90+
yield new ServerEvent('foo');
91+
yield new ServerEvent('bar');
92+
yield new ServerEvent('baz', retry: 1000);
93+
}, retry: 1500);
94+
95+
$expected = <<<STR
96+
retry: 1500
97+
data: foo
98+
99+
data: bar
100+
101+
retry: 1000
102+
data: baz
103+
104+
105+
STR;
106+
107+
$this->assertSameResponseContent($expected, $response);
108+
}
109+
110+
public function testStreamEventWithSendMethod()
111+
{
112+
$response = new EventStreamResponse(function (EventStreamResponse $response) {
113+
$response->sendEvent(new ServerEvent('foo'));
114+
});
115+
116+
$this->assertSameResponseContent("data: foo\n\n", $response);
117+
}
118+
119+
private function assertSameResponseContent(string $expected, EventStreamResponse $response): void
120+
{
121+
ob_start();
122+
$response->send();
123+
$actual = ob_get_clean();
124+
125+
$this->assertSame($expected, $actual);
126+
}
127+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.