Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[WIP] rust concurrent #192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: second-iteration
Choose a base branch
Loading
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 1 benchmark/logic/concurrent_insert.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async.series(
next();
},
function r() {
console.log("Done!")
exit(0);
}
], function (err) {
Expand Down
2 changes: 1 addition & 1 deletion 2 benchmark/logic/concurrent_select.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async.series(
});
}
try {
const _result = await cassandra.concurrent.executeConcurrent(client, allParameters, {prepare: true});
const _result = await cassandra.concurrent.executeConcurrent(client, allParameters, {prepare: true, collectResults: true });
} catch (err) {
return next(err);
}
Expand Down
45 changes: 45 additions & 0 deletions 45 lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const ResultSet = require("./types/result-set.js");
const { parseParams, convertHints } = require("./types/cql-utils.js");
const queryOptions = require("./query-options.js");
const { PreparedCache } = require("./cache.js");
const { ResultSetGroup } = require("./concurrent/index.js");

/**
* Represents a database client that maintains multiple connections to the cluster nodes, providing methods to
Expand Down Expand Up @@ -147,9 +148,33 @@ class Client extends events.EventEmitter {
* @package
*/
async prepareQuery(query) {
await this.connect();
return await this.rustClient.prepareStatement(query);
}

/**
* Manually prepare query into prepared statement
* @param {Array<string>} queries
* @returns {Promise<Array<rust.PreparedStatementWrapper>>}
* @package
*/
async prepareMultiple(queries) {
await this.connect();
let mapped = {};
let result = [];
let toPrepare = [];
for (let i = 0; i < queries.length; i++) {
if (!mapped[queries[i]]) {
mapped[queries[i]] = toPrepare.length;
toPrepare.push(queries[i]);
}
result[i] = mapped[queries[i]];
}
let prepared = await this.rustClient.prepareMultiple(toPrepare);
let resp = result.map((e) => prepared[e]);
return resp;
}

/**
* Attempts to connect to one of the [contactPoints]{@link ClientOptions} and discovers the rest the nodes of the
* cluster.
Expand Down Expand Up @@ -359,6 +384,26 @@ class Client extends events.EventEmitter {
reject(e);
}
}

/**
*
* @param {Array<rust.PreparedStatementWrapper} prepared
* @param {Array<Array<any>>} params
* @param {ExecOptions.ExecutionOptions} option
* @returns {ResultSetGroup}
*/
async executeMultiplePrepared(prepared, params, options, res) {
let rustOptions = options.getRustOptions();
let parsedParams = [];
for (let i = 0; i < prepared.length; i++) {
parsedParams.push(parseParams(prepared[i].getExpectedTypes(), params[i]));
}
let queries = await this.rustClient.executeMultiple(prepared, parsedParams, rustOptions);
for (let i = 0; i < prepared.length; i++) {
res.setResultItem(i, new ResultSet(queries[i]));
}
return res;
}

/**
* @deprecated Not supported by the driver. Usage will throw an error.
Expand Down
19 changes: 15 additions & 4 deletions 19 lib/concurrent/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class ArrayBasedExecutor {
constructor(client, query, parameters, options) {
this._client = client;
this._query = query;
this._prepared = null;
this._parameters = parameters;
options = options || utils.emptyObject;
this._raiseOnFirstError = options.raiseOnFirstError !== false;
Expand All @@ -128,13 +129,23 @@ class ArrayBasedExecutor {
}

async execute() {
const promises = new Array(this._concurrencyLevel);
// Prepare all the queries
let allParams = this._parameters;
if (!this._query) {
let allQueries = this._parameters.map((e) => e.query);
allParams = this._parameters.map((e) => e.params);
this._prepared = await this._client.prepareMultiple(allQueries);
} else {
this._prepared = await this._client.prepareQuery(this._query);
}

return await this._client.executeMultiplePrepared(this._prepared, allParams, this._queryOptions, this._result);
/* const promises = new Array(this._concurrencyLevel);
for (let i = 0; i < this._concurrencyLevel; i++) {
promises[i] = this._executeOneAtATime(i, 0);
}

return Promise.all(promises).then(() => this._result);
return Promise.all(promises).then(() => this._result); */
}

async _executeOneAtATime(initialIndex, iteration) {
Expand All @@ -149,10 +160,10 @@ class ArrayBasedExecutor {
let params;

if (this._query === null) {
query = item.query;
query = this._prepared[index];
params = item.params;
} else {
query = this._query;
query = this._prepared;
params = item;
}

Expand Down
1 change: 1 addition & 0 deletions 1 src/requests/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use scylla::statement::prepared::PreparedStatement;
use crate::{result::map_column_type_to_complex_type, types::type_wrappers::ComplexType};

#[napi]
#[derive(Clone)]
pub struct PreparedStatementWrapper {
pub(crate) prepared: PreparedStatement,
}
Expand Down
110 changes: 108 additions & 2 deletions 110 src/session.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use scylla::client::caching_session::CachingSession;
use scylla::client::session_builder::SessionBuilder;
use scylla::client::SelfIdentity;
Expand All @@ -6,6 +8,7 @@ use scylla::statement::batch::Batch;
use scylla::statement::prepared::PreparedStatement;
use scylla::statement::{Consistency, SerialConsistency, Statement};
use scylla::value::{CqlValue, MaybeUnset};
use tokio::task::JoinSet;

use crate::options;
use crate::paging::{PagingResult, PagingStateWrapper};
Expand Down Expand Up @@ -36,7 +39,7 @@ pub struct BatchWrapper {

#[napi]
pub struct SessionWrapper {
inner: CachingSession,
inner: Arc<CachingSession>,
}

#[napi]
Expand Down Expand Up @@ -67,7 +70,9 @@ impl SessionWrapper {
session,
options.cache_size.unwrap_or(DEFAULT_CACHE_SIZE) as usize,
);
Ok(SessionWrapper { inner: session })
Ok(SessionWrapper {
inner: session.into(),
})
}

/// Returns the name of the current keyspace
Expand Down Expand Up @@ -121,6 +126,51 @@ impl SessionWrapper {
})
}

#[napi]
pub async fn prepare_multiple(
&self,
statements: Vec<String>,
) -> napi::Result<Vec<PreparedStatementWrapper>> {
let mut set: JoinSet<(
Result<PreparedStatement, scylla::errors::PrepareError>,
usize,
)> = JoinSet::new();
let mut i = 0;
let mut in_flight = 0;
let mut res: Vec<Option<PreparedStatementWrapper>> = Vec::new();
res.resize(statements.len(), None);

macro_rules! parse_result {
($task_result: ident) => {
let (prepared, index) = $task_result.unwrap();
let prepared = prepared.map_err(err_to_napi)?;
res[index] = Some(PreparedStatementWrapper { prepared });
};
}

for e in statements {
if in_flight > 5000 {
let res = set.join_next().await.expect("Expected some result");
parse_result!(res);
in_flight -= 1;
}
let x = e.into();
let w = self.inner.clone();
set.spawn(async move { (w.add_prepared_statement(&x).await, i) });
in_flight += 1;
i += 1;
}

while let Some(task_result) = set.join_next().await {
parse_result!(task_result);
}
let res: Vec<PreparedStatementWrapper> = res.into_iter().flatten().collect();
if res.len() != i {
return Err(js_error("Some queries were not prepared"));
}
Ok(res)
}

/// Execute a given prepared statement against the database with provided parameters.
///
/// Returns a wrapper of the result provided by the rust driver
Expand Down Expand Up @@ -237,6 +287,62 @@ impl SessionWrapper {
paging_state: paging_state.into(),
})
}

#[napi]
pub async fn execute_multiple(
&self,
queries: Vec<&PreparedStatementWrapper>,
params: Vec<Vec<Option<&MaybeUnsetQueryParameterWrapper>>>,
options: &QueryOptionsWrapper,
) -> napi::Result<Vec<QueryResultWrapper>> {
let params_vec: Vec<Vec<Option<MaybeUnset<CqlValue>>>> = params
.into_iter()
.map(MaybeUnsetQueryParameterWrapper::extract_parameters)
.collect();
let mut set: JoinSet<(
Result<scylla::response::query_result::QueryResult, scylla::errors::ExecutionError>,
usize,
)> = JoinSet::new();
let mut i = 0;
let mut in_flight = 0;

let mut res: Vec<Option<QueryResultWrapper>> = Vec::new();
while res.len() < queries.len() {
res.push(None);
}
Comment on lines +310 to +312
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while res.len() < queries.len() {
res.push(None);
}
res.resize(queries.len(), None);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you agree to add clone trait for QueryRowsResult, then sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, really? If this is the reason you did it this way, then please write that in a comment.


macro_rules! parse_result {
($task_result: ident) => {
let (result, index) = $task_result.unwrap();
let result = result.map_err(err_to_napi)?;
res[index] = Some(QueryResultWrapper::from_query(result)?);
};
}

for f in params_vec {
if in_flight > 5000 {
let res = set.join_next().await.expect("Expected some result");
parse_result!(res);
in_flight -= 1;
}

let e = queries.get(i).expect("msg");
let query = apply_prepared_options(e.prepared.clone(), options)?;
let w = self.inner.clone();
set.spawn(async move { (w.get_session().execute_unpaged(&query, f).await, i) });
i += 1;
in_flight += 1;
}

while let Some(task_result) = set.join_next().await {
parse_result!(task_result);
}
let res: Vec<QueryResultWrapper> = res.into_iter().flatten().collect();
if res.len() != i {
return Err(js_error("Some queries were not executed correctly"));
}
Ok(res)
}
}

/// Creates object representing a prepared batch of statements.
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.