Coverage for providers/src/airflow/providers/teradata/operators/teradata.py: 90%

28 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-12-27 08:27 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from collections.abc import Sequence 

21from typing import TYPE_CHECKING, ClassVar 

22 

23from airflow.models import BaseOperator 

24from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator 

25from airflow.providers.teradata.hooks.teradata import TeradataHook 

26 

27if TYPE_CHECKING: 

28 from airflow.utils.context import Context 

29 

30 

31class TeradataOperator(SQLExecuteQueryOperator): 

32 """ 

33 General Teradata Operator to execute queries on Teradata Database. 

34 

35 Executes sql statements in the Teradata SQL Database using Teradata Python SQL Driver 

36 

37 .. seealso:: 

38 For more information on how to use this operator, take a look at the guide: 

39 :ref:`howto/operator:TeradataOperator` 

40 

41 :param sql: the SQL query to be executed as a single string, or a list of str (sql statements) 

42 :param teradata_conn_id: reference to a predefined database 

43 :param autocommit: if True, each command is automatically committed.(default value: False) 

44 :param parameters: (optional) the parameters to render the SQL query with. 

45 :param schema: The Teradata database to connect to. 

46 """ 

47 

48 template_fields: Sequence[str] = ( 

49 "sql", 

50 "parameters", 

51 ) 

52 template_ext: Sequence[str] = (".sql",) 

53 template_fields_renderers: ClassVar[dict] = {"sql": "sql"} 

54 ui_color = "#e07c24" 

55 

56 def __init__( 

57 self, 

58 teradata_conn_id: str = TeradataHook.default_conn_name, 

59 schema: str | None = None, 

60 **kwargs, 

61 ) -> None: 

62 if schema: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 hook_params = kwargs.pop("hook_params", {}) 

64 kwargs["hook_params"] = { 

65 "database": schema, 

66 **hook_params, 

67 } 

68 super().__init__(**kwargs) 

69 self.conn_id = teradata_conn_id 

70 

71 

72class TeradataStoredProcedureOperator(BaseOperator): 

73 """ 

74 Executes stored procedure in a specific Teradata database. 

75 

76 :param procedure: name of stored procedure to call (templated) 

77 :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` 

78 reference to a specific Teradata database. 

79 :param parameters: (optional, templated) the parameters provided in the call 

80 

81 """ 

82 

83 template_fields: Sequence[str] = ( 

84 "procedure", 

85 "parameters", 

86 ) 

87 ui_color = "#ededed" 

88 

89 def __init__( 

90 self, 

91 *, 

92 procedure: str, 

93 teradata_conn_id: str = TeradataHook.default_conn_name, 

94 parameters: dict | list | None = None, 

95 **kwargs, 

96 ) -> None: 

97 super().__init__(**kwargs) 

98 self.teradata_conn_id = teradata_conn_id 

99 self.procedure = procedure 

100 self.parameters = parameters 

101 

102 def execute(self, context: Context): 

103 hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) 

104 return hook.callproc(self.procedure, autocommit=True, parameters=self.parameters)