Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Jan 26, 2026. It is now read-only.

Latest commit

 

History

History
History
148 lines (128 loc) · 4.36 KB

File metadata and controls

148 lines (128 loc) · 4.36 KB
Copy raw file
Download raw file
Open symbols panel
Edit and raw actions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// SPDX-License-Identifier: BSD-3-Clause
/*
Creation/destruction of Deferreds.
Implementation of worker loop processing deferred objects.
This worker loop is executed in a separate thread until the system
gets shut down.
*/
#include "include/sharpy/Deferred.hpp"
#include "include/sharpy/Mediator.hpp"
#include "include/sharpy/Registry.hpp"
#include "include/sharpy/Service.hpp"
#include "include/sharpy/Transceiver.hpp"
#include "include/sharpy/itac.hpp"
#include "include/sharpy/jit/mlir.hpp"
#include <oneapi/tbb/concurrent_queue.h>
#include <pybind11/pybind11.h>
namespace py = pybind11;
#include <iostream>
namespace SHARPY {
// thread-safe FIFO queue holding deferred objects
extern tbb::concurrent_bounded_queue<Runable::ptr_type> _deferred;
// if needed, object/promise is broadcasted to worker processes
// (for controller/worker mode)
void _dist(const Runable *p) {
if (getTransceiver() && getTransceiver()->is_cw() &&
getTransceiver()->rank() == 0)
getMediator()->to_workers(p);
}
// create a enriched future
Deferred::future_type Deferred::get_future() {
return {promise_type::get_future().share(),
_guid,
_dtype,
_shape,
_device,
_team};
}
// defer a array-producing computation by adding it to the queue.
// return a future for the resulting array.
// set is_global to false if result is a local temporary which does not need a
// guid
Deferred::future_type defer_array(Runable::ptr_type &&_d, bool is_global) {
Deferred *d = dynamic_cast<Deferred *>(_d.get());
if (!d)
throw std::invalid_argument("Expected Deferred Array promise");
if (is_global) {
_dist(d);
if (d->guid() == Registry::NOGUID) {
d->set_guid(Registry::get_guid());
}
}
auto f = d->get_future();
Registry::put(f);
push_runable(std::move(_d));
return f;
}
// defer a global array producer
void Deferred::defer(Runable::ptr_type &&p) { defer_array(std::move(p), true); }
void Runable::defer(Runable::ptr_type &&p) { push_runable(std::move(p)); }
void Runable::fini() { _deferred.clear(); }
// process promises as they arrive through calls to defer
// This is run in a separate thread until shutdown is requested.
// Shutdown is indicated by a Deferred object which evaluates to false.
// The loop repeatedly creates MLIR functions for jit-compilation by letting
// Deferred objects add their MLIR code until an object can not produce MLIR
// but wants immediate execution (indicated by generate_mlir returning true).
// When execution is needed, the function signature (input args, return
// statement) is finalized, the function gets compiled and executed. The loop
// completes by calling run() on the requesting object.
void process_promises(const std::string &libidtr) {
int vtProcessSym, vtSHARPYClass, vtPopSym;
VT(VT_classdef, "sharpy", &vtSHARPYClass);
VT(VT_funcdef, "process", vtSHARPYClass, &vtProcessSym);
VT(VT_funcdef, "pop", vtSHARPYClass, &vtPopSym);
VT(VT_begin, vtProcessSym);
bool done = false;
jit::JIT jit(libidtr);
std::vector<Runable::ptr_type> deleters;
do {
// we need to keep runables/deferred/futures alive until we set their values
// below
std::vector<Runable::ptr_type> runables;
jit::DepManager dm(jit);
auto &builder = dm.getBuilder();
auto loc = builder.getUnknownLoc();
Runable::ptr_type d;
if (!deleters.empty()) {
for (auto &dl : deleters) {
if (dl->generate_mlir(builder, loc, dm)) {
assert(!"deleters must generate MLIR");
}
runables.emplace_back(std::move(dl));
}
deleters.clear();
} else {
while (true) {
VT(VT_begin, vtPopSym);
_deferred.pop(d);
VT(VT_end, vtPopSym);
if (d) {
if (d->isDeleter()) {
deleters.emplace_back(std::move(d));
} else {
if (d->generate_mlir(builder, loc, dm)) {
break;
};
// keep alive for later set_value
runables.emplace_back(std::move(d));
}
} else {
// signals system shutdown
done = true;
break;
}
}
}
if (!runables.empty()) {
dm.finalizeAndRun();
} // no else needed
// now we execute the deferred action which could not be compiled
if (d) {
py::gil_scoped_acquire acquire;
d->run();
d.reset();
}
} while (!done);
}
} // namespace SHARPY
Morty Proxy This is a proxified and sanitized view of the page, visit original site.