diff --git a/examples/sqlalchemy.py b/examples/sqlalchemy.py index 650fb293b..f2162d873 100644 --- a/examples/sqlalchemy.py +++ b/examples/sqlalchemy.py @@ -39,7 +39,6 @@ - Constraints: with the addition of information_schema to Unity Catalog, Databricks SQL supports foreign key and primary key constraints. This dialect can write these constraints but the ability for alembic to reflect and modify them programmatically has not been tested. - - Delta IDENTITY columns are not yet supported. """ import os diff --git a/src/databricks/sql/utils.py b/src/databricks/sql/utils.py index 5265380fd..e2a361d04 100644 --- a/src/databricks/sql/utils.py +++ b/src/databricks/sql/utils.py @@ -534,9 +534,15 @@ def named_parameters_to_dbsqlparams_v2(parameters: List[Any]): def resolve_databricks_sql_integer_type(integer): - """Returns the smallest Databricks SQL integer type that can contain the passed integer""" + """Returns DbsqlType.INTEGER unless the passed int() requires a BIGINT. + + Note: TINYINT is never inferred here because it is a rarely used type and clauses like LIMIT and OFFSET + cannot accept TINYINT bound parameter values. If you need to bind a TINYINT value, you can explicitly + declare its type in a DbsqlParameter object, which will bypass this inference logic.""" if -128 <= integer <= 127: - return DbSqlType.TINYINT + # If DBR is ever updated to permit TINYINT values passed to LIMIT and OFFSET + # then we can change this line to return DbSqlType.TINYINT + return DbSqlType.INTEGER elif -2147483648 <= integer <= 2147483647: return DbSqlType.INTEGER else: diff --git a/src/databricks/sqlalchemy/__init__.py b/src/databricks/sqlalchemy/__init__.py index 95b6c5169..9a81bda56 100644 --- a/src/databricks/sqlalchemy/__init__.py +++ b/src/databricks/sqlalchemy/__init__.py @@ -1,24 +1,23 @@ -"""This module's layout loosely follows example of SQLAlchemy's postgres dialect -""" - -import decimal, re, datetime -from dateutil.parser import parse +import re +from typing import Any, Optional import sqlalchemy -from sqlalchemy import types, event -from sqlalchemy.engine import default, Engine +from sqlalchemy import event +from sqlalchemy.engine import Engine, default, reflection +from sqlalchemy.engine.interfaces import ( + ReflectedForeignKeyConstraint, + ReflectedPrimaryKeyConstraint, +) from sqlalchemy.exc import DatabaseError, SQLAlchemyError -from sqlalchemy.engine import reflection -from databricks import sql +import databricks.sqlalchemy._ddl as dialect_ddl_impl # This import is required to process our @compiles decorators import databricks.sqlalchemy._types as dialect_type_impl - - -from databricks.sqlalchemy.base import ( - DatabricksDDLCompiler, - DatabricksIdentifierPreparer, +from databricks import sql +from databricks.sqlalchemy.utils import ( + extract_identifier_groups_from_string, + extract_identifiers_from_string, ) try: @@ -39,13 +38,16 @@ class DatabricksDialect(default.DefaultDialect): name: str = "databricks" driver: str = "databricks" default_schema_name: str = "default" - preparer = DatabricksIdentifierPreparer # type: ignore - ddl_compiler = DatabricksDDLCompiler + preparer = dialect_ddl_impl.DatabricksIdentifierPreparer # type: ignore + ddl_compiler = dialect_ddl_impl.DatabricksDDLCompiler + statement_compiler = dialect_ddl_impl.DatabricksStatementCompiler supports_statement_cache: bool = True supports_multivalues_insert: bool = True supports_native_decimal: bool = True supports_sane_rowcount: bool = False non_native_boolean_check_constraint: bool = False + supports_identity_columns: bool = True + supports_schemas: bool = True paramstyle: str = "named" colspecs = { @@ -149,25 +151,43 @@ def get_columns(self, connection, table_name, schema=None, **kwargs): return columns - def get_pk_constraint(self, connection, table_name, schema=None, **kw): + @reflection.cache + def get_pk_constraint( + self, + connection, + table_name: str, + schema: Optional[str] = None, + **kw: Any, + ) -> ReflectedPrimaryKeyConstraint: """Return information about the primary key constraint on table_name`. - - Given a :class:`_engine.Connection`, a string - `table_name`, and an optional string `schema`, return primary - key information as a dictionary with these keys: - - constrained_columns - a list of column names that make up the primary key - - name - optional name of the primary key constraint. - """ - # TODO: implement this behaviour - return {"constrained_columns": []} - def get_foreign_keys(self, connection, table_name, schema=None, **kw): + with self.get_connection_cursor(connection) as cursor: + # DESCRIBE TABLE EXTENDED doesn't support parameterised inputs :( + result = cursor.execute(f"DESCRIBE TABLE EXTENDED {table_name}").fetchall() + + # DESCRIBE TABLE EXTENDED doesn't give a deterministic name to the field where + # a primary key constraint will be found in its output. So we cycle through its + # output looking for a match that includes "PRIMARY KEY". This is brittle. We + # could optionally make two roundtrips: the first would query information_schema + # for the name of the primary key constraint on this table, and a second to + # DESCRIBE TABLE EXTENDED, at which point we would know the name of the constraint. + # But for now we instead assume that Python list comprehension is faster than a + # network roundtrip. + dte_dict = {row["col_name"]: row["data_type"] for row in result} + target = [(k, v) for k, v in dte_dict.items() if "PRIMARY KEY" in v] + if target: + name, _constraint_string = target[0] + column_list = extract_identifiers_from_string(_constraint_string) + else: + name, column_list = None, None + + return {"constrained_columns": column_list, "name": name} + + def get_foreign_keys( + self, connection, table_name, schema=None, **kw + ) -> ReflectedForeignKeyConstraint: """Return information about foreign_keys in `table_name`. Given a :class:`_engine.Connection`, a string @@ -190,8 +210,60 @@ def get_foreign_keys(self, connection, table_name, schema=None, **kw): a list of column names in the referred table that correspond to constrained_columns """ - # TODO: Implement this behaviour - return [] + """Return information about the primary key constraint on + table_name`. + """ + + with self.get_connection_cursor(connection) as cursor: + # DESCRIBE TABLE EXTENDED doesn't support parameterised inputs :( + result = cursor.execute( + f"DESCRIBE TABLE EXTENDED {schema + '.' if schema else ''}{table_name}" + ).fetchall() + + # DESCRIBE TABLE EXTENDED doesn't give a deterministic name to the field where + # a foreign key constraint will be found in its output. So we cycle through its + # output looking for a match that includes "FOREIGN KEY". This is brittle. We + # could optionally make two roundtrips: the first would query information_schema + # for the name of the foreign key constraint on this table, and a second to + # DESCRIBE TABLE EXTENDED, at which point we would know the name of the constraint. + # But for now we instead assume that Python list comprehension is faster than a + # network roundtrip. + dte_dict = {row["col_name"]: row["data_type"] for row in result} + target = [(k, v) for k, v in dte_dict.items() if "FOREIGN KEY" in v] + + def extract_constraint_dict_from_target(target): + if target: + name, _constraint_string = target + _extracted = extract_identifier_groups_from_string(_constraint_string) + constrained_columns_str, referred_columns_str = ( + _extracted[0], + _extracted[1], + ) + + constrained_columns = extract_identifiers_from_string( + constrained_columns_str + ) + referred_columns = extract_identifiers_from_string(referred_columns_str) + referred_table = str(table_name) + else: + name, constrained_columns, referred_columns, referred_table = ( + None, + None, + None, + None, + ) + + return { + "constrained_columns": constrained_columns, + "name": name, + "referred_table": referred_table, + "referred_columns": referred_columns, + } + + if target: + return [extract_constraint_dict_from_target(i) for i in target] + else: + return [] def get_indexes(self, connection, table_name, schema=None, **kw): """Return information about indexes in `table_name`. @@ -238,6 +310,7 @@ def do_rollback(self, dbapi_connection): # Databricks SQL Does not support transactions pass + @reflection.cache def has_table( self, connection, table_name, schema=None, catalog=None, **kwargs ) -> bool: @@ -252,7 +325,9 @@ def has_table( try: res = connection.execute( - sqlalchemy.text(f"DESCRIBE TABLE {_catalog}.{_schema}.{table_name}") + sqlalchemy.text( + f"DESCRIBE TABLE `{_catalog}`.`{_schema}`.`{table_name}`" + ) ) return True except DatabaseError as e: diff --git a/src/databricks/sqlalchemy/_ddl.py b/src/databricks/sqlalchemy/_ddl.py new file mode 100644 index 000000000..4d825c9fe --- /dev/null +++ b/src/databricks/sqlalchemy/_ddl.py @@ -0,0 +1,69 @@ +import re +from sqlalchemy.sql import compiler +import logging + +logger = logging.getLogger(__name__) + + +class DatabricksIdentifierPreparer(compiler.IdentifierPreparer): + """https://docs.databricks.com/en/sql/language-manual/sql-ref-identifiers.html""" + + legal_characters = re.compile(r"^[A-Z0-9_]+$", re.I) + + def __init__(self, dialect): + super().__init__(dialect, initial_quote="`") + + +class DatabricksDDLCompiler(compiler.DDLCompiler): + def post_create_table(self, table): + return " USING DELTA" + + def visit_unique_constraint(self, constraint, **kw): + logger.warn("Databricks does not support unique constraints") + pass + + def visit_check_constraint(self, constraint, **kw): + logger.warn("Databricks does not support check constraints") + pass + + def visit_identity_column(self, identity, **kw): + """When configuring an Identity() with Databricks, only the always option is supported. + All other options are ignored. + + Note: IDENTITY columns must always be defined as BIGINT. An exception will be raised if INT is used. + + https://www.databricks.com/blog/2022/08/08/identity-columns-to-generate-surrogate-keys-are-now-available-in-a-lakehouse-near-you.html + """ + text = "GENERATED %s AS IDENTITY" % ( + "ALWAYS" if identity.always else "BY DEFAULT", + ) + return text + + def get_column_specification(self, column, **kwargs): + """Currently we override this method only to emit a log message if a user attempts to set + autoincrement=True on a column. See comments in test_suite.py. We may implement implicit + IDENTITY using this feature in the future, similar to the Microsoft SQL Server dialect. + """ + if column is column.table._autoincrement_column or column.autoincrement is True: + logger.warn( + "Databricks dialect ignores SQLAlchemy's autoincrement semantics. Use explicit Identity() instead." + ) + + return super().get_column_specification(column, **kwargs) + + +class DatabricksStatementCompiler(compiler.SQLCompiler): + def limit_clause(self, select, **kw): + """Identical to the default implementation of SQLCompiler.limit_clause except it writes LIMIT ALL instead of LIMIT -1, + since Databricks SQL doesn't support the latter. + + https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-limit.html + """ + text = "" + if select._limit_clause is not None: + text += "\n LIMIT " + self.process(select._limit_clause, **kw) + if select._offset_clause is not None: + if select._limit_clause is None: + text += "\n LIMIT ALL" + text += " OFFSET " + self.process(select._offset_clause, **kw) + return text diff --git a/src/databricks/sqlalchemy/base.py b/src/databricks/sqlalchemy/base.py deleted file mode 100644 index 080f04106..000000000 --- a/src/databricks/sqlalchemy/base.py +++ /dev/null @@ -1,17 +0,0 @@ -import re -from sqlalchemy.sql import compiler - - -class DatabricksIdentifierPreparer(compiler.IdentifierPreparer): - # SparkSQL identifier specification: - # ref: https://spark.apache.org/docs/latest/sql-ref-identifier.html - - legal_characters = re.compile(r"^[A-Z0-9_]+$", re.I) - - def __init__(self, dialect): - super().__init__(dialect, initial_quote="`") - - -class DatabricksDDLCompiler(compiler.DDLCompiler): - def post_create_table(self, table): - return " USING DELTA" diff --git a/src/databricks/sqlalchemy/pytest.ini b/src/databricks/sqlalchemy/pytest.ini new file mode 100644 index 000000000..e69de29bb diff --git a/src/databricks/sqlalchemy/requirements.py b/src/databricks/sqlalchemy/requirements.py index e639d19b7..d8229dace 100644 --- a/src/databricks/sqlalchemy/requirements.py +++ b/src/databricks/sqlalchemy/requirements.py @@ -9,6 +9,7 @@ in test_suite.py with a Databricks-specific reason. See the special note about the array_type exclusion below. +See special note about has_temp_table exclusion below. """ import sqlalchemy.testing.requirements @@ -93,4 +94,53 @@ def array_type(self): test runner will crash the pytest process due to an AttributeError """ + # TODO: Implement array type using inline? return sqlalchemy.testing.exclusions.closed() + + @property + def table_ddl_if_exists(self): + """target platform supports IF NOT EXISTS / IF EXISTS for tables.""" + + return sqlalchemy.testing.exclusions.open() + + @property + def identity_columns(self): + """If a backend supports GENERATED { ALWAYS | BY DEFAULT } + AS IDENTITY""" + return sqlalchemy.testing.exclusions.open() + + @property + def identity_columns_standard(self): + """If a backend supports GENERATED { ALWAYS | BY DEFAULT } + AS IDENTITY with a standard syntax. + This is mainly to exclude MSSql. + """ + return sqlalchemy.testing.exclusions.open() + + @property + def has_temp_table(self): + """target dialect supports checking a single temp table name + + unfortunately this is not the same as temp_table_names + + SQLAlchemy's HasTableTest is not normalised in such a way that temp table tests + are separate from temp view and normal table tests. If those tests were split out, + we would just add detailed skip markers in test_suite.py. But since we'd like to + run the HasTableTest group for the features we support, we must set this exclusinon + to closed(). + + It would be ideal if there were a separate requirement for has_temp_view. Without it, + we're in a bind. + """ + return sqlalchemy.testing.exclusions.closed() + + @property + def temporary_views(self): + """target database supports temporary views""" + return sqlalchemy.testing.exclusions.open() + + @property + def views(self): + """Target database must support VIEWs.""" + + return sqlalchemy.testing.exclusions.open() \ No newline at end of file diff --git a/src/databricks/sqlalchemy/test/test_suite.py b/src/databricks/sqlalchemy/test/test_suite.py index c9ba48b42..a16e97096 100644 --- a/src/databricks/sqlalchemy/test/test_suite.py +++ b/src/databricks/sqlalchemy/test/test_suite.py @@ -122,252 +122,209 @@ class DateHistoricTest(DateHistoricTest): pass -class FetchLimitOffsetTest(FetchLimitOffsetTest): - @pytest.mark.skip( - reason="Dialect should advertise which offset rules Databricks supports. Offset handling needs work." - ) - def test_bound_offset(self): - """ - Exception: - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE] The limit like expression "-1" is invalid. The limit expression must be equal to or greater than 0, but got -1.; line 3 pos 7 - """ +@pytest.mark.reviewed +class RowFetchTest(RowFetchTest): + pass + +@pytest.mark.reviewed +class FetchLimitOffsetTest(FetchLimitOffsetTest): + @pytest.mark.flaky @pytest.mark.skip( - reason="Dialect should advertise which offset rules Databricks supports. Offset handling needs work." + reason="Insertion order on Databricks is not deterministic. See comment in test_suite.py." ) def test_limit_render_multiple_times(self): - """ - Exception: - AssertionError: [(5,)] != [(1,)] - """ + """This test depends on the order that records are inserted into the table. It's passing criteria requires that + a record inserted with id=1 is the first record returned when no ORDER BY clause is specified. But Databricks occasionally + INSERTS in a different order, which makes this test seem to fail. The test is flaky, but the underlying functionality + (can multiple LIMIT clauses be rendered) is not broken. - @pytest.mark.skip( - reason="Dialect should advertise which offset rules Databricks supports. Offset handling needs work." - ) - def test_simple_offset(self): - """ - Exception: - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE] The limit like expression "-1" is invalid. The limit expression must be equal to or greater than 0, but got -1.; line 3 pos 7 + Unclear if this is a bug in Databricks, Delta, or some race-condition in the test itself. """ + pass - @pytest.mark.skip( - reason="Dialect should advertise which offset rules Databricks supports. Offset handling needs work." - ) - def test_simple_offset_zero(self): - """ - Exception: - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE] The limit like expression "-1" is invalid. The limit expression must be equal to or greater than 0, but got -1.; line 3 pos 7 - """ + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_bound_fetch_offset(self): + pass - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_expr_offset(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE] The limit like expression "-1" is invalid. The limit expression must be equal to or greater than 0, but got -1.; line 3 pos 7 - """ + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_fetch_offset_no_order(self): + pass + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_fetch_offset_nobinds(self): + pass + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_simple_fetch(self): + pass + + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_simple_fetch_offset(self): + pass + + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_simple_fetch_percent(self): + pass + + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_simple_fetch_percent_ties(self): + pass + + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_simple_fetch_ties(self): + pass + + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_expr_fetch_offset(self): + pass + + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_fetch_offset_percent(self): + pass + + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_fetch_offset_percent_ties(self): + pass + + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_fetch_offset_ties(self): + pass + + @pytest.mark.skip(reason="Databricks doesn't support FETCH clauses") + def test_fetch_offset_ties_exact_number(self): + pass + + +@pytest.mark.reviewed class FutureTableDDLTest(FutureTableDDLTest): @pytest.mark.skip( - reason="Internal bug. DESCRIBE TABLE function should deliver an executable object." + reason="Comment reflection is possible but not implemented in this dialect." ) def test_add_table_comment(self): - """ - Exception: - sqlalchemy.exc.ObjectNotExecutableError: Not an executable object: 'DESCRIBE TABLE main.pysql_sqlalchemy.test_table' - """ + """We could use requirements.comment_reflection here to disable this but prefer a more meaningful skip message""" + pass @pytest.mark.skip( - reason="Internal bug. DESCRIBE TABLE function should deliver an executable object." + reason="Comment reflection is possible but not implemented in this dialect." ) - def test_create_table(self): - """ - Exception: - sqlalchemy.exc.ObjectNotExecutableError: Not an executable object: 'DESCRIBE TABLE main.pysql_sqlalchemy.test_table' - """ + def test_drop_table_comment(self): + """We could use requirements.comment_reflection here to disable this but prefer a more meaningful skip message""" + pass - @pytest.mark.skip( - reason="Internal bug. DESCRIBE TABLE function should deliver an executable object." - ) - def test_drop_table(self): - """ - Exception: - sqlalchemy.exc.ObjectNotExecutableError: Not an executable object: 'DESCRIBE TABLE main.pysql_sqlalchemy.test_table' + @pytest.mark.skip(reason="Databricks does not support indexes.") + def test_create_index_if_not_exists(self): + """We could use requirements.index_reflection and requirements.index_ddl_if_exists + here to disable this but prefer a more meaningful skip message """ + pass - @pytest.mark.skip( - reason="Internal bug. DESCRIBE TABLE function should deliver an executable object." - ) - def test_drop_table_comment(self): - """ - Exception: - sqlalchemy.exc.ObjectNotExecutableError: Not an executable object: 'DESCRIBE TABLE main.pysql_sqlalchemy.test_table' + @pytest.mark.skip(reason="Databricks does not support indexes.") + def test_drop_index_if_exists(self): + """We could use requirements.index_reflection and requirements.index_ddl_if_exists + here to disable this but prefer a more meaningful skip message """ + pass - @pytest.mark.skip( - reason="Internal bug. DESCRIBE TABLE function should deliver an executable object." - ) - def test_underscore_names(self): - """ - Exception: - sqlalchemy.exc.ObjectNotExecutableError: Not an executable object: 'DESCRIBE TABLE main.pysql_sqlalchemy._test_table' - """ - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_create_table_schema(self): - """ - Exception: - - sqlalchemy.exc.ObjectNotExecutableError: Not an executable object: 'DESCRIBE TABLE main.test_schema.test_table' - """ +@pytest.mark.reviewed +@pytest.mark.skip(reason="Identity works. Test needs rewrite for Databricks. See comments in test_suite.py") +class IdentityColumnTest(IdentityColumnTest): + """The setup for these tests tries to create a table with a DELTA IDENTITY column but has two problems: + 1. It uses an Integer() type for the column. Whereas DELTA IDENTITY columns must be BIGINT. + 2. It tries to set the start == 42, which Databricks doesn't support + I can get the tests to _run_ by patching the table fixture to use BigInteger(). But it asserts that the + identity of two rows are 42 and 43, which is not possible since they will be rows 1 and 2 instead. -class IdentityAutoincrementTest(IdentityAutoincrementTest): - @pytest.mark.skip(reason="Identity column handling needs work.") - def test_autoincrement_with_identity(self): - """ - Exception: - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) Column id is not specified in INSERT - """ + I'm satisified through manual testing that our implementation of visit_identity_column works but a better test is needed. + """ + pass -class LongNameBlowoutTest(LongNameBlowoutTest): +@pytest.mark.reviewed +class IdentityAutoincrementTest(IdentityAutoincrementTest): @pytest.mark.skip( - reason="CreateIndex is not supported in Unity Catalog + parameters cannot exceed 255 characters in length" + reason="Identity works. Test needs rewrite for Databricks. See comments in test_suite.py" ) - def test_long_convention_name(self): - """ - This test is parameterized. It receives the following failures from Databricks compute - Exception: - [fk-_exclusions0] sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [RequestId=9e4262cc-05bc-4086-b17d-0c8082599218 ErrorClass=INVALID_PARAMETER_VALUE.INVALID_FIELD_LENGTH] CreateTable foreign_key.name too long. Maximum length is 255 characters. - [ix-_exclusions2] sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [UC_COMMAND_NOT_SUPPORTED.WITHOUT_RECOMMENDATION] The command(s): CreateIndex are not supported in Unity Catalog. - [pk-_exclusions1] sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [RequestId=f3e6940b-bd69-455d-9314-87522bcf8cef ErrorClass=INVALID_PARAMETER_VALUE.INVALID_FIELD_LENGTH] CreateTable primary_key.name too long. Maximum length is 255 characters. + def test_autoincrement_with_identity(self): + """This test has the same issue as IdentityColumnTest.test_select_all in that it creates a table with identity + using an Integer() rather than a BigInteger(). If I override this behaviour to use a BigInteger() instead, the + test passes. """ -class RowFetchTest(RowFetchTest): +@pytest.mark.reviewed +class LongNameBlowoutTest(LongNameBlowoutTest): + """These tests all include assertions that the tested name > 255 characters""" + @pytest.mark.skip( - reason="Date type implementation needs work. Timezone information not preserved." + reason="Databricks constraint names are limited to 255 characters" ) - def test_row_w_scalar_select(self): - """ - Exception: - AssertionError: datetime.datetime(2006, 5, 12, 12, 0, tzinfo=) != datetime.datetime(2006, 5, 12, 12, 0) - """ + def test_long_convention_name(self): + pass +@pytest.mark.reviewed class ExceptionTest(ExceptionTest): - @pytest.mark.skip(reason="Databricks may not support this method.") + @pytest.mark.skip(reason="Databricks doesn't enforce primary key constraints.") def test_integrity_error(self): - """ - Exception: - databricks.sql.exc.ServerOperationError: Column id is not specified in INSERT - """ - - -class HasTableTest(HasTableTest): - @pytest.mark.skip(reason="Schema is not properly configured for this test.") - def test_has_table(self): - """ - Exception - - databricks.sql.exc.ServerOperationError: [SCHEMA_NOT_FOUND] The schema `main.test_schema` cannot be found. Verify the spelling and correctness of the schema and catalog. - If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog. - To tolerate the error on drop use DROP SCHEMA IF EXISTS. + """Per Databricks documentation, primary and foreign key constraints are informational only + and are not enforced. + https://docs.databricks.com/api/workspace/tableconstraints """ + pass - @pytest.mark.skip(reason="Schema is not properly configured for this test.") - def test_has_table_schema(self): - """ - Exception - databricks.sql.exc.ServerOperationError: [SCHEMA_NOT_FOUND] The schema `main.test_schema` cannot be found. Verify the spelling and correctness of the schema and catalog. - If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog. - To tolerate the error on drop use DROP SCHEMA IF EXISTS. - """ +@pytest.mark.reviewed +class HasTableTest(HasTableTest): + """Databricks does not support temporary tables.""" - @pytest.mark.skip(reason="Schema is not properly configured for this test.") + @pytest.mark.skip(reason="Databricks does not support temporary tables.") def test_has_table_temp_table(self): - """ - Exception - databricks.sql.exc.ServerOperationError: [SCHEMA_NOT_FOUND] The schema `main.test_schema` cannot be found. Verify the spelling and correctness of the schema and catalog. - If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog. - To tolerate the error on drop use DROP SCHEMA IF EXISTS. - - """ + pass - @pytest.mark.skip(reason="Schema is not properly configured for this test.") + @pytest.mark.skip(reason="Strange test design. See comments in test_suite.py") def test_has_table_temp_view(self): - """ - Exception - databricks.sql.exc.ServerOperationError: [SCHEMA_NOT_FOUND] The schema `main.test_schema` cannot be found. Verify the spelling and correctness of the schema and catalog. - If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog. - To tolerate the error on drop use DROP SCHEMA IF EXISTS. - - """ - - @pytest.mark.skip(reason="Schema is not properly configured for this test.") - def test_has_table_view(self): - """ - Exception - databricks.sql.exc.ServerOperationError: [SCHEMA_NOT_FOUND] The schema `main.test_schema` cannot be found. Verify the spelling and correctness of the schema and catalog. - If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog. - To tolerate the error on drop use DROP SCHEMA IF EXISTS. + """Databricks supports temporary views but this test depends on requirements.has_temp_table, which we + explicitly close so that we can run other tests in this group. See the comment under has_temp_table in + requirements.py for details. - """ - - @pytest.mark.skip(reason="Schema is not properly configured for this test.") - def test_has_table_view_schema(self): - """ - Exception - databricks.sql.exc.ServerOperationError: [SCHEMA_NOT_FOUND] The schema `main.test_schema` cannot be found. Verify the spelling and correctness of the schema and catalog. - If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog. - To tolerate the error on drop use DROP SCHEMA IF EXISTS. + From what I can see, there is no way to run this test since it will fail during setup if we mark has_temp_table + open(). It _might_ be possible to hijack this behaviour by implementing temp_table_keyword_args in our own + provision.py. Doing so would mean creating a real table during this class setup instead of a temp table. Then + we could just skip the temp table tests but run the temp view tests. But this test fixture doesn't cleanup its + temp tables and has no hook to do so. + It would be ideal for SQLAlchemy to define a separate requirements.has_temp_views. """ + pass +@pytest.mark.reviewed +@pytest.mark.skip( + reason="This dialect does not support implicit autoincrement. See comments in test_suite.py" +) class LastrowidTest(LastrowidTest): - @pytest.mark.skip(reason="DDL for INSERT requires adjustment") - def test_autoincrement_on_insert(self): - """ - Exception - databricks.sql.exc.ServerOperationError: Column id is not specified in INSERT + """SQLAlchemy docs describe that a column without an explicit Identity() may implicitly create one if autoincrement=True. + That is what this method tests. Databricks supports auto-incrementing IDENTITY columns but they must be explicitly + declared. This limitation is present in our dialect as well. Which means that SQLAlchemy's autoincrement setting of a column + is ignored. We emit a logging.WARN message if you try it. - """ + In the future we could handle this autoincrement by implicitly calling the visit_identity_column() method of our DDLCompiler + when autoincrement=True. There is an example of this in the Microsoft SQL Server dialect: MSSDDLCompiler.get_column_specification - @pytest.mark.skip(reason="DDL for INSERT requires adjustment") - def test_last_inserted_id(self): - """ - Exception: - databricks.sql.exc.ServerOperationError: Column id is not specified in INSERT + For now, if you need to create a SQLAlchemy column with an auto-incrementing identity, you must set this explicitly in your column + definition by passing an Identity() to the column constructor. + """ - """ + pass +@pytest.mark.reviewed class CompositeKeyReflectionTest(CompositeKeyReflectionTest): - @pytest.mark.skip(reason="Primary key handling needs work.") - def test_pk_column_order(self): - """ - Exception: - AssertionError: [] != ['name', 'id', 'attr'] - assert [] == ['name', 'id', 'attr'] - Right contains 3 more items, first extra item: 'name' - Full diff: - - ['name', 'id', 'attr'] - + [] - """ - - @pytest.mark.skip( - reason="Composite key implementation needs. Work may not be supported by Databricks." - ) - def test_fk_column_order(self): - """ - Excpetion: - AssertionError: 0 != 1 - assert 0 == 1 - """ - + pass class ComponentReflectionTestExtra(ComponentReflectionTestExtra): @pytest.mark.skip(reason="Test setup needs adjustment.") @@ -445,51 +402,6 @@ def test_empty_insert_multiple(self): """ -class TableDDLTest(TableDDLTest): - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_create_table(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `pysql_sqlalchemy`.`test_table` because it already exists. - """ - - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_create_table_schema(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [SCHEMA_NOT_FOUND] The schema `main.test_schema` cannot be found. Verify the spelling and correctness of the schema and catalog. - """ - - @pytest.mark.skip( - reason="DDL handling needs work. Some features not implemented in dialect." - ) - def test_add_table_comment(self): - """ - Exception: - NotImplementedError - """ - - @pytest.mark.skip( - reason="DDL handling needs work. Some features not implemented in dialect." - ) - def test_drop_table_comment(self): - """ - Exception: - NotImplementedError - """ - - @pytest.mark.skip( - reason="DDL handling needs work. Some features not implemented in dialect." - ) - def test_underscore_names(self): - """ - This exception may require this test to simply be rewritten as it appears to be a race condition. - - Exception: - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `pysql_sqlalchemy`.`_test_table` because it already exists. - """ - - class ComponentReflectionTest(ComponentReflectionTest): @pytest.mark.skip(reason="Error during execution. Requires investigation.") def test_autoincrement_col(self): @@ -674,79 +586,50 @@ def test_get_view_definition(self): """ -class HasIndexTest(HasIndexTest): - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_has_index_schema(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) [UC_COMMAND_NOT_SUPPORTED.WITHOUT_RECOMMENDATION] The command(s): CreateIndex are not supported in Unity Catalog. - """ - - @pytest.mark.skip(reason="Dialect doesn't know how to handle indexes.") - def test_has_index(self): - """ - Exception: - AssertionError: assert False - """ - - -class QuotedNameArgumentTest(QuotedNameArgumentTest): - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_get_check_constraints(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) - """ - - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_get_columns(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) +@pytest.mark.reviewed +class TableDDLTest(TableDDLTest): + @pytest.mark.skip(reason="Databricks does not support indexes.") + def test_create_index_if_not_exists(self, connection): + """We could use requirements.index_reflection and requirements.index_ddl_if_exists + here to disable this but prefer a more meaningful skip message """ + pass - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_get_foreign_keys(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) + @pytest.mark.skip(reason="Databricks does not support indexes.") + def test_drop_index_if_exists(self, connection): + """We could use requirements.index_reflection and requirements.index_ddl_if_exists + here to disable this but prefer a more meaningful skip message """ + pass - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_get_indexes(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) - """ + @pytest.mark.skip( + reason="Comment reflection is possible but not implemented in this dialect." + ) + def test_add_table_comment(self, connection): + """We could use requirements.comment_reflection here to disable this but prefer a more meaningful skip message""" + pass - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_get_pk_constraint(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) - """ + @pytest.mark.skip( + reason="Comment reflection is possible but not implemented in this dialect." + ) + def test_drop_table_comment(self, connection): + """We could use requirements.comment_reflection here to disable this but prefer a more meaningful skip message""" + pass - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_get_table_comment(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) - """ - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_get_table_options(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) - """ +@pytest.mark.reviewed +@pytest.mark.skip(reason="Databricks does not support indexes.") +class HasIndexTest(HasIndexTest): + pass - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_get_view_definition(self): - """ - Exception: - - sqlalchemy.exc.DatabaseError: (databricks.sql.exc.ServerOperationError) - """ - @pytest.mark.skip(reason="Error during execution. Requires investigation.") - def test_get_unique_constraints(self): - pass +@pytest.mark.reviewed +@pytest.mark.skip( + reason="Databricks does not support spaces in table names. See comment in test_suite.py" +) +class QuotedNameArgumentTest(QuotedNameArgumentTest): + """These tests are challenging. The whole test setup depends on a table with a name like `quote ' one` + which will never work on Databricks because table names can't contains spaces. But QuotedNamedArgumentTest + also checks the behaviour of DDL identifier preparation process. We need to override some of IdentifierPreparer + methods because these are the ultimate control for whether or not CHECK and UNIQUE constraints are emitted. + """ diff --git a/src/databricks/sqlalchemy/test_local/test_utils.py b/src/databricks/sqlalchemy/test_local/test_utils.py new file mode 100644 index 000000000..ecb9fd437 --- /dev/null +++ b/src/databricks/sqlalchemy/test_local/test_utils.py @@ -0,0 +1,38 @@ +import pytest +from databricks.sqlalchemy.utils import ( + extract_identifiers_from_string, + extract_identifier_groups_from_string, +) + + +# These are outputs from DESCRIBE TABLE EXTENDED +@pytest.mark.parametrize( + "input, expected", + [ + ("PRIMARY KEY (`pk1`, `pk2`)", ["pk1", "pk2"]), + ("PRIMARY KEY (`a`, `b`, `c`)", ["a", "b", "c"]), + ("PRIMARY KEY (`name`, `id`, `attr`)", ["name", "id", "attr"]), + ], +) +def test_extract_identifiers(input, expected): + assert ( + extract_identifiers_from_string(input) == expected + ), "Failed to extract identifiers from string" + + +@pytest.mark.parametrize( + "input, expected", + [ + ( + "FOREIGN KEY (`pname`, `pid`, `pattr`) REFERENCES `main`.`pysql_sqlalchemy`.`tb1` (`name`, `id`, `attr`)", + [ + "(`pname`, `pid`, `pattr`)", + "(`name`, `id`, `attr`)", + ], + ) + ], +) +def test_extract_identifer_batches(input, expected): + assert ( + extract_identifier_groups_from_string(input) == expected + ), "Failed to extract identifier groups from string" diff --git a/src/databricks/sqlalchemy/utils.py b/src/databricks/sqlalchemy/utils.py new file mode 100644 index 000000000..d13dbdd1f --- /dev/null +++ b/src/databricks/sqlalchemy/utils.py @@ -0,0 +1,23 @@ +from typing import List +import re + + +def extract_identifiers_from_string(input_str: str) -> List[str]: + """For a string input resembling (`a`, `b`, `c`) return a list of identifiers ['a', 'b', 'c']""" + + # This matches the valid character list contained in DatabricksIdentifierPreparer + pattern = re.compile(r"`([A-Za-z0-9_]+)`") + matches = pattern.findall(input_str) + return [i for i in matches] + + +def extract_identifier_groups_from_string(input_str: str) -> List[str]: + """For a string input resembling : + + FOREIGN KEY (`pname`, `pid`, `pattr`) REFERENCES `main`.`pysql_sqlalchemy`.`tb1` (`name`, `id`, `attr`) + + Return ['(`pname`, `pid`, `pattr`)', '(`name`, `id`, `attr`)'] + """ + pattern = re.compile(r"\([`A-Za-z0-9_,\s]*\)") + matches = pattern.findall(input_str) + return [i for i in matches] diff --git a/tests/unit/test_parameters.py b/tests/unit/test_parameters.py index 1370def35..b14f8948b 100644 --- a/tests/unit/test_parameters.py +++ b/tests/unit/test_parameters.py @@ -23,7 +23,7 @@ class TestTSparkParameterConversion(object): "input_value, expected_type", [ ("a", "STRING"), - (1, "TINYINT"), + (1, "INTEGER"), (1000, "INTEGER"), (9223372036854775807, "BIGINT"), # Max value of a signed 64-bit integer (True, "BOOLEAN"), @@ -80,8 +80,8 @@ def test_infer_types_dict(self): @pytest.mark.parametrize( "input_value, expected_type", [ - (-128, DbSqlType.TINYINT), - (127, DbSqlType.TINYINT), + (-128, DbSqlType.INTEGER), + (127, DbSqlType.INTEGER), (-2147483649, DbSqlType.BIGINT), (-2147483648, DbSqlType.INTEGER), (2147483647, DbSqlType.INTEGER),