Я использую библиотеку Python concurrent.futures с ThreadPoolExecutor и ProcessPoolExecutor. Я хочу реализовать механизм отмены всех запущенных или невыполненных задач в случае сбоя какой-либо из задач. В частности, я хочу:
Отменить все фьючерсы (как запущенные, так и невыполненные) в случае сбоя задачи.
Поднять ошибка, которая привела к сбою первой задачи, если эта ошибка молча игнорируется; в противном случае позвольте Python поднять его естественным путем.
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
copy_func = partial(copy_from, table_name=table_name, column_string=column_string)
with ProcessPoolExecutor(max_workers=cores_to_use) as executor:
futures = {executor.submit(copy_func, file_path): file_path for file_path in file_path_list}
for f in as_completed(futures):
try:
f.result()
except Exception as e:
executor.shutdown(wait=False) # Attempt to stop the executor
for future in futures:
future.cancel() # Cancel all futures
raise e # Raise the exception
Вопросы:
Правильно ли это обрабатывать отмену задач в ThreadPoolExecutor и ProcessPoolExecutor?
Есть ли более эффективные подходы для достижения этой функциональности?
Как я могу гарантировать, что возникающее исключение не будет игнорироваться молча?
Как я могу гарантировать, что возникшее исключение не будет игнорироваться молча?
li>
Как освободить все ресурсы, используемые concurrent.futures после исключения?
Я использую библиотеку Python concurrent.futures с ThreadPoolExecutor и ProcessPoolExecutor. Я хочу реализовать механизм отмены всех запущенных или невыполненных задач в случае сбоя какой-либо из задач. В частности, я хочу: [list] [*]Отменить все фьючерсы (как запущенные, так и невыполненные) в случае сбоя задачи. [*]Поднять ошибка, которая привела к сбою первой задачи, если эта ошибка молча игнорируется; в противном случае позвольте Python поднять его естественным путем. [/list] Вот подход, который я пробовал: [code]from concurrent.futures import ProcessPoolExecutor, as_completed from functools import partial
copy_func = partial(copy_from, table_name=table_name, column_string=column_string) with ProcessPoolExecutor(max_workers=cores_to_use) as executor: futures = {executor.submit(copy_func, file_path): file_path for file_path in file_path_list} for f in as_completed(futures): try: f.result() except Exception as e: executor.shutdown(wait=False) # Attempt to stop the executor for future in futures: future.cancel() # Cancel all futures raise e # Raise the exception [/code] Вопросы: [list] [*]Правильно ли это обрабатывать отмену задач в ThreadPoolExecutor и ProcessPoolExecutor? [*]Есть ли более эффективные подходы для достижения этой функциональности? [*]Как я могу гарантировать, что возникающее исключение не будет игнорироваться молча? [*]Как я могу гарантировать, что возникшее исключение не будет игнорироваться молча? [*] li> Как освободить все ресурсы, используемые concurrent.futures после исключения? [/list] Спасибо !
Я использую библиотеку Python concurrent.futures с ThreadPoolExecutor и ProcessPoolExecutor. Я хочу реализовать механизм отмены всех запущенных или невыполненных задач в случае сбоя какой-либо из задач. В частности, я хочу:
Я использую библиотеку Python concurrent.futures с ThreadPoolExecutor и ProcessPoolExecutor. Я хочу реализовать механизм отмены всех запущенных или невыполненных задач в случае сбоя какой-либо из задач. В частности, я хочу:
У меня есть следующий фрагмент кода, иллюстрирующий мою проблему:
Каждый поток вычисляет локальные значения, а затем обновляет массив результатов. Предположим, что это обновлять (result += mask ) — очень медленная операция, как ее распараллелить,...
Я разрабатываю программу для вставки данных на уровне тиков для более чем 300 акций, полученных в формате словаря веб-потока от API брокера. У меня успешно работает версия программы без concurrent.futures, но она пропускает некоторые тики из-за...
Мой код выглядит следующим образом:
with concurrent.futures.ThreadPoolExecutor() as executor:
length = len(self.seq)
futures = * length
results = []
for i in range(length):
f,args,kwargs = self.seq
future = executor.submit(f, *args, **kwargs)...