From 9b387e6e0272b37933c595d0f4ea840316c57e15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Czech?= Date: Fri, 16 May 2025 10:09:16 +0200 Subject: [PATCH 1/2] Limit iterations v2 --- benchmark/logic_rust/concurrent_insert.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/benchmark/logic_rust/concurrent_insert.rs b/benchmark/logic_rust/concurrent_insert.rs index 73d0b7fb..db50f331 100644 --- a/benchmark/logic_rust/concurrent_insert.rs +++ b/benchmark/logic_rust/concurrent_insert.rs @@ -4,24 +4,29 @@ use scylla::client::session_builder::SessionBuilder; use scylla::statement::prepared::PreparedStatement; use std::env; use std::sync::Arc; +use tokio::sync::Barrier; use uuid::Uuid; const CONCURRENCY: usize = 2000; +const STEP: usize = 200000; async fn insert_data( session: Arc, start_index: usize, n: i32, insert_query: &PreparedStatement, + barrier: Arc, ) -> Result<(), Box> { let mut index = start_index; while index < n as usize { let id = Uuid::new_v4(); session.execute_unpaged(insert_query, (id, 100)).await?; + if index / STEP != (index + CONCURRENCY) / STEP { + barrier.wait().await; + } index += CONCURRENCY; } - Ok(()) } @@ -60,12 +65,14 @@ async fn main() -> Result<(), Box> { let mut handles = vec![]; let session = Arc::new(session); + let barrier = Arc::new(Barrier::new(CONCURRENCY)); for i in 0..CONCURRENCY { let session_clone = Arc::clone(&session); let insert_query_clone = insert_query.clone(); + let barrier_clone = barrier.clone(); handles.push(tokio::spawn(async move { - insert_data(session_clone, i, n, &insert_query_clone) + insert_data(session_clone, i, n, &insert_query_clone, barrier_clone) .await .unwrap(); })); From 6cc431a31f437277578472cdf891c93cd8337bba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Czech?= Date: Fri, 16 May 2025 10:27:43 +0200 Subject: [PATCH 2/2] Debug --- benchmark/logic_rust/concurrent_insert.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/benchmark/logic_rust/concurrent_insert.rs b/benchmark/logic_rust/concurrent_insert.rs index db50f331..6e15dcd9 100644 --- a/benchmark/logic_rust/concurrent_insert.rs +++ b/benchmark/logic_rust/concurrent_insert.rs @@ -3,12 +3,14 @@ use scylla::client::session::Session; use scylla::client::session_builder::SessionBuilder; use scylla::statement::prepared::PreparedStatement; use std::env; +use std::sync::atomic::AtomicI32; use std::sync::Arc; use tokio::sync::Barrier; use uuid::Uuid; -const CONCURRENCY: usize = 2000; +const CONCURRENCY: usize = 500; const STEP: usize = 200000; +static COUNTER: AtomicI32 = AtomicI32::new(0); async fn insert_data( session: Arc, @@ -23,6 +25,8 @@ async fn insert_data( let id = Uuid::new_v4(); session.execute_unpaged(insert_query, (id, 100)).await?; if index / STEP != (index + CONCURRENCY) / STEP { + let w = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Acquire); + println!("Waiting: {w}"); barrier.wait().await; } index += CONCURRENCY;