From c50e9db0bddc7be40c98d089d9a2256040be2426 Mon Sep 17 00:00:00 2001 From: fresioAS Date: Wed, 29 Apr 2026 09:04:47 +0200 Subject: [PATCH 1/5] get_current_catalog --- sqlmesh/core/engine_adapter/fabric.py | 4 +++ tests/core/engine_adapter/test_fabric.py | 44 ++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index e1dffe88f4..9163c3f60a 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -108,6 +108,10 @@ def _drop_catalog(self, catalog_name: exp.Identifier) -> None: # will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data self.close() + def get_current_catalog(self) -> t.Optional[str]: + """Return the adapter-managed catalog for Fabric's stateless sessions.""" + return self._target_catalog or self._extra_config.get("database") + def set_current_catalog(self, catalog_name: str) -> None: """ Set the current catalog for Microsoft Fabric connections. diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index a52218a097..eb83e6b014 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -19,6 +19,50 @@ def adapter(make_mocked_engine_adapter: t.Callable) -> FabricEngineAdapter: return make_mocked_engine_adapter(FabricEngineAdapter) +def test_get_current_catalog_uses_target_catalog_or_configured_database( + make_mocked_engine_adapter: t.Callable, +): + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + database="default_catalog", + ) + + assert adapter.get_current_catalog() == "default_catalog" + + adapter._target_catalog = "switched_catalog" + + assert adapter.get_current_catalog() == "switched_catalog" + + adapter._connection_pool.close() + + assert adapter._connection_pool.get_attribute("target_catalog") is None + assert adapter.get_current_catalog() == "default_catalog" + adapter.cursor.execute.assert_not_called() + + +def test_get_current_catalog_returns_none_without_target_or_database( + make_mocked_engine_adapter: t.Callable, +): + adapter = make_mocked_engine_adapter(FabricEngineAdapter) + + assert adapter.get_current_catalog() is None + adapter.cursor.execute.assert_not_called() + + +def test_set_current_catalog_does_not_query_database( + make_mocked_engine_adapter: t.Callable, +): + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + database="default_catalog", + ) + + adapter.set_current_catalog("new_catalog") + + assert adapter.get_current_catalog() == "new_catalog" + adapter.cursor.execute.assert_not_called() + + def test_columns(adapter: FabricEngineAdapter): adapter.cursor.fetchall.return_value = [ ("decimal_ps", "decimal", None, 5, 4), From 6e1e980a9320fc0f150c4bb9f6acccdc2354d692 Mon Sep 17 00:00:00 2001 From: fresioAS Date: Wed, 29 Apr 2026 09:35:59 +0200 Subject: [PATCH 2/5] neutral state catalog --- sqlmesh/core/engine_adapter/fabric.py | 43 ++++++++++++++++------ tests/core/engine_adapter/test_fabric.py | 46 ++++++++++++++++++++++-- 2 files changed, 76 insertions(+), 13 deletions(-) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 9163c3f60a..80fa11e491 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -52,6 +52,23 @@ def _target_catalog(self) -> t.Optional[str]: def _target_catalog(self, value: t.Optional[str]) -> None: self._connection_pool.set_attribute("target_catalog", value) + def _normalize_catalog( + self, catalog_name: t.Optional[str] + ) -> t.Optional[str]: + if not catalog_name: + return None + + default_catalog = ( + self._default_catalog or self._extra_config.get("database") + ) + if default_catalog and catalog_name == default_catalog: + return None + + return catalog_name + + def _catalog_state_label(self, catalog_name: t.Optional[str]) -> str: + return catalog_name or "" + @property def api_client(self) -> FabricHttpClient: # the requests Session is not guaranteed to be threadsafe @@ -109,10 +126,10 @@ def _drop_catalog(self, catalog_name: exp.Identifier) -> None: self.close() def get_current_catalog(self) -> t.Optional[str]: - """Return the adapter-managed catalog for Fabric's stateless sessions.""" - return self._target_catalog or self._extra_config.get("database") + """Return the explicit Fabric catalog target for the current thread.""" + return self._normalize_catalog(self._target_catalog) - def set_current_catalog(self, catalog_name: str) -> None: + def set_current_catalog(self, catalog_name: t.Optional[str]) -> None: """ Set the current catalog for Microsoft Fabric connections. @@ -121,7 +138,8 @@ def set_current_catalog(self, catalog_name: str) -> None: recreate them with the new catalog in the connection configuration. Args: - catalog_name: The name of the catalog (warehouse) to switch to + catalog_name: The name of the catalog (warehouse) to switch to. + The configured default catalog is treated as the neutral state. Note: Fabric doesn't support catalog switching via USE statements because each @@ -132,13 +150,18 @@ def set_current_catalog(self, catalog_name: str) -> None: https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations """ current_catalog = self.get_current_catalog() + target_catalog = self._normalize_catalog(catalog_name) # If already using the requested catalog, do nothing - if current_catalog and current_catalog == catalog_name: - logger.debug(f"Already using catalog '{catalog_name}', no action needed") + if current_catalog == target_catalog: + logger.debug("Already using the requested Fabric catalog state, no action needed") return - logger.info(f"Switching from catalog '{current_catalog}' to '{catalog_name}'") + logger.info( + "Switching from catalog '%s' to '%s'", + self._catalog_state_label(current_catalog), + self._catalog_state_label(target_catalog), + ) # commit the transaction before closing the connection to help prevent errors like: # > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a @@ -149,14 +172,14 @@ def set_current_catalog(self, catalog_name: str) -> None: # note: we call close() on the connection pool instead of self.close() because self.close() calls close_all() # on the connection pool but we just want to close the connection for this thread self._connection_pool.close() - self._target_catalog = catalog_name # new connections will use this catalog + self._target_catalog = target_catalog catalog_after_switch = self.get_current_catalog() - if catalog_after_switch != catalog_name: + if catalog_after_switch != target_catalog: # We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog raise SQLMeshError( - f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}" + f"Unable to switch catalog to {target_catalog}, catalog ended up as {catalog_after_switch}" ) def alter_table( diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index eb83e6b014..98252912d3 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -19,7 +19,7 @@ def adapter(make_mocked_engine_adapter: t.Callable) -> FabricEngineAdapter: return make_mocked_engine_adapter(FabricEngineAdapter) -def test_get_current_catalog_uses_target_catalog_or_configured_database( +def test_get_current_catalog_uses_only_explicit_target_catalog( make_mocked_engine_adapter: t.Callable, ): adapter = make_mocked_engine_adapter( @@ -27,7 +27,7 @@ def test_get_current_catalog_uses_target_catalog_or_configured_database( database="default_catalog", ) - assert adapter.get_current_catalog() == "default_catalog" + assert adapter.get_current_catalog() is None adapter._target_catalog = "switched_catalog" @@ -36,7 +36,7 @@ def test_get_current_catalog_uses_target_catalog_or_configured_database( adapter._connection_pool.close() assert adapter._connection_pool.get_attribute("target_catalog") is None - assert adapter.get_current_catalog() == "default_catalog" + assert adapter.get_current_catalog() is None adapter.cursor.execute.assert_not_called() @@ -63,6 +63,46 @@ def test_set_current_catalog_does_not_query_database( adapter.cursor.execute.assert_not_called() +def test_set_current_catalog_to_default_clears_explicit_target( + make_mocked_engine_adapter: t.Callable, +): + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + default_catalog="core", + database="core", + ) + + adapter.set_current_catalog("planning") + adapter.set_current_catalog("core") + + assert adapter.get_current_catalog() is None + adapter.cursor.execute.assert_not_called() + + +def test_catalog_scoped_call_restores_to_neutral_state( + make_mocked_engine_adapter: t.Callable, + mocker: MockerFixture, +): + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + default_catalog="core", + database="core", + ) + set_current_catalog_spy = mocker.patch.object( + adapter, + "set_current_catalog", + wraps=adapter.set_current_catalog, + ) + adapter.cursor.fetchone.return_value = (1,) + + adapter.table_exists("planning.db.table") + + assert [ + call.args[0] for call in set_current_catalog_spy.call_args_list + ] == ["planning", None] + assert adapter.get_current_catalog() is None + + def test_columns(adapter: FabricEngineAdapter): adapter.cursor.fetchall.return_value = [ ("decimal_ps", "decimal", None, 5, 4), From 0b693115bf468bb3dd196e75bda3b75495e440b3 Mon Sep 17 00:00:00 2001 From: fresioAS Date: Wed, 29 Apr 2026 10:14:48 +0200 Subject: [PATCH 3/5] check connected catalog --- sqlmesh/core/engine_adapter/fabric.py | 94 ++++++++++++++++-------- tests/core/engine_adapter/test_fabric.py | 59 ++++++++++++--- 2 files changed, 113 insertions(+), 40 deletions(-) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 80fa11e491..259babe540 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -52,6 +52,15 @@ def _target_catalog(self) -> t.Optional[str]: def _target_catalog(self, value: t.Optional[str]) -> None: self._connection_pool.set_attribute("target_catalog", value) + @property + def _connected_catalog(self) -> t.Optional[str]: + """Catalog the currently-open thread-local connection is actually using.""" + return self._connection_pool.get_attribute("connected_catalog") + + @_connected_catalog.setter + def _connected_catalog(self, value: t.Optional[str]) -> None: + self._connection_pool.set_attribute("connected_catalog", value) + def _normalize_catalog( self, catalog_name: t.Optional[str] ) -> t.Optional[str]: @@ -112,17 +121,21 @@ def _create_catalog(self, catalog_name: exp.Identifier) -> None: def _drop_catalog(self, catalog_name: exp.Identifier) -> None: """Drop a catalog (warehouse) in Microsoft Fabric via REST API.""" warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False) - current_catalog = self.get_current_catalog() logger.info(f"Deleting Fabric warehouse: {warehouse_name}") self.api_client.delete_warehouse(warehouse_name) - if warehouse_name == current_catalog: - # Somewhere around 2025-09-08, Fabric started validating the "Database=" connection argument and throwing 'Authentication failed' if the database doesnt exist - # In addition, set_current_catalog() is implemented using a threadlocal variable "target_catalog" - # So, when we drop a warehouse, and there are still threads with "target_catalog" set to reference it, any operations on those threads - # that use an either use an existing connection pointing to this warehouse or trigger a new connection - # will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data + # Close all connections if any thread may be using the dropped warehouse. + # We must check both the logical target and the physical connection catalog + # (falling back to the configured default when either is neutral) because + # Fabric validates the DATABASE= connection argument and raises + # 'Authentication Failed' when it points at a non-existent warehouse. + default_db = self._extra_config.get("database") + in_use = { + self.get_current_catalog() or default_db, + self._normalize_catalog(self._connected_catalog) or default_db, + } + if warehouse_name in in_use: self.close() def get_current_catalog(self) -> t.Optional[str]: @@ -149,39 +162,58 @@ def set_current_catalog(self, catalog_name: t.Optional[str]) -> None: See: https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations """ - current_catalog = self.get_current_catalog() target_catalog = self._normalize_catalog(catalog_name) - # If already using the requested catalog, do nothing - if current_catalog == target_catalog: - logger.debug("Already using the requested Fabric catalog state, no action needed") + # No-op: the logical catalog state already matches. + if self.get_current_catalog() == target_catalog: + logger.debug( + "Already using requested Fabric catalog state, no action needed" + ) return - logger.info( - "Switching from catalog '%s' to '%s'", - self._catalog_state_label(current_catalog), - self._catalog_state_label(target_catalog), + # Decide whether the open connection needs to be replaced. + # + # The set_catalog decorator restores the previous catalog (often None) + # after every catalog-scoped call. For Fabric, a connection close + + # reopen is expensive because each new connection goes through ODBC and + # the Fabric gateway. We therefore apply lazy connection management: + # + # * When restoring to neutral (target=None): just update _target_catalog. + # The existing connection stays alive and will be reused or replaced + # on the next real switch, avoiding a pointless bounce through the + # default catalog. + # + # * When switching to a non-neutral catalog: only close/reopen if the + # open connection is already on a different catalog. If a previous + # restore-to-neutral left the connection on the right catalog, we + # skip the close entirely. + connected_catalog = self._normalize_catalog(self._connected_catalog) + needs_reconnect = ( + target_catalog is not None and connected_catalog != target_catalog ) - # commit the transaction before closing the connection to help prevent errors like: - # > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a - # > DDL statement in another concurrent transaction since the start of this transaction - # on subsequent queries in the new connection - self._connection_pool.commit() + if needs_reconnect: + logger.info( + "Switching connection from catalog '%s' to '%s'", + self._catalog_state_label(connected_catalog), + self._catalog_state_label(target_catalog), + ) + # Commit before closing to avoid snapshot-isolation errors on + # subsequent queries in the new connection. + self._connection_pool.commit() + # note: close() on the pool (not self.close()) to only affect this + # thread's connection rather than all threads. + self._connection_pool.close() + self._connected_catalog = target_catalog + else: + logger.debug( + "Updating catalog target to '%s' (connection remains on '%s')", + self._catalog_state_label(target_catalog), + self._catalog_state_label(connected_catalog), + ) - # note: we call close() on the connection pool instead of self.close() because self.close() calls close_all() - # on the connection pool but we just want to close the connection for this thread - self._connection_pool.close() self._target_catalog = target_catalog - catalog_after_switch = self.get_current_catalog() - - if catalog_after_switch != target_catalog: - # We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog - raise SQLMeshError( - f"Unable to switch catalog to {target_catalog}, catalog ended up as {catalog_after_switch}" - ) - def alter_table( self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]] ) -> None: diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 98252912d3..2ca0a9be01 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -79,30 +79,71 @@ def test_set_current_catalog_to_default_clears_explicit_target( adapter.cursor.execute.assert_not_called() -def test_catalog_scoped_call_restores_to_neutral_state( +def test_catalog_scoped_call_restores_to_neutral_without_close( make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, ): + """Decorator's restore-to-neutral must not close the existing connection.""" adapter = make_mocked_engine_adapter( FabricEngineAdapter, default_catalog="core", database="core", ) - set_current_catalog_spy = mocker.patch.object( - adapter, - "set_current_catalog", - wraps=adapter.set_current_catalog, - ) + close_spy = mocker.spy(adapter._connection_pool, "close") adapter.cursor.fetchone.return_value = (1,) adapter.table_exists("planning.db.table") - assert [ - call.args[0] for call in set_current_catalog_spy.call_args_list - ] == ["planning", None] + # Decorator calls set_current_catalog("planning") then set_current_catalog(None). + # Only the first call (None→planning) should trigger a connection close. + assert close_spy.call_count == 1 + assert adapter._connected_catalog == "planning" assert adapter.get_current_catalog() is None +def test_repeated_same_catalog_reuses_connection( + make_mocked_engine_adapter: t.Callable, + mocker: MockerFixture, +): + """Two consecutive operations on the same catalog share one connection.""" + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + default_catalog="core", + database="core", + ) + close_spy = mocker.spy(adapter._connection_pool, "close") + adapter.cursor.fetchone.return_value = (1,) + + adapter.table_exists("planning.db.table") + adapter.table_exists("planning.db.table") + + # Only the very first switch (None→planning) should close. + # The restore to neutral keeps the connection alive and the second + # planning operation reuses it without another close. + assert close_spy.call_count == 1 + assert adapter._connected_catalog == "planning" + + +def test_switching_between_catalogs_closes_each_time( + make_mocked_engine_adapter: t.Callable, + mocker: MockerFixture, +): + """Switching to a different catalog always triggers a connection close.""" + adapter = make_mocked_engine_adapter( + FabricEngineAdapter, + default_catalog="core", + database="core", + ) + close_spy = mocker.spy(adapter._connection_pool, "close") + adapter.cursor.fetchone.return_value = (1,) + + adapter.table_exists("safran.db.table") # None→safran: 1 close + adapter.table_exists("planning.db.table") # safran→planning: 2nd close + + assert close_spy.call_count == 2 + assert adapter._connected_catalog == "planning" + + def test_columns(adapter: FabricEngineAdapter): adapter.cursor.fetchall.return_value = [ ("decimal_ps", "decimal", None, 5, 4), From 3f6cca8d045cf10533b5f6ad93f3eb27101ace1c Mon Sep 17 00:00:00 2001 From: fresioAS Date: Wed, 29 Apr 2026 10:30:59 +0200 Subject: [PATCH 4/5] show default --- sqlmesh/core/engine_adapter/fabric.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 259babe540..4112c5099d 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -76,7 +76,7 @@ def _normalize_catalog( return catalog_name def _catalog_state_label(self, catalog_name: t.Optional[str]) -> str: - return catalog_name or "" + return catalog_name or self._default_catalog or self._extra_config.get("database") or "" @property def api_client(self) -> FabricHttpClient: From 72116e95cfbb787cd6fced9da107d986591048c7 Mon Sep 17 00:00:00 2001 From: fresioAS Date: Wed, 29 Apr 2026 21:24:47 +0200 Subject: [PATCH 5/5] style --- sqlmesh/core/engine_adapter/fabric.py | 23 ++++++++++------------- tests/core/engine_adapter/test_fabric.py | 4 ++-- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 4112c5099d..77fc045068 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -61,22 +61,23 @@ def _connected_catalog(self) -> t.Optional[str]: def _connected_catalog(self, value: t.Optional[str]) -> None: self._connection_pool.set_attribute("connected_catalog", value) - def _normalize_catalog( - self, catalog_name: t.Optional[str] - ) -> t.Optional[str]: + def _normalize_catalog(self, catalog_name: t.Optional[str]) -> t.Optional[str]: if not catalog_name: return None - default_catalog = ( - self._default_catalog or self._extra_config.get("database") - ) + default_catalog = self._default_catalog or self._extra_config.get("database") if default_catalog and catalog_name == default_catalog: return None return catalog_name def _catalog_state_label(self, catalog_name: t.Optional[str]) -> str: - return catalog_name or self._default_catalog or self._extra_config.get("database") or "" + return ( + catalog_name + or self._default_catalog + or self._extra_config.get("database") + or "" + ) @property def api_client(self) -> FabricHttpClient: @@ -166,9 +167,7 @@ def set_current_catalog(self, catalog_name: t.Optional[str]) -> None: # No-op: the logical catalog state already matches. if self.get_current_catalog() == target_catalog: - logger.debug( - "Already using requested Fabric catalog state, no action needed" - ) + logger.debug("Already using requested Fabric catalog state, no action needed") return # Decide whether the open connection needs to be replaced. @@ -188,9 +187,7 @@ def set_current_catalog(self, catalog_name: t.Optional[str]) -> None: # restore-to-neutral left the connection on the right catalog, we # skip the close entirely. connected_catalog = self._normalize_catalog(self._connected_catalog) - needs_reconnect = ( - target_catalog is not None and connected_catalog != target_catalog - ) + needs_reconnect = target_catalog is not None and connected_catalog != target_catalog if needs_reconnect: logger.info( diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 2ca0a9be01..b4aefde6ba 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -137,8 +137,8 @@ def test_switching_between_catalogs_closes_each_time( close_spy = mocker.spy(adapter._connection_pool, "close") adapter.cursor.fetchone.return_value = (1,) - adapter.table_exists("safran.db.table") # None→safran: 1 close - adapter.table_exists("planning.db.table") # safran→planning: 2nd close + adapter.table_exists("safran.db.table") # None→safran: 1 close + adapter.table_exists("planning.db.table") # safran→planning: 2nd close assert close_spy.call_count == 2 assert adapter._connected_catalog == "planning"