Coverage for providers/src/airflow/providers/teradata/transfers/teradata_to_teradata.py: 94%

41 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-12-27 08:27 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from collections.abc import Sequence 

21from functools import cached_property 

22from typing import TYPE_CHECKING 

23 

24from airflow.models import BaseOperator 

25from airflow.providers.teradata.hooks.teradata import TeradataHook 

26 

27if TYPE_CHECKING: 

28 from airflow.utils.context import Context 

29 

30 

31class TeradataToTeradataOperator(BaseOperator): 

32 """ 

33 Moves data from Teradata source database to Teradata destination database. 

34 

35 .. seealso:: 

36 For more information on how to use this operator, take a look at the guide: 

37 :ref:`howto/operator:TeradataToTeradataOperator` 

38 

39 :param dest_teradata_conn_id: destination Teradata connection. 

40 :param destination_table: destination table to insert rows. 

41 :param source_teradata_conn_id: :ref:`Source Teradata connection <howto/connection:Teradata>`. 

42 :param sql: SQL query to execute against the source Teradata database 

43 :param sql_params: Parameters to use in sql query. 

44 :param rows_chunk: number of rows per chunk to commit. 

45 """ 

46 

47 template_fields: Sequence[str] = ( 

48 "sql", 

49 "sql_params", 

50 ) 

51 template_ext: Sequence[str] = (".sql",) 

52 template_fields_renderers = {"sql": "sql", "sql_params": "py"} 

53 ui_color = "#e07c24" 

54 

55 def __init__( 

56 self, 

57 *, 

58 dest_teradata_conn_id: str, 

59 destination_table: str, 

60 source_teradata_conn_id: str, 

61 sql: str, 

62 sql_params: dict | None = None, 

63 rows_chunk: int = 5000, 

64 **kwargs, 

65 ) -> None: 

66 super().__init__(**kwargs) 

67 if sql_params is None: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 sql_params = {} 

69 self.dest_teradata_conn_id = dest_teradata_conn_id 

70 self.destination_table = destination_table 

71 self.source_teradata_conn_id = source_teradata_conn_id 

72 self.sql = sql 

73 self.sql_params = sql_params 

74 self.rows_chunk = rows_chunk 

75 

76 @cached_property 

77 def src_hook(self) -> TeradataHook: 

78 return TeradataHook(teradata_conn_id=self.source_teradata_conn_id) 

79 

80 @cached_property 

81 def dest_hook(self) -> TeradataHook: 

82 return TeradataHook(teradata_conn_id=self.dest_teradata_conn_id) 

83 

84 def execute(self, context: Context) -> None: 

85 src_hook = self.src_hook 

86 dest_hook = self.dest_hook 

87 with src_hook.get_conn() as src_conn: 

88 cursor = src_conn.cursor() 

89 cursor.execute(self.sql, self.sql_params) 

90 target_fields = [field[0] for field in cursor.description] 

91 rows_total = 0 

92 if len(target_fields) != 0: 92 ↛ 101line 92 didn't jump to line 101 because the condition on line 92 was always true

93 for rows in iter(lambda: cursor.fetchmany(self.rows_chunk), []): 

94 dest_hook.insert_rows( 

95 self.destination_table, 

96 rows, 

97 target_fields=target_fields, 

98 commit_every=self.rows_chunk, 

99 ) 

100 rows_total += len(rows) 

101 self.log.info("Finished data transfer. Total number of rows transferred - %s", rows_total) 

102 cursor.close()