Skip to main content

Stack Exchange Network

Stack Exchange network consists of 183 Q&A communities including Stack Overflow, the largest, most trusted online community for developers to learn, share their knowledge, and build their careers.

Visit Stack Exchange
Asked
Viewed 76 times
1
\$\begingroup\$

I had to design a connection pool that handles Boost MySQL connection to use the asynchronous functions.

Here is the code:


#include <mutex>
#include <tuple>
#include <string>
#include <memory>
#include <queue>
#include <condition_variable>       
#include <boost/asio/io_context.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/mysql.hpp>
/// The Interface that handle the different operation with a Connection
class IConnector {
   public:
    virtual void run(std::unique_ptr<boost::mysql::tcp_ssl_connection>&&) = 0;
};

class IConnection_pool {
   public:
    virtual void rejoin(std::unique_ptr<boost::mysql::tcp_ssl_connection>&&) = 0;
    virtual void replace(std::unique_ptr<boost::mysql::tcp_ssl_connection>&&) = 0;
    virtual void push(std::shared_ptr<IConnector>) = 0;
};

class Connector : public IConnector, public std::enable_shared_from_this<Connector> {
   public:
    Connector(boost::asio::ip::tcp::resolver::results_type& end_point, boost::mysql::handshake_params params,
              IConnection_pool* connection_pool)
        : end_point_(end_point), params_(params), connection_pool_(connection_pool) {}
    void run(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn) {
        conn_ = std::move(conn);
        do_connect();
    }

   private:
    void do_connect() {
        conn_->async_connect(*end_point_.begin(), params_, diag_,
                             std::bind(&Connector::on_connect, shared_from_this(), std::placeholders::_1));
    }
    void on_connect(boost::mysql::error_code ec) {
        if (ec) {
            connection_pool_->replace(std::move(conn_));
            return;
        }
        connection_pool_->rejoin(std::move(conn_));
    }
    std::unique_ptr<boost::mysql::tcp_ssl_connection> conn_;
    boost::asio::ip::tcp::resolver::results_type& end_point_;
    boost::mysql::handshake_params params_;
    boost::mysql::diagnostics diag_;
    IConnection_pool* connection_pool_;
};

class Connector_ping : public IConnector, public std::enable_shared_from_this<Connector_ping> {
   public:
   Connector_ping(IConnection_pool* connection_pool) : connection_pool_(connection_pool) {}
 void run(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn);    

   private:
    void do_ping() {
        conn_->async_ping(diag_, std::bind(&Connector_ping::on_ping, shared_from_this(), std::placeholders::_1));
    }
    void on_ping(boost::mysql::error_code ec) {
        if (ec) {
            connection_pool_->replace(std::move(conn_));
            return;
        }
        connection_pool_->rejoin(std::move(conn_));
    }
    std::unique_ptr<boost::mysql::tcp_ssl_connection> conn_;
    boost::mysql::diagnostics diag_;
    IConnection_pool* connection_pool_;
};

class Connector_close : public IConnector, public std::enable_shared_from_this<Connector_close> {
   public:
    Connector_close() {}
    void run(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn) {
        conn_ = std::move(conn);
        do_close();
    }

   private:
    void do_close() {
        conn_->async_close(diagn_, std::bind(&Connector_close::on_close, shared_from_this(), std::placeholders::_1));
    }
    void on_close(boost::mysql::error_code ec) {
        if (ec) {
        }
    }

    std::unique_ptr<boost::mysql::tcp_ssl_connection> conn_;
    boost::mysql::diagnostics diagn_;
};

class Connector_query : public IConnector, public std::enable_shared_from_this<Connector_query> {
   public:
    Connector_query(std::tuple<> tuple , IConnection_pool& connection_pool) : tuple_(tuple) , connection_pool_(connection_pool){}
    void run(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn) {
        conn_ = std::move(conn);
        do_statment();
    }

   private:
    void do_statment() {
        conn_->async_prepare_statement(
            statement_str_, diagn_,
            std::bind(&Connector_query::on_statment, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
    }
    void on_statment(boost::mysql::error_code ec, boost::mysql::statement stm) {
        if (ec) {
            connection_pool_.replace(std::move(conn_));
            return;
        }
        stmt_ = stm;
        do_query();
    }
    void do_query() {
        conn_->async_execute_statement(
            stmt_, tuple_, result_, diagn_,
            std::bind(&Connector_query::on_query, shared_from_this(), std::placeholders::_1));
    }
    void on_query(boost::mysql::error_code ec) {
        if (ec) {
            connection_pool_.replace(std::move(conn_));
            return;
        }

        do_close_statment();
    }
    void do_close_statment() {
        conn_->async_close_statement(
            stmt_, diagn_, std::bind(&Connector_query::on_close_statmemt, shared_from_this(), std::placeholders::_1));
    }
    void on_close_statmemt(boost::mysql::error_code ec) {
        if (ec) {
            connection_pool_.replace(std::move(conn_));
        }
        do_leave();
    }
    void do_leave() { connection_pool_.rejoin(std::move(conn_)); }

    std::unique_ptr<boost::mysql::tcp_ssl_connection> conn_;
    std::tuple<> tuple_;

    IConnection_pool& connection_pool_;
    const std::string statement_str_{"SELECT 1;"};
    boost::mysql::statement stmt_;
    boost::mysql::diagnostics diagn_;
    boost::mysql::results result_;
};

class Connection_pool : public IConnection_pool {
   public:
    Connection_pool(boost::asio::io_context& io_context, boost::asio::ssl::context& ssl_context, std::string hostname,
                    std::string username, std::string password, std::string database, std::size_t connection_nb)
        : io_context_(io_context),
          ssl_context_(ssl_context),
          hostname_(hostname),
          username_(username),
          password_(password),
          database_(database),
          connections_nb_(connection_nb),
          resolver_(io_context.get_executor()),
          params_(username_, password_, database_),
          timer_(io_context_, std::chrono::milliseconds(1)) {}
    ~Connection_pool() {
        std::lock_guard<std::mutex> lock(mutex_conn_);
        for (std::size_t index = 0, size = connections_.size(); index < size; ++index) {
            std::make_shared<Connector_close>()->run(std::move(connections_.back()));
            connections_.pop_back();
        }
    }
    void rejoin(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn) {
        std::lock_guard<std::mutex> lock(mutex_conn_);
        connections_.emplace_back(std::move(conn));
        condition_conn_.notify_one();
    }
    void replace(std::unique_ptr<boost::mysql::tcp_ssl_connection>&& conn) {
        std::make_shared<Connector_close>()->run(std::move(conn));
        std::make_shared<Connector>(endpoint_, params_, this)
            ->run(std::move(
                std::make_unique<boost::mysql::tcp_ssl_connection>(io_context_.get_executor(), ssl_context_)));
    }
    void push(std::shared_ptr<IConnector> connector) {
        std::lock_guard<std::mutex> lock(mutex_queue_);
        connector_queue_.push(connector);
        condition_queue_.notify_one();
    }
    void run() {
        intitialize();
        io_context_.post(std::bind(&Connection_pool::process, this));
    }

   private:
    void process() {
        while (is_full()) {
            std::shared_ptr<IConnector> ptr;
            {
                std::lock_guard<std::mutex> lock(mutex_queue_);
                ptr = connector_queue_.front();
                connector_queue_.pop();
            }
            {
                std::unique_lock<std::mutex> lock(mutex_conn_);
                if (connections_.empty()) {
                    condition_conn_.wait(lock, [this] { return !connections_.empty(); });
                }
                ptr->run(std::move(connections_.back()));
                connections_.pop_back();
            }
        }
    }
    void intitialize() {
        boost::system::error_code ec;
        endpoint_ = resolver_.resolve(hostname_, boost::mysql::default_port_string, ec);
        if (ec) {
            return;
        }
        connections_.reserve(connections_nb_);
        for (std::size_t index = 0; index < connections_nb_; ++index) {
            std::make_shared<Connector>(endpoint_, params_, this)
                ->run(std::move(
                    std::make_unique<boost::mysql::tcp_ssl_connection>(io_context_.get_executor(), ssl_context_)));
        }
        do_ping_all();
    }
    void do_ping_all() {
        timer_.expires_from_now(std::chrono::milliseconds(250));
        timer_.async_wait([this](boost::mysql::error_code ec) {
            if (ec) {
                return;
            }
            {
                std::lock_guard<std::mutex> lock(mutex_conn_);
                for (std::size_t index = 0, size = connections_.size(); index < size; ++index) {
                    std::make_shared<Connector_ping>(this)->run(std::move(connections_.back()));
                    connections_.pop_back();
                }
            }
            do_ping_all();
        });
    }
    bool is_full() {
        std::unique_lock<std::mutex> lock(mutex_queue_);
        if (connector_queue_.empty()) condition_queue_.wait(lock, [this] { return !connector_queue_.empty(); });
        return true;
    }
    std::string hostname_{};
    std::string username_{};
    std::string password_{};
    std::string database_{};
    std::size_t connections_nb_{};
    boost::asio::io_context& io_context_;
    boost::asio::ssl::context& ssl_context_;
    boost::mysql::handshake_params params_;
    boost::asio::ip::tcp::resolver resolver_;
    boost::asio::ip::tcp::resolver::results_type endpoint_;
    boost::asio::steady_timer timer_;
    std::mutex mutex_queue_;
    std::mutex mutex_conn_;

    std::vector<std::unique_ptr<boost::mysql::tcp_ssl_connection>> connections_;
    std::queue<std::shared_ptr<IConnector>> connector_queue_;
    std::condition_variable condition_conn_;
    std::condition_variable condition_queue_;
};


int main(){  
    boost::asio::io_context io_context_;
    boost::asio::ssl::context ssl_ctx_(boost::asio::ssl::context::tls_client);
    std::string hostname_{}; 
    std::string username_{};
    std::string password_{};
    std::string database_{};
    Connection_pool connection_pool_(io_context_,ssl_ctx_,hostname_,username_,password_,database_,4);
    connection_pool_.run();
    io_context_.run();
    return 0 ;

}

The full code is here https://godbolt.org/z/sT1hTW3TK. I am looking to see if it can be improved in terms of performance and safety.

\$\endgroup\$
1

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.

Morty Proxy This is a proxified and sanitized view of the page, visit original site.