Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 67d4720

Browse filesBrowse files
gusinaciolutter
authored andcommitted
graph, store: Create database sql executor
1 parent 59d9d63 commit 67d4720
Copy full SHA for 67d4720

File tree

Expand file treeCollapse file tree

15 files changed

+1146
-10
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

15 files changed

+1146
-10
lines changed
Open diff view settings
Collapse file

‎Cargo.lock‎

Copy file name to clipboardExpand all lines: Cargo.lock
+14Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Collapse file

‎Cargo.toml‎

Copy file name to clipboardExpand all lines: Cargo.toml
+2-5Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,8 @@ serde_derive = "1.0.125"
8181
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
8282
serde_regex = "1.1.0"
8383
serde_yaml = "0.9.21"
84-
slog = { version = "2.7.0", features = [
85-
"release_max_level_trace",
86-
"max_level_trace",
87-
] }
88-
sqlparser = "0.46.0"
84+
slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] }
85+
sqlparser = { version = "0.46.0", features = ["visitor"] }
8986
strum = { version = "0.26", features = ["derive"] }
9087
syn = { version = "2.0.106", features = ["full"] }
9188
test-store = { path = "./store/test-store" }
Collapse file

‎graph/src/components/store/traits.rs‎

Copy file name to clipboardExpand all lines: graph/src/components/store/traits.rs
+3-1Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::components::transaction_receipt;
1616
use crate::components::versions::ApiVersion;
1717
use crate::data::query::Trace;
1818
use crate::data::store::ethereum::call;
19-
use crate::data::store::QueryObject;
19+
use crate::data::store::{QueryObject, SqlQueryObject};
2020
use crate::data::subgraph::{status, DeploymentFeatures};
2121
use crate::data::{query::QueryTarget, subgraph::schema::*};
2222
use crate::prelude::{DeploymentState, NodeId, QueryExecutionError, SubgraphName};
@@ -660,6 +660,8 @@ pub trait QueryStore: Send + Sync {
660660
query: EntityQuery,
661661
) -> Result<(Vec<QueryObject>, Trace), QueryExecutionError>;
662662

663+
fn execute_sql(&self, sql: &str) -> Result<Vec<SqlQueryObject>, QueryExecutionError>;
664+
663665
async fn is_deployment_synced(&self) -> Result<bool, Error>;
664666

665667
async fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError>;
Collapse file

‎graph/src/data/query/error.rs‎

Copy file name to clipboardExpand all lines: graph/src/data/query/error.rs
+4-1Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ pub enum QueryExecutionError {
7373
InvalidSubgraphManifest,
7474
ResultTooBig(usize, usize),
7575
DeploymentNotFound(String),
76+
SqlError(String),
7677
IdMissing,
7778
IdNotString,
7879
InternalError(String),
@@ -135,6 +136,7 @@ impl QueryExecutionError {
135136
| IdMissing
136137
| IdNotString
137138
| InternalError(_) => false,
139+
SqlError(_) => false,
138140
}
139141
}
140142
}
@@ -213,7 +215,7 @@ impl fmt::Display for QueryExecutionError {
213215
}
214216
InvalidFilterError => write!(f, "Filter must by an object"),
215217
InvalidOrFilterStructure(fields, example) => {
216-
write!(f, "Cannot mix column filters with 'or' operator at the same level. Found column filter(s) {} alongside 'or' operator.\n\n{}",
218+
write!(f, "Cannot mix column filters with 'or' operator at the same level. Found column filter(s) {} alongside 'or' operator.\n\n{}",
217219
fields.join(", "), example)
218220
}
219221
EntityFieldError(e, a) => {
@@ -281,6 +283,7 @@ impl fmt::Display for QueryExecutionError {
281283
IdMissing => write!(f, "entity is missing an `id` attribute"),
282284
IdNotString => write!(f, "entity `id` attribute is not a string"),
283285
InternalError(msg) => write!(f, "internal error: {}", msg),
286+
SqlError(e) => write!(f, "sql error: {}", e),
284287
}
285288
}
286289
}
Collapse file

‎graph/src/data/store/mod.rs‎

Copy file name to clipboardExpand all lines: graph/src/data/store/mod.rs
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,6 +1079,9 @@ pub struct QueryObject {
10791079
pub entity: r::Object,
10801080
}
10811081

1082+
/// An object that is returned from a SQL query. It wraps an `r::Value`
1083+
pub struct SqlQueryObject(pub r::Value);
1084+
10821085
impl CacheWeight for QueryObject {
10831086
fn indirect_weight(&self) -> usize {
10841087
self.parent.indirect_weight() + self.entity.indirect_weight()
Collapse file

‎store/postgres/Cargo.toml‎

Copy file name to clipboardExpand all lines: store/postgres/Cargo.toml
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ git-testament = "0.2.6"
3232
itertools = "0.14.0"
3333
hex = "0.4.3"
3434
pretty_assertions = "1.4.1"
35+
sqlparser = { workspace = true }
36+
thiserror = { workspace = true }
3537

3638
[dev-dependencies]
3739
clap.workspace = true
Collapse file

‎store/postgres/src/deployment_store.rs‎

Copy file name to clipboardExpand all lines: store/postgres/src/deployment_store.rs
+21-2Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use graph::components::store::{
1212
PruningStrategy, QueryPermit, StoredDynamicDataSource, VersionStats,
1313
};
1414
use graph::components::versions::VERSIONS;
15+
use graph::data::graphql::IntoValue;
1516
use graph::data::query::Trace;
16-
use graph::data::store::IdList;
17+
use graph::data::store::{IdList, SqlQueryObject};
1718
use graph::data::subgraph::{status, SPEC_VERSION_0_0_6};
1819
use graph::data_source::CausalityRegion;
1920
use graph::derive::CheapClone;
@@ -54,7 +55,7 @@ use crate::dynds::DataSourcesTable;
5455
use crate::primary::{DeploymentId, Primary};
5556
use crate::relational::index::{CreateIndex, IndexList, Method};
5657
use crate::relational::{self, Layout, LayoutCache, SqlName, Table};
57-
use crate::relational_queries::FromEntityData;
58+
use crate::relational_queries::{FromEntityData, JSONData};
5859
use crate::{advisory_lock, catalog, retry};
5960
use crate::{detail, ConnectionPool};
6061
use crate::{dynds, primary::Site};
@@ -290,6 +291,24 @@ impl DeploymentStore {
290291
layout.query(&logger, conn, query)
291292
}
292293

294+
pub(crate) fn execute_sql(
295+
&self,
296+
conn: &mut PgConnection,
297+
query: &str,
298+
) -> Result<Vec<SqlQueryObject>, QueryExecutionError> {
299+
let query = diesel::sql_query(query);
300+
301+
// Execute the provided SQL query
302+
let results = query
303+
.load::<JSONData>(conn)
304+
.map_err(|e| QueryExecutionError::SqlError(e.to_string()))?;
305+
306+
Ok(results
307+
.into_iter()
308+
.map(|e| SqlQueryObject(e.into_value()))
309+
.collect::<Vec<_>>())
310+
}
311+
293312
fn check_intf_uniqueness(
294313
&self,
295314
conn: &mut PgConnection,
Collapse file

‎store/postgres/src/lib.rs‎

Copy file name to clipboardExpand all lines: store/postgres/src/lib.rs
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod query_store;
3030
mod relational;
3131
mod relational_queries;
3232
mod retry;
33+
mod sql;
3334
mod store;
3435
mod store_events;
3536
mod subgraph_store;
Collapse file

‎store/postgres/src/query_store.rs‎

Copy file name to clipboardExpand all lines: store/postgres/src/query_store.rs
+26-1Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use std::collections::HashMap;
22
use std::time::Instant;
33

44
use crate::deployment_store::{DeploymentStore, ReplicaId};
5+
use crate::sql::Parser;
56
use graph::components::store::{DeploymentId, QueryPermit, QueryStore as QueryStoreTrait};
67
use graph::data::query::Trace;
7-
use graph::data::store::QueryObject;
8+
use graph::data::store::{QueryObject, SqlQueryObject};
89
use graph::prelude::*;
910
use graph::schema::{ApiSchema, InputSchema};
1011

@@ -16,6 +17,7 @@ pub(crate) struct QueryStore {
1617
store: Arc<DeploymentStore>,
1718
chain_store: Arc<crate::ChainStore>,
1819
api_version: Arc<ApiVersion>,
20+
sql_parser: Result<Parser, StoreError>,
1921
}
2022

2123
impl QueryStore {
@@ -26,12 +28,16 @@ impl QueryStore {
2628
replica_id: ReplicaId,
2729
api_version: Arc<ApiVersion>,
2830
) -> Self {
31+
let sql_parser = store
32+
.find_layout(site.clone())
33+
.map(|layout| Parser::new(layout));
2934
QueryStore {
3035
site,
3136
replica_id,
3237
store,
3338
chain_store,
3439
api_version,
40+
sql_parser,
3541
}
3642
}
3743
}
@@ -57,6 +63,25 @@ impl QueryStoreTrait for QueryStore {
5763
})
5864
}
5965

66+
fn execute_sql(
67+
&self,
68+
sql: &str,
69+
) -> Result<Vec<SqlQueryObject>, graph::prelude::QueryExecutionError> {
70+
let mut conn = self
71+
.store
72+
.get_replica_conn(self.replica_id)
73+
.map_err(|e| QueryExecutionError::SqlError(format!("SQL error: {}", e)))?;
74+
75+
let parser = self
76+
.sql_parser
77+
.as_ref()
78+
.map_err(|e| QueryExecutionError::SqlError(format!("SQL error: {}", e)))?;
79+
80+
let sql = parser.parse_and_validate(sql)?;
81+
82+
self.store.execute_sql(&mut conn, &sql)
83+
}
84+
6085
/// Return true if the deployment with the given id is fully synced,
6186
/// and return false otherwise. Errors from the store are passed back up
6287
async fn is_deployment_synced(&self) -> Result<bool, Error> {
Collapse file

‎store/postgres/src/relational_queries.rs‎

Copy file name to clipboardExpand all lines: store/postgres/src/relational_queries.rs
+43Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use diesel::sql_types::{Array, BigInt, Binary, Bool, Int8, Integer, Jsonb, Text,
1414
use diesel::QuerySource as _;
1515
use graph::components::store::write::{EntityWrite, RowGroup, WriteChunk};
1616
use graph::components::store::{Child as StoreChild, DerivedEntityQuery};
17+
18+
use graph::data::graphql::IntoValue;
1719
use graph::data::store::{Id, IdType, NULL};
1820
use graph::data::store::{IdList, IdRef, QueryObject};
1921
use graph::data::value::{Object, Word};
@@ -439,6 +441,47 @@ pub fn parse_id(id_type: IdType, json: serde_json::Value) -> Result<Id, StoreErr
439441
}
440442
}
441443

444+
#[derive(QueryableByName, Debug)]
445+
pub struct JSONData {
446+
#[diesel(sql_type = Jsonb)]
447+
pub data: serde_json::Value,
448+
}
449+
450+
impl IntoValue for JSONData {
451+
fn into_value(self) -> r::Value {
452+
JSONData::to_value(self.data)
453+
}
454+
}
455+
456+
impl JSONData {
457+
pub fn to_value(data: serde_json::Value) -> r::Value {
458+
match data {
459+
serde_json::Value::Null => r::Value::Null,
460+
serde_json::Value::Bool(b) => r::Value::Boolean(b),
461+
serde_json::Value::Number(n) => {
462+
if let Some(i) = n.as_i64() {
463+
r::Value::Int(i)
464+
} else {
465+
r::Value::Float(n.as_f64().unwrap())
466+
}
467+
}
468+
serde_json::Value::String(s) => r::Value::String(s),
469+
serde_json::Value::Array(vals) => {
470+
let vals: Vec<_> = vals.into_iter().map(JSONData::to_value).collect::<Vec<_>>();
471+
r::Value::List(vals)
472+
}
473+
serde_json::Value::Object(map) => {
474+
let mut m = std::collections::BTreeMap::new();
475+
for (k, v) in map {
476+
let value = JSONData::to_value(v);
477+
m.insert(Word::from(k), value);
478+
}
479+
r::Value::object(m)
480+
}
481+
}
482+
}
483+
}
484+
442485
/// Helper struct for retrieving entities from the database. With diesel, we
443486
/// can only run queries that return columns whose number and type are known
444487
/// at compile time. Because of that, we retrieve the actual data for an

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.