diff --git a/benchmark/logic/concurrent_insert.js b/benchmark/logic/concurrent_insert.js index e36c0a69..fe3596be 100644 --- a/benchmark/logic/concurrent_insert.js +++ b/benchmark/logic/concurrent_insert.js @@ -59,6 +59,7 @@ async.series( next(); }, function r() { + console.log("Done!") exit(0); } ], function (err) { diff --git a/benchmark/logic/concurrent_select.js b/benchmark/logic/concurrent_select.js index f8a8e524..e6382abb 100644 --- a/benchmark/logic/concurrent_select.js +++ b/benchmark/logic/concurrent_select.js @@ -52,7 +52,7 @@ async.series( }); } try { - const _result = await cassandra.concurrent.executeConcurrent(client, allParameters, {prepare: true}); + const _result = await cassandra.concurrent.executeConcurrent(client, allParameters, {prepare: true, collectResults: true }); } catch (err) { return next(err); } diff --git a/lib/client.js b/lib/client.js index affe24d1..590354c3 100644 --- a/lib/client.js +++ b/lib/client.js @@ -19,6 +19,7 @@ const ResultSet = require("./types/result-set.js"); const { parseParams, convertHints } = require("./types/cql-utils.js"); const queryOptions = require("./query-options.js"); const { PreparedCache } = require("./cache.js"); +const { ResultSetGroup } = require("./concurrent/index.js"); /** * Represents a database client that maintains multiple connections to the cluster nodes, providing methods to @@ -147,9 +148,33 @@ class Client extends events.EventEmitter { * @package */ async prepareQuery(query) { + await this.connect(); return await this.rustClient.prepareStatement(query); } + /** + * Manually prepare query into prepared statement + * @param {Array} queries + * @returns {Promise>} + * @package + */ + async prepareMultiple(queries) { + await this.connect(); + let mapped = {}; + let result = []; + let toPrepare = []; + for (let i = 0; i < queries.length; i++) { + if (!mapped[queries[i]]) { + mapped[queries[i]] = toPrepare.length; + toPrepare.push(queries[i]); + } + result[i] = mapped[queries[i]]; + } + let prepared = await this.rustClient.prepareMultiple(toPrepare); + let resp = result.map((e) => prepared[e]); + return resp; + } + /** * Attempts to connect to one of the [contactPoints]{@link ClientOptions} and discovers the rest the nodes of the * cluster. @@ -359,6 +384,26 @@ class Client extends events.EventEmitter { reject(e); } } + + /** + * + * @param {Array>} params + * @param {ExecOptions.ExecutionOptions} option + * @returns {ResultSetGroup} + */ + async executeMultiplePrepared(prepared, params, options, res) { + let rustOptions = options.getRustOptions(); + let parsedParams = []; + for (let i = 0; i < prepared.length; i++) { + parsedParams.push(parseParams(prepared[i].getExpectedTypes(), params[i])); + } + let queries = await this.rustClient.executeMultiple(prepared, parsedParams, rustOptions); + for (let i = 0; i < prepared.length; i++) { + res.setResultItem(i, new ResultSet(queries[i])); + } + return res; + } /** * @deprecated Not supported by the driver. Usage will throw an error. diff --git a/lib/concurrent/index.js b/lib/concurrent/index.js index 6ee96ce8..ee9507b8 100644 --- a/lib/concurrent/index.js +++ b/lib/concurrent/index.js @@ -106,6 +106,7 @@ class ArrayBasedExecutor { constructor(client, query, parameters, options) { this._client = client; this._query = query; + this._prepared = null; this._parameters = parameters; options = options || utils.emptyObject; this._raiseOnFirstError = options.raiseOnFirstError !== false; @@ -128,13 +129,23 @@ class ArrayBasedExecutor { } async execute() { - const promises = new Array(this._concurrencyLevel); + // Prepare all the queries + let allParams = this._parameters; + if (!this._query) { + let allQueries = this._parameters.map((e) => e.query); + allParams = this._parameters.map((e) => e.params); + this._prepared = await this._client.prepareMultiple(allQueries); + } else { + this._prepared = await this._client.prepareQuery(this._query); + } + return await this._client.executeMultiplePrepared(this._prepared, allParams, this._queryOptions, this._result); + /* const promises = new Array(this._concurrencyLevel); for (let i = 0; i < this._concurrencyLevel; i++) { promises[i] = this._executeOneAtATime(i, 0); } - return Promise.all(promises).then(() => this._result); + return Promise.all(promises).then(() => this._result); */ } async _executeOneAtATime(initialIndex, iteration) { @@ -149,10 +160,10 @@ class ArrayBasedExecutor { let params; if (this._query === null) { - query = item.query; + query = this._prepared[index]; params = item.params; } else { - query = this._query; + query = this._prepared; params = item; } diff --git a/src/requests/request.rs b/src/requests/request.rs index 4144ed1a..8d56a09b 100644 --- a/src/requests/request.rs +++ b/src/requests/request.rs @@ -4,6 +4,7 @@ use scylla::statement::prepared::PreparedStatement; use crate::{result::map_column_type_to_complex_type, types::type_wrappers::ComplexType}; #[napi] +#[derive(Clone)] pub struct PreparedStatementWrapper { pub(crate) prepared: PreparedStatement, } diff --git a/src/session.rs b/src/session.rs index 0da6901f..ee3f1005 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use scylla::client::caching_session::CachingSession; use scylla::client::session_builder::SessionBuilder; use scylla::client::SelfIdentity; @@ -6,6 +8,7 @@ use scylla::statement::batch::Batch; use scylla::statement::prepared::PreparedStatement; use scylla::statement::{Consistency, SerialConsistency, Statement}; use scylla::value::{CqlValue, MaybeUnset}; +use tokio::task::JoinSet; use crate::options; use crate::paging::{PagingResult, PagingStateWrapper}; @@ -36,7 +39,7 @@ pub struct BatchWrapper { #[napi] pub struct SessionWrapper { - inner: CachingSession, + inner: Arc, } #[napi] @@ -67,7 +70,9 @@ impl SessionWrapper { session, options.cache_size.unwrap_or(DEFAULT_CACHE_SIZE) as usize, ); - Ok(SessionWrapper { inner: session }) + Ok(SessionWrapper { + inner: session.into(), + }) } /// Returns the name of the current keyspace @@ -121,6 +126,51 @@ impl SessionWrapper { }) } + #[napi] + pub async fn prepare_multiple( + &self, + statements: Vec, + ) -> napi::Result> { + let mut set: JoinSet<( + Result, + usize, + )> = JoinSet::new(); + let mut i = 0; + let mut in_flight = 0; + let mut res: Vec> = Vec::new(); + res.resize(statements.len(), None); + + macro_rules! parse_result { + ($task_result: ident) => { + let (prepared, index) = $task_result.unwrap(); + let prepared = prepared.map_err(err_to_napi)?; + res[index] = Some(PreparedStatementWrapper { prepared }); + }; + } + + for e in statements { + if in_flight > 5000 { + let res = set.join_next().await.expect("Expected some result"); + parse_result!(res); + in_flight -= 1; + } + let x = e.into(); + let w = self.inner.clone(); + set.spawn(async move { (w.add_prepared_statement(&x).await, i) }); + in_flight += 1; + i += 1; + } + + while let Some(task_result) = set.join_next().await { + parse_result!(task_result); + } + let res: Vec = res.into_iter().flatten().collect(); + if res.len() != i { + return Err(js_error("Some queries were not prepared")); + } + Ok(res) + } + /// Execute a given prepared statement against the database with provided parameters. /// /// Returns a wrapper of the result provided by the rust driver @@ -237,6 +287,62 @@ impl SessionWrapper { paging_state: paging_state.into(), }) } + + #[napi] + pub async fn execute_multiple( + &self, + queries: Vec<&PreparedStatementWrapper>, + params: Vec>>, + options: &QueryOptionsWrapper, + ) -> napi::Result> { + let params_vec: Vec>>> = params + .into_iter() + .map(MaybeUnsetQueryParameterWrapper::extract_parameters) + .collect(); + let mut set: JoinSet<( + Result, + usize, + )> = JoinSet::new(); + let mut i = 0; + let mut in_flight = 0; + + let mut res: Vec> = Vec::new(); + while res.len() < queries.len() { + res.push(None); + } + + macro_rules! parse_result { + ($task_result: ident) => { + let (result, index) = $task_result.unwrap(); + let result = result.map_err(err_to_napi)?; + res[index] = Some(QueryResultWrapper::from_query(result)?); + }; + } + + for f in params_vec { + if in_flight > 5000 { + let res = set.join_next().await.expect("Expected some result"); + parse_result!(res); + in_flight -= 1; + } + + let e = queries.get(i).expect("msg"); + let query = apply_prepared_options(e.prepared.clone(), options)?; + let w = self.inner.clone(); + set.spawn(async move { (w.get_session().execute_unpaged(&query, f).await, i) }); + i += 1; + in_flight += 1; + } + + while let Some(task_result) = set.join_next().await { + parse_result!(task_result); + } + let res: Vec = res.into_iter().flatten().collect(); + if res.len() != i { + return Err(js_error("Some queries were not executed correctly")); + } + Ok(res) + } } /// Creates object representing a prepared batch of statements.