I had to design a connection pool that handles Boost MySQL connection to use the asynchronous functions.
Here is the code:
#include <mutex>
#include <tuple>
#include <string>
#include <memory>
#include <queue>
#include <condition_variable>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/mysql.hpp>
/// The Interface that handle the different operation with a Connection
class IConnector {
public:
virtual void run(std::unique_ptr<boost::mysql::tcp_ssl_connection>&&) = 0;
};
class IConnection_pool {
public:
virtual void rejoin(std::unique_ptr<boost::mysql::tcp_ssl_connection>&&) = 0;
virtual void replace(std::unique_ptr<boost::mysql::tcp_ssl_connection>&&) = 0;
virtual void push(std::shared_ptr<IConnector>) = 0;
};
class Connector : public IConnector, public std::enable_shared_from_this<Connector> {
public:
Connector(boost::asio::ip::tcp::resolver::results_type& end_point, boost::mysql::handshake_params params,
IConnection_pool* connection_pool)
: end_point_(end_point), params_(params), connection_pool_(connection_pool) {}
void run(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn) {
conn_ = std::move(conn);
do_connect();
}
private:
void do_connect() {
conn_->async_connect(*end_point_.begin(), params_, diag_,
std::bind(&Connector::on_connect, shared_from_this(), std::placeholders::_1));
}
void on_connect(boost::mysql::error_code ec) {
if (ec) {
connection_pool_->replace(std::move(conn_));
return;
}
connection_pool_->rejoin(std::move(conn_));
}
std::unique_ptr<boost::mysql::tcp_ssl_connection> conn_;
boost::asio::ip::tcp::resolver::results_type& end_point_;
boost::mysql::handshake_params params_;
boost::mysql::diagnostics diag_;
IConnection_pool* connection_pool_;
};
class Connector_ping : public IConnector, public std::enable_shared_from_this<Connector_ping> {
public:
Connector_ping(IConnection_pool* connection_pool) : connection_pool_(connection_pool) {}
void run(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn);
private:
void do_ping() {
conn_->async_ping(diag_, std::bind(&Connector_ping::on_ping, shared_from_this(), std::placeholders::_1));
}
void on_ping(boost::mysql::error_code ec) {
if (ec) {
connection_pool_->replace(std::move(conn_));
return;
}
connection_pool_->rejoin(std::move(conn_));
}
std::unique_ptr<boost::mysql::tcp_ssl_connection> conn_;
boost::mysql::diagnostics diag_;
IConnection_pool* connection_pool_;
};
class Connector_close : public IConnector, public std::enable_shared_from_this<Connector_close> {
public:
Connector_close() {}
void run(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn) {
conn_ = std::move(conn);
do_close();
}
private:
void do_close() {
conn_->async_close(diagn_, std::bind(&Connector_close::on_close, shared_from_this(), std::placeholders::_1));
}
void on_close(boost::mysql::error_code ec) {
if (ec) {
}
}
std::unique_ptr<boost::mysql::tcp_ssl_connection> conn_;
boost::mysql::diagnostics diagn_;
};
class Connector_query : public IConnector, public std::enable_shared_from_this<Connector_query> {
public:
Connector_query(std::tuple<> tuple , IConnection_pool& connection_pool) : tuple_(tuple) , connection_pool_(connection_pool){}
void run(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn) {
conn_ = std::move(conn);
do_statment();
}
private:
void do_statment() {
conn_->async_prepare_statement(
statement_str_, diagn_,
std::bind(&Connector_query::on_statment, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
void on_statment(boost::mysql::error_code ec, boost::mysql::statement stm) {
if (ec) {
connection_pool_.replace(std::move(conn_));
return;
}
stmt_ = stm;
do_query();
}
void do_query() {
conn_->async_execute_statement(
stmt_, tuple_, result_, diagn_,
std::bind(&Connector_query::on_query, shared_from_this(), std::placeholders::_1));
}
void on_query(boost::mysql::error_code ec) {
if (ec) {
connection_pool_.replace(std::move(conn_));
return;
}
do_close_statment();
}
void do_close_statment() {
conn_->async_close_statement(
stmt_, diagn_, std::bind(&Connector_query::on_close_statmemt, shared_from_this(), std::placeholders::_1));
}
void on_close_statmemt(boost::mysql::error_code ec) {
if (ec) {
connection_pool_.replace(std::move(conn_));
}
do_leave();
}
void do_leave() { connection_pool_.rejoin(std::move(conn_)); }
std::unique_ptr<boost::mysql::tcp_ssl_connection> conn_;
std::tuple<> tuple_;
IConnection_pool& connection_pool_;
const std::string statement_str_{"SELECT 1;"};
boost::mysql::statement stmt_;
boost::mysql::diagnostics diagn_;
boost::mysql::results result_;
};
class Connection_pool : public IConnection_pool {
public:
Connection_pool(boost::asio::io_context& io_context, boost::asio::ssl::context& ssl_context, std::string hostname,
std::string username, std::string password, std::string database, std::size_t connection_nb)
: io_context_(io_context),
ssl_context_(ssl_context),
hostname_(hostname),
username_(username),
password_(password),
database_(database),
connections_nb_(connection_nb),
resolver_(io_context.get_executor()),
params_(username_, password_, database_),
timer_(io_context_, std::chrono::milliseconds(1)) {}
~Connection_pool() {
std::lock_guard<std::mutex> lock(mutex_conn_);
for (std::size_t index = 0, size = connections_.size(); index < size; ++index) {
std::make_shared<Connector_close>()->run(std::move(connections_.back()));
connections_.pop_back();
}
}
void rejoin(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn) {
std::lock_guard<std::mutex> lock(mutex_conn_);
connections_.emplace_back(std::move(conn));
condition_conn_.notify_one();
}
void replace(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn) {
std::make_shared<Connector_close>()->run(std::move(conn));
std::make_shared<Connector>(endpoint_, params_, this)
->run(std::move(
std::make_unique<boost::mysql::tcp_ssl_connection>(io_context_.get_executor(), ssl_context_)));
}
void push(std::shared_ptr<IConnector> connector) {
std::lock_guard<std::mutex> lock(mutex_queue_);
connector_queue_.push(connector);
condition_queue_.notify_one();
}
void run() {
intitialize();
io_context_.post(std::bind(&Connection_pool::process, this));
}
private:
void process() {
while (is_full()) {
std::shared_ptr<IConnector> ptr;
{
std::lock_guard<std::mutex> lock(mutex_queue_);
ptr = connector_queue_.front();
connector_queue_.pop();
}
{
std::unique_lock<std::mutex> lock(mutex_conn_);
if (connections_.empty()) {
condition_conn_.wait(lock, [this] { return !connections_.empty(); });
}
ptr->run(std::move(connections_.back()));
connections_.pop_back();
}
}
}
void intitialize() {
boost::system::error_code ec;
endpoint_ = resolver_.resolve(hostname_, boost::mysql::default_port_string, ec);
if (ec) {
return;
}
connections_.reserve(connections_nb_);
for (std::size_t index = 0; index < connections_nb_; ++index) {
std::make_shared<Connector>(endpoint_, params_, this)
->run(std::move(
std::make_unique<boost::mysql::tcp_ssl_connection>(io_context_.get_executor(), ssl_context_)));
}
do_ping_all();
}
void do_ping_all() {
timer_.expires_from_now(std::chrono::milliseconds(250));
timer_.async_wait([this](boost::mysql::error_code ec) {
if (ec) {
return;
}
{
std::lock_guard<std::mutex> lock(mutex_conn_);
for (std::size_t index = 0, size = connections_.size(); index < size; ++index) {
std::make_shared<Connector_ping>(this)->run(std::move(connections_.back()));
connections_.pop_back();
}
}
do_ping_all();
});
}
bool is_full() {
std::unique_lock<std::mutex> lock(mutex_queue_);
if (connector_queue_.empty()) condition_queue_.wait(lock, [this] { return !connector_queue_.empty(); });
return true;
}
std::string hostname_{};
std::string username_{};
std::string password_{};
std::string database_{};
std::size_t connections_nb_{};
boost::asio::io_context& io_context_;
boost::asio::ssl::context& ssl_context_;
boost::mysql::handshake_params params_;
boost::asio::ip::tcp::resolver resolver_;
boost::asio::ip::tcp::resolver::results_type endpoint_;
boost::asio::steady_timer timer_;
std::mutex mutex_queue_;
std::mutex mutex_conn_;
std::vector<std::unique_ptr<boost::mysql::tcp_ssl_connection>> connections_;
std::queue<std::shared_ptr<IConnector>> connector_queue_;
std::condition_variable condition_conn_;
std::condition_variable condition_queue_;
};
int main(){
boost::asio::io_context io_context_;
boost::asio::ssl::context ssl_ctx_(boost::asio::ssl::context::tls_client);
std::string hostname_{};
std::string username_{};
std::string password_{};
std::string database_{};
Connection_pool connection_pool_(io_context_,ssl_ctx_,hostname_,username_,password_,database_,4);
connection_pool_.run();
io_context_.run();
return 0 ;
}
The full code is here https://godbolt.org/z/sT1hTW3TK. I am looking to see if it can be improved in terms of performance and safety.