Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2f2cd86

Browse filesBrowse files
Ilya Gurovmf2199c24t
authored
feat: support transactions management (#535)
Add transaction management, including utils for handling spanner sessions and connections. Co-authored-by: MF2199 <38331387+mf2199@users.noreply.github.com> Co-authored-by: Chris Kleinknecht <libc@google.com>
1 parent 5b5ea3c commit 2f2cd86
Copy full SHA for 2f2cd86

File tree

Expand file treeCollapse file tree

6 files changed

+463
-28
lines changed
Filter options
Expand file treeCollapse file tree

6 files changed

+463
-28
lines changed

‎google/cloud/spanner_dbapi/__init__.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_dbapi/__init__.py
+14-4Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@
4949

5050

5151
def connect(
52-
instance_id, database_id, project=None, credentials=None, user_agent=None
52+
instance_id,
53+
database_id,
54+
project=None,
55+
credentials=None,
56+
pool=None,
57+
user_agent=None,
5358
):
5459
"""
5560
Create a connection to Cloud Spanner database.
@@ -71,6 +76,13 @@ def connect(
7176
If none are specified, the client will attempt to ascertain
7277
the credentials from the environment.
7378
79+
:type pool: Concrete subclass of
80+
:class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`.
81+
:param pool: (Optional). Session pool to be used by database.
82+
83+
:type user_agent: :class:`str`
84+
:param user_agent: (Optional) User agent to be used with this connection requests.
85+
7486
:rtype: :class:`google.cloud.spanner_dbapi.connection.Connection`
7587
:returns: Connection object associated with the given Cloud Spanner resource.
7688
@@ -87,9 +99,7 @@ def connect(
8799
if not instance.exists():
88100
raise ValueError("instance '%s' does not exist." % instance_id)
89101

90-
database = instance.database(
91-
database_id, pool=spanner_v1.pool.BurstyPool()
92-
)
102+
database = instance.database(database_id, pool=pool)
93103
if not database.exists():
94104
raise ValueError("database '%s' does not exist." % database_id)
95105

‎google/cloud/spanner_dbapi/connection.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_dbapi/connection.py
+110-12Lines changed: 110 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,7 @@
1414
from .cursor import Cursor
1515
from .exceptions import InterfaceError
1616

17-
AUTOCOMMIT_MODE_WARNING = (
18-
"This method is non-operational, as Cloud Spanner"
19-
"DB API always works in `autocommit` mode."
20-
"See https://github.com/googleapis/python-spanner-django#transaction-management-isnt-supported"
21-
)
17+
AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode"
2218

2319
ColumnDetails = namedtuple("column_details", ["null_ok", "spanner_type"])
2420

@@ -37,11 +33,98 @@ class Connection:
3733
"""
3834

3935
def __init__(self, instance, database):
40-
self.instance = instance
41-
self.database = database
42-
self.is_closed = False
36+
self._instance = instance
37+
self._database = database
4338

4439
self._ddl_statements = []
40+
self._transaction = None
41+
self._session = None
42+
43+
self.is_closed = False
44+
self._autocommit = False
45+
46+
@property
47+
def autocommit(self):
48+
"""Autocommit mode flag for this connection.
49+
50+
:rtype: bool
51+
:returns: Autocommit mode flag value.
52+
"""
53+
return self._autocommit
54+
55+
@autocommit.setter
56+
def autocommit(self, value):
57+
"""Change this connection autocommit mode.
58+
59+
:type value: bool
60+
:param value: New autocommit mode state.
61+
"""
62+
if value and not self._autocommit:
63+
self.commit()
64+
65+
self._autocommit = value
66+
67+
@property
68+
def database(self):
69+
"""Database to which this connection relates.
70+
71+
:rtype: :class:`~google.cloud.spanner_v1.database.Database`
72+
:returns: The related database object.
73+
"""
74+
return self._database
75+
76+
@property
77+
def instance(self):
78+
"""Instance to which this connection relates.
79+
80+
:rtype: :class:`~google.cloud.spanner_v1.instance.Instance`
81+
:returns: The related instance object.
82+
"""
83+
return self._instance
84+
85+
def _session_checkout(self):
86+
"""Get a Cloud Spanner session from the pool.
87+
88+
If there is already a session associated with
89+
this connection, it'll be used instead.
90+
91+
:rtype: :class:`google.cloud.spanner_v1.session.Session`
92+
:returns: Cloud Spanner session object ready to use.
93+
"""
94+
if not self._session:
95+
self._session = self.database._pool.get()
96+
97+
return self._session
98+
99+
def _release_session(self):
100+
"""Release the currently used Spanner session.
101+
102+
The session will be returned into the sessions pool.
103+
"""
104+
self.database._pool.put(self._session)
105+
self._session = None
106+
107+
def transaction_checkout(self):
108+
"""Get a Cloud Spanner transaction.
109+
110+
Begin a new transaction, if there is no transaction in
111+
this connection yet. Return the begun one otherwise.
112+
113+
The method is non operational in autocommit mode.
114+
115+
:rtype: :class:`google.cloud.spanner_v1.transaction.Transaction`
116+
:returns: A Cloud Spanner transaction object, ready to use.
117+
"""
118+
if not self.autocommit:
119+
if (
120+
not self._transaction
121+
or self._transaction.committed
122+
or self._transaction.rolled_back
123+
):
124+
self._transaction = self._session_checkout().transaction()
125+
self._transaction.begin()
126+
127+
return self._transaction
45128

46129
def cursor(self):
47130
self._raise_if_closed()
@@ -142,18 +225,33 @@ def get_table_column_schema(self, table_name):
142225
def close(self):
143226
"""Close this connection.
144227
145-
The connection will be unusable from this point forward.
228+
The connection will be unusable from this point forward. If the
229+
connection has an active transaction, it will be rolled back.
146230
"""
147-
self.__dbhandle = None
231+
if (
232+
self._transaction
233+
and not self._transaction.committed
234+
and not self._transaction.rolled_back
235+
):
236+
self._transaction.rollback()
237+
148238
self.is_closed = True
149239

150240
def commit(self):
151241
"""Commit all the pending transactions."""
152-
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
242+
if self.autocommit:
243+
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
244+
elif self._transaction:
245+
self._transaction.commit()
246+
self._release_session()
153247

154248
def rollback(self):
155249
"""Rollback all the pending transactions."""
156-
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
250+
if self.autocommit:
251+
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
252+
elif self._transaction:
253+
self._transaction.rollback()
254+
self._release_session()
157255

158256
def __enter__(self):
159257
return self

‎google/cloud/spanner_dbapi/cursor.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_dbapi/cursor.py
+12Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def execute(self, sql, args=None):
9191
# Classify whether this is a read-only SQL statement.
9292
try:
9393
classification = classify_stmt(sql)
94+
9495
if classification == STMT_DDL:
9596
self._connection.append_ddl_statement(sql)
9697
return
@@ -99,6 +100,17 @@ def execute(self, sql, args=None):
99100
# any prior DDL statements were run.
100101
self._run_prior_DDL_statements()
101102

103+
if not self._connection.autocommit:
104+
transaction = self._connection.transaction_checkout()
105+
106+
sql, params = sql_pyformat_args_to_spanner(sql, args)
107+
108+
self._res = transaction.execute_sql(
109+
sql, params, param_types=get_param_types(params)
110+
)
111+
self._itr = PeekIterator(self._res)
112+
return
113+
102114
if classification == STMT_NON_UPDATING:
103115
self.__handle_DQL(sql, args or None)
104116
elif classification == STMT_INSERT:

‎tests/spanner_dbapi/test_connect.py

Copy file name to clipboardExpand all lines: tests/spanner_dbapi/test_connect.py
+26-1Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import google.auth.credentials
1313
from google.api_core.gapic_v1.client_info import ClientInfo
1414
from google.cloud.spanner_dbapi import connect, Connection
15+
from google.cloud.spanner_v1.pool import FixedSizePool
1516

1617

1718
def _make_credentials():
@@ -43,7 +44,7 @@ def test_connect(self):
4344
"test-database",
4445
PROJECT,
4546
CREDENTIALS,
46-
USER_AGENT,
47+
user_agent=USER_AGENT,
4748
)
4849

4950
self.assertIsInstance(connection, Connection)
@@ -108,3 +109,27 @@ def test_connect_database_id(self):
108109
database_mock.assert_called_once_with(DATABASE, pool=mock.ANY)
109110

110111
self.assertIsInstance(connection, Connection)
112+
113+
def test_default_sessions_pool(self):
114+
with mock.patch("google.cloud.spanner_v1.instance.Instance.database"):
115+
with mock.patch(
116+
"google.cloud.spanner_v1.instance.Instance.exists",
117+
return_value=True,
118+
):
119+
connection = connect("test-instance", "test-database")
120+
121+
self.assertIsNotNone(connection.database._pool)
122+
123+
def test_sessions_pool(self):
124+
database_id = "test-database"
125+
pool = FixedSizePool()
126+
127+
with mock.patch(
128+
"google.cloud.spanner_v1.instance.Instance.database"
129+
) as database_mock:
130+
with mock.patch(
131+
"google.cloud.spanner_v1.instance.Instance.exists",
132+
return_value=True,
133+
):
134+
connect("test-instance", database_id, pool=pool)
135+
database_mock.assert_called_once_with(database_id, pool=pool)

‎tests/spanner_dbapi/test_connection.py

Copy file name to clipboardExpand all lines: tests/spanner_dbapi/test_connection.py
+18-1Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ def test_close(self):
4949
connection.cursor()
5050

5151
@mock.patch("warnings.warn")
52-
def test_transaction_management_warnings(self, warn_mock):
52+
def test_transaction_autocommit_warnings(self, warn_mock):
5353
connection = self._make_connection()
54+
connection.autocommit = True
5455

5556
connection.commit()
5657
warn_mock.assert_called_with(
@@ -60,3 +61,19 @@ def test_transaction_management_warnings(self, warn_mock):
6061
warn_mock.assert_called_with(
6162
AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2
6263
)
64+
65+
def test_database_property(self):
66+
connection = self._make_connection()
67+
self.assertIsInstance(connection.database, Database)
68+
self.assertEqual(connection.database, connection._database)
69+
70+
with self.assertRaises(AttributeError):
71+
connection.database = None
72+
73+
def test_instance_property(self):
74+
connection = self._make_connection()
75+
self.assertIsInstance(connection.instance, Instance)
76+
self.assertEqual(connection.instance, connection._instance)
77+
78+
with self.assertRaises(AttributeError):
79+
connection.instance = None

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.