Я использую нечеткое сопоставление, чтобы сопоставить набор данных из 5000 строк с набором данных из 2 миллионов строк. Я использую ExtractOne(), чтобы получить то, что мне нужно, но это медленно. Я хочу использовать многопроцессорность, чтобы сделать это. У меня есть несколько наборов данных, которые необходимы подпроцессам, и я думаю, что должен передать их в вызов многопроцессной функции; Я не уверен в этом, но я все равно это сделал. Я новичок в многопроцессорности и пробую все, что нахожу в Google, но ничего не работает.
Этот вызов multiprocess() является частью гораздо более крупного сценария, который я пишу в Spyder. Причина, по которой я делаю этот отдельный read_parquet, заключается в том, что я прочитал, что multiprocess() не может видеть переменные, которые у меня есть в моей среде Spyder, поэтому я должен принять их снова.
Размер материала и фрагмента 1 предназначен только для тестирования; Я удалю все это, как только код начнет работать с меньшими наборами данных.
Основные проблемы на данный момент:
Код работает в Spyder, но продолжает работать, и я не вижу активности в диспетчере задач. Я вижу, что запускаются новые процессы, но загрузка их процессора остается равной 0, а объем используемой памяти не меняется. Я смотрел это около 30 минут.
Я почти уверен, что мой код где-то неверен. Я использовал различные стратегии, чтобы заставить что-то работать, но все они потерпели неудачу. Я пробовал использовать асинхронный запуск и очереди для сбора результатов. Все безрезультатно.
Любая помощь приветствуется.
Обновление 17 февраля 2026 г.: использовал Copilot, чтобы получить несколько советов по кодированию; все еще не работает; файлы журналов не созданы; не вижу увеличения использования процессора в диспетчере задач, как я ожидал. Я вижу, что процессор открыт и память используется, но процессор неактивен.
import pyarrow.parquet as pq
from rapidfuzz import process, fuzz
from rapidfuzz.utils import default_process
import pandas as pd
import duckdb
import os
import numpy as np
import multiprocessing as mp
import logging, os, traceback
def fuzzy_match_mike(security_4c, df_hold_choices): # add back in result_queue as an arg once verified
print(f"This is the df for this child process: {security_4c}")
df_temp = df_hold_choices.copy(deep=True)
bm, sc, idx = process.extractOne(
security_4c['4c_primary_name2'].tolist(),
df_temp,
scorer=fuzz.WRatio,
score_cutoff = 80,
# since we preprocessed the data, no need to have rapidfuzz process the records
processor=None
)
df_temp[idx] = np.nan
except Exception:
logging.exception("Unhandled exception in fuzzy_match_mike")
traceback.print_exc()
raise
return result_store_prime
# ________________________END PRIMARY FUNCTION DECLARATION_____________________
security_4c['4c_primary_name'] = security_4c['4c_primary_name'].astype(str).fillna("")
security_4c['4c_primary_name2'] = [default_process(i) for i in security_4c['4c_primary_name'].tolist()]
df_hold['sba_data_primary_name'] = df_hold['sba_data_primary_name'].astype(str).fillna("")
df_hold['sba_data_primary_name2'] = [default_process(i) for i in df_hold['sba_data_primary_name'].tolist()]
# Starmap needs to be passed a tuple; each of which needs to be a
# standalone packet that the function accepts. Remember, the function
# inputs are (4c_data, sba data to compare to)
list_of_dfs = [stuff[i:i+chunk_size] for i in range(0, len(stuff), chunk_size)] #Update to all of 4c once working
list_of_dfs = [(df_un, df_hold_choices) for df_un in list_of_dfs]
Я использую нечеткое сопоставление, чтобы сопоставить набор данных из 5000 строк с набором данных из 2 миллионов строк. Я использую ExtractOne(), чтобы получить то, что мне нужно, но это медленно. Я хочу использовать многопроцессорность, чтобы сделать это. У меня есть несколько наборов данных, которые необходимы подпроцессам, и я думаю, что должен передать их в вызов многопроцессной функции; Я не уверен в этом, но я все равно это сделал. Я новичок в многопроцессорности и пробую все, что нахожу в Google, но ничего не работает. Этот вызов multiprocess() является частью гораздо более крупного сценария, который я пишу в Spyder. Причина, по которой я делаю этот отдельный read_parquet, заключается в том, что я прочитал, что multiprocess() не может видеть переменные, которые у меня есть в моей среде Spyder, поэтому я должен принять их снова. Размер материала и фрагмента 1 предназначен только для тестирования; Я удалю все это, как только код начнет работать с меньшими наборами данных. Основные проблемы на данный момент: [list] [*]Код работает в Spyder, но продолжает работать, и я не вижу активности в диспетчере задач. Я вижу, что запускаются новые процессы, но загрузка их процессора остается равной 0, а объем используемой памяти не меняется. Я смотрел это около 30 минут.
[*]Я почти уверен, что мой код где-то неверен. Я использовал различные стратегии, чтобы заставить что-то работать, но все они потерпели неудачу. Я пробовал использовать асинхронный запуск и очереди для сбора результатов. Все безрезультатно.
[/list] Любая помощь приветствуется. Обновление 17 февраля 2026 г.: использовал Copilot, чтобы получить несколько советов по кодированию; все еще не работает; файлы журналов не созданы; не вижу увеличения использования процессора в диспетчере задач, как я ожидал. Я вижу, что процессор открыт и память используется, но процессор неактивен. import pyarrow.parquet as pq
from rapidfuzz import process, fuzz from rapidfuzz.utils import default_process import pandas as pd import duckdb import os import numpy as np import multiprocessing as mp import logging, os, traceback
def fuzzy_match_mike(security_4c, df_hold_choices): # add back in result_queue as an arg once verified
print(f"This is the df for this child process: {security_4c}")
df_temp = df_hold_choices.copy(deep=True)
bm, sc, idx = process.extractOne( security_4c['4c_primary_name2'].tolist()[i], df_temp, scorer=fuzz.WRatio, score_cutoff = 80, # since we preprocessed the data, no need to have rapidfuzz process the records processor=None ) df_temp[idx] = np.nan
except Exception: logging.exception("Unhandled exception in fuzzy_match_mike") traceback.print_exc() raise
return result_store_prime
# ________________________END PRIMARY FUNCTION DECLARATION_____________________
security_4c['4c_primary_name'] = security_4c['4c_primary_name'].astype(str).fillna("") security_4c['4c_primary_name2'] = [default_process(i) for i in security_4c['4c_primary_name'].tolist()]
df_hold['sba_data_primary_name'] = df_hold['sba_data_primary_name'].astype(str).fillna("") df_hold['sba_data_primary_name2'] = [default_process(i) for i in df_hold['sba_data_primary_name'].tolist()]
# Starmap needs to be passed a tuple; each of which needs to be a # standalone packet that the function accepts. Remember, the function # inputs are (4c_data, sba data to compare to) list_of_dfs = [stuff[i:i+chunk_size] for i in range(0, len(stuff), chunk_size)] #Update to all of 4c once working list_of_dfs = [(df_un, df_hold_choices) for df_un in list_of_dfs]