Coverage for providers/src/airflow/providers/teradata/transfers/s3_to_teradata.py: 78%
44 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-27 08:27 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-27 08:27 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from collections.abc import Sequence
21from textwrap import dedent
22from typing import TYPE_CHECKING
24from airflow.models import BaseOperator
26try:
27 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
28except ModuleNotFoundError as e:
29 from airflow.exceptions import AirflowOptionalProviderFeatureException
31 raise AirflowOptionalProviderFeatureException(e)
32from airflow.providers.teradata.hooks.teradata import TeradataHook
34if TYPE_CHECKING:
35 from airflow.utils.context import Context
38class S3ToTeradataOperator(BaseOperator):
39 """
40 Loads CSV, JSON and Parquet format data from Amazon S3 to Teradata.
42 .. seealso::
43 For more information on how to use this operator, take a look at the guide:
44 :ref:`howto/operator:S3ToTeradataOperator`
46 :param s3_source_key: The URI format specifying the location of the S3 bucket.(templated)
47 The URI format is /s3/YOUR-BUCKET.s3.amazonaws.com/YOUR-BUCKET-NAME.
48 Refer to
49 https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
50 :param public_bucket: Specifies whether the provided S3 bucket is public. If the bucket is public,
51 it means that anyone can access the objects within it via a URL without requiring authentication.
52 If the bucket is private and authentication is not provided, the operator will throw an exception.
53 :param teradata_table: The name of the teradata table to which the data is transferred.(templated)
54 :param aws_conn_id: The Airflow AWS connection used for AWS credentials.
55 :param teradata_conn_id: The connection ID used to connect to Teradata
56 :ref:`Teradata connection <howto/connection:Teradata>`.
57 :param teradata_authorization_name: The name of Teradata Authorization Database Object,
58 is used to control who can access an S3 object store.
59 Refer to
60 https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object
62 Note that ``s3_source_key`` and ``teradata_table`` are
63 templated, so you can use variables in them if you wish.
64 """
66 template_fields: Sequence[str] = ("s3_source_key", "teradata_table")
67 ui_color = "#e07c24"
69 def __init__(
70 self,
71 *,
72 s3_source_key: str,
73 public_bucket: bool = False,
74 teradata_table: str,
75 aws_conn_id: str = "aws_default",
76 teradata_conn_id: str = "teradata_default",
77 teradata_authorization_name: str = "",
78 **kwargs,
79 ) -> None:
80 super().__init__(**kwargs)
81 self.s3_source_key = s3_source_key
82 self.public_bucket = public_bucket
83 self.teradata_table = teradata_table
84 self.aws_conn_id = aws_conn_id
85 self.teradata_conn_id = teradata_conn_id
86 self.teradata_authorization_name = teradata_authorization_name
88 def execute(self, context: Context) -> None:
89 self.log.info(
90 "transferring data from %s to teradata table %s...", self.s3_source_key, self.teradata_table
91 )
93 s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
94 teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
95 credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''"
96 if not self.public_bucket: 96 ↛ 108line 96 didn't jump to line 108 because the condition on line 96 was always true
97 # Accessing data directly from the S3 bucket and creating permanent table inside the database
98 if self.teradata_authorization_name: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 credentials_part = f"AUTHORIZATION={self.teradata_authorization_name}"
100 else:
101 credentials = s3_hook.get_credentials()
102 access_key = credentials.access_key
103 access_secret = credentials.secret_key
104 credentials_part = f"ACCESS_ID= '{access_key}' ACCESS_KEY= '{access_secret}'"
105 token = credentials.token
106 if token: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 credentials_part = credentials_part + f" SESSION_TOKEN = '{token}'"
108 sql = dedent(f"""
109 CREATE MULTISET TABLE {self.teradata_table} AS
110 (
111 SELECT * FROM (
112 LOCATION = '{self.s3_source_key}'
113 {credentials_part}
114 ) AS d
115 ) WITH DATA
116 """).rstrip()
117 try:
118 teradata_hook.run(sql, True)
119 except Exception as ex:
120 self.log.error(str(ex))
121 raise
122 self.log.info("The transfer of data from S3 to Teradata was successful")