Coverage for providers/src/airflow/providers/teradata/transfers/azure_blob_to_teradata.py: 80%
41 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-27 08:27 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-27 08:27 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from collections.abc import Sequence
21from textwrap import dedent
22from typing import TYPE_CHECKING
24from airflow.models import BaseOperator
26try:
27 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
28except ModuleNotFoundError as e:
29 from airflow.exceptions import AirflowOptionalProviderFeatureException
31 raise AirflowOptionalProviderFeatureException(e)
33from airflow.providers.teradata.hooks.teradata import TeradataHook
35if TYPE_CHECKING:
36 from airflow.utils.context import Context
39class AzureBlobStorageToTeradataOperator(BaseOperator):
40 """
42 Loads CSV, JSON and Parquet format data from Azure Blob Storage to Teradata.
44 .. seealso::
45 For more information on how to use this operator, take a look at the guide:
46 :ref:`howto/operator:AzureBlobStorageToTeradataOperator`
48 :param blob_source_key: The URI format specifying the location of the Azure blob object store.(templated)
49 The URI format is `/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`.
50 Refer to
51 https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
52 :param public_bucket: Specifies whether the provided blob container is public. If the blob container is public,
53 it means that anyone can access the objects within it via a URL without requiring authentication.
54 If the bucket is private and authentication is not provided, the operator will throw an exception.
55 :param azure_conn_id: The Airflow WASB connection used for azure blob credentials.
56 :param teradata_table: The name of the teradata table to which the data is transferred.(templated)
57 :param teradata_conn_id: The connection ID used to connect to Teradata
58 :ref:`Teradata connection <howto/connection:Teradata>`
59 :param teradata_authorization_name: The name of Teradata Authorization Database Object,
60 is used to control who can access an Azure Blob object store.
61 Refer to
62 https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object
64 Note that ``blob_source_key`` and ``teradata_table`` are
65 templated, so you can use variables in them if you wish.
66 """
68 template_fields: Sequence[str] = ("blob_source_key", "teradata_table")
69 ui_color = "#e07c24"
71 def __init__(
72 self,
73 *,
74 blob_source_key: str,
75 public_bucket: bool = False,
76 azure_conn_id: str = "azure_default",
77 teradata_table: str,
78 teradata_conn_id: str = "teradata_default",
79 teradata_authorization_name: str = "",
80 **kwargs,
81 ) -> None:
82 super().__init__(**kwargs)
83 self.blob_source_key = blob_source_key
84 self.public_bucket = public_bucket
85 self.azure_conn_id = azure_conn_id
86 self.teradata_table = teradata_table
87 self.teradata_conn_id = teradata_conn_id
88 self.teradata_authorization_name = teradata_authorization_name
90 def execute(self, context: Context) -> None:
91 self.log.info(
92 "transferring data from %s to teradata table %s...", self.blob_source_key, self.teradata_table
93 )
94 teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
95 credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''"
96 if not self.public_bucket: 96 ↛ 108line 96 didn't jump to line 108 because the condition on line 96 was always true
97 # Accessing data directly from the Azure Blob Storage and creating permanent table inside the
98 # database
99 if self.teradata_authorization_name: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true
100 credentials_part = f"AUTHORIZATION={self.teradata_authorization_name}"
101 else:
102 # Obtaining the Azure client ID and Azure secret in order to access a specified Blob container
103 azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id)
104 conn = azure_hook.get_connection(self.azure_conn_id)
105 access_id = conn.login
106 access_secret = conn.password
107 credentials_part = f"ACCESS_ID= '{access_id}' ACCESS_KEY= '{access_secret}'"
108 sql = dedent(f"""
109 CREATE MULTISET TABLE {self.teradata_table} AS
110 (
111 SELECT * FROM (
112 LOCATION = '{self.blob_source_key}'
113 {credentials_part}
114 ) AS d
115 ) WITH DATA
116 """).rstrip()
117 try:
118 teradata_hook.run(sql, True)
119 except Exception as ex:
120 self.log.error(str(ex))
121 raise
122 self.log.info("The transfer of data from Azure Blob to Teradata was successful")