Невозможно подключиться к SFTP из Amazon Managed Apache Airflow, но можно подключиться из локальногоPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Невозможно подключиться к SFTP из Amazon Managed Apache Airflow, но можно подключиться из локального

Сообщение Anonymous »

Я пытаюсь подключиться от Amazon Managed Apache Airflow к SFTP-серверу, но возникает ошибка.

Код: Выделить всё

    [[2024-10-28, 07:20:13 UTC] {{standard_task_runner.py:88}} INFO - Job 174: Subtask sftp_task
[2024-10-28, 07:20:14 UTC] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='sftp_dag_with_hooks_connection' AIRFLOW_CTX_TASK_ID='sftp_task' AIRFLOW_CTX_EXECUTION_DATE='2024-10-28T07:20:08.185187+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-10-28T07:20:08.185187+00:00'
[2024-10-28, 07:20:14 UTC] {{ssh.py:300}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
[2024-10-28, 07:20:14 UTC] {{transport.py:1909}} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2024-10-28, 07:20:15 UTC] {{transport.py:1909}} INFO - Auth banner: b'Ubuntu 14.04.6 LTS\n'
[2024-10-28, 07:20:15 UTC] {{ssh.py:342}} INFO - Failed to connect. Sleeping before retry attempt 1
[2024-10-28, 07:20:20 UTC] {{transport.py:1909}} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2024-10-28, 07:20:21 UTC] {{transport.py:1909}} INFO - Auth banner: b'Ubuntu 14.04.6 LTS\n'
[2024-10-28, 07:20:21 UTC] {{ssh.py:342}} INFO - Failed to connect.  Sleeping before retry attempt 2
[2024-10-28, 07:20:25 UTC] {{transport.py:1909}} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2024-10-28, 07:20:26 UTC] {{transport.py:1909}} INFO - Auth banner: b'Ubuntu 14.04.6 LTS\n'
[2024-10-28, 07:20:26 UTC] {{taskinstance.py:2698}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 199, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 216, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/dags/pipelines/iqvia/aimxr/dag/dag_test_hooks.py", line 8, in sftp_task
with sftp_hook.get_conn() as sftp_client:
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/sftp/hooks/sftp.py", line 120, in get_conn
self.conn = super().get_conn().open_sftp()
^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/ssh/hooks/ssh.py", line 346, in get_conn
for attempt in Retrying(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 347, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 325, in iter
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 158, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/ssh/hooks/ssh.py", line 353, in get_conn
client.connect(**connect_kwargs)
File "/usr/local/airflow/.local/lib/python3.11/site-packages/paramiko/client.py", line 485, in connect
self._auth(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/paramiko/client.py", line 818, in _auth
raise saved_exception
File "/usr/local/airflow/.local/lib/python3.11/site-packages/paramiko/client.py", line 805, in _auth
self._transport.auth_password(username, password)
File "/usr/local/airflow/.local/lib/python3.11/site-packages/paramiko/transport.py", line 1603, in auth_password
return self.auth_handler.wait_for_response(my_event)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/paramiko/auth_handler.py", line 263, in wait_for_response
raise e
paramiko.ssh_exception.BadAuthenticationType: Bad authentication type; allowed types: ['publickey']
Однако тот же локальный скрипт Python может подключиться.
Я заметил, что paramiko неоднократно пытается подключиться к SFTP, даже если это происходит сбой, когда я пытаюсь подключиться локально (журналы вставлены ниже), и в конечном итоге это удается. Однако, похоже, этого не происходит, когда я запускаю тот же код через воздушный поток AWS Managed Apache.
Это скрипт Python, который у меня есть. Раньше я работал с SFTP, но впервые работаю с интеграцией воздушного потока AWS Managed Apache с SFTP.
Буду очень благодарен за любую помощь.

Код: Выделить всё

import os
import paramiko
import logging
import pytz
import re
from io import StringIO
import pandas as pd
logging.basicConfig(level=logging.DEBUG)
from dotenv import load_dotenv

load_dotenv()

pdt_timezone = pytz.timezone('America/Los_Angeles')
# hostname = os.getenv("hostname1")
# username = os.getenv("username1")
# port = int(os.getenv("port1"))
# keyfilepath = os.getenv("keyfilepath1")
# passphrase = os.getenv("passphrase1")

class SFTPtoS3loader:
def __init__(self, hostname, username, port, passphrase=None, password=None, keyfilepath=None):
self.hostname = hostname
self.username = username
self.port = port
self.passphrase = passphrase
self.password = password
self.keyfilepath = '''-----BEGIN OPENSSH PRIVATE KEY-----     xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx     -----END OPENSSH PRIVATE KEY-----'''

def connect_to_sftp(self):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key_file = re.sub(r' {5}', '\n', self.keyfilepath)
key_file = StringIO(key_file)
private_key = paramiko.RSAKey.from_private_key(key_file,
password=self.passphrase)
client.connect(hostname=self.hostname, username=self.username,
port=22,
passphrase=self.passphrase, pkey=private_key,
allow_agent=True,look_for_keys=True,
disabled_algorithms=dict(pubkeys=["rsa-sha2-512", "rsa-sha2-256"]))
sftp = client.open_sftp()
return sftp

def read_file_into_dataframe(self):
sftp = SFTPtoS3loader.connect_to_sftp(self)
file_list = sftp.listdir('from_dmd/aim_feed')
file_to_read = 'from_dmd/aim_feed/' + file_list[0]
with sftp.file(file_to_read, 'r') as remote_file:
file_content = remote_file.read().decode()  # Decode the file content to a string

# Use StringIO to read the file content into a DataFrame
file_buffer = StringIO(file_content)
df = pd.read_csv(file_buffer)
print(df.columns)
return df

if __name__ == '__main__':
# Instantiate the SFTPtoS3loader class with environment variables
loader = SFTPtoS3loader(
hostname=os.getenv('hostname1'),
username=os.getenv('username1'),
port=int(os.getenv('port1')),
passphrase=os.getenv('passphrase1'),
password=os.getenv('password1')
# keyfilepath=os.getenv('keyfilepath')
)

# Call the method to read files from SFTP
df = loader.read_file_into_dataframe()
print(df)

Это журналы успешного локального соединения SFTP.

Код: Выделить всё

DEBUG:paramiko.transport:client lang: 

Код: Выделить всё

DEBUG:paramiko.transport:server lang: 

Код: Выделить всё

DEBUG:paramiko.transport:kex follows: False

Код: Выделить всё

DEBUG:paramiko.transport:=== Key exchange agreements ===

Код: Выделить всё

DEBUG:paramiko.transport:Kex: [email protected]

Код: Выделить всё

DEBUG:paramiko.transport:HostKey: ssh-ed25519

Код: Выделить всё

DEBUG:paramiko.transport:Cipher: aes128-ctr

Код: Выделить всё

DEBUG:paramiko.transport:MAC: hmac-sha2-256

Код: Выделить всё

DEBUG:paramiko.transport:Compression: none

Код: Выделить всё

DEBUG:paramiko.transport:=== End of kex handshake ===

Код: Выделить всё

DEBUG:paramiko.transport:kex engine KexCurve25519 specified hash_algo 

Код: Выделить всё

DEBUG:paramiko.transport:Switch to new keys ...

Код: Выделить всё

DEBUG:paramiko.transport:Adding ssh-ed25519 host key for data.dmdconnects.com: b'1a87b0ce12c36b78bf21a957d53c7ae7'

Код: Выделить всё

DEBUG:paramiko.transport:Trying SSH key b'9dd379b45d4ef4b28b234f3d6ea9d81c'

Код: Выделить всё

DEBUG:paramiko.transport:userauth is OK

Код: Выделить всё

DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'

Код: Выделить всё

DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']

Код: Выделить всё

DEBUG:paramiko.transport:Server-side algorithm list:  ['']

Код: Выделить всё

DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm

Код: Выделить всё

DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'

Код: Выделить всё

DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']

Код: Выделить всё

DEBUG:paramiko.transport:Server-side algorithm list: ['']

Код: Выделить всё

DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm

Код: Выделить всё

INFO:paramiko.transport:Auth banner: b'Ubuntu 14.04.6 LTS\n'

Код: Выделить всё

INFO:paramiko.transport:Authentication (publickey) failed.

Код: Выделить всё

DEBUG:paramiko.transport:Trying SSH agent key b'bcd6af1f62fde3407ecd79d38ecf1493'

Код: Выделить всё

DEBUG:paramiko.transport:userauth is OK

Код: Выделить всё

DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'

Код: Выделить всё

DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']

Код: Выделить всё

DEBUG:paramiko.transport:Server-side algorithm list: ['']

Код: Выделить всё

DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm

Код: Выделить всё

DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'

Код: Выделить всё

DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']

Код: Выделить всё

DEBUG:paramiko.transport:Server-side algorithm list: ['']

Код: Выделить всё

DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm

Код: Выделить всё

INFO:paramiko.transport:Authentication (publickey) failed.

Код: Выделить всё

DEBUG:paramiko.transport:Trying SSH agent key b'6d0c453545e3225099ca99064119b28a'

Код: Выделить всё

DEBUG:paramiko.transport:userauth is OK

Код: Выделить всё

DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'

Код: Выделить всё

DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']

Код: Выделить всё

DEBUG:paramiko.transport:Server-side algorithm list: ['']

Код: Выделить всё

DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm

Код: Выделить всё

DEBUG:paramiko.transport:Finalizing pubkey algorithm for key of type 'ssh-rsa'

Код: Выделить всё

DEBUG:paramiko.transport:Our pubkey algorithm list: ['ssh-rsa']

Код: Выделить всё

DEBUG:paramiko.transport:Server-side algorithm list: ['']

Код: Выделить всё

DEBUG:paramiko.transport:Agreed upon 'ssh-rsa' pubkey algorithm

Код: Выделить всё

INFO:paramiko.transport:Authentication (publickey) successful!

Код: Выделить всё

DEBUG:paramiko.transport:[chan 0] Max packet in: 32768 bytes

Код: Выделить всё

DEBUG:paramiko.transport:[chan 0] Max packet out: 32768 bytes

Код: Выделить всё

DEBUG:paramiko.transport:Secsh channel 0 opened.

Код: Выделить всё

DEBUG:paramiko.transport:[chan 0] Sesch channel 0 request ok

Код: Выделить всё

INFO:paramiko.transport.sftp:[chan 0] Opened sftp connection (server version 3)
PS: При установке возникает та же проблема

Код: Выделить всё

apache-airflow-providers-sftp
и попробуйте подключиться через вкладку «Соединения» в Apache Airflow.

Подробнее здесь: https://stackoverflow.com/questions/791 ... to-connect
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»