Код: Выделить всё
[[2024-10-28, 07:20:13 UTC] {{standard_task_runner.py:88}} INFO - Job 174: Subtask sftp_task
[2024-10-28, 07:20:14 UTC] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='sftp_dag_with_hooks_connection' AIRFLOW_CTX_TASK_ID='sftp_task' AIRFLOW_CTX_EXECUTION_DATE='2024-10-28T07:20:08.185187+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-10-28T07:20:08.185187+00:00'
[2024-10-28, 07:20:14 UTC] {{ssh.py:300}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
[2024-10-28, 07:20:14 UTC] {{transport.py:1909}} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2024-10-28, 07:20:15 UTC] {{transport.py:1909}} INFO - Auth banner: b'Ubuntu 14.04.6 LTS\n'
[2024-10-28, 07:20:15 UTC] {{ssh.py:342}} INFO - Failed to connect. Sleeping before retry attempt 1
[2024-10-28, 07:20:20 UTC] {{transport.py:1909}} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2024-10-28, 07:20:21 UTC] {{transport.py:1909}} INFO - Auth banner: b'Ubuntu 14.04.6 LTS\n'
[2024-10-28, 07:20:21 UTC] {{ssh.py:342}} INFO - Failed to connect. Sleeping before retry attempt 2
[2024-10-28, 07:20:25 UTC] {{transport.py:1909}} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2024-10-28, 07:20:26 UTC] {{transport.py:1909}} INFO - Auth banner: b'Ubuntu 14.04.6 LTS\n'
[2024-10-28, 07:20:26 UTC] {{taskinstance.py:2698}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 199, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 216, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/dags/pipelines/iqvia/aimxr/dag/dag_test_hooks.py", line 8, in sftp_task
with sftp_hook.get_conn() as sftp_client:
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/sftp/hooks/sftp.py", line 120, in get_conn
self.conn = super().get_conn().open_sftp()
^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/ssh/hooks/ssh.py", line 346, in get_conn
for attempt in Retrying(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 347, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 325, in iter
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 158, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/ssh/hooks/ssh.py", line 353, in get_conn
client.connect(**connect_kwargs)
File "/usr/local/airflow/.local/lib/python3.11/site-packages/paramiko/client.py", line 485, in connect
self._auth(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/paramiko/client.py", line 818, in _auth
raise saved_exception
File "/usr/local/airflow/.local/lib/python3.11/site-packages/paramiko/client.py", line 805, in _auth
self._transport.auth_password(username, password)
File "/usr/local/airflow/.local/lib/python3.11/site-packages/paramiko/transport.py", line 1603, in auth_password
return self.auth_handler.wait_for_response(my_event)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/paramiko/auth_handler.py", line 263, in wait_for_response
raise e
paramiko.ssh_exception.BadAuthenticationType: Bad authentication type; allowed types: ['publickey']
Я заметил, что paramiko неоднократно пытается подключиться к SFTP, даже если это происходит сбой, когда я пытаюсь подключиться локально (журналы вставлены ниже), и в конечном итоге это удается. Однако, похоже, этого не происходит, когда я запускаю тот же код через воздушный поток AWS Managed Apache.
Это скрипт Python, который у меня есть. Раньше я работал с SFTP, но впервые работаю с интеграцией воздушного потока AWS Managed Apache с SFTP.
Буду очень благодарен за любую помощь.
Код: Выделить всё
import os
import paramiko
import logging
import pytz
import re
from io import StringIO
import pandas as pd
logging.basicConfig(level=logging.DEBUG)
from dotenv import load_dotenv
load_dotenv()
pdt_timezone = pytz.timezone('America/Los_Angeles')
# hostname = os.getenv("hostname1")
# username = os.getenv("username1")
# port = int(os.getenv("port1"))
# keyfilepath = os.getenv("keyfilepath1")
# passphrase = os.getenv("passphrase1")
class SFTPtoS3loader:
def __init__(self, hostname, username, port, passphrase=None, password=None, keyfilepath=None):
self.hostname = hostname
self.username = username
self.port = port
self.passphrase = passphrase
self.password = password
self.keyfilepath = '''-----BEGIN OPENSSH PRIVATE KEY----- xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx -----END OPENSSH PRIVATE KEY-----'''
def connect_to_sftp(self):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key_file = re.sub(r' {5}', '\n', self.keyfilepath)
key_file = StringIO(key_file)
private_key = paramiko.RSAKey.from_private_key(key_file,
password=self.passphrase)
client.connect(hostname=self.hostname, username=self.username,
port=22,
passphrase=self.passphrase, pkey=private_key,
allow_agent=True,look_for_keys=True,
disabled_algorithms=dict(pubkeys=["rsa-sha2-512", "rsa-sha2-256"]))
sftp = client.open_sftp()
return sftp
def read_file_into_dataframe(self):
sftp = SFTPtoS3loader.connect_to_sftp(self)
file_list = sftp.listdir('from_dmd/aim_feed')
file_to_read = 'from_dmd/aim_feed/' + file_list[0]
with sftp.file(file_to_read, 'r') as remote_file:
file_content = remote_file.read().decode() # Decode the file content to a string
# Use StringIO to read the file content into a DataFrame
file_buffer = StringIO(file_content)
df = pd.read_csv(file_buffer)
print(df.columns)
return df
if __name__ == '__main__':
# Instantiate the SFTPtoS3loader class with environment variables
loader = SFTPtoS3loader(
hostname=os.getenv('hostname1'),
username=os.getenv('username1'),
port=int(os.getenv('port1')),
passphrase=os.getenv('passphrase1'),
password=os.getenv('password1')
# keyfilepath=os.getenv('keyfilepath')
)
# Call the method to read files from SFTP
df = loader.read_file_into_dataframe()
print(df)
Код: Выделить всё
DEBUG:paramiko.transport:client lang:
Код: Выделить всё
DEBUG:paramiko.transport:server lang:
Код: Выделить всё
DEBUG:paramiko.transport:kex follows: False
Код: Выделить всё
DEBUG:paramiko.transport:=== Key exchange agreements ===
Код: Выделить всё
DEBUG:paramiko.transport:Kex: [email protected]
Код: Выделить всё
DEBUG:paramiko.transport:HostKey: ssh-ed25519
Код: Выделить всё
DEBUG:paramiko.transport:Cipher: aes128-ctr
Код: Выделить всё
DEBUG:paramiko.transport:MAC: hmac-sha2-256
Код: Выделить всё
DEBUG:paramiko.transport:Compression: none
Код: Выделить всё
DEBUG:paramiko.transport:=== End of kex handshake ===
Код: Выделить всё
DEBUG:paramiko.transport:kex engine KexCurve25519 specified hash_algo
Код: Выделить всё
DEBUG:paramiko.transport:Switch to new keys ...
Код: Выделить всё
DEBUG:paramiko.transport:Adding ssh-ed25519 host key for data.dmdconnects.com: b'1a87b0ce12c36b78bf21a957d53c7ae7'
Код: Выделить всё
DEBUG:paramiko.transport:Trying SSH key b'9dd379b45d4ef4b28b234f3d6ea9d81c'
Код: Выделить всё
DEBUG:paramiko.transport:userauth is OK
Код: Выделить всё
DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'
Код: Выделить всё
DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']
Код: Выделить всё
DEBUG:paramiko.transport:Server-side algorithm list: ['']
Код: Выделить всё
DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm
Код: Выделить всё
DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'
Код: Выделить всё
DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']
Код: Выделить всё
DEBUG:paramiko.transport:Server-side algorithm list: ['']
Код: Выделить всё
DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm
Код: Выделить всё
INFO:paramiko.transport:Auth banner: b'Ubuntu 14.04.6 LTS\n'
Код: Выделить всё
INFO:paramiko.transport:Authentication (publickey) failed.
Код: Выделить всё
DEBUG:paramiko.transport:Trying SSH agent key b'bcd6af1f62fde3407ecd79d38ecf1493'
Код: Выделить всё
DEBUG:paramiko.transport:userauth is OK
Код: Выделить всё
DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'
Код: Выделить всё
DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']
Код: Выделить всё
DEBUG:paramiko.transport:Server-side algorithm list: ['']
Код: Выделить всё
DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm
Код: Выделить всё
DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'
Код: Выделить всё
DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']
Код: Выделить всё
DEBUG:paramiko.transport:Server-side algorithm list: ['']
Код: Выделить всё
DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm
Код: Выделить всё
INFO:paramiko.transport:Authentication (publickey) failed.
Код: Выделить всё
DEBUG:paramiko.transport:Trying SSH agent key b'6d0c453545e3225099ca99064119b28a'
Код: Выделить всё
DEBUG:paramiko.transport:userauth is OK
Код: Выделить всё
DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'
Код: Выделить всё
DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']
Код: Выделить всё
DEBUG:paramiko.transport:Server-side algorithm list: ['']
Код: Выделить всё
DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm
Код: Выделить всё
DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'
Код: Выделить всё
DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']
Код: Выделить всё
DEBUG:paramiko.transport:Server-side algorithm list: ['']
Код: Выделить всё
DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm
Код: Выделить всё
INFO:paramiko.transport:Authentication (publickey) successful!
Код: Выделить всё
DEBUG:paramiko.transport:[chan 0] Max packet in: 32768 bytes
Код: Выделить всё
DEBUG:paramiko.transport:[chan 0] Max packet out: 32768 bytes
Код: Выделить всё
DEBUG:paramiko.transport:Secsh channel 0 opened.
Код: Выделить всё
DEBUG:paramiko.transport:[chan 0] Sesch channel 0 request ok
Код: Выделить всё
INFO:paramiko.transport.sftp:[chan 0] Opened sftp connection (server version 3)
Код: Выделить всё
apache-airflow-providers-sftp
Подробнее здесь: https://stackoverflow.com/questions/791 ... to-connect