Coverage for providers/src/airflow/providers/teradata/transfers/teradata_to_teradata.py: 94%
41 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-27 08:27 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-27 08:27 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from collections.abc import Sequence
21from functools import cached_property
22from typing import TYPE_CHECKING
24from airflow.models import BaseOperator
25from airflow.providers.teradata.hooks.teradata import TeradataHook
27if TYPE_CHECKING:
28 from airflow.utils.context import Context
31class TeradataToTeradataOperator(BaseOperator):
32 """
33 Moves data from Teradata source database to Teradata destination database.
35 .. seealso::
36 For more information on how to use this operator, take a look at the guide:
37 :ref:`howto/operator:TeradataToTeradataOperator`
39 :param dest_teradata_conn_id: destination Teradata connection.
40 :param destination_table: destination table to insert rows.
41 :param source_teradata_conn_id: :ref:`Source Teradata connection <howto/connection:Teradata>`.
42 :param sql: SQL query to execute against the source Teradata database
43 :param sql_params: Parameters to use in sql query.
44 :param rows_chunk: number of rows per chunk to commit.
45 """
47 template_fields: Sequence[str] = (
48 "sql",
49 "sql_params",
50 )
51 template_ext: Sequence[str] = (".sql",)
52 template_fields_renderers = {"sql": "sql", "sql_params": "py"}
53 ui_color = "#e07c24"
55 def __init__(
56 self,
57 *,
58 dest_teradata_conn_id: str,
59 destination_table: str,
60 source_teradata_conn_id: str,
61 sql: str,
62 sql_params: dict | None = None,
63 rows_chunk: int = 5000,
64 **kwargs,
65 ) -> None:
66 super().__init__(**kwargs)
67 if sql_params is None: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 sql_params = {}
69 self.dest_teradata_conn_id = dest_teradata_conn_id
70 self.destination_table = destination_table
71 self.source_teradata_conn_id = source_teradata_conn_id
72 self.sql = sql
73 self.sql_params = sql_params
74 self.rows_chunk = rows_chunk
76 @cached_property
77 def src_hook(self) -> TeradataHook:
78 return TeradataHook(teradata_conn_id=self.source_teradata_conn_id)
80 @cached_property
81 def dest_hook(self) -> TeradataHook:
82 return TeradataHook(teradata_conn_id=self.dest_teradata_conn_id)
84 def execute(self, context: Context) -> None:
85 src_hook = self.src_hook
86 dest_hook = self.dest_hook
87 with src_hook.get_conn() as src_conn:
88 cursor = src_conn.cursor()
89 cursor.execute(self.sql, self.sql_params)
90 target_fields = [field[0] for field in cursor.description]
91 rows_total = 0
92 if len(target_fields) != 0: 92 ↛ 101line 92 didn't jump to line 101 because the condition on line 92 was always true
93 for rows in iter(lambda: cursor.fetchmany(self.rows_chunk), []):
94 dest_hook.insert_rows(
95 self.destination_table,
96 rows,
97 target_fields=target_fields,
98 commit_every=self.rows_chunk,
99 )
100 rows_total += len(rows)
101 self.log.info("Finished data transfer. Total number of rows transferred - %s", rows_total)
102 cursor.close()