Coverage for providers/src/airflow/providers/teradata/operators/teradata_compute_cluster.py: 87%
205 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-27 08:27 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-27 08:27 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import re
21from abc import abstractmethod
22from enum import Enum
23from functools import cached_property
24from typing import TYPE_CHECKING
26from airflow.models import BaseOperator
27from airflow.providers.teradata.hooks.teradata import TeradataHook
28from airflow.providers.teradata.utils.constants import Constants
30if TYPE_CHECKING:
31 from airflow.utils.context import Context
33from collections.abc import Sequence
34from datetime import timedelta
35from typing import TYPE_CHECKING, Any, cast
37from airflow.providers.teradata.triggers.teradata_compute_cluster import TeradataComputeClusterSyncTrigger
39if TYPE_CHECKING:
40 from airflow.utils.context import Context
42from airflow.exceptions import AirflowException
45# Represents
46# 1. Compute Cluster Setup - Provision and Decomission operations
47# 2. Compute Cluster State - Resume and Suspend operations
48class _Operation(Enum):
49 SETUP = 1
50 STATE = 2
53# Handler to handle single result set of a SQL query
54def _single_result_row_handler(cursor):
55 records = cursor.fetchone()
56 if isinstance(records, list):
57 return records[0]
58 if records is None:
59 return records
60 raise TypeError(f"Unexpected results: {cursor.fetchone()!r}")
63# Providers given operation is setup or state operation
64def _determine_operation_context(operation):
65 if operation == Constants.CC_CREATE_OPR or operation == Constants.CC_DROP_OPR:
66 return _Operation.SETUP
67 return _Operation.STATE
70class _TeradataComputeClusterOperator(BaseOperator):
71 """
72 Teradata Compute Cluster Base Operator to set up and status operations of compute cluster.
74 :param compute_profile_name: Name of the Compute Profile to manage.
75 :param compute_group_name: Name of compute group to which compute profile belongs.
76 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
77 reference to a specific Teradata database.
78 :param timeout: Time elapsed before the task times out and fails.
79 """
81 template_fields: Sequence[str] = (
82 "compute_profile_name",
83 "compute_group_name",
84 "teradata_conn_id",
85 "timeout",
86 )
88 ui_color = "#e07c24"
90 def __init__(
91 self,
92 compute_profile_name: str,
93 compute_group_name: str | None = None,
94 teradata_conn_id: str = TeradataHook.default_conn_name,
95 timeout: int = Constants.CC_OPR_TIME_OUT,
96 **kwargs,
97 ) -> None:
98 super().__init__(**kwargs)
99 self.compute_profile_name = compute_profile_name
100 self.compute_group_name = compute_group_name
101 self.teradata_conn_id = teradata_conn_id
102 self.timeout = timeout
104 @cached_property
105 def hook(self) -> TeradataHook:
106 return TeradataHook(teradata_conn_id=self.teradata_conn_id)
108 @abstractmethod
109 def execute(self, context: Context):
110 pass
112 def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
113 """
114 Execute when the trigger fires - returns immediately.
116 Relies on trigger to throw an exception, otherwise it assumes execution was successful.
117 """
118 self._compute_cluster_execute_complete(event)
120 def _compute_cluster_execute(self):
121 # Verifies the provided compute profile name.
122 if (
123 self.compute_profile_name is None
124 or self.compute_profile_name == "None"
125 or self.compute_profile_name == ""
126 ):
127 self.log.info("Invalid compute cluster profile name")
128 raise AirflowException(Constants.CC_OPR_EMPTY_PROFILE_ERROR_MSG)
129 # Verifies if the provided Teradata instance belongs to Vantage Cloud Lake.
130 lake_support_find_sql = "SELECT count(1) from DBC.StorageV WHERE StorageName='TD_OFSSTORAGE'"
131 lake_support_result = self.hook.run(lake_support_find_sql, handler=_single_result_row_handler)
132 if lake_support_result is None:
133 raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG)
134 # Getting teradata db version. Considering teradata instance is Lake when db version is 20 or above
135 db_version_get_sql = "SELECT InfoData AS Version FROM DBC.DBCInfoV WHERE InfoKey = 'VERSION'"
136 try:
137 db_version_result = self.hook.run(db_version_get_sql, handler=_single_result_row_handler)
138 if db_version_result is not None:
139 db_version_result = str(db_version_result)
140 db_version = db_version_result.split(".")[0]
141 if db_version is not None and int(db_version) < 20:
142 raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG)
143 else:
144 raise AirflowException("Error occurred while getting teradata database version")
145 except Exception as ex:
146 self.log.error("Error occurred while getting teradata database version: %s ", str(ex))
147 raise AirflowException("Error occurred while getting teradata database version")
149 def _compute_cluster_execute_complete(self, event: dict[str, Any]) -> None:
150 if event["status"] == "success":
151 return event["message"]
152 elif event["status"] == "error": 152 ↛ exitline 152 didn't return from function '_compute_cluster_execute_complete' because the condition on line 152 was always true
153 raise AirflowException(event["message"])
155 def _handle_cc_status(self, operation_type, sql):
156 create_sql_result = self._hook_run(sql, handler=_single_result_row_handler)
157 self.log.info(
158 "%s query ran successfully. Differing to trigger to check status in db. Result from sql: %s",
159 operation_type,
160 create_sql_result,
161 )
162 self.defer(
163 timeout=timedelta(minutes=self.timeout),
164 trigger=TeradataComputeClusterSyncTrigger(
165 teradata_conn_id=cast(str, self.teradata_conn_id),
166 compute_profile_name=self.compute_profile_name,
167 compute_group_name=self.compute_group_name,
168 operation_type=operation_type,
169 poll_interval=Constants.CC_POLL_INTERVAL,
170 ),
171 method_name="execute_complete",
172 )
174 return create_sql_result
176 def _hook_run(self, query, handler=None):
177 try:
178 if handler is not None: 178 ↛ 181line 178 didn't jump to line 181 because the condition on line 178 was always true
179 return self.hook.run(query, handler=handler)
180 else:
181 return self.hook.run(query)
182 except Exception as ex:
183 self.log.error(str(ex))
184 raise
186 def _get_initially_suspended(self, create_cp_query):
187 initially_suspended = "FALSE"
188 pattern = r"INITIALLY_SUSPENDED\s*\(\s*'(TRUE|FALSE)'\s*\)"
189 # Search for the pattern in the input string
190 match = re.search(pattern, create_cp_query, re.IGNORECASE)
191 if match: 191 ↛ 193line 191 didn't jump to line 193 because the condition on line 191 was never true
192 # Get the value of INITIALLY_SUSPENDED
193 initially_suspended = match.group(1).strip().upper()
194 return initially_suspended
197class TeradataComputeClusterProvisionOperator(_TeradataComputeClusterOperator):
198 """
200 Creates the new Computer Cluster with specified Compute Group Name and Compute Profile Name.
202 .. seealso::
203 For more information on how to use this operator, take a look at the guide:
204 :ref:`howto/operator:TeradataComputeClusterProvisionOperator`
206 :param compute_profile_name: Name of the Compute Profile to manage.
207 :param compute_group_name: Name of compute group to which compute profile belongs.
208 :param query_strategy: Query strategy to use. Refers to the approach or method used by the
209 Teradata Optimizer to execute SQL queries efficiently within a Teradata computer cluster.
210 Valid query_strategy value is either 'STANDARD' or 'ANALYTIC'. Default at database level is STANDARD.
211 :param compute_map: ComputeMapName of the compute map. The compute_map in a compute cluster profile refers
212 to the mapping of compute resources to a specific node or set of nodes within the cluster.
213 :param compute_attribute: Optional attributes of compute profile. Example compute attribute
214 MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(5) INITIALLY_SUSPENDED('FALSE')
215 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
216 reference to a specific Teradata database.
217 :param timeout: Time elapsed before the task times out and fails.
218 """
220 template_fields: Sequence[str] = (
221 "compute_profile_name",
222 "compute_group_name",
223 "query_strategy",
224 "compute_map",
225 "compute_attribute",
226 "teradata_conn_id",
227 "timeout",
228 )
230 ui_color = "#e07c24"
232 def __init__(
233 self,
234 query_strategy: str | None = None,
235 compute_map: str | None = None,
236 compute_attribute: str | None = None,
237 **kwargs,
238 ) -> None:
239 super().__init__(**kwargs)
240 self.query_strategy = query_strategy
241 self.compute_map = compute_map
242 self.compute_attribute = compute_attribute
244 def _build_ccp_setup_query(self):
245 create_cp_query = "CREATE COMPUTE PROFILE " + self.compute_profile_name
246 if self.compute_group_name:
247 create_cp_query = create_cp_query + " IN " + self.compute_group_name
248 if self.compute_map is not None:
249 create_cp_query = create_cp_query + ", INSTANCE = " + self.compute_map
250 if self.query_strategy is not None:
251 create_cp_query = create_cp_query + ", INSTANCE TYPE = " + self.query_strategy
252 if self.compute_attribute is not None:
253 create_cp_query = create_cp_query + " USING " + self.compute_attribute
254 return create_cp_query
256 def execute(self, context: Context):
257 """
258 Initiate the execution of CREATE COMPUTE SQL statement.
260 Initiate the execution of the SQL statement for provisioning the compute cluster within Teradata Vantage
261 Lake, effectively creates the compute cluster.
262 Airflow runs this method on the worker and defers using the trigger.
263 """
264 super().execute(context)
265 return self._compute_cluster_execute()
267 def _compute_cluster_execute(self):
268 super()._compute_cluster_execute()
269 if self.compute_group_name:
270 cg_status_query = (
271 "SELECT count(1) FROM DBC.ComputeGroups WHERE UPPER(ComputeGroupName) = UPPER('"
272 + self.compute_group_name
273 + "')"
274 )
275 cg_status_result = self._hook_run(cg_status_query, _single_result_row_handler)
276 if cg_status_result is not None: 276 ↛ 279line 276 didn't jump to line 279 because the condition on line 276 was always true
277 cg_status_result = str(cg_status_result)
278 else:
279 cg_status_result = 0
280 if int(cg_status_result) == 0:
281 create_cg_query = "CREATE COMPUTE GROUP " + self.compute_group_name
282 if self.query_strategy is not None:
283 create_cg_query = (
284 create_cg_query + " USING QUERY_STRATEGY ('" + self.query_strategy + "')"
285 )
286 self._hook_run(create_cg_query, _single_result_row_handler)
287 cp_status_query = (
288 "SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('"
289 + self.compute_profile_name
290 + "')"
291 )
292 if self.compute_group_name:
293 cp_status_query += " AND UPPER(ComputeGroupName) = UPPER('" + self.compute_group_name + "')"
294 cp_status_result = self._hook_run(cp_status_query, handler=_single_result_row_handler)
295 if cp_status_result is not None:
296 cp_status_result = str(cp_status_result)
297 msg = f"Compute Profile {self.compute_profile_name} is already exists under Compute Group {self.compute_group_name}. Status is {cp_status_result}"
298 self.log.info(msg)
299 return cp_status_result
300 else:
301 create_cp_query = self._build_ccp_setup_query()
302 operation = Constants.CC_CREATE_OPR
303 initially_suspended = self._get_initially_suspended(create_cp_query)
304 if initially_suspended == "TRUE": 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 operation = Constants.CC_CREATE_SUSPEND_OPR
306 return self._handle_cc_status(operation, create_cp_query)
309class TeradataComputeClusterDecommissionOperator(_TeradataComputeClusterOperator):
310 """
311 Drops the compute cluster with specified Compute Group Name and Compute Profile Name.
313 .. seealso::
314 For more information on how to use this operator, take a look at the guide:
315 :ref:`howto/operator:TeradataComputeClusterDecommissionOperator`
317 :param compute_profile_name: Name of the Compute Profile to manage.
318 :param compute_group_name: Name of compute group to which compute profile belongs.
319 :param delete_compute_group: Indicates whether the compute group should be deleted.
320 When set to True, it signals the system to remove the specified compute group.
321 Conversely, when set to False, no action is taken on the compute group.
322 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
323 reference to a specific Teradata database.
324 :param timeout: Time elapsed before the task times out and fails.
325 """
327 template_fields: Sequence[str] = (
328 "compute_profile_name",
329 "compute_group_name",
330 "delete_compute_group",
331 "teradata_conn_id",
332 "timeout",
333 )
335 ui_color = "#e07c24"
337 def __init__(
338 self,
339 delete_compute_group: bool = False,
340 **kwargs,
341 ) -> None:
342 super().__init__(**kwargs)
343 self.delete_compute_group = delete_compute_group
345 def execute(self, context: Context):
346 """
347 Initiate the execution of DROP COMPUTE SQL statement.
349 Initiate the execution of the SQL statement for decommissioning the compute cluster within Teradata Vantage
350 Lake, effectively drops the compute cluster.
351 Airflow runs this method on the worker and defers using the trigger.
352 """
353 super().execute(context)
354 return self._compute_cluster_execute()
356 def _compute_cluster_execute(self):
357 super()._compute_cluster_execute()
358 cp_drop_query = "DROP COMPUTE PROFILE " + self.compute_profile_name
359 if self.compute_group_name:
360 cp_drop_query = cp_drop_query + " IN COMPUTE GROUP " + self.compute_group_name
361 self._hook_run(cp_drop_query, handler=_single_result_row_handler)
362 self.log.info(
363 "Compute Profile %s IN Compute Group %s is successfully dropped",
364 self.compute_profile_name,
365 self.compute_group_name,
366 )
367 if self.delete_compute_group:
368 cg_drop_query = "DROP COMPUTE GROUP " + self.compute_group_name
369 self._hook_run(cg_drop_query, handler=_single_result_row_handler)
370 self.log.info("Compute Group %s is successfully dropped", self.compute_group_name)
373class TeradataComputeClusterResumeOperator(_TeradataComputeClusterOperator):
374 """
375 Teradata Compute Cluster Operator to Resume the specified Teradata Vantage Cloud Lake Compute Cluster.
377 Resumes the Teradata Vantage Lake Computer Cluster by employing the RESUME SQL statement within the
378 Teradata Vantage Lake Compute Cluster SQL Interface.
380 .. seealso::
381 For more information on how to use this operator, take a look at the guide:
382 :ref:`howto/operator:TeradataComputeClusterResumeOperator`
384 :param compute_profile_name: Name of the Compute Profile to manage.
385 :param compute_group_name: Name of compute group to which compute profile belongs.
386 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
387 reference to a specific Teradata database.
388 :param timeout: Time elapsed before the task times out and fails. Time is in minutes.
389 """
391 template_fields: Sequence[str] = (
392 "compute_profile_name",
393 "compute_group_name",
394 "teradata_conn_id",
395 "timeout",
396 )
398 ui_color = "#e07c24"
400 def __init__(
401 self,
402 **kwargs,
403 ) -> None:
404 super().__init__(**kwargs)
406 def execute(self, context: Context):
407 """
408 Initiate the execution of RESUME COMPUTE SQL statement.
410 Initiate the execution of the SQL statement for resuming the compute cluster within Teradata Vantage
411 Lake, effectively resumes the compute cluster.
412 Airflow runs this method on the worker and defers using the trigger.
413 """
414 super().execute(context)
415 return self._compute_cluster_execute()
417 def _compute_cluster_execute(self):
418 super()._compute_cluster_execute()
419 cc_status_query = (
420 "SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('"
421 + self.compute_profile_name
422 + "')"
423 )
424 if self.compute_group_name:
425 cc_status_query += " AND UPPER(ComputeGroupName) = UPPER('" + self.compute_group_name + "')"
426 cc_status_result = self._hook_run(cc_status_query, handler=_single_result_row_handler)
427 if cc_status_result is not None:
428 cp_status_result = str(cc_status_result)
429 # Generates an error message if the compute cluster does not exist for the specified
430 # compute profile and compute group.
431 else:
432 self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG)
433 raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG)
434 if cp_status_result != Constants.CC_RESUME_DB_STATUS:
435 cp_resume_query = f"RESUME COMPUTE FOR COMPUTE PROFILE {self.compute_profile_name}"
436 if self.compute_group_name:
437 cp_resume_query = f"{cp_resume_query} IN COMPUTE GROUP {self.compute_group_name}"
438 return self._handle_cc_status(Constants.CC_RESUME_OPR, cp_resume_query)
439 else:
440 self.log.info(
441 "Compute Cluster %s already %s", self.compute_profile_name, Constants.CC_RESUME_DB_STATUS
442 )
445class TeradataComputeClusterSuspendOperator(_TeradataComputeClusterOperator):
446 """
447 Teradata Compute Cluster Operator to suspend the specified Teradata Vantage Cloud Lake Compute Cluster.
449 Suspends the Teradata Vantage Lake Computer Cluster by employing the SUSPEND SQL statement within the
450 Teradata Vantage Lake Compute Cluster SQL Interface.
452 .. seealso::
453 For more information on how to use this operator, take a look at the guide:
454 :ref:`howto/operator:TeradataComputeClusterSuspendOperator`
456 :param compute_profile_name: Name of the Compute Profile to manage.
457 :param compute_group_name: Name of compute group to which compute profile belongs.
458 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
459 reference to a specific Teradata database.
460 :param timeout: Time elapsed before the task times out and fails.
461 """
463 template_fields: Sequence[str] = (
464 "compute_profile_name",
465 "compute_group_name",
466 "teradata_conn_id",
467 "timeout",
468 )
470 ui_color = "#e07c24"
472 def __init__(
473 self,
474 **kwargs,
475 ) -> None:
476 super().__init__(**kwargs)
478 def execute(self, context: Context):
479 """
480 Initiate the execution of SUSPEND COMPUTE SQL statement.
482 Initiate the execution of the SQL statement for suspending the compute cluster within Teradata Vantage
483 Lake, effectively suspends the compute cluster.
484 Airflow runs this method on the worker and defers using the trigger.
485 """
486 super().execute(context)
487 return self._compute_cluster_execute()
489 def _compute_cluster_execute(self):
490 super()._compute_cluster_execute()
491 sql = (
492 "SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('"
493 + self.compute_profile_name
494 + "')"
495 )
496 if self.compute_group_name:
497 sql += " AND UPPER(ComputeGroupName) = UPPER('" + self.compute_group_name + "')"
498 result = self._hook_run(sql, handler=_single_result_row_handler)
499 if result is not None:
500 result = str(result)
501 # Generates an error message if the compute cluster does not exist for the specified
502 # compute profile and compute group.
503 else:
504 self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG)
505 raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG)
506 if result != Constants.CC_SUSPEND_DB_STATUS:
507 sql = f"SUSPEND COMPUTE FOR COMPUTE PROFILE {self.compute_profile_name}"
508 if self.compute_group_name:
509 sql = f"{sql} IN COMPUTE GROUP {self.compute_group_name}"
510 return self._handle_cc_status(Constants.CC_SUSPEND_OPR, sql)
511 else:
512 self.log.info(
513 "Compute Cluster %s already %s", self.compute_profile_name, Constants.CC_SUSPEND_DB_STATUS
514 )