From 06b6df1d05d3f852e8eaaa2caacc9efd42738c18 Mon Sep 17 00:00:00 2001 From: Favian Samatha Date: Fri, 19 Dec 2025 15:13:46 -0800 Subject: [PATCH 1/2] fix: mysql deadlock on is_closed during wrapper destructor --- aws_advanced_python_wrapper/driver_dialect.py | 2 +- .../mysql_driver_dialect.py | 13 +++++-- .../pg_driver_dialect.py | 2 +- aws_advanced_python_wrapper/plugin_service.py | 2 +- .../sqlalchemy_driver_dialect.py | 4 +- .../utils/decorators.py | 37 +++++++++++++++++++ 6 files changed, 51 insertions(+), 9 deletions(-) diff --git a/aws_advanced_python_wrapper/driver_dialect.py b/aws_advanced_python_wrapper/driver_dialect.py index c9df891a8..584e15727 100644 --- a/aws_advanced_python_wrapper/driver_dialect.py +++ b/aws_advanced_python_wrapper/driver_dialect.py @@ -107,7 +107,7 @@ def prepare_connect_info(self, host_info: HostInfo, props: Properties) -> Proper PropertiesUtils.remove_wrapper_props(prop_copy) return prop_copy - def is_closed(self, conn: Connection) -> bool: + def is_closed(self, conn: Connection, is_releasing_resources: bool = False) -> bool: raise UnsupportedOperationError( Messages.get_formatted("DriverDialect.UnsupportedOperationError", self._driver_name, "is_closed")) diff --git a/aws_advanced_python_wrapper/mysql_driver_dialect.py b/aws_advanced_python_wrapper/mysql_driver_dialect.py index dd7055c55..7dc4f5c41 100644 --- a/aws_advanced_python_wrapper/mysql_driver_dialect.py +++ b/aws_advanced_python_wrapper/mysql_driver_dialect.py @@ -26,7 +26,8 @@ from aws_advanced_python_wrapper.driver_dialect import DriverDialect from aws_advanced_python_wrapper.driver_dialect_codes import DriverDialectCodes from aws_advanced_python_wrapper.errors import UnsupportedOperationError -from aws_advanced_python_wrapper.utils.decorators import timeout +from aws_advanced_python_wrapper.utils.decorators import ( + timeout, timeout_with_new_thread) from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, PropertiesUtils, @@ -86,16 +87,20 @@ def is_dialect(self, connect_func: Callable) -> bool: return MySQLDriverDialect.TARGET_DRIVER_CODE.lower() in (connect_func.__module__ + connect_func.__qualname__).lower() return True - def is_closed(self, conn: Connection) -> bool: + def is_closed(self, conn: Connection, is_releasing_resources: bool = False) -> bool: if MySQLDriverDialect._is_mysql_connection(conn): - # is_connected validates the connection using a ping(). # If there are any unread results from previous executions an error will be thrown. if self.can_execute_query(conn): socket_timeout = WrapperProperties.SOCKET_TIMEOUT_SEC.get_float(self._props) timeout_sec = socket_timeout if socket_timeout > 0 else MySQLDriverDialect.IS_CLOSED_TIMEOUT_SEC - is_connected_with_timeout = timeout(MySQLDriverDialect._executor, timeout_sec)(conn.is_connected) # type: ignore + is_connected_with_timeout = None + if not is_releasing_resources: + is_connected_with_timeout = timeout(MySQLDriverDialect._executor, timeout_sec)(conn.is_connected) # type: ignore + else: + # When the wrapper destructor is called the executor may also be deleted in the same time which could cause a deadlock. + is_connected_with_timeout = timeout_with_new_thread(timeout_sec)(conn.is_connected) # type: ignore try: return not is_connected_with_timeout() except TimeoutError: diff --git a/aws_advanced_python_wrapper/pg_driver_dialect.py b/aws_advanced_python_wrapper/pg_driver_dialect.py index fbe441f39..21f351a99 100644 --- a/aws_advanced_python_wrapper/pg_driver_dialect.py +++ b/aws_advanced_python_wrapper/pg_driver_dialect.py @@ -63,7 +63,7 @@ def is_dialect(self, connect_func: Callable) -> bool: return PgDriverDialect.TARGET_DRIVER_CODE.lower() in (connect_func.__module__ + connect_func.__qualname__).lower() return True - def is_closed(self, conn: Connection) -> bool: + def is_closed(self, conn: Connection, is_releasing_resources: bool = False) -> bool: if isinstance(conn, psycopg.Connection): return conn.closed diff --git a/aws_advanced_python_wrapper/plugin_service.py b/aws_advanced_python_wrapper/plugin_service.py index f3dc5fbce..bdc546a68 100644 --- a/aws_advanced_python_wrapper/plugin_service.py +++ b/aws_advanced_python_wrapper/plugin_service.py @@ -704,7 +704,7 @@ def is_plugin_in_use(self, plugin_class: Type[Plugin]) -> bool: def release_resources(self): try: if self.current_connection is not None and not self.driver_dialect.is_closed( - self.current_connection): + self.current_connection, True): self.current_connection.close() except Exception: # ignore diff --git a/aws_advanced_python_wrapper/sqlalchemy_driver_dialect.py b/aws_advanced_python_wrapper/sqlalchemy_driver_dialect.py index 290a8f86d..8c4ad382a 100644 --- a/aws_advanced_python_wrapper/sqlalchemy_driver_dialect.py +++ b/aws_advanced_python_wrapper/sqlalchemy_driver_dialect.py @@ -55,13 +55,13 @@ def set_autocommit(self, conn: Connection, autocommit: bool): return self._underlying_driver.set_autocommit(conn, autocommit) - def is_closed(self, conn: Connection) -> bool: + def is_closed(self, conn: Connection, is_releasing_resources: bool = False) -> bool: if isinstance(conn, PoolProxiedConnection): conn = conn.driver_connection if conn is None: return True - return self._underlying_driver.is_closed(conn) + return self._underlying_driver.is_closed(conn, is_releasing_resources) def abort_connection(self, conn: Connection): if isinstance(conn, PoolProxiedConnection): diff --git a/aws_advanced_python_wrapper/utils/decorators.py b/aws_advanced_python_wrapper/utils/decorators.py index 7f76fb984..f0fe768d4 100644 --- a/aws_advanced_python_wrapper/utils/decorators.py +++ b/aws_advanced_python_wrapper/utils/decorators.py @@ -15,6 +15,8 @@ from __future__ import annotations import functools +import threading +from concurrent.futures import TimeoutError from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -68,3 +70,38 @@ def func_wrapper(*args, **kwargs): return func_wrapper return timeout_decorator + + +def timeout_with_new_thread(timeout_sec): + """ + Timeout decorator using a new daemon thread, timeout in seconds + """ + + def timeout_decorator(func): + @functools.wraps(func) + def func_wrapper(*args, **kwargs): + result = [None] + exception = [None] + + def target(): + try: + result[0] = func(*args, **kwargs) + except Exception as e: + exception[0] = e + + thread = threading.Thread(target=target, daemon=True) + thread.start() + thread.join(timeout=timeout_sec) + + if thread.is_alive(): + # Timeout occurred + raise TimeoutError(f"Function timed out after {timeout_sec} seconds") + + if exception[0]: + raise exception[0] + + return result[0] + + return func_wrapper + + return timeout_decorator From c0eaac1797effc942ac4063a7a71b68e891c19db Mon Sep 17 00:00:00 2001 From: Favian Samatha Date: Fri, 19 Dec 2025 17:13:19 -0800 Subject: [PATCH 2/2] revert last commit and remove is_closed --- aws_advanced_python_wrapper/driver_dialect.py | 2 +- .../mysql_driver_dialect.py | 13 ++----- .../pg_driver_dialect.py | 2 +- aws_advanced_python_wrapper/plugin_service.py | 3 +- .../sqlalchemy_driver_dialect.py | 4 +- .../utils/decorators.py | 37 ------------------- 6 files changed, 9 insertions(+), 52 deletions(-) diff --git a/aws_advanced_python_wrapper/driver_dialect.py b/aws_advanced_python_wrapper/driver_dialect.py index 584e15727..c9df891a8 100644 --- a/aws_advanced_python_wrapper/driver_dialect.py +++ b/aws_advanced_python_wrapper/driver_dialect.py @@ -107,7 +107,7 @@ def prepare_connect_info(self, host_info: HostInfo, props: Properties) -> Proper PropertiesUtils.remove_wrapper_props(prop_copy) return prop_copy - def is_closed(self, conn: Connection, is_releasing_resources: bool = False) -> bool: + def is_closed(self, conn: Connection) -> bool: raise UnsupportedOperationError( Messages.get_formatted("DriverDialect.UnsupportedOperationError", self._driver_name, "is_closed")) diff --git a/aws_advanced_python_wrapper/mysql_driver_dialect.py b/aws_advanced_python_wrapper/mysql_driver_dialect.py index 7dc4f5c41..dd7055c55 100644 --- a/aws_advanced_python_wrapper/mysql_driver_dialect.py +++ b/aws_advanced_python_wrapper/mysql_driver_dialect.py @@ -26,8 +26,7 @@ from aws_advanced_python_wrapper.driver_dialect import DriverDialect from aws_advanced_python_wrapper.driver_dialect_codes import DriverDialectCodes from aws_advanced_python_wrapper.errors import UnsupportedOperationError -from aws_advanced_python_wrapper.utils.decorators import ( - timeout, timeout_with_new_thread) +from aws_advanced_python_wrapper.utils.decorators import timeout from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, PropertiesUtils, @@ -87,20 +86,16 @@ def is_dialect(self, connect_func: Callable) -> bool: return MySQLDriverDialect.TARGET_DRIVER_CODE.lower() in (connect_func.__module__ + connect_func.__qualname__).lower() return True - def is_closed(self, conn: Connection, is_releasing_resources: bool = False) -> bool: + def is_closed(self, conn: Connection) -> bool: if MySQLDriverDialect._is_mysql_connection(conn): + # is_connected validates the connection using a ping(). # If there are any unread results from previous executions an error will be thrown. if self.can_execute_query(conn): socket_timeout = WrapperProperties.SOCKET_TIMEOUT_SEC.get_float(self._props) timeout_sec = socket_timeout if socket_timeout > 0 else MySQLDriverDialect.IS_CLOSED_TIMEOUT_SEC - is_connected_with_timeout = None + is_connected_with_timeout = timeout(MySQLDriverDialect._executor, timeout_sec)(conn.is_connected) # type: ignore - if not is_releasing_resources: - is_connected_with_timeout = timeout(MySQLDriverDialect._executor, timeout_sec)(conn.is_connected) # type: ignore - else: - # When the wrapper destructor is called the executor may also be deleted in the same time which could cause a deadlock. - is_connected_with_timeout = timeout_with_new_thread(timeout_sec)(conn.is_connected) # type: ignore try: return not is_connected_with_timeout() except TimeoutError: diff --git a/aws_advanced_python_wrapper/pg_driver_dialect.py b/aws_advanced_python_wrapper/pg_driver_dialect.py index 21f351a99..fbe441f39 100644 --- a/aws_advanced_python_wrapper/pg_driver_dialect.py +++ b/aws_advanced_python_wrapper/pg_driver_dialect.py @@ -63,7 +63,7 @@ def is_dialect(self, connect_func: Callable) -> bool: return PgDriverDialect.TARGET_DRIVER_CODE.lower() in (connect_func.__module__ + connect_func.__qualname__).lower() return True - def is_closed(self, conn: Connection, is_releasing_resources: bool = False) -> bool: + def is_closed(self, conn: Connection) -> bool: if isinstance(conn, psycopg.Connection): return conn.closed diff --git a/aws_advanced_python_wrapper/plugin_service.py b/aws_advanced_python_wrapper/plugin_service.py index bdc546a68..1fe04db9e 100644 --- a/aws_advanced_python_wrapper/plugin_service.py +++ b/aws_advanced_python_wrapper/plugin_service.py @@ -703,8 +703,7 @@ def is_plugin_in_use(self, plugin_class: Type[Plugin]) -> bool: def release_resources(self): try: - if self.current_connection is not None and not self.driver_dialect.is_closed( - self.current_connection, True): + if self.current_connection is not None: self.current_connection.close() except Exception: # ignore diff --git a/aws_advanced_python_wrapper/sqlalchemy_driver_dialect.py b/aws_advanced_python_wrapper/sqlalchemy_driver_dialect.py index 8c4ad382a..290a8f86d 100644 --- a/aws_advanced_python_wrapper/sqlalchemy_driver_dialect.py +++ b/aws_advanced_python_wrapper/sqlalchemy_driver_dialect.py @@ -55,13 +55,13 @@ def set_autocommit(self, conn: Connection, autocommit: bool): return self._underlying_driver.set_autocommit(conn, autocommit) - def is_closed(self, conn: Connection, is_releasing_resources: bool = False) -> bool: + def is_closed(self, conn: Connection) -> bool: if isinstance(conn, PoolProxiedConnection): conn = conn.driver_connection if conn is None: return True - return self._underlying_driver.is_closed(conn, is_releasing_resources) + return self._underlying_driver.is_closed(conn) def abort_connection(self, conn: Connection): if isinstance(conn, PoolProxiedConnection): diff --git a/aws_advanced_python_wrapper/utils/decorators.py b/aws_advanced_python_wrapper/utils/decorators.py index f0fe768d4..7f76fb984 100644 --- a/aws_advanced_python_wrapper/utils/decorators.py +++ b/aws_advanced_python_wrapper/utils/decorators.py @@ -15,8 +15,6 @@ from __future__ import annotations import functools -import threading -from concurrent.futures import TimeoutError from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -70,38 +68,3 @@ def func_wrapper(*args, **kwargs): return func_wrapper return timeout_decorator - - -def timeout_with_new_thread(timeout_sec): - """ - Timeout decorator using a new daemon thread, timeout in seconds - """ - - def timeout_decorator(func): - @functools.wraps(func) - def func_wrapper(*args, **kwargs): - result = [None] - exception = [None] - - def target(): - try: - result[0] = func(*args, **kwargs) - except Exception as e: - exception[0] = e - - thread = threading.Thread(target=target, daemon=True) - thread.start() - thread.join(timeout=timeout_sec) - - if thread.is_alive(): - # Timeout occurred - raise TimeoutError(f"Function timed out after {timeout_sec} seconds") - - if exception[0]: - raise exception[0] - - return result[0] - - return func_wrapper - - return timeout_decorator