Coverage for providers/src/airflow/providers/teradata/transfers/s3_to_teradata.py: 78%

44 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-12-27 08:27 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from collections.abc import Sequence 

21from textwrap import dedent 

22from typing import TYPE_CHECKING 

23 

24from airflow.models import BaseOperator 

25 

26try: 

27 from airflow.providers.amazon.aws.hooks.s3 import S3Hook 

28except ModuleNotFoundError as e: 

29 from airflow.exceptions import AirflowOptionalProviderFeatureException 

30 

31 raise AirflowOptionalProviderFeatureException(e) 

32from airflow.providers.teradata.hooks.teradata import TeradataHook 

33 

34if TYPE_CHECKING: 

35 from airflow.utils.context import Context 

36 

37 

38class S3ToTeradataOperator(BaseOperator): 

39 """ 

40 Loads CSV, JSON and Parquet format data from Amazon S3 to Teradata. 

41 

42 .. seealso:: 

43 For more information on how to use this operator, take a look at the guide: 

44 :ref:`howto/operator:S3ToTeradataOperator` 

45 

46 :param s3_source_key: The URI format specifying the location of the S3 bucket.(templated) 

47 The URI format is /s3/YOUR-BUCKET.s3.amazonaws.com/YOUR-BUCKET-NAME. 

48 Refer to 

49 https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US 

50 :param public_bucket: Specifies whether the provided S3 bucket is public. If the bucket is public, 

51 it means that anyone can access the objects within it via a URL without requiring authentication. 

52 If the bucket is private and authentication is not provided, the operator will throw an exception. 

53 :param teradata_table: The name of the teradata table to which the data is transferred.(templated) 

54 :param aws_conn_id: The Airflow AWS connection used for AWS credentials. 

55 :param teradata_conn_id: The connection ID used to connect to Teradata 

56 :ref:`Teradata connection <howto/connection:Teradata>`. 

57 :param teradata_authorization_name: The name of Teradata Authorization Database Object, 

58 is used to control who can access an S3 object store. 

59 Refer to 

60 https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object 

61 

62 Note that ``s3_source_key`` and ``teradata_table`` are 

63 templated, so you can use variables in them if you wish. 

64 """ 

65 

66 template_fields: Sequence[str] = ("s3_source_key", "teradata_table") 

67 ui_color = "#e07c24" 

68 

69 def __init__( 

70 self, 

71 *, 

72 s3_source_key: str, 

73 public_bucket: bool = False, 

74 teradata_table: str, 

75 aws_conn_id: str = "aws_default", 

76 teradata_conn_id: str = "teradata_default", 

77 teradata_authorization_name: str = "", 

78 **kwargs, 

79 ) -> None: 

80 super().__init__(**kwargs) 

81 self.s3_source_key = s3_source_key 

82 self.public_bucket = public_bucket 

83 self.teradata_table = teradata_table 

84 self.aws_conn_id = aws_conn_id 

85 self.teradata_conn_id = teradata_conn_id 

86 self.teradata_authorization_name = teradata_authorization_name 

87 

88 def execute(self, context: Context) -> None: 

89 self.log.info( 

90 "transferring data from %s to teradata table %s...", self.s3_source_key, self.teradata_table 

91 ) 

92 

93 s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) 

94 teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) 

95 credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''" 

96 if not self.public_bucket: 96 ↛ 108line 96 didn't jump to line 108 because the condition on line 96 was always true

97 # Accessing data directly from the S3 bucket and creating permanent table inside the database 

98 if self.teradata_authorization_name: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 credentials_part = f"AUTHORIZATION={self.teradata_authorization_name}" 

100 else: 

101 credentials = s3_hook.get_credentials() 

102 access_key = credentials.access_key 

103 access_secret = credentials.secret_key 

104 credentials_part = f"ACCESS_ID= '{access_key}' ACCESS_KEY= '{access_secret}'" 

105 token = credentials.token 

106 if token: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 credentials_part = credentials_part + f" SESSION_TOKEN = '{token}'" 

108 sql = dedent(f""" 

109 CREATE MULTISET TABLE {self.teradata_table} AS 

110 ( 

111 SELECT * FROM ( 

112 LOCATION = '{self.s3_source_key}' 

113 {credentials_part} 

114 ) AS d 

115 ) WITH DATA 

116 """).rstrip() 

117 try: 

118 teradata_hook.run(sql, True) 

119 except Exception as ex: 

120 self.log.error(str(ex)) 

121 raise 

122 self.log.info("The transfer of data from S3 to Teradata was successful")