Вот моя упрощенная версия кода:
Код: Выделить всё
import pandas as pd
import threading
processing_urls_lock = threading.Lock()
processing_urls = {}
def process_row(row, das, evaluator, ntp_server_checker):
tenant_id = row._1
dst_ip = row.dst_ip
url = row.url_or_ip
dst_country = row.dst_country
with processing_urls_lock:
if url in processing_urls:
event = processing_urls[url]
event.wait()
return None
else:
event = threading.Event()
processing_urls[url] = event
try:
new_row = {}
# Calling the API
return new_row
finally:
with processing_urls_lock:
event.set()
processing_urls.pop(url, None)
def phase_1(df_query_res: pd.DataFrame, app_token: str) -> pd.DataFrame:
data = []
evaluator = DestinationRiskEvaluator()
ntp_server_checker = NTPServerChecker()
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
future_to_row = {
executor.submit(process_row, row, app_token, evaluator, ntp_server_checker): row for
row in df_query_res.itertuples(index=True, name='Row')}
for future in concurrent.futures.as_completed(future_to_row):
result = future.result()
if result:
data.append(result)
df = pd.DataFrame(data)
print(f"Total rows processed: {len(data)}")
return df
p>
Примечание: я могу убедиться, что моя машина может использовать такое большое количество потоков (много ядер ЦП).
Я уверен, что вызовы API — мое узкое место
Что особенного в моем коде, так это то, что я не хочу просто выполнять его параллельно, но пусть одни и те же две строки из первых 100 строк имеют одинаковый URL-адрес, тогда я не хочу делать еще один вызов API, а хочу дождитесь завершения первого потока, а затем прочитайте из кеша (строка, в которой я написал # Вызов API, сначала проверяет, находятся ли данные в кеше, прежде чем вызывать API)
Подробнее здесь: https://stackoverflow.com/questions/790 ... -in-python
Мобильная версия