Многопроцессорность Python работает вечно, и в диспетчере задач нет активностиPython

Программы на Python
Ответить
Anonymous
 Многопроцессорность Python работает вечно, и в диспетчере задач нет активности

Сообщение Anonymous »

Я использую нечеткое сопоставление, чтобы сопоставить набор данных из 5000 строк с набором данных из 2 миллионов строк. Я использую ExtractOne(), чтобы получить то, что мне нужно, но это медленно. Я хочу использовать многопроцессорность, чтобы сделать это. У меня есть несколько наборов данных, которые необходимы подпроцессам, и я думаю, что должен передать их в вызов многопроцессной функции; Я не уверен в этом, но я все равно это сделал. Я новичок в многопроцессорности и пробую все, что нахожу в Google, но ничего не работает.
Этот вызов multiprocess() является частью гораздо более крупного сценария, который я пишу в Spyder. Причина, по которой я делаю этот отдельный read_parquet, заключается в том, что я прочитал, что multiprocess() не может видеть переменные, которые у меня есть в моей среде Spyder, поэтому я должен принять их снова.
Размер материала и фрагмента 1 предназначен только для тестирования; Я удалю все это, как только код начнет работать с меньшими наборами данных.
Основные проблемы на данный момент:
  • Код работает в Spyder, но продолжает работать, и я не вижу активности в диспетчере задач. Я вижу, что запускаются новые процессы, но загрузка их процессора остается равной 0, а объем используемой памяти не меняется. Я смотрел это около 30 минут.
  • Я почти уверен, что мой код где-то неверен. Я использовал различные стратегии, чтобы заставить что-то работать, но все они потерпели неудачу. Я пробовал использовать асинхронный запуск и очереди для сбора результатов. Все безрезультатно.
Любая помощь приветствуется.
Обновление 17 февраля 2026 г.: использовал Copilot, чтобы получить несколько советов по кодированию; все еще не работает; файлы журналов не созданы; не вижу увеличения использования процессора в диспетчере задач, как я ожидал. Я вижу, что процессор открыт и память используется, но процессор неактивен.
import pyarrow.parquet as pq

from rapidfuzz import process, fuzz
from rapidfuzz.utils import default_process
import pandas as pd
import duckdb
import os
import numpy as np
import multiprocessing as mp
import logging, os, traceback

def fuzzy_match_mike(security_4c, df_hold_choices): # add back in result_queue as an arg once verified

logging.basicConfig(
filename = f'worker_{os.getpid()}.log',
level = logging.DEBUG,
format = '%(asctime)s %(levelname)s %(messages)s'
)
try:

best_match = []
best_match2 = []
best_match3 = []

score = []
score2 = []
score3 = []

index = []
index2 = []
index3 = []

result_store_prime = []

for i in range(security_4c.shape[0]):

print(f"This is the df for this child process: {security_4c}")

df_temp = df_hold_choices.copy(deep=True)

bm, sc, idx = process.extractOne(
security_4c['4c_primary_name2'].tolist(),
df_temp,
scorer=fuzz.WRatio,
score_cutoff = 80,
# since we preprocessed the data, no need to have rapidfuzz process the records
processor=None
)
df_temp[idx] = np.nan

bm2, sc2, idx2 = process.extractOne(
security_4c['4c_primary_name2'].tolist(),
df_temp,
scorer=fuzz.WRatio,
score_cutoff = 80,
processor=None
)
df_temp[idx2] = np.nan

bm3, sc3, idx3 = process.extractOne(
security_4c['4c_primary_name2'].tolist(),
df_temp,
scorer=fuzz.WRatio,
score_cutoff = 80,
processor=None
)

best_match.append(bm)
score.append(sc)
index.append(idx)

best_match2.append(bm2)
score2.append(sc2)
index2.append(idx2)

best_match3.append(bm3)
score3.append(sc3)
index3.append(idx3)

result = [i,best_match,
score,
index,

best_match2,
score2,
index2,

best_match3,
score3,
index3]

result_store_prime.append(result)

except Exception:
logging.exception("Unhandled exception in fuzzy_match_mike")
traceback.print_exc()
raise

return result_store_prime

# ________________________END PRIMARY FUNCTION DECLARATION_____________________

security_4c['4c_primary_name'] = security_4c['4c_primary_name'].astype(str).fillna("")
security_4c['4c_primary_name2'] = [default_process(i) for i in security_4c['4c_primary_name'].tolist()]

df_hold['sba_data_primary_name'] = df_hold['sba_data_primary_name'].astype(str).fillna("")
df_hold['sba_data_primary_name2'] = [default_process(i) for i in df_hold['sba_data_primary_name'].tolist()]

df_hold_choices = df_hold['sba_data_primary_name2']

stuff = security_4c.iloc[12:16,]

os.chdir(r"\\sharedDrive\current_projects\ALR\Securities")

df_hold.to_parquet("df_hold.parquet", engine = 'pyarrow', compression = 'snappy')
security_4c.to_parquet("security_4c.parquet", engine = 'pyarrow', compression = 'snappy')

if __name__=='__main__':
# with statement allows pools to close and join on their own
os.chdir(r"\\sharedDrive\current_projects\ALR\Securities")

df_hold = pd.read_parquet("df_hold.parquet",
thrift_string_size_limit = 1000000000,
thrift_container_size_limit = 1000000000)

security_4c = pd.read_parquet("security_4c.parquet",
thrift_string_size_limit = 1000000000,
thrift_container_size_limit = 1000000000)

df_hold_choices = df_hold['sba_data_primary_name2']

with mp.Pool(processes = mp.cpu_count()) as pool:

stuff = security_4c.iloc[12:16,]

chunk_size = 1

# Starmap needs to be passed a tuple; each of which needs to be a
# standalone packet that the function accepts. Remember, the function
# inputs are (4c_data, sba data to compare to)
list_of_dfs = [stuff[i:i+chunk_size] for i in range(0, len(stuff), chunk_size)] #Update to all of 4c once working
list_of_dfs = [(df_un, df_hold_choices) for df_un in list_of_dfs]

results = pool.starmap(fuzzy_match_mike, list_of_dfs)

final_result = [item for sublist in results for item in sublist]



Подробнее здесь: https://stackoverflow.com/questions/798 ... sk-manager
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»