Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit cbf64e1

Browse filesBrowse files
authored
When prepared statements are disabled, avoid relying on them harder (#1065)
It appears that PgBouncer's `transaction` pooling mode does not consider implicit transactions properly, and so in a [`Parse`, `Flush`, `Bind`, `Execute`, `Sync`] sequence, `Flush` would be (incorrectly) considered by PgBouncer as a transaction boundary and it will happily send the following `Bind` / `Execute` messages to a different backend process. This makes it so that when `statement_cache_size` is set to `0`, asyncpg assumes a pessimistic stance on prepared statement persistence and does not rely on them even in implicit transactions. The above message sequence thus becomes `Parse`, `Flush`, `Parse` (a second time), `Bind`, `Execute`, `Sync`. This obviously has negative performance impact due to the extraneous `Parse`. Fixes: #1058 Fixes: #1041
1 parent 87ab143 commit cbf64e1
Copy full SHA for cbf64e1

File tree

Expand file treeCollapse file tree

6 files changed

+81
-25
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

6 files changed

+81
-25
lines changed
Open diff view settings
Collapse file

‎asyncpg/connection.py‎

Copy file name to clipboardExpand all lines: asyncpg/connection.py
+31-15Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class Connection(metaclass=ConnectionMeta):
4747
__slots__ = ('_protocol', '_transport', '_loop',
4848
'_top_xact', '_aborted',
4949
'_pool_release_ctr', '_stmt_cache', '_stmts_to_close',
50+
'_stmt_cache_enabled',
5051
'_listeners', '_server_version', '_server_caps',
5152
'_intro_query', '_reset_query', '_proxy',
5253
'_stmt_exclusive_section', '_config', '_params', '_addr',
@@ -79,6 +80,7 @@ def __init__(self, protocol, transport, loop,
7980
max_lifetime=config.max_cached_statement_lifetime)
8081

8182
self._stmts_to_close = set()
83+
self._stmt_cache_enabled = config.statement_cache_size > 0
8284

8385
self._listeners = {}
8486
self._log_listeners = set()
@@ -381,11 +383,13 @@ async def _get_statement(
381383
# Only use the cache when:
382384
# * `statement_cache_size` is greater than 0;
383385
# * query size is less than `max_cacheable_statement_size`.
384-
use_cache = self._stmt_cache.get_max_size() > 0
385-
if (use_cache and
386-
self._config.max_cacheable_statement_size and
387-
len(query) > self._config.max_cacheable_statement_size):
388-
use_cache = False
386+
use_cache = (
387+
self._stmt_cache_enabled
388+
and (
389+
not self._config.max_cacheable_statement_size
390+
or len(query) <= self._config.max_cacheable_statement_size
391+
)
392+
)
389393

390394
if isinstance(named, str):
391395
stmt_name = named
@@ -434,14 +438,16 @@ async def _get_statement(
434438
# for the statement.
435439
statement._init_codecs()
436440

437-
if need_reprepare:
438-
await self._protocol.prepare(
439-
stmt_name,
440-
query,
441-
timeout,
442-
state=statement,
443-
record_class=record_class,
444-
)
441+
if (
442+
need_reprepare
443+
or (not statement.name and not self._stmt_cache_enabled)
444+
):
445+
# Mark this anonymous prepared statement as "unprepared",
446+
# causing it to get re-Parsed in next bind_execute.
447+
# We always do this when stmt_cache_size is set to 0 assuming
448+
# people are running PgBouncer which is mishandling implicit
449+
# transactions.
450+
statement.mark_unprepared()
445451

446452
if use_cache:
447453
self._stmt_cache.put(
@@ -1679,7 +1685,13 @@ async def __execute(
16791685
record_class=None
16801686
):
16811687
executor = lambda stmt, timeout: self._protocol.bind_execute(
1682-
stmt, args, '', limit, return_status, timeout)
1688+
state=stmt,
1689+
args=args,
1690+
portal_name='',
1691+
limit=limit,
1692+
return_extra=return_status,
1693+
timeout=timeout,
1694+
)
16831695
timeout = self._protocol._get_timeout(timeout)
16841696
return await self._do_execute(
16851697
query,
@@ -1691,7 +1703,11 @@ async def __execute(
16911703

16921704
async def _executemany(self, query, args, timeout):
16931705
executor = lambda stmt, timeout: self._protocol.bind_execute_many(
1694-
stmt, args, '', timeout)
1706+
state=stmt,
1707+
args=args,
1708+
portal_name='',
1709+
timeout=timeout,
1710+
)
16951711
timeout = self._protocol._get_timeout(timeout)
16961712
with self._stmt_exclusive_section:
16971713
result, _ = await self._do_execute(query, executor, timeout)
Collapse file

‎asyncpg/protocol/coreproto.pxd‎

Copy file name to clipboardExpand all lines: asyncpg/protocol/coreproto.pxd
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ cdef class CoreProtocol:
167167

168168

169169
cdef _connect(self)
170-
cdef _prepare(self, str stmt_name, str query)
170+
cdef _prepare_and_describe(self, str stmt_name, str query)
171+
cdef _send_parse_message(self, str stmt_name, str query)
171172
cdef _send_bind_message(self, str portal_name, str stmt_name,
172173
WriteBuffer bind_data, int32_t limit)
173174
cdef _bind_execute(self, str portal_name, str stmt_name,
Collapse file

‎asyncpg/protocol/coreproto.pyx‎

Copy file name to clipboardExpand all lines: asyncpg/protocol/coreproto.pyx
+17-1Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,10 @@ cdef class CoreProtocol:
237237
# ErrorResponse
238238
self._parse_msg_error_response(True)
239239

240+
elif mtype == b'1':
241+
# ParseComplete, in case `_bind_execute()` is reparsing
242+
self.buffer.discard_message()
243+
240244
elif mtype == b'2':
241245
# BindComplete
242246
self.buffer.discard_message()
@@ -269,6 +273,10 @@ cdef class CoreProtocol:
269273
# ErrorResponse
270274
self._parse_msg_error_response(True)
271275

276+
elif mtype == b'1':
277+
# ParseComplete, in case `_bind_execute_many()` is reparsing
278+
self.buffer.discard_message()
279+
272280
elif mtype == b'2':
273281
# BindComplete
274282
self.buffer.discard_message()
@@ -874,7 +882,15 @@ cdef class CoreProtocol:
874882
outbuf.write_buffer(buf)
875883
self._write(outbuf)
876884

877-
cdef _prepare(self, str stmt_name, str query):
885+
cdef _send_parse_message(self, str stmt_name, str query):
886+
cdef:
887+
WriteBuffer msg
888+
889+
self._ensure_connected()
890+
msg = self._build_parse_message(stmt_name, query)
891+
self._write(msg)
892+
893+
cdef _prepare_and_describe(self, str stmt_name, str query):
878894
cdef:
879895
WriteBuffer packet
880896
WriteBuffer buf
Collapse file

‎asyncpg/protocol/prepared_stmt.pxd‎

Copy file name to clipboardExpand all lines: asyncpg/protocol/prepared_stmt.pxd
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ cdef class PreparedStatementState:
1010
readonly str name
1111
readonly str query
1212
readonly bint closed
13+
readonly bint prepared
1314
readonly int refs
1415
readonly type record_class
1516
readonly bint ignore_custom_codec
Collapse file

‎asyncpg/protocol/prepared_stmt.pyx‎

Copy file name to clipboardExpand all lines: asyncpg/protocol/prepared_stmt.pyx
+7Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ cdef class PreparedStatementState:
2727
self.args_num = self.cols_num = 0
2828
self.cols_desc = None
2929
self.closed = False
30+
self.prepared = True
3031
self.refs = 0
3132
self.record_class = record_class
3233
self.ignore_custom_codec = ignore_custom_codec
@@ -101,6 +102,12 @@ cdef class PreparedStatementState:
101102
def mark_closed(self):
102103
self.closed = True
103104

105+
def mark_unprepared(self):
106+
if self.name:
107+
raise exceptions.InternalClientError(
108+
"named prepared statements cannot be marked unprepared")
109+
self.prepared = False
110+
104111
cdef _encode_bind_msg(self, args, int seqno = -1):
105112
cdef:
106113
int idx
Collapse file

‎asyncpg/protocol/protocol.pyx‎

Copy file name to clipboardExpand all lines: asyncpg/protocol/protocol.pyx
+23-8Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ cdef class BaseProtocol(CoreProtocol):
155155

156156
waiter = self._new_waiter(timeout)
157157
try:
158-
self._prepare(stmt_name, query) # network op
158+
self._prepare_and_describe(stmt_name, query) # network op
159159
self.last_query = query
160160
if state is None:
161161
state = PreparedStatementState(
@@ -168,10 +168,15 @@ cdef class BaseProtocol(CoreProtocol):
168168
return await waiter
169169

170170
@cython.iterable_coroutine
171-
async def bind_execute(self, PreparedStatementState state, args,
172-
str portal_name, int limit, return_extra,
173-
timeout):
174-
171+
async def bind_execute(
172+
self,
173+
state: PreparedStatementState,
174+
args,
175+
portal_name: str,
176+
limit: int,
177+
return_extra: bool,
178+
timeout,
179+
):
175180
if self.cancel_waiter is not None:
176181
await self.cancel_waiter
177182
if self.cancel_sent_waiter is not None:
@@ -184,6 +189,9 @@ cdef class BaseProtocol(CoreProtocol):
184189

185190
waiter = self._new_waiter(timeout)
186191
try:
192+
if not state.prepared:
193+
self._send_parse_message(state.name, state.query)
194+
187195
self._bind_execute(
188196
portal_name,
189197
state.name,
@@ -201,9 +209,13 @@ cdef class BaseProtocol(CoreProtocol):
201209
return await waiter
202210

203211
@cython.iterable_coroutine
204-
async def bind_execute_many(self, PreparedStatementState state, args,
205-
str portal_name, timeout):
206-
212+
async def bind_execute_many(
213+
self,
214+
state: PreparedStatementState,
215+
args,
216+
portal_name: str,
217+
timeout,
218+
):
207219
if self.cancel_waiter is not None:
208220
await self.cancel_waiter
209221
if self.cancel_sent_waiter is not None:
@@ -222,6 +234,9 @@ cdef class BaseProtocol(CoreProtocol):
222234

223235
waiter = self._new_waiter(timeout)
224236
try:
237+
if not state.prepared:
238+
self._send_parse_message(state.name, state.query)
239+
225240
more = self._bind_execute_many(
226241
portal_name,
227242
state.name,

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.