Coverage for providers/src/airflow/providers/teradata/transfers/azure_blob_to_teradata.py: 80%

41 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-12-27 08:27 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from collections.abc import Sequence 

21from textwrap import dedent 

22from typing import TYPE_CHECKING 

23 

24from airflow.models import BaseOperator 

25 

26try: 

27 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook 

28except ModuleNotFoundError as e: 

29 from airflow.exceptions import AirflowOptionalProviderFeatureException 

30 

31 raise AirflowOptionalProviderFeatureException(e) 

32 

33from airflow.providers.teradata.hooks.teradata import TeradataHook 

34 

35if TYPE_CHECKING: 

36 from airflow.utils.context import Context 

37 

38 

39class AzureBlobStorageToTeradataOperator(BaseOperator): 

40 """ 

41 

42 Loads CSV, JSON and Parquet format data from Azure Blob Storage to Teradata. 

43 

44 .. seealso:: 

45 For more information on how to use this operator, take a look at the guide: 

46 :ref:`howto/operator:AzureBlobStorageToTeradataOperator` 

47 

48 :param blob_source_key: The URI format specifying the location of the Azure blob object store.(templated) 

49 The URI format is `/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`. 

50 Refer to 

51 https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US 

52 :param public_bucket: Specifies whether the provided blob container is public. If the blob container is public, 

53 it means that anyone can access the objects within it via a URL without requiring authentication. 

54 If the bucket is private and authentication is not provided, the operator will throw an exception. 

55 :param azure_conn_id: The Airflow WASB connection used for azure blob credentials. 

56 :param teradata_table: The name of the teradata table to which the data is transferred.(templated) 

57 :param teradata_conn_id: The connection ID used to connect to Teradata 

58 :ref:`Teradata connection <howto/connection:Teradata>` 

59 :param teradata_authorization_name: The name of Teradata Authorization Database Object, 

60 is used to control who can access an Azure Blob object store. 

61 Refer to 

62 https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object 

63 

64 Note that ``blob_source_key`` and ``teradata_table`` are 

65 templated, so you can use variables in them if you wish. 

66 """ 

67 

68 template_fields: Sequence[str] = ("blob_source_key", "teradata_table") 

69 ui_color = "#e07c24" 

70 

71 def __init__( 

72 self, 

73 *, 

74 blob_source_key: str, 

75 public_bucket: bool = False, 

76 azure_conn_id: str = "azure_default", 

77 teradata_table: str, 

78 teradata_conn_id: str = "teradata_default", 

79 teradata_authorization_name: str = "", 

80 **kwargs, 

81 ) -> None: 

82 super().__init__(**kwargs) 

83 self.blob_source_key = blob_source_key 

84 self.public_bucket = public_bucket 

85 self.azure_conn_id = azure_conn_id 

86 self.teradata_table = teradata_table 

87 self.teradata_conn_id = teradata_conn_id 

88 self.teradata_authorization_name = teradata_authorization_name 

89 

90 def execute(self, context: Context) -> None: 

91 self.log.info( 

92 "transferring data from %s to teradata table %s...", self.blob_source_key, self.teradata_table 

93 ) 

94 teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) 

95 credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''" 

96 if not self.public_bucket: 96 ↛ 108line 96 didn't jump to line 108 because the condition on line 96 was always true

97 # Accessing data directly from the Azure Blob Storage and creating permanent table inside the 

98 # database 

99 if self.teradata_authorization_name: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true

100 credentials_part = f"AUTHORIZATION={self.teradata_authorization_name}" 

101 else: 

102 # Obtaining the Azure client ID and Azure secret in order to access a specified Blob container 

103 azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id) 

104 conn = azure_hook.get_connection(self.azure_conn_id) 

105 access_id = conn.login 

106 access_secret = conn.password 

107 credentials_part = f"ACCESS_ID= '{access_id}' ACCESS_KEY= '{access_secret}'" 

108 sql = dedent(f""" 

109 CREATE MULTISET TABLE {self.teradata_table} AS 

110 ( 

111 SELECT * FROM ( 

112 LOCATION = '{self.blob_source_key}' 

113 {credentials_part} 

114 ) AS d 

115 ) WITH DATA 

116 """).rstrip() 

117 try: 

118 teradata_hook.run(sql, True) 

119 except Exception as ex: 

120 self.log.error(str(ex)) 

121 raise 

122 self.log.info("The transfer of data from Azure Blob to Teradata was successful")