Сложное использование потоков в PythonPython

Программы на Python
Ответить
Anonymous
 Сложное использование потоков в Python

Сообщение Anonymous »

Я написал скрипт Python, который считывает фрейм данных построчно и анализирует каждую из них. Каждая строка содержит URL-адрес, который затем отправляется во внешний API, способный обрабатывать тысячи запросов в секунду. Скрипт сохраняет ответы API локально, чтобы избежать повторного запроса того же URL-адреса, что помогает сократить расходы.
Вот моя упрощенная версия кода:

Код: Выделить всё

import pandas as pd
import threading

processing_urls_lock = threading.Lock()
processing_urls = {}

def process_row(row, das, evaluator, ntp_server_checker):
tenant_id = row._1
dst_ip = row.dst_ip
url = row.url_or_ip
dst_country = row.dst_country

with processing_urls_lock:
if url in processing_urls:
event = processing_urls[url]
event.wait()
return None
else:
event = threading.Event()
processing_urls[url] = event

try:
new_row = {}
# Calling the API
return new_row
finally:
with processing_urls_lock:
event.set()
processing_urls.pop(url, None)

def phase_1(df_query_res: pd.DataFrame, app_token: str) -> pd.DataFrame:
data = []
evaluator = DestinationRiskEvaluator()
ntp_server_checker = NTPServerChecker()

with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
future_to_row = {
executor.submit(process_row, row, app_token, evaluator, ntp_server_checker): row for
row in df_query_res.itertuples(index=True, name='Row')}

for future in concurrent.futures.as_completed(future_to_row):
result = future.result()
if result:
data.append(result)

df = pd.DataFrame(data)
print(f"Total rows processed: {len(data)}")
return df

До использования потоков мой сценарий занимал 5 часов, но сейчас он занимает столько же, поэтому я предполагаю, что с моей реализацией что-то не так, но я не знаю, что это.
p>
Примечание: я могу убедиться, что моя машина может использовать такое большое количество потоков (много ядер ЦП).
Я уверен, что вызовы API — мое узкое место
Что особенного в моем коде, так это то, что я не хочу просто выполнять его параллельно, но пусть одни и те же две строки из первых 100 строк имеют одинаковый URL-адрес, тогда я не хочу делать еще один вызов API, а хочу дождитесь завершения первого потока, а затем прочитайте из кеша (строка, в которой я написал # Вызов API, сначала проверяет, находятся ли данные в кеше, прежде чем вызывать API)

Подробнее здесь: https://stackoverflow.com/questions/790 ... -in-python
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»