From 47b9ad1d295feb0c00f6b4c07e755e22b9d0f85b Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Mon, 28 Aug 2023 20:20:29 -0500 Subject: [PATCH 1/5] Configurable routing for queries without routing comment --- pgcat.toml | 11 +- src/config.rs | 65 ++++++++++++ src/pool.rs | 59 ++++++++++- src/query_router.rs | 67 ++++++++++-- tests/docker/Dockerfile | 2 + tests/ruby/helpers/auth_query_helper.rb | 16 +-- tests/ruby/helpers/pg_instance.rb | 24 ++++- tests/ruby/helpers/pgcat_helper.rb | 16 +-- tests/ruby/helpers/pgcat_process.rb | 27 +++-- tests/ruby/mirrors_spec.rb | 12 +-- tests/ruby/misc_spec.rb | 14 +-- tests/ruby/sharding_spec.rb | 134 +++++++++++++++++++++++- 12 files changed, 380 insertions(+), 67 deletions(-) diff --git a/pgcat.toml b/pgcat.toml index ae5d74dc..f7b7fecf 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -171,12 +171,17 @@ query_parser_read_write_splitting = true # queries. The primary can always be explicitly selected with our custom protocol. primary_reads_enabled = true -# Allow sharding commands to be passed as statement comments instead of -# separate commands. If these are unset this functionality is disabled. # sharding_key_regex = '/\* sharding_key: (\d+) \*/' -# shard_id_regex = '/\* shard_id: (\d+) \*/' +# shard_id_regex = '/\*shard_id:(\d+)\*/' # regex_search_limit = 1000 # only look at the first 1000 characters of SQL statements +# Defines the behavior when no shard_id or sharding_key are specified for a query against +# a sharded system with either sharding_key_regex or shard_id_regex specified. +# `random`: picks a shard at random +# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors +# `shard_`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime +# no_shard_specified_behavior = "random" + # So what if you wanted to implement a different hashing function, # or you've already built one and you want this pooler to use it? # Current options: diff --git a/src/config.rs b/src/config.rs index dc915d57..53bf3925 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,11 +3,14 @@ use arc_swap::ArcSwap; use log::{error, info}; use once_cell::sync::Lazy; use regex::Regex; +use serde::{Deserializer, Serializer}; use serde_derive::{Deserialize, Serialize}; + use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeMap, HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::path::Path; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -101,6 +104,9 @@ pub struct Address { /// Address stats pub stats: Arc, + + /// Number of errors encountered + pub error_count: Arc, } impl Default for Address { @@ -118,6 +124,7 @@ impl Default for Address { pool_name: String::from("pool_name"), mirrors: Vec::new(), stats: Arc::new(AddressStats::default()), + error_count: Arc::new(AtomicU64::new(0)), } } } @@ -182,6 +189,18 @@ impl Address { ), } } + + pub fn error_count(&self) -> u64 { + self.error_count.load(Ordering::Relaxed) + } + + pub fn increment_error_count(&self) { + self.error_count.fetch_add(1, Ordering::Relaxed); + } + + pub fn reset_error_count(&self) { + self.error_count.store(0, Ordering::Relaxed); + } } /// PostgreSQL user. @@ -539,6 +558,7 @@ pub struct Pool { pub sharding_key_regex: Option, pub shard_id_regex: Option, pub regex_search_limit: Option, + pub no_shard_specified_behavior: Option, pub auth_query: Option, pub auth_query_user: Option, @@ -693,6 +713,7 @@ impl Default for Pool { sharding_key_regex: None, shard_id_regex: None, regex_search_limit: Some(1000), + no_shard_specified_behavior: None, auth_query: None, auth_query_user: None, auth_query_password: None, @@ -711,6 +732,50 @@ pub struct ServerConfig { pub role: Role, } +// No Shard Specified handling. +#[derive(Debug, PartialEq, Clone, Eq, Hash, Copy)] +pub enum NoShardSpecifiedHandling { + Shard(usize), + Random, + RandomHealthy, +} +impl Default for NoShardSpecifiedHandling { + fn default() -> Self { + NoShardSpecifiedHandling::Shard(0) + } +} +impl serde::Serialize for NoShardSpecifiedHandling { + fn serialize(&self, serializer: S) -> Result { + match self { + NoShardSpecifiedHandling::Shard(shard) => { + serializer.serialize_str(&format!("shard_{}", &shard.to_string())) + } + NoShardSpecifiedHandling::Random => serializer.serialize_str("random"), + NoShardSpecifiedHandling::RandomHealthy => serializer.serialize_str("random_healthy"), + } + } +} +impl<'de> serde::Deserialize<'de> for NoShardSpecifiedHandling { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + if s.starts_with("shard_") { + let shard = s[6..].parse::().map_err(serde::de::Error::custom)?; + return Ok(NoShardSpecifiedHandling::Shard(shard)); + } + + match s.as_str() { + "random" => Ok(NoShardSpecifiedHandling::Random), + "random_healthy" => Ok(NoShardSpecifiedHandling::RandomHealthy), + _ => Err(serde::de::Error::custom( + "invalid value for no_shard_specified_behavior", + )), + } + } +} + #[derive(Clone, PartialEq, Serialize, Deserialize, Debug, Hash, Eq)] pub struct MirrorServerConfig { pub host: String, diff --git a/src/pool.rs b/src/pool.rs index 7e110ce2..2c245c84 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -10,6 +10,7 @@ use rand::thread_rng; use regex::Regex; use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use std::sync::atomic::AtomicU64; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -18,7 +19,8 @@ use std::time::Instant; use tokio::sync::Notify; use crate::config::{ - get_config, Address, General, LoadBalancingMode, Plugins, PoolMode, Role, User, + get_config, Address, General, LoadBalancingMode, NoShardSpecifiedHandling, Plugins, PoolMode, + Role, User, }; use crate::errors::Error; @@ -34,6 +36,7 @@ pub type ServerHost = String; pub type ServerPort = u16; pub type BanList = Arc>>>; +//pub type ErrorList = Arc>>>; pub type ClientServerMap = Arc>>; pub type PoolMap = HashMap; @@ -140,6 +143,9 @@ pub struct PoolSettings { // Regex for searching for the shard id in SQL statements pub shard_id_regex: Option, + // What to do when no shard is specified in the SQL statement + pub no_shard_specified_behavior: Option, + // Limit how much of each query is searched for a potential shard regex match pub regex_search_limit: usize, @@ -173,6 +179,7 @@ impl Default for PoolSettings { sharding_key_regex: None, shard_id_regex: None, regex_search_limit: 1000, + no_shard_specified_behavior: None, auth_query: None, auth_query_user: None, auth_query_password: None, @@ -299,6 +306,7 @@ impl ConnectionPool { pool_name: pool_name.clone(), mirrors: vec![], stats: Arc::new(AddressStats::default()), + error_count: Arc::new(AtomicU64::new(0)), }); address_id += 1; } @@ -317,6 +325,7 @@ impl ConnectionPool { pool_name: pool_name.clone(), mirrors: mirror_addresses, stats: Arc::new(AddressStats::default()), + error_count: Arc::new(AtomicU64::new(0)), }; address_id += 1; @@ -429,6 +438,7 @@ impl ConnectionPool { shards.push(pools); addresses.push(servers); + banlist.push(HashMap::new()); } @@ -482,6 +492,9 @@ impl ConnectionPool { .clone() .map(|regex| Regex::new(regex.as_str()).unwrap()), regex_search_limit: pool_config.regex_search_limit.unwrap_or(1000), + no_shard_specified_behavior: pool_config + .no_shard_specified_behavior + .clone(), auth_query: pool_config.auth_query.clone(), auth_query_user: pool_config.auth_query_user.clone(), auth_query_password: pool_config.auth_query_password.clone(), @@ -651,7 +664,10 @@ impl ConnectionPool { .get() .await { - Ok(conn) => conn, + Ok(conn) => { + address.reset_error_count(); + conn + } Err(err) => { error!( "Connection checkout error for instance {:?}, error: {:?}", @@ -766,11 +782,22 @@ impl ConnectionPool { /// traffic for any new transactions. Existing transactions on that replica /// will finish successfully or error out to the clients. pub fn ban(&self, address: &Address, reason: BanReason, client_info: Option<&ClientStats>) { + // Count the number of errors since the last successful checkout + // This is used to determine if the shard is down + match reason { + BanReason::FailedHealthCheck + | BanReason::FailedCheckout + | BanReason::MessageSendFailed + | BanReason::MessageReceiveFailed => { + address.increment_error_count(); + } + _ => (), + }; + // Primary can never be banned if address.role == Role::Primary { return; } - error!("Banning instance {:?}, reason: {:?}", address, reason); let now = chrono::offset::Utc::now().naive_utc(); @@ -920,6 +947,32 @@ impl ConnectionPool { self.original_server_parameters.read().clone() } + pub fn get_random_healthy_shard_id(&self) -> usize { + let mut shards = Vec::new(); + for shard in 0..self.shards() { + shards.push(shard); + } + + // Shuffle to avoid always picking the same shard when error counts are equal + shards.shuffle(&mut thread_rng()); + shards.sort_by(|a, b| { + let err_count_a = self.addresses[*a] + .iter() + .fold(0, |acc, address| acc + address.error_count()); + + let err_count_b = self.addresses[*b] + .iter() + .fold(0, |acc, address| acc + address.error_count()); + + err_count_a.partial_cmp(&err_count_b).unwrap() + }); + + match shards.first() { + Some(shard) => *shard, + None => 0, + } + } + fn busy_connection_count(&self, address: &Address) -> u32 { let state = self.pool_state(address.shard, address.address_index); let idle = state.idle_connections; diff --git a/src/query_router.rs b/src/query_router.rs index efca499f..ba1c3055 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -1,7 +1,7 @@ /// Route queries automatically based on explicitly requested /// or implied query characteristics. use bytes::{Buf, BytesMut}; -use log::{debug, error}; +use log::{debug, error, warn}; use once_cell::sync::OnceCell; use regex::{Regex, RegexSet}; use sqlparser::ast::Statement::{Query, StartTransaction}; @@ -12,11 +12,11 @@ use sqlparser::ast::{ use sqlparser::dialect::PostgreSqlDialect; use sqlparser::parser::Parser; -use crate::config::Role; +use crate::config::{NoShardSpecifiedHandling, Role}; use crate::errors::Error; use crate::messages::BytesMutReader; use crate::plugins::{Intercept, Plugin, PluginOutput, QueryLogger, TableAccess}; -use crate::pool::PoolSettings; +use crate::pool::{get_pool, PoolSettings}; use crate::sharding::Sharder; use std::collections::BTreeSet; @@ -143,13 +143,14 @@ impl QueryRouter { let code = message_cursor.get_u8() as char; let len = message_cursor.get_i32() as usize; + let comment_shard_routing_enabled = self.pool_settings.shard_id_regex.is_some() + || self.pool_settings.sharding_key_regex.is_some(); + // Check for any sharding regex matches in any queries - match code as char { - // For Parse and Query messages peek to see if they specify a shard_id as a comment early in the statement - 'P' | 'Q' => { - if self.pool_settings.shard_id_regex.is_some() - || self.pool_settings.sharding_key_regex.is_some() - { + if comment_shard_routing_enabled { + match code as char { + // For Parse and Query messages peek to see if they specify a shard_id as a comment early in the statement + 'P' | 'Q' => { // Check only the first block of bytes configured by the pool settings let seg = cmp::min(len - 5, self.pool_settings.regex_search_limit); @@ -188,8 +189,8 @@ impl QueryRouter { } } } + _ => {} } - _ => {} } // Only simple protocol supported for commands processed below @@ -215,6 +216,50 @@ impl QueryRouter { // server it'll go to if the query parser is enabled. if matches.len() != 1 { debug!("Regular query, not a command"); + if comment_shard_routing_enabled { + // comment_shard_routing_enabled is true, but no sharding comment was found + match self.pool_settings.no_shard_specified_behavior { + Some(behavior) => { + match behavior { + NoShardSpecifiedHandling::Random => { + self.set_shard(rand::random::() % self.pool_settings.shards); + } + + NoShardSpecifiedHandling::RandomHealthy => { + let pool = get_pool( + &self.pool_settings.db, + &self.pool_settings.user.username, + ); + match pool { + Some(pool) => { + self.set_shard( + //pool.get_random_shard_id_with_least_errors(), + pool.get_random_healthy_shard_id(), + ); + } + None => { + // Query is using an obselete pool (old pool from before config reload) + // so we fallback to just random + self.set_shard( + rand::random::() % self.pool_settings.shards, + ); + } + } + } + + NoShardSpecifiedHandling::Shard(shard_id) => { + if shard_id >= self.pool_settings.shards { + warn!("Shard id {} is out of range, defaulting to 0", shard_id); + self.set_shard(0); + } else { + self.set_shard(shard_id); + } + } + } + } + None => self.set_shard(0), + } + } return None; } @@ -1204,6 +1249,7 @@ mod test { ban_time: PoolSettings::default().ban_time, sharding_key_regex: None, shard_id_regex: None, + no_shard_specified_behavior: None, regex_search_limit: 1000, auth_query: None, auth_query_password: None, @@ -1281,6 +1327,7 @@ mod test { ban_time: PoolSettings::default().ban_time, sharding_key_regex: Some(Regex::new(r"/\* sharding_key: (\d+) \*/").unwrap()), shard_id_regex: Some(Regex::new(r"/\* shard_id: (\d+) \*/").unwrap()), + no_shard_specified_behavior: None, regex_search_limit: 1000, auth_query: None, auth_query_password: None, diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 99fd694d..261adb05 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -1,5 +1,7 @@ FROM rust:bullseye +COPY --from=sclevine/yj /bin/yj /bin/yj +RUN /bin/yj -h RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 -y RUN cargo install cargo-binutils rustfilt RUN rustup component add llvm-tools-preview diff --git a/tests/ruby/helpers/auth_query_helper.rb b/tests/ruby/helpers/auth_query_helper.rb index 60e85713..43d7c785 100644 --- a/tests/ruby/helpers/auth_query_helper.rb +++ b/tests/ruby/helpers/auth_query_helper.rb @@ -33,18 +33,18 @@ def self.single_shard_auth_query( "0" => { "database" => "shard0", "servers" => [ - ["localhost", primary.port.to_s, "primary"], - ["localhost", replica.port.to_s, "replica"], + ["localhost", primary.port.to_i, "primary"], + ["localhost", replica.port.to_i, "replica"], ] }, }, "users" => { "0" => user.merge(config_user) } } } - pgcat_cfg["general"]["port"] = pgcat.port + pgcat_cfg["general"]["port"] = pgcat.port.to_i pgcat.update_config(pgcat_cfg) pgcat.start - + pgcat.wait_until_ready( pgcat.connection_string( "sharded_db", @@ -92,13 +92,13 @@ def self.two_pools_auth_query( "0" => { "database" => database, "servers" => [ - ["localhost", primary.port.to_s, "primary"], - ["localhost", replica.port.to_s, "replica"], + ["localhost", primary.port.to_i, "primary"], + ["localhost", replica.port.to_i, "replica"], ] }, }, "users" => { "0" => user.merge(config_user) } - } + } end # Main proxy configs pgcat_cfg["pools"] = { @@ -109,7 +109,7 @@ def self.two_pools_auth_query( pgcat_cfg["general"]["port"] = pgcat.port pgcat.update_config(pgcat_cfg.deep_merge(extra_conf)) pgcat.start - + pgcat.wait_until_ready(pgcat.connection_string("sharded_db0", pg_user['username'], pg_user['password'])) OpenStruct.new.tap do |struct| diff --git a/tests/ruby/helpers/pg_instance.rb b/tests/ruby/helpers/pg_instance.rb index a3828248..53617c24 100644 --- a/tests/ruby/helpers/pg_instance.rb +++ b/tests/ruby/helpers/pg_instance.rb @@ -7,10 +7,24 @@ class PgInstance attr_reader :password attr_reader :database_name + def self.mass_takedown(databases) + raise StandardError "block missing" unless block_given? + + databases.each do |database| + database.toxiproxy.toxic(:limit_data, bytes: 1).toxics.each(&:save) + end + sleep 0.1 + yield + ensure + databases.each do |database| + database.toxiproxy.toxics.each(&:destroy) + end + end + def initialize(port, username, password, database_name) - @original_port = port + @original_port = port.to_i @toxiproxy_port = 10000 + port.to_i - @port = @toxiproxy_port + @port = @toxiproxy_port.to_i @username = username @password = password @@ -48,9 +62,9 @@ def toxiproxy def take_down if block_given? - Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield } + Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 1).apply { yield } else - Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save) + Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 1).toxics.each(&:save) end end @@ -89,6 +103,6 @@ def count_query(query) end def count_select_1_plus_2 - with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i } + with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query LIKE '%SELECT $1 + $2%'")[0]["sum"].to_i } end end diff --git a/tests/ruby/helpers/pgcat_helper.rb b/tests/ruby/helpers/pgcat_helper.rb index 9b764d87..9b95dbfa 100644 --- a/tests/ruby/helpers/pgcat_helper.rb +++ b/tests/ruby/helpers/pgcat_helper.rb @@ -38,9 +38,9 @@ def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mod "automatic_sharding_key" => "data.id", "sharding_function" => "pg_bigint_hash", "shards" => { - "0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_s, "primary"]] }, - "1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] }, - "2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] }, + "0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_i, "primary"]] }, + "1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_i, "primary"]] }, + "2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_i, "primary"]] }, }, "users" => { "0" => user }, "plugins" => { @@ -100,7 +100,7 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb "0" => { "database" => "shard0", "servers" => [ - ["localhost", primary.port.to_s, "primary"] + ["localhost", primary.port.to_i, "primary"] ] }, }, @@ -146,10 +146,10 @@ def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mo "0" => { "database" => "shard0", "servers" => [ - ["localhost", primary.port.to_s, "primary"], - ["localhost", replica0.port.to_s, "replica"], - ["localhost", replica1.port.to_s, "replica"], - ["localhost", replica2.port.to_s, "replica"] + ["localhost", primary.port.to_i, "primary"], + ["localhost", replica0.port.to_i, "replica"], + ["localhost", replica1.port.to_i, "replica"], + ["localhost", replica2.port.to_i, "replica"] ] }, }, diff --git a/tests/ruby/helpers/pgcat_process.rb b/tests/ruby/helpers/pgcat_process.rb index dd3fd052..30af6b3a 100644 --- a/tests/ruby/helpers/pgcat_process.rb +++ b/tests/ruby/helpers/pgcat_process.rb @@ -1,5 +1,5 @@ require 'pg' -require 'toml' +require 'json' require 'fileutils' require 'securerandom' @@ -18,9 +18,9 @@ def self.finalize(pid, log_filename, config_filename) end def initialize(log_level) - @env = {"RUST_LOG" => log_level} + @env = {} @port = rand(20000..32760) - @log_level = log_level + @log_level = "ERROR" #log_level @log_filename = "/tmp/pgcat_log_#{SecureRandom.urlsafe_base64}.log" @config_filename = "/tmp/pgcat_cfg_#{SecureRandom.urlsafe_base64}.toml" @@ -30,7 +30,7 @@ def initialize(log_level) '../../target/debug/pgcat' end - @command = "#{command_path} #{@config_filename}" + @command = "#{command_path} #{@config_filename} --log-level #{@log_level}" FileUtils.cp("../../pgcat.toml", @config_filename) cfg = current_config @@ -46,17 +46,16 @@ def logs def update_config(config_hash) @original_config = current_config - output_to_write = TOML::Generator.new(config_hash).body - output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,') - output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*\]/, ',\1]') - File.write(@config_filename, output_to_write) + File.write("/tmp/4", config_hash.to_json) + `cat /tmp/4 | yj -jt > #{@config_filename}` end def current_config - loadable_string = File.read(@config_filename) - loadable_string = loadable_string.gsub(/,\s*(\d+)\s*,/, ', "\1",') - loadable_string = loadable_string.gsub(/,\s*(\d+)\s*\]/, ', "\1"]') - TOML.load(loadable_string) + JSON.parse(`cat #{@config_filename} | yj -tj`) + end + + def raw_config_file + File.read(@config_filename) end def reload_config @@ -116,11 +115,11 @@ def connection_string(pool_name, username, password = nil, parameters: {}) cfg = current_config user_idx, user_obj = cfg["pools"][pool_name]["users"].detect { |k, user| user["username"] == username } connection_string = "postgresql://#{username}:#{password || user_obj["password"]}@0.0.0.0:#{@port}/#{pool_name}" - + # Add the additional parameters to the connection string parameter_string = parameters.map { |key, value| "#{key}=#{value}" }.join("&") connection_string += "?#{parameter_string}" unless parameter_string.empty? - + connection_string end diff --git a/tests/ruby/mirrors_spec.rb b/tests/ruby/mirrors_spec.rb index 898d0d71..3294ed9e 100644 --- a/tests/ruby/mirrors_spec.rb +++ b/tests/ruby/mirrors_spec.rb @@ -11,9 +11,9 @@ before do new_configs = processes.pgcat.current_config new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [ - [mirror_host, mirror_pg.port.to_s, "0"], - [mirror_host, mirror_pg.port.to_s, "0"], - [mirror_host, mirror_pg.port.to_s, "0"], + [mirror_host, mirror_pg.port.to_i, "0"], + [mirror_host, mirror_pg.port.to_i, "0"], + [mirror_host, mirror_pg.port.to_i, "0"], ] processes.pgcat.update_config(new_configs) processes.pgcat.reload_config @@ -42,9 +42,9 @@ new_configs = processes.pgcat.current_config new_configs["pools"]["sharded_db"]["idle_timeout"] = 5000 + i new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [ - [mirror_host, mirror_pg.port.to_s, "0"], - [mirror_host, mirror_pg.port.to_s, "0"], - [mirror_host, mirror_pg.port.to_s, "0"], + [mirror_host, mirror_pg.port.to_i, "0"], + [mirror_host, mirror_pg.port.to_i, "0"], + [mirror_host, mirror_pg.port.to_i, "0"], ] processes.pgcat.update_config(new_configs) processes.pgcat.reload_config diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index 1d4ade4c..075bd799 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -252,7 +252,7 @@ end expect(processes.primary.count_query("RESET ROLE")).to eq(10) - end + end end context "transaction mode" do @@ -317,7 +317,7 @@ conn.async_exec("SET statement_timeout to 1500") expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq(orignal_statement_timeout) end - + end context "transaction mode with transactions" do @@ -377,9 +377,9 @@ current_configs = processes.pgcat.current_config correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"] puts(current_configs["general"]["idle_client_in_transaction_timeout"]) - + current_configs["general"]["idle_client_in_transaction_timeout"] = 0 - + processes.pgcat.update_config(current_configs) # with timeout 0 processes.pgcat.reload_config end @@ -397,9 +397,9 @@ context "idle transaction timeout set to 500ms" do before do current_configs = processes.pgcat.current_config - correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"] + correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"] current_configs["general"]["idle_client_in_transaction_timeout"] = 500 - + processes.pgcat.update_config(current_configs) # with timeout 500 processes.pgcat.reload_config end @@ -418,7 +418,7 @@ conn.async_exec("BEGIN") conn.async_exec("SELECT 1") sleep(1) # above 500ms - expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/) + expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/) conn.async_exec("SELECT 1") # should be able to send another query conn.close end diff --git a/tests/ruby/sharding_spec.rb b/tests/ruby/sharding_spec.rb index 123c10dc..c35f5ef4 100644 --- a/tests/ruby/sharding_spec.rb +++ b/tests/ruby/sharding_spec.rb @@ -7,11 +7,11 @@ before do conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) - # Setup the sharding data 3.times do |i| conn.exec("SET SHARD TO '#{i}'") - conn.exec("DELETE FROM data WHERE id > 0") + + conn.exec("DELETE FROM data WHERE id > 0") rescue nil end 18.times do |i| @@ -19,10 +19,11 @@ conn.exec("SET SHARDING KEY TO '#{i}'") conn.exec("INSERT INTO data (id, value) VALUES (#{i}, 'value_#{i}')") end + + conn.close end after do - processes.all_databases.map(&:reset) processes.pgcat.shutdown end @@ -48,4 +49,131 @@ end end end + + describe "comment-based routing" do + context "when no configs are set" do + it "routes queries with a shard_id comment to the default shard" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + 10.times { conn.async_exec("/* shard_id: 2 */ SELECT 1 + 2") } + + expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([10, 0, 0]) + end + + it "does not honor no_shard_specified_behavior directives" do + end + end + + [ + ["shard_id_regex", "/\\* the_shard_id: (\\d+) \\*/", "/* the_shard_id: 1 */"], + ["sharding_key_regex", "/\\* the_sharding_key: (\\d+) \\*/", "/* the_sharding_key: 3 */"], + ].each do |config_name, config_value, comment_to_use| + context "when #{config_name} config is set" do + let(:no_shard_specified_behavior) { nil } + + before do + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + + current_configs = processes.pgcat.current_config + current_configs["pools"]["sharded_db"][config_name] = config_value + if no_shard_specified_behavior + current_configs["pools"]["sharded_db"]["no_shard_specified_behavior"] = no_shard_specified_behavior + end + processes.pgcat.update_config(current_configs) + processes.pgcat.reload_config + end + + it "routes queries with a shard_id comment to the correct shard" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + 25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") } + + expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0]) + end + + context "when no_shard_specified_behavior config is set to random" do + let(:no_shard_specified_behavior) { "random" } + + context "with no shard comment" do + it "sends queries to random shard" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + 25.times { conn.async_exec("SELECT 1 + 2") } + + expect(processes.all_databases.map(&:count_select_1_plus_2).all?(&:positive?)).to be true + end + end + + context "with a shard comment" do + it "honors the comment" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + 25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") } + + expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0]) + end + end + end + + context "when no_shard_specified_behavior config is set to random_healthy" do + let(:no_shard_specified_behavior) { "random_healthy" } + + context "with no shard comment" do + it "sends queries to random healthy shard" do + + good_databases = [processes.all_databases[0], processes.all_databases[2]] + bad_database = processes.all_databases[1] + + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + 250.times { conn.async_exec("SELECT 99") } + bad_database.take_down do + 250.times do + conn.async_exec("SELECT 99") + rescue PG::ConnectionBad => e + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + end + end + + # Routes traffic away from bad shard + 25.times { conn.async_exec("SELECT 1 + 2") } + expect(good_databases.map(&:count_select_1_plus_2).all?(&:positive?)).to be true + expect(bad_database.count_select_1_plus_2).to eq(0) + + # Routes traffic to the bad shard if the shard_id is specified + 25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") } + bad_database = processes.all_databases[1] + expect(bad_database.count_select_1_plus_2).to eq(25) + end + end + + context "with a shard comment" do + it "honors the comment" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + 25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") } + + expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0]) + end + end + end + + context "when no_shard_specified_behavior config is set to shard_x" do + let(:no_shard_specified_behavior) { "shard_2" } + + context "with no shard comment" do + it "sends queries to the specified shard" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + 25.times { conn.async_exec("SELECT 1 + 2") } + + expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 0, 25]) + end + end + + context "with a shard comment" do + it "honors the comment" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + 25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") } + + expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0]) + end + end + end + end + end + end end From 48ef27b062e0c493818bab194ef392585858e85a Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Mon, 28 Aug 2023 20:37:18 -0500 Subject: [PATCH 2/5] fix tests --- Dockerfile.ci | 2 ++ src/query_router.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Dockerfile.ci b/Dockerfile.ci index 4503e870..a750612c 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -1,4 +1,6 @@ FROM cimg/rust:1.67.1 +COPY --from=sclevine/yj /bin/yj /bin/yj +RUN /bin/yj -h RUN sudo apt-get update && \ sudo apt-get install -y \ psmisc postgresql-contrib-14 postgresql-client-14 libpq-dev \ diff --git a/src/query_router.rs b/src/query_router.rs index ba1c3055..29a8fd99 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -257,7 +257,7 @@ impl QueryRouter { } } } - None => self.set_shard(0), + None => (), } } return None; From 5e1bb5f3c6d5332f6fd0e0914b6cab6f091b1ddd Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Mon, 28 Aug 2023 20:48:30 -0500 Subject: [PATCH 3/5] cleanup --- src/pool.rs | 3 +-- tests/ruby/helpers/pgcat_process.rb | 10 +++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index 2c245c84..3f81ff03 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -36,7 +36,6 @@ pub type ServerHost = String; pub type ServerPort = u16; pub type BanList = Arc>>>; -//pub type ErrorList = Arc>>>; pub type ClientServerMap = Arc>>; pub type PoolMap = HashMap; @@ -438,7 +437,6 @@ impl ConnectionPool { shards.push(pools); addresses.push(servers); - banlist.push(HashMap::new()); } @@ -798,6 +796,7 @@ impl ConnectionPool { if address.role == Role::Primary { return; } + error!("Banning instance {:?}, reason: {:?}", address, reason); let now = chrono::offset::Utc::now().naive_utc(); diff --git a/tests/ruby/helpers/pgcat_process.rb b/tests/ruby/helpers/pgcat_process.rb index 30af6b3a..cc015594 100644 --- a/tests/ruby/helpers/pgcat_process.rb +++ b/tests/ruby/helpers/pgcat_process.rb @@ -1,5 +1,6 @@ require 'pg' require 'json' +require 'tempfile' require 'fileutils' require 'securerandom' @@ -20,7 +21,7 @@ def self.finalize(pid, log_filename, config_filename) def initialize(log_level) @env = {} @port = rand(20000..32760) - @log_level = "ERROR" #log_level + @log_level = log_level @log_filename = "/tmp/pgcat_log_#{SecureRandom.urlsafe_base64}.log" @config_filename = "/tmp/pgcat_cfg_#{SecureRandom.urlsafe_base64}.toml" @@ -46,8 +47,11 @@ def logs def update_config(config_hash) @original_config = current_config - File.write("/tmp/4", config_hash.to_json) - `cat /tmp/4 | yj -jt > #{@config_filename}` + Tempfile.create('json_out', '/tmp') do |f| + f.write(config_hash.to_json) + f.flush + `cat #{f.path} | yj -jt > #{@config_filename}` + end end def current_config From bc699d2d5c4a440aaf94a1912d4112c6846b99ba Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Mon, 28 Aug 2023 21:32:12 -0500 Subject: [PATCH 4/5] bump From 5e877674ca41c0e5246e90aff46ae52495c770ac Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Mon, 28 Aug 2023 21:45:44 -0500 Subject: [PATCH 5/5] use latest image --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c7f5c9fa..9fe3c256 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -9,7 +9,7 @@ jobs: # Specify the execution environment. You can specify an image from Dockerhub or use one of our Convenience Images from CircleCI's Developer Hub. # See: https://circleci.com/docs/2.0/configuration-reference/#docker-machine-macos-windows-executor docker: - - image: ghcr.io/levkk/pgcat-ci:1.67 + - image: ghcr.io/levkk/pgcat-ci:latest environment: RUST_LOG: info LLVM_PROFILE_FILE: /tmp/pgcat-%m-%p.profraw