У меня есть рабочий код, время выполнения которого нужно значительно улучшить, и я совершенно растерялся. По сути, я получу zip-папки, содержащие десятки тысяч файлов JSON, каждый из которых содержит примерно 1000 сообщений JSON. В каждом из этих файлов имеется около 15 различных типов объектов json, и некоторые из этих объектов содержат внутри себя списки словарей, в то время как другие довольно просты. Мне нужно прочитать все данные, проанализировать объекты и извлечь соответствующую информацию, а затем передать эти проанализированные данные обратно и вставить их в другую программу, используя API для стороннего программного обеспечения (своего рода оболочка вокруг проприетарной реализации). SQL).
Итак, у меня есть код, который делает все это. Проблема в том, что каждый раз выполнение занимает около 4–5 часов, а мне нужно приблизиться к 30 минутам.
Мой текущий код сильно зависит от asyncio. Я использую это, чтобы добиться некоторого параллелизма, особенно при чтении файлов json. Я также начал профилировать свой код и до сих пор перешел на использование orjson для чтения данных из каждого файла и переписал каждую из моих функций синтаксического анализатора на cython, чтобы получить некоторые улучшения и с этой стороны. Однако я использую очереди asyncio для передачи данных туда и обратно, и мой профилировщик показывает, что много времени тратится только на вызовыqueue.get иqueue.put. Я также изучил msgspec, чтобы улучшить чтение данных json, и хотя это было быстрее, оно становилось медленнее, когда мне приходилось отправлять объекты msgspec.Struct в мой код Cython и использовать их вместо просто словаря.
Поэтому я просто надеялся на некоторую общую помощь о том, как улучшить этот процесс. Я читал о многопроцессорности как с multiprocessing.pools, так и с concurrent.futures, но оба они оказались медленнее, чем моя текущая реализация. Я подумал, может быть, мне нужно изменить способ передачи данных через очереди, чтобы я передавал полные данные JSON для каждого файла вместо каждого отдельного сообщения (около 1000 документов каждое), но это не помогло.
Я прочитал так много вопросов/ответов SO, но кажется, что у многих людей очень однородные данные json (а не 15 разных типов сообщений). Я изучал пакетную обработку, но не до конца понимаю, как это меняет ситуацию. Именно это я и делал, используя concurrent.futures, но, опять же, на самом деле это заняло больше времени.
В целом я хотел бы продолжать это как очереди, потому что в будущем я хотел бы запустить этот же процесс для потоковой передачи данных, чтобы эта часть просто заменила чтение json, и вместо этого каждое сообщение, полученное через поток, помещалось в очередь, и все остальное работало то же самое.
Некоторые псевдокоды приведены ниже.
main.py
import asyncio
from glob import glob
import orjson
from parser_dispatcher import ParserDispatcher
from sql_dispatcher import SqlDispatcher
async def load_json(file_path, queue):
async with aiofiles.open(file_path, mode="rb") as f:
data = await f.read()
json_data = await asyncio.to_thread(orjson.loads(data))
for msg in json_data:
await queue.put(msg)
async def load_all_json_files(base_path, queue):
file_list = glob(f"{base_path}/*.json")
tasks = [load_json(file_path, queue) for file_path in file_list]
await asyncio.gather(*tasks)
await queue.put(None) # to end the processing
def main()
base_path = "\path\to\json\folder"
paser_queue = asyncio.queue()
sql_queue = asyncio.queue()
parser_dispatch = ParserDispatcher()
sql_dispatch = SqlDispatcher()
load_task = load_all_json_files(base_path, parser_queue)
parser_task = parser_dispatch.process_queue(parser_queue, sql_queue)
sql_task = sql_dispatch.process_queue(sql_queue)
await asyncio.gather(load_task, parser_task, sqlr_task)
if __name__ -- "__main__":
asyncio.run(main))
parser_dispatcher.py
import asyncio
import message_parsers as mp
class ParserDispatcher:
def __init__(self):
self.parsers = {
("1", "2", "3"): mp.parser1,
.... etc
} # this is a dictionary where keys are tuples and values are the parser functions
def dispatch(self, msg):
parser_key = tuple(msg.get("type"), msg.get("source"), msg.get("channel"))
parser = self.parsers.get(parser_key)
if parser:
new_msg = parser(msg)
else:
new_msg = []
return new_msg
async def process_queue(self, parser_queue, sql_queue):
while True:
msg = await process_queue.get()
if msg is None:
await sql_put.put(None)
process_queue.task_done()
parsed_messages = self.dispatch(msg)
for parsed_message in parsed_messages:
await sql_queue.put(parsed_message)
sql_dispatcher.py
import asycnio
import proprietarySqlLibrary as sql
class SqlDispatcher:
def __init__(self):
# do all the connections to the DB in here
async def process_queue(self, sql_queue):
while True:
msg = await sql_queue.get()
# then go through and add this data to the DB
# this part is also relatively slow but I'm focusing on the first half for now
# since I don't have control over the DB stuff
Подробнее здесь: https://stackoverflow.com/questions/793 ... -in-python
Эффективный синтаксический анализ и обработка миллионов json-объектов в Python ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Эффективный синтаксический анализ и обработка миллионов json-объектов в Python
Anonymous » » в форуме Python - 0 Ответы
- 7 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Эффективный синтаксический анализ и обработка миллионов json-объектов в Python
Anonymous » » в форуме Python - 0 Ответы
- 6 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Синтаксический анализ частичных фрагментов json с помощью simdjson/rapidjson
Anonymous » » в форуме C++ - 0 Ответы
- 10 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Синтаксический анализ частичных фрагментов json с помощью simdjson/rapidjson
Anonymous » » в форуме C++ - 0 Ответы
- 14 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Синтаксический анализ Python XML с помощью ElementTree не дает интересующего результата
Anonymous » » в форуме Python - 0 Ответы
- 12 Просмотры
-
Последнее сообщение Anonymous
-