Coverage for providers/src/airflow/providers/teradata/operators/teradata_compute_cluster.py: 87%

205 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-12-27 08:27 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import re 

21from abc import abstractmethod 

22from enum import Enum 

23from functools import cached_property 

24from typing import TYPE_CHECKING 

25 

26from airflow.models import BaseOperator 

27from airflow.providers.teradata.hooks.teradata import TeradataHook 

28from airflow.providers.teradata.utils.constants import Constants 

29 

30if TYPE_CHECKING: 

31 from airflow.utils.context import Context 

32 

33from collections.abc import Sequence 

34from datetime import timedelta 

35from typing import TYPE_CHECKING, Any, cast 

36 

37from airflow.providers.teradata.triggers.teradata_compute_cluster import TeradataComputeClusterSyncTrigger 

38 

39if TYPE_CHECKING: 

40 from airflow.utils.context import Context 

41 

42from airflow.exceptions import AirflowException 

43 

44 

45# Represents 

46# 1. Compute Cluster Setup - Provision and Decomission operations 

47# 2. Compute Cluster State - Resume and Suspend operations 

48class _Operation(Enum): 

49 SETUP = 1 

50 STATE = 2 

51 

52 

53# Handler to handle single result set of a SQL query 

54def _single_result_row_handler(cursor): 

55 records = cursor.fetchone() 

56 if isinstance(records, list): 

57 return records[0] 

58 if records is None: 

59 return records 

60 raise TypeError(f"Unexpected results: {cursor.fetchone()!r}") 

61 

62 

63# Providers given operation is setup or state operation 

64def _determine_operation_context(operation): 

65 if operation == Constants.CC_CREATE_OPR or operation == Constants.CC_DROP_OPR: 

66 return _Operation.SETUP 

67 return _Operation.STATE 

68 

69 

70class _TeradataComputeClusterOperator(BaseOperator): 

71 """ 

72 Teradata Compute Cluster Base Operator to set up and status operations of compute cluster. 

73 

74 :param compute_profile_name: Name of the Compute Profile to manage. 

75 :param compute_group_name: Name of compute group to which compute profile belongs. 

76 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` 

77 reference to a specific Teradata database. 

78 :param timeout: Time elapsed before the task times out and fails. 

79 """ 

80 

81 template_fields: Sequence[str] = ( 

82 "compute_profile_name", 

83 "compute_group_name", 

84 "teradata_conn_id", 

85 "timeout", 

86 ) 

87 

88 ui_color = "#e07c24" 

89 

90 def __init__( 

91 self, 

92 compute_profile_name: str, 

93 compute_group_name: str | None = None, 

94 teradata_conn_id: str = TeradataHook.default_conn_name, 

95 timeout: int = Constants.CC_OPR_TIME_OUT, 

96 **kwargs, 

97 ) -> None: 

98 super().__init__(**kwargs) 

99 self.compute_profile_name = compute_profile_name 

100 self.compute_group_name = compute_group_name 

101 self.teradata_conn_id = teradata_conn_id 

102 self.timeout = timeout 

103 

104 @cached_property 

105 def hook(self) -> TeradataHook: 

106 return TeradataHook(teradata_conn_id=self.teradata_conn_id) 

107 

108 @abstractmethod 

109 def execute(self, context: Context): 

110 pass 

111 

112 def execute_complete(self, context: Context, event: dict[str, Any]) -> None: 

113 """ 

114 Execute when the trigger fires - returns immediately. 

115 

116 Relies on trigger to throw an exception, otherwise it assumes execution was successful. 

117 """ 

118 self._compute_cluster_execute_complete(event) 

119 

120 def _compute_cluster_execute(self): 

121 # Verifies the provided compute profile name. 

122 if ( 

123 self.compute_profile_name is None 

124 or self.compute_profile_name == "None" 

125 or self.compute_profile_name == "" 

126 ): 

127 self.log.info("Invalid compute cluster profile name") 

128 raise AirflowException(Constants.CC_OPR_EMPTY_PROFILE_ERROR_MSG) 

129 # Verifies if the provided Teradata instance belongs to Vantage Cloud Lake. 

130 lake_support_find_sql = "SELECT count(1) from DBC.StorageV WHERE StorageName='TD_OFSSTORAGE'" 

131 lake_support_result = self.hook.run(lake_support_find_sql, handler=_single_result_row_handler) 

132 if lake_support_result is None: 

133 raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG) 

134 # Getting teradata db version. Considering teradata instance is Lake when db version is 20 or above 

135 db_version_get_sql = "SELECT InfoData AS Version FROM DBC.DBCInfoV WHERE InfoKey = 'VERSION'" 

136 try: 

137 db_version_result = self.hook.run(db_version_get_sql, handler=_single_result_row_handler) 

138 if db_version_result is not None: 

139 db_version_result = str(db_version_result) 

140 db_version = db_version_result.split(".")[0] 

141 if db_version is not None and int(db_version) < 20: 

142 raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG) 

143 else: 

144 raise AirflowException("Error occurred while getting teradata database version") 

145 except Exception as ex: 

146 self.log.error("Error occurred while getting teradata database version: %s ", str(ex)) 

147 raise AirflowException("Error occurred while getting teradata database version") 

148 

149 def _compute_cluster_execute_complete(self, event: dict[str, Any]) -> None: 

150 if event["status"] == "success": 

151 return event["message"] 

152 elif event["status"] == "error": 152 ↛ exitline 152 didn't return from function '_compute_cluster_execute_complete' because the condition on line 152 was always true

153 raise AirflowException(event["message"]) 

154 

155 def _handle_cc_status(self, operation_type, sql): 

156 create_sql_result = self._hook_run(sql, handler=_single_result_row_handler) 

157 self.log.info( 

158 "%s query ran successfully. Differing to trigger to check status in db. Result from sql: %s", 

159 operation_type, 

160 create_sql_result, 

161 ) 

162 self.defer( 

163 timeout=timedelta(minutes=self.timeout), 

164 trigger=TeradataComputeClusterSyncTrigger( 

165 teradata_conn_id=cast(str, self.teradata_conn_id), 

166 compute_profile_name=self.compute_profile_name, 

167 compute_group_name=self.compute_group_name, 

168 operation_type=operation_type, 

169 poll_interval=Constants.CC_POLL_INTERVAL, 

170 ), 

171 method_name="execute_complete", 

172 ) 

173 

174 return create_sql_result 

175 

176 def _hook_run(self, query, handler=None): 

177 try: 

178 if handler is not None: 178 ↛ 181line 178 didn't jump to line 181 because the condition on line 178 was always true

179 return self.hook.run(query, handler=handler) 

180 else: 

181 return self.hook.run(query) 

182 except Exception as ex: 

183 self.log.error(str(ex)) 

184 raise 

185 

186 def _get_initially_suspended(self, create_cp_query): 

187 initially_suspended = "FALSE" 

188 pattern = r"INITIALLY_SUSPENDED\s*\(\s*'(TRUE|FALSE)'\s*\)" 

189 # Search for the pattern in the input string 

190 match = re.search(pattern, create_cp_query, re.IGNORECASE) 

191 if match: 191 ↛ 193line 191 didn't jump to line 193 because the condition on line 191 was never true

192 # Get the value of INITIALLY_SUSPENDED 

193 initially_suspended = match.group(1).strip().upper() 

194 return initially_suspended 

195 

196 

197class TeradataComputeClusterProvisionOperator(_TeradataComputeClusterOperator): 

198 """ 

199 

200 Creates the new Computer Cluster with specified Compute Group Name and Compute Profile Name. 

201 

202 .. seealso:: 

203 For more information on how to use this operator, take a look at the guide: 

204 :ref:`howto/operator:TeradataComputeClusterProvisionOperator` 

205 

206 :param compute_profile_name: Name of the Compute Profile to manage. 

207 :param compute_group_name: Name of compute group to which compute profile belongs. 

208 :param query_strategy: Query strategy to use. Refers to the approach or method used by the 

209 Teradata Optimizer to execute SQL queries efficiently within a Teradata computer cluster. 

210 Valid query_strategy value is either 'STANDARD' or 'ANALYTIC'. Default at database level is STANDARD. 

211 :param compute_map: ComputeMapName of the compute map. The compute_map in a compute cluster profile refers 

212 to the mapping of compute resources to a specific node or set of nodes within the cluster. 

213 :param compute_attribute: Optional attributes of compute profile. Example compute attribute 

214 MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(5) INITIALLY_SUSPENDED('FALSE') 

215 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` 

216 reference to a specific Teradata database. 

217 :param timeout: Time elapsed before the task times out and fails. 

218 """ 

219 

220 template_fields: Sequence[str] = ( 

221 "compute_profile_name", 

222 "compute_group_name", 

223 "query_strategy", 

224 "compute_map", 

225 "compute_attribute", 

226 "teradata_conn_id", 

227 "timeout", 

228 ) 

229 

230 ui_color = "#e07c24" 

231 

232 def __init__( 

233 self, 

234 query_strategy: str | None = None, 

235 compute_map: str | None = None, 

236 compute_attribute: str | None = None, 

237 **kwargs, 

238 ) -> None: 

239 super().__init__(**kwargs) 

240 self.query_strategy = query_strategy 

241 self.compute_map = compute_map 

242 self.compute_attribute = compute_attribute 

243 

244 def _build_ccp_setup_query(self): 

245 create_cp_query = "CREATE COMPUTE PROFILE " + self.compute_profile_name 

246 if self.compute_group_name: 

247 create_cp_query = create_cp_query + " IN " + self.compute_group_name 

248 if self.compute_map is not None: 

249 create_cp_query = create_cp_query + ", INSTANCE = " + self.compute_map 

250 if self.query_strategy is not None: 

251 create_cp_query = create_cp_query + ", INSTANCE TYPE = " + self.query_strategy 

252 if self.compute_attribute is not None: 

253 create_cp_query = create_cp_query + " USING " + self.compute_attribute 

254 return create_cp_query 

255 

256 def execute(self, context: Context): 

257 """ 

258 Initiate the execution of CREATE COMPUTE SQL statement. 

259 

260 Initiate the execution of the SQL statement for provisioning the compute cluster within Teradata Vantage 

261 Lake, effectively creates the compute cluster. 

262 Airflow runs this method on the worker and defers using the trigger. 

263 """ 

264 super().execute(context) 

265 return self._compute_cluster_execute() 

266 

267 def _compute_cluster_execute(self): 

268 super()._compute_cluster_execute() 

269 if self.compute_group_name: 

270 cg_status_query = ( 

271 "SELECT count(1) FROM DBC.ComputeGroups WHERE UPPER(ComputeGroupName) = UPPER('" 

272 + self.compute_group_name 

273 + "')" 

274 ) 

275 cg_status_result = self._hook_run(cg_status_query, _single_result_row_handler) 

276 if cg_status_result is not None: 276 ↛ 279line 276 didn't jump to line 279 because the condition on line 276 was always true

277 cg_status_result = str(cg_status_result) 

278 else: 

279 cg_status_result = 0 

280 if int(cg_status_result) == 0: 

281 create_cg_query = "CREATE COMPUTE GROUP " + self.compute_group_name 

282 if self.query_strategy is not None: 

283 create_cg_query = ( 

284 create_cg_query + " USING QUERY_STRATEGY ('" + self.query_strategy + "')" 

285 ) 

286 self._hook_run(create_cg_query, _single_result_row_handler) 

287 cp_status_query = ( 

288 "SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('" 

289 + self.compute_profile_name 

290 + "')" 

291 ) 

292 if self.compute_group_name: 

293 cp_status_query += " AND UPPER(ComputeGroupName) = UPPER('" + self.compute_group_name + "')" 

294 cp_status_result = self._hook_run(cp_status_query, handler=_single_result_row_handler) 

295 if cp_status_result is not None: 

296 cp_status_result = str(cp_status_result) 

297 msg = f"Compute Profile {self.compute_profile_name} is already exists under Compute Group {self.compute_group_name}. Status is {cp_status_result}" 

298 self.log.info(msg) 

299 return cp_status_result 

300 else: 

301 create_cp_query = self._build_ccp_setup_query() 

302 operation = Constants.CC_CREATE_OPR 

303 initially_suspended = self._get_initially_suspended(create_cp_query) 

304 if initially_suspended == "TRUE": 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 operation = Constants.CC_CREATE_SUSPEND_OPR 

306 return self._handle_cc_status(operation, create_cp_query) 

307 

308 

309class TeradataComputeClusterDecommissionOperator(_TeradataComputeClusterOperator): 

310 """ 

311 Drops the compute cluster with specified Compute Group Name and Compute Profile Name. 

312 

313 .. seealso:: 

314 For more information on how to use this operator, take a look at the guide: 

315 :ref:`howto/operator:TeradataComputeClusterDecommissionOperator` 

316 

317 :param compute_profile_name: Name of the Compute Profile to manage. 

318 :param compute_group_name: Name of compute group to which compute profile belongs. 

319 :param delete_compute_group: Indicates whether the compute group should be deleted. 

320 When set to True, it signals the system to remove the specified compute group. 

321 Conversely, when set to False, no action is taken on the compute group. 

322 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` 

323 reference to a specific Teradata database. 

324 :param timeout: Time elapsed before the task times out and fails. 

325 """ 

326 

327 template_fields: Sequence[str] = ( 

328 "compute_profile_name", 

329 "compute_group_name", 

330 "delete_compute_group", 

331 "teradata_conn_id", 

332 "timeout", 

333 ) 

334 

335 ui_color = "#e07c24" 

336 

337 def __init__( 

338 self, 

339 delete_compute_group: bool = False, 

340 **kwargs, 

341 ) -> None: 

342 super().__init__(**kwargs) 

343 self.delete_compute_group = delete_compute_group 

344 

345 def execute(self, context: Context): 

346 """ 

347 Initiate the execution of DROP COMPUTE SQL statement. 

348 

349 Initiate the execution of the SQL statement for decommissioning the compute cluster within Teradata Vantage 

350 Lake, effectively drops the compute cluster. 

351 Airflow runs this method on the worker and defers using the trigger. 

352 """ 

353 super().execute(context) 

354 return self._compute_cluster_execute() 

355 

356 def _compute_cluster_execute(self): 

357 super()._compute_cluster_execute() 

358 cp_drop_query = "DROP COMPUTE PROFILE " + self.compute_profile_name 

359 if self.compute_group_name: 

360 cp_drop_query = cp_drop_query + " IN COMPUTE GROUP " + self.compute_group_name 

361 self._hook_run(cp_drop_query, handler=_single_result_row_handler) 

362 self.log.info( 

363 "Compute Profile %s IN Compute Group %s is successfully dropped", 

364 self.compute_profile_name, 

365 self.compute_group_name, 

366 ) 

367 if self.delete_compute_group: 

368 cg_drop_query = "DROP COMPUTE GROUP " + self.compute_group_name 

369 self._hook_run(cg_drop_query, handler=_single_result_row_handler) 

370 self.log.info("Compute Group %s is successfully dropped", self.compute_group_name) 

371 

372 

373class TeradataComputeClusterResumeOperator(_TeradataComputeClusterOperator): 

374 """ 

375 Teradata Compute Cluster Operator to Resume the specified Teradata Vantage Cloud Lake Compute Cluster. 

376 

377 Resumes the Teradata Vantage Lake Computer Cluster by employing the RESUME SQL statement within the 

378 Teradata Vantage Lake Compute Cluster SQL Interface. 

379 

380 .. seealso:: 

381 For more information on how to use this operator, take a look at the guide: 

382 :ref:`howto/operator:TeradataComputeClusterResumeOperator` 

383 

384 :param compute_profile_name: Name of the Compute Profile to manage. 

385 :param compute_group_name: Name of compute group to which compute profile belongs. 

386 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` 

387 reference to a specific Teradata database. 

388 :param timeout: Time elapsed before the task times out and fails. Time is in minutes. 

389 """ 

390 

391 template_fields: Sequence[str] = ( 

392 "compute_profile_name", 

393 "compute_group_name", 

394 "teradata_conn_id", 

395 "timeout", 

396 ) 

397 

398 ui_color = "#e07c24" 

399 

400 def __init__( 

401 self, 

402 **kwargs, 

403 ) -> None: 

404 super().__init__(**kwargs) 

405 

406 def execute(self, context: Context): 

407 """ 

408 Initiate the execution of RESUME COMPUTE SQL statement. 

409 

410 Initiate the execution of the SQL statement for resuming the compute cluster within Teradata Vantage 

411 Lake, effectively resumes the compute cluster. 

412 Airflow runs this method on the worker and defers using the trigger. 

413 """ 

414 super().execute(context) 

415 return self._compute_cluster_execute() 

416 

417 def _compute_cluster_execute(self): 

418 super()._compute_cluster_execute() 

419 cc_status_query = ( 

420 "SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('" 

421 + self.compute_profile_name 

422 + "')" 

423 ) 

424 if self.compute_group_name: 

425 cc_status_query += " AND UPPER(ComputeGroupName) = UPPER('" + self.compute_group_name + "')" 

426 cc_status_result = self._hook_run(cc_status_query, handler=_single_result_row_handler) 

427 if cc_status_result is not None: 

428 cp_status_result = str(cc_status_result) 

429 # Generates an error message if the compute cluster does not exist for the specified 

430 # compute profile and compute group. 

431 else: 

432 self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG) 

433 raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG) 

434 if cp_status_result != Constants.CC_RESUME_DB_STATUS: 

435 cp_resume_query = f"RESUME COMPUTE FOR COMPUTE PROFILE {self.compute_profile_name}" 

436 if self.compute_group_name: 

437 cp_resume_query = f"{cp_resume_query} IN COMPUTE GROUP {self.compute_group_name}" 

438 return self._handle_cc_status(Constants.CC_RESUME_OPR, cp_resume_query) 

439 else: 

440 self.log.info( 

441 "Compute Cluster %s already %s", self.compute_profile_name, Constants.CC_RESUME_DB_STATUS 

442 ) 

443 

444 

445class TeradataComputeClusterSuspendOperator(_TeradataComputeClusterOperator): 

446 """ 

447 Teradata Compute Cluster Operator to suspend the specified Teradata Vantage Cloud Lake Compute Cluster. 

448 

449 Suspends the Teradata Vantage Lake Computer Cluster by employing the SUSPEND SQL statement within the 

450 Teradata Vantage Lake Compute Cluster SQL Interface. 

451 

452 .. seealso:: 

453 For more information on how to use this operator, take a look at the guide: 

454 :ref:`howto/operator:TeradataComputeClusterSuspendOperator` 

455 

456 :param compute_profile_name: Name of the Compute Profile to manage. 

457 :param compute_group_name: Name of compute group to which compute profile belongs. 

458 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` 

459 reference to a specific Teradata database. 

460 :param timeout: Time elapsed before the task times out and fails. 

461 """ 

462 

463 template_fields: Sequence[str] = ( 

464 "compute_profile_name", 

465 "compute_group_name", 

466 "teradata_conn_id", 

467 "timeout", 

468 ) 

469 

470 ui_color = "#e07c24" 

471 

472 def __init__( 

473 self, 

474 **kwargs, 

475 ) -> None: 

476 super().__init__(**kwargs) 

477 

478 def execute(self, context: Context): 

479 """ 

480 Initiate the execution of SUSPEND COMPUTE SQL statement. 

481 

482 Initiate the execution of the SQL statement for suspending the compute cluster within Teradata Vantage 

483 Lake, effectively suspends the compute cluster. 

484 Airflow runs this method on the worker and defers using the trigger. 

485 """ 

486 super().execute(context) 

487 return self._compute_cluster_execute() 

488 

489 def _compute_cluster_execute(self): 

490 super()._compute_cluster_execute() 

491 sql = ( 

492 "SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('" 

493 + self.compute_profile_name 

494 + "')" 

495 ) 

496 if self.compute_group_name: 

497 sql += " AND UPPER(ComputeGroupName) = UPPER('" + self.compute_group_name + "')" 

498 result = self._hook_run(sql, handler=_single_result_row_handler) 

499 if result is not None: 

500 result = str(result) 

501 # Generates an error message if the compute cluster does not exist for the specified 

502 # compute profile and compute group. 

503 else: 

504 self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG) 

505 raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG) 

506 if result != Constants.CC_SUSPEND_DB_STATUS: 

507 sql = f"SUSPEND COMPUTE FOR COMPUTE PROFILE {self.compute_profile_name}" 

508 if self.compute_group_name: 

509 sql = f"{sql} IN COMPUTE GROUP {self.compute_group_name}" 

510 return self._handle_cc_status(Constants.CC_SUSPEND_OPR, sql) 

511 else: 

512 self.log.info( 

513 "Compute Cluster %s already %s", self.compute_profile_name, Constants.CC_SUSPEND_DB_STATUS 

514 )