diff --git a/lib/cache.js b/lib/cache.js new file mode 100644 index 00000000..d31787a2 --- /dev/null +++ b/lib/cache.js @@ -0,0 +1,32 @@ +const _rust = require("../index"); + +class PreparedCache { + /** + * @type {Map} + */ + #cache; + + constructor() { + this.#cache = {}; + } + + /** + * + * @param {string} key + * @returns {_rust.PreparedStatementWrapper} + */ + getElement(key) { + return this.#cache[key]; + } + + /** + * + * @param {string} key + * @param {_rust.PreparedStatementWrapper} element + */ + storeElement(key, element) { + this.#cache[key] = element; + } +} + +module.exports.PreparedCache = PreparedCache; diff --git a/lib/client.js b/lib/client.js index f44e3b17..2e2b152d 100644 --- a/lib/client.js +++ b/lib/client.js @@ -18,6 +18,7 @@ const rust = require("../index"); const ResultSet = require("./types/result-set.js"); const { parseParams, convertHints } = require("./types/cql-utils.js"); const queryOptions = require("./query-options.js"); +const { PreparedCache } = require("./cache.js"); /** * Represents a database client that maintains multiple connections to the cluster nodes, providing methods to @@ -139,6 +140,16 @@ class Client extends events.EventEmitter { return fullOptions; } + /** + * Manually prepare query into prepared statement + * @param {string} query + * @returns {Promise} + * @package + */ + async prepareQuery(query) { + return await this.rustClient.prepareStatement(query); + } + /** * Attempts to connect to one of the [contactPoints]{@link ClientOptions} and discovers the rest the nodes of the * cluster. @@ -256,7 +267,7 @@ class Client extends events.EventEmitter { try { const execOptions = this.createOptions(options); return promiseUtils.optionalCallback( - this.#rustyExecute(query, params, execOptions), + this.rustyExecute(query, params, execOptions), callback, ); } catch (err) { @@ -557,6 +568,7 @@ class Client extends events.EventEmitter { let allQueries = []; let parametersRows = []; let hints = execOptions.getHints() || []; + let preparedCache = new PreparedCache(); for (let i = 0; i < queries.length; i++) { let element = queries[i]; @@ -566,17 +578,24 @@ class Client extends events.EventEmitter { /** * @type {rust.PreparedStatementWrapper | string} */ - let query = typeof element === "string" ? element : element.query; + let statement = + typeof element === "string" ? element : element.query; let params = element.params || []; let types; - if (!query) { + if (!statement) { throw new errors.ArgumentError(`Invalid query at index ${i}`); } if (shouldBePrepared) { - query = await this.rustClient.prepareStatement(query); - types = query.getExpectedTypes(); + let prepared = preparedCache.getElement(statement); + if (!prepared) { + prepared = + await this.rustClient.prepareStatement(statement); + preparedCache.storeElement(statement, prepared); + } + types = prepared.getExpectedTypes(); + statement = prepared; } else { types = convertHints(hints[i] || []); } @@ -584,7 +603,7 @@ class Client extends events.EventEmitter { if (params) { params = parseParams(types, params, shouldBePrepared === false); } - allQueries.push(query); + allQueries.push(statement); parametersRows.push(params); } @@ -658,13 +677,13 @@ class Client extends events.EventEmitter { /** * Wrapper for executing queries by rust driver - * @param {string} query + * @param {string | rust.PreparedStatementWrapper} query * @param {Array} params * @param {ExecOptions.ExecutionOptions} execOptions * @returns {Promise} - * @private + * @package */ - async #rustyExecute(query, params, execOptions) { + async rustyExecute(query, params, execOptions) { if ( // !execOptions.isPrepared() && params && @@ -709,7 +728,7 @@ class Client extends events.EventEmitter { * Core part of executing rust queries * @param {ResolveCallback} resolve * @param {RejectCallback} reject - * @param {string} query + * @param {string | rust.PreparedStatementWrapper} query * @param {Array} params * @param {ExecOptions.ExecutionOptions} execOptions */ @@ -719,7 +738,10 @@ class Client extends events.EventEmitter { let result; if (execOptions.isPrepared()) { // Execute prepared statement, as requested by the user - let statement = await this.rustClient.prepareStatement(query); + let statement = + query instanceof rust.PreparedStatementWrapper + ? query + : await this.rustClient.prepareStatement(query); let parsedParams = parseParams( statement.getExpectedTypes(), params, @@ -730,6 +752,11 @@ class Client extends events.EventEmitter { rustOptions, ); } else { + if (query instanceof rust.PreparedStatementWrapper) { + throw new Error( + "Unexpected prepared statement wrapper for unprepared queries", + ); + } let expectedTypes = convertHints(execOptions.getHints() || []); let parsedParams = parseParams(expectedTypes, params, true); result = await this.rustClient.queryUnpaged( diff --git a/lib/concurrent/index.js b/lib/concurrent/index.js index 8835a5f1..e5f472df 100644 --- a/lib/concurrent/index.js +++ b/lib/concurrent/index.js @@ -5,6 +5,7 @@ const utils = require("../utils"); const { Stream } = require("stream"); const { Mutex } = require("async-mutex"); const { env } = require("process"); +const { PreparedCache } = require("../cache"); /** * Utilities for concurrent query execution with the DataStax Node.js Driver. @@ -123,9 +124,10 @@ class ArrayBasedExecutor { }); this._result = new ResultSetGroup(options); this._stop = false; + this._cache = new PreparedCache(); } - execute() { + async execute() { const promises = new Array(this._concurrencyLevel); for (let i = 0; i < this._concurrencyLevel; i++) { @@ -135,7 +137,7 @@ class ArrayBasedExecutor { return Promise.all(promises).then(() => this._result); } - _executeOneAtATime(initialIndex, iteration) { + async _executeOneAtATime(initialIndex, iteration) { const index = initialIndex + this._concurrencyLevel * iteration; if (index >= this._parameters.length || this._stop) { @@ -154,8 +156,14 @@ class ArrayBasedExecutor { params = item; } + let prepared = this._cache.getElement(query); + if (!prepared) { + prepared = await (this._client.prepareQuery(query)); + this._cache.storeElement(query, prepared); + } + return this._client - .execute(query, params, this._queryOptions) + .rustyExecute(prepared, params, this._queryOptions) .then((rs) => this._result.setResultItem(index, rs)) .catch((err) => this._setError(index, err)) .then(() => this._executeOneAtATime(initialIndex, iteration + 1));