Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 8e1698a

Browse filesBrowse files
committed
worker: allow transferring/cloning generic BaseObjects
Extend support for transferring objects à la `MessagePort` to other types of `BaseObject` subclasses, as well as implement cloning support for cases in which destructive transferring is not needed or optional. PR-URL: #33772 Backport-PR-URL: #33965 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent 667d520 commit 8e1698a
Copy full SHA for 8e1698a

File tree

Expand file treeCollapse file tree

5 files changed

+194
-56
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+194
-56
lines changed
Open diff view settings
Collapse file

‎doc/api/errors.md‎

Copy file name to clipboardExpand all lines: doc/api/errors.md
+3-2Lines changed: 3 additions & 2 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1570,8 +1570,9 @@ is thrown if a required option is missing.
15701570
<a id="ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST"></a>
15711571
### `ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`
15721572

1573-
A `MessagePort` was found in the object passed to a `postMessage()` call,
1574-
but not provided in the `transferList` for that call.
1573+
An object that needs to be explicitly listed in the `transferList` argument
1574+
was found in the object passed to a `postMessage()` call, but not provided in
1575+
the `transferList` for that call. Usually, this is a `MessagePort`.
15751576

15761577
<a id="ERR_MISSING_PASSPHRASE"></a>
15771578
### `ERR_MISSING_PASSPHRASE`
Collapse file

‎src/base_object.h‎

Copy file name to clipboardExpand all lines: src/base_object.h
+37-1Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ class Environment;
3434
template <typename T, bool kIsWeak>
3535
class BaseObjectPtrImpl;
3636

37+
namespace worker {
38+
class TransferData;
39+
}
40+
3741
class BaseObject : public MemoryRetainer {
3842
public:
3943
enum InternalFields { kSlot, kInternalFieldCount };
@@ -101,7 +105,39 @@ class BaseObject : public MemoryRetainer {
101105
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(
102106
Environment* env);
103107

104-
protected:
108+
// Interface for transferring BaseObject instances using the .postMessage()
109+
// method of MessagePorts (and, by extension, Workers).
110+
// GetTransferMode() returns a transfer mode that indicates how to deal with
111+
// the current object:
112+
// - kUntransferable:
113+
// No transfer is possible, either because this type of BaseObject does
114+
// not know how to be transfered, or because it is not in a state in
115+
// which it is possible to do so (e.g. because it has already been
116+
// transfered).
117+
// - kTransferable:
118+
// This object can be transfered in a destructive fashion, i.e. will be
119+
// rendered unusable on the sending side of the channel in the process
120+
// of being transfered. (In C++ this would be referred to as movable but
121+
// not copyable.) Objects of this type need to be listed in the
122+
// `transferList` argument of the relevant postMessage() call in order to
123+
// make sure that they are not accidentally destroyed on the sending side.
124+
// TransferForMessaging() will be called to get a representation of the
125+
// object that is used for subsequent deserialization.
126+
// - kCloneable:
127+
// This object can be cloned without being modified.
128+
// CloneForMessaging() will be called to get a representation of the
129+
// object that is used for subsequent deserialization, unless the
130+
// object is listed in transferList, in which case TransferForMessaging()
131+
// is attempted first.
132+
enum class TransferMode {
133+
kUntransferable,
134+
kTransferable,
135+
kCloneable
136+
};
137+
virtual TransferMode GetTransferMode() const;
138+
virtual std::unique_ptr<worker::TransferData> TransferForMessaging();
139+
virtual std::unique_ptr<worker::TransferData> CloneForMessaging() const;
140+
105141
virtual inline void OnGCCollect();
106142

107143
private:
Collapse file

‎src/node_messaging.cc‎

Copy file name to clipboardExpand all lines: src/node_messaging.cc
+117-47Lines changed: 117 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,19 @@ using v8::ValueSerializer;
3636
using v8::WasmModuleObject;
3737

3838
namespace node {
39+
40+
BaseObject::TransferMode BaseObject::GetTransferMode() const {
41+
return BaseObject::TransferMode::kUntransferable;
42+
}
43+
44+
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
45+
return CloneForMessaging();
46+
}
47+
48+
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
49+
return {};
50+
}
51+
3952
namespace worker {
4053

4154
Message::Message(MallocedBuffer<char>&& buffer)
@@ -54,21 +67,20 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
5467
DeserializerDelegate(
5568
Message* m,
5669
Environment* env,
57-
const std::vector<MessagePort*>& message_ports,
70+
const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
5871
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
5972
const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules)
60-
: message_ports_(message_ports),
73+
: host_objects_(host_objects),
6174
shared_array_buffers_(shared_array_buffers),
6275
wasm_modules_(wasm_modules) {}
6376

6477
MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
65-
// Currently, only MessagePort hosts objects are supported, so identifying
66-
// by the index in the message's MessagePort array is sufficient.
78+
// Identifying the index in the message's BaseObject array is sufficient.
6779
uint32_t id;
6880
if (!deserializer->ReadUint32(&id))
6981
return MaybeLocal<Object>();
70-
CHECK_LE(id, message_ports_.size());
71-
return message_ports_[id]->object(isolate);
82+
CHECK_LE(id, host_objects_.size());
83+
return host_objects_[id]->object(isolate);
7284
}
7385

7486
MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
@@ -87,7 +99,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
8799
ValueDeserializer* deserializer = nullptr;
88100

89101
private:
90-
const std::vector<MessagePort*>& message_ports_;
102+
const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
91103
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
92104
const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules_;
93105
};
@@ -101,22 +113,25 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
101113
EscapableHandleScope handle_scope(env->isolate());
102114
Context::Scope context_scope(context);
103115

104-
// Create all necessary MessagePort handles.
105-
std::vector<MessagePort*> ports(message_ports_.size());
106-
for (uint32_t i = 0; i < message_ports_.size(); ++i) {
107-
ports[i] = MessagePort::New(env,
108-
context,
109-
std::move(message_ports_[i]));
110-
if (ports[i] == nullptr) {
111-
for (MessagePort* port : ports) {
112-
// This will eventually release the MessagePort object itself.
113-
if (port != nullptr)
114-
port->Close();
116+
// Create all necessary objects for transferables, e.g. MessagePort handles.
117+
std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
118+
for (uint32_t i = 0; i < transferables_.size(); ++i) {
119+
TransferData* data = transferables_[i].get();
120+
host_objects[i] = data->Deserialize(
121+
env, context, std::move(transferables_[i]));
122+
if (!host_objects[i]) {
123+
for (BaseObjectPtr<BaseObject> object : host_objects) {
124+
if (!object) continue;
125+
126+
// Since creating one of the objects failed, we don't want to have the
127+
// other objects lying around in memory. We act as if the object has
128+
// been garbage-collected.
129+
object->Detach();
115130
}
116131
return MaybeLocal<Value>();
117132
}
118133
}
119-
message_ports_.clear();
134+
transferables_.clear();
120135

121136
std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
122137
// Attach all transferred SharedArrayBuffers to their new Isolate.
@@ -130,7 +145,7 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
130145
shared_array_buffers_.clear();
131146

132147
DeserializerDelegate delegate(
133-
this, env, ports, shared_array_buffers, wasm_modules_);
148+
this, env, host_objects, shared_array_buffers, wasm_modules_);
134149
ValueDeserializer deserializer(
135150
env->isolate(),
136151
reinterpret_cast<const uint8_t*>(main_message_buf_.data),
@@ -175,8 +190,8 @@ void Message::AddSharedArrayBuffer(
175190
shared_array_buffers_.push_back(reference);
176191
}
177192

178-
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
179-
message_ports_.emplace_back(std::move(data));
193+
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
194+
transferables_.emplace_back(std::move(data));
180195
}
181196

182197
uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {
@@ -242,8 +257,8 @@ class SerializerDelegate : public ValueSerializer::Delegate {
242257
}
243258

244259
Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
245-
if (env_->message_port_constructor_template()->HasInstance(object)) {
246-
return WriteMessagePort(Unwrap<MessagePort>(object));
260+
if (env_->base_object_ctor_template()->HasInstance(object)) {
261+
return WriteHostObject(Unwrap<BaseObject>(object));
247262
}
248263

249264
ThrowDataCloneError(env_->clone_unsupported_type_str());
@@ -282,32 +297,61 @@ class SerializerDelegate : public ValueSerializer::Delegate {
282297
void Finish() {
283298
// Only close the MessagePort handles and actually transfer them
284299
// once we know that serialization succeeded.
285-
for (MessagePort* port : ports_) {
286-
port->Close();
287-
msg_->AddMessagePort(port->Detach());
300+
for (uint32_t i = 0; i < host_objects_.size(); i++) {
301+
BaseObject* host_object = host_objects_[i];
302+
std::unique_ptr<TransferData> data;
303+
if (i < first_cloned_object_index_)
304+
data = host_object->TransferForMessaging();
305+
if (!data)
306+
data = host_object->CloneForMessaging();
307+
CHECK(data);
308+
msg_->AddTransferable(std::move(data));
288309
}
289310
}
290311

312+
inline void AddHostObject(BaseObject* host_object) {
313+
// Make sure we have not started serializing the value itself yet.
314+
CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
315+
host_objects_.push_back(host_object);
316+
}
317+
291318
ValueSerializer* serializer = nullptr;
292319

293320
private:
294-
Maybe<bool> WriteMessagePort(MessagePort* port) {
295-
for (uint32_t i = 0; i < ports_.size(); i++) {
296-
if (ports_[i] == port) {
321+
Maybe<bool> WriteHostObject(BaseObject* host_object) {
322+
for (uint32_t i = 0; i < host_objects_.size(); i++) {
323+
if (host_objects_[i] == host_object) {
297324
serializer->WriteUint32(i);
298325
return Just(true);
299326
}
300327
}
301328

302-
THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
303-
return Nothing<bool>();
329+
BaseObject::TransferMode mode = host_object->GetTransferMode();
330+
if (mode == BaseObject::TransferMode::kUntransferable) {
331+
ThrowDataCloneError(env_->clone_unsupported_type_str());
332+
return Nothing<bool>();
333+
} else if (mode == BaseObject::TransferMode::kTransferable) {
334+
// TODO(addaleax): This message code is too specific. Fix that in a
335+
// semver-major follow-up.
336+
THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
337+
return Nothing<bool>();
338+
}
339+
340+
CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
341+
uint32_t index = host_objects_.size();
342+
if (first_cloned_object_index_ == SIZE_MAX)
343+
first_cloned_object_index_ = index;
344+
serializer->WriteUint32(index);
345+
host_objects_.push_back(host_object);
346+
return Just(true);
304347
}
305348

306349
Environment* env_;
307350
Local<Context> context_;
308351
Message* msg_;
309352
std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
310-
std::vector<MessagePort*> ports_;
353+
std::vector<BaseObject*> host_objects_;
354+
size_t first_cloned_object_index_ = SIZE_MAX;
311355

312356
friend class worker::Message;
313357
};
@@ -366,8 +410,7 @@ Maybe<bool> Message::Serialize(Environment* env,
366410
array_buffers.push_back(ab);
367411
serializer.TransferArrayBuffer(id, ab);
368412
continue;
369-
} else if (env->message_port_constructor_template()
370-
->HasInstance(entry)) {
413+
} else if (env->base_object_ctor_template()->HasInstance(entry)) {
371414
// Check if the source MessagePort is being transferred.
372415
if (!source_port.IsEmpty() && entry == source_port) {
373416
ThrowDataCloneException(
@@ -376,26 +419,34 @@ Maybe<bool> Message::Serialize(Environment* env,
376419
"Transfer list contains source port"));
377420
return Nothing<bool>();
378421
}
379-
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
380-
if (port == nullptr || port->IsDetached()) {
422+
BaseObject* host_object = Unwrap<BaseObject>(entry.As<Object>());
423+
if (env->message_port_constructor_template()->HasInstance(entry) &&
424+
(host_object == nullptr ||
425+
static_cast<MessagePort*>(host_object)->IsDetached())) {
381426
ThrowDataCloneException(
382427
context,
383428
FIXED_ONE_BYTE_STRING(
384429
env->isolate(),
385430
"MessagePort in transfer list is already detached"));
386431
return Nothing<bool>();
387432
}
388-
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
389-
delegate.ports_.end()) {
433+
if (std::find(delegate.host_objects_.begin(),
434+
delegate.host_objects_.end(),
435+
host_object) != delegate.host_objects_.end()) {
390436
ThrowDataCloneException(
391437
context,
392-
FIXED_ONE_BYTE_STRING(
393-
env->isolate(),
394-
"Transfer list contains duplicate MessagePort"));
438+
String::Concat(env->isolate(),
439+
FIXED_ONE_BYTE_STRING(
440+
env->isolate(),
441+
"Transfer list contains duplicate "),
442+
entry.As<Object>()->GetConstructorName()));
395443
return Nothing<bool>();
396444
}
397-
delegate.ports_.push_back(port);
398-
continue;
445+
if (host_object != nullptr && host_object->GetTransferMode() !=
446+
BaseObject::TransferMode::kUntransferable) {
447+
delegate.AddHostObject(host_object);
448+
continue;
449+
}
399450
}
400451

401452
THROW_ERR_INVALID_TRANSFER_OBJECT(env);
@@ -436,7 +487,7 @@ void Message::MemoryInfo(MemoryTracker* tracker) const {
436487
tracker->TrackField("array_buffer_contents", array_buffer_contents_);
437488
tracker->TrackFieldWithSize("shared_array_buffers",
438489
shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
439-
tracker->TrackField("message_ports", message_ports_);
490+
tracker->TrackField("transferables", transferables_);
440491
}
441492

442493
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
@@ -702,6 +753,25 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {
702753
return std::move(data_);
703754
}
704755

756+
BaseObject::TransferMode MessagePort::GetTransferMode() const {
757+
if (IsDetached())
758+
return BaseObject::TransferMode::kUntransferable;
759+
return BaseObject::TransferMode::kTransferable;
760+
}
761+
762+
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
763+
Close();
764+
return Detach();
765+
}
766+
767+
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
768+
Environment* env,
769+
Local<Context> context,
770+
std::unique_ptr<TransferData> self) {
771+
return BaseObjectPtr<MessagePort> { MessagePort::New(
772+
env, context,
773+
static_unique_pointer_cast<MessagePortData>(std::move(self))) };
774+
}
705775

706776
Maybe<bool> MessagePort::PostMessage(Environment* env,
707777
Local<Value> message_v,
@@ -729,8 +799,8 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
729799

730800
// Check if the target port is posted to itself.
731801
if (data_->sibling_ != nullptr) {
732-
for (const auto& port_data : msg.message_ports()) {
733-
if (data_->sibling_ == port_data.get()) {
802+
for (const auto& transferable : msg.transferables()) {
803+
if (data_->sibling_ == transferable.get()) {
734804
doomed = true;
735805
ProcessEmitWarning(env, "The target port was posted to itself, and "
736806
"the communication channel was lost");

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.