Python `multiprocessing.Manager().Queue()` засоряетсяPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Python `multiprocessing.Manager().Queue()` засоряется

Сообщение Anonymous »

У меня проблемы с multiprocessing.Manager().Queue()
Короче: некоторые процессы засоряются и остаются< /em> забит, в то время как другие процессы имеют одинаковое время выполнения и всегда согласованы. Данные не теряются и не повреждаются в очереди, они в конечном итоге появятся на другом конце.
Ниже приведен фрагмент соответствующего кода, но предоставить полный код невозможно. поскольку я подключаюсь к API, который требует имена пользователей, пароли и т. д. Мой код в основном создает 16 соединений веб-сокетов для 16 различных акций для 16 потоков цен в реальном времени и отображает их все на 1 рисунке с 16 подграфиками, используя matplotlib.animation.FuncAnimation(). Некоторые акции никогда не будут задерживаться и всегда будут отображаться при получении ответа веб-сокета, в то время как некоторые другие акции будут всегда задерживаться с момента получения ответа веб-сокета и построения графика. То есть он будет пинговать сервер, но процесс застрянет в очереди.put() и очереди.get(). Я вставил time.sleep(10) после ответа API, чтобы уменьшить частоту прохождения данных по каналу, но проблема все равно возникает.
Я знаю, что это очередь. проблема, поскольку я напечатал ответ веб-сокета сразу после того, как он получил ответ от API (перед put()) и напечатал ответ после того, как он прошел через канал (после get()). Глядя на изображение, я знаю, что это проблема с очередью. «Входящая пара» печатается после ответа API и перед очередью.put(), «выходящая пара» печатается непосредственно после очереди.get(). Время «входа пары» правильное, в то время как «выход пары» из HK50 отстает примерно на 7 минут, но «вход пары» происходит примерно вовремя (около 3 секунд, что нормально. Но пара J225). out" не задерживается. Поэтому у меня нет задержки с ответами веб-сокета.
[img]https://i.sstatic. net/iVFsWKzj.png[/img]

Вы также можете увидеть это на рисунке ниже. Некоторые графики отстают от других, если посмотреть на момент текущей цены. Например. HK50 отстает от большинства других, хотя на самом деле из веб-сокета есть обновления цен.
[img]https://i.sstatic. net/lQFjCKr9.png[/img]

Я не ожидаю четкого решения, но я новичок в многопроцессорной обработке, поэтому, возможно, я допустил очень простую ошибку.
Краткое описание моего кода:
В __name__ == "__main__":
  • Остальные API используются для получения основной информации и создания словарей для хранения данных.
  • manager = multiprocessing.Manager()Создается , и его метод Queue() используется в списке очередью = [manager.Queue() for _ in range(len(epics))] (длина равна # из акции).
  • Процессы порождаются многопроцессорным циклом for.Process(target = get_live_prices, args=(queues[idx],...), для каждой акции и добавляется к списку, называемому процессами внутри цикла.
  • Цикл по спискам процессов и Process.start() для каждого процесса.
    Спецификации графика создаются с помощью списка линий из осей, который берется из plt.subplots().
  • Затем создается график анимации, который принимает список очередей с помощью anim = FuncAnimation(fig, update_plot, Fargs=(queues,...)
В get_live_prices():
  • Примите очередь как на входе, и соединение будет создано с помощью try: внутри цикл while, поэтому в случае разрыва соединения ответ об ошибке сохранит цикл и создаст новое соединение.
  • Создайте процесс threading.Thread() которая вызывает функцию keep_connection_alive, поскольку соединение закроется (длинная история о том, почему, но единственный способ, которым соединение не закрывается для некоторых тикеров, - это то, что я должен постоянно пинговать его).
  • вызовите функцию get_websocket_responses(), которая будет работать в цикле и получать ответы от соединения вечно
В get_websocket_responses():
  • Постоянный цикл сбора данных о цене и временной метки из ответа веб-сокета до тех пор, пока не будет плохого ответа или не будет закрыто соединение. Последнее вызовет исключение в get_live_prices(), которое приведет к выходу из функции.
  • Цена, временная метка и название акции помещаются в канал с помощью очередь.put() (queue.get() находится в функции update_plot().
В update_plot():
  • update_plot() принимает список очередей в качестве входных данных, и эта функция вызывается функцией FuncAnimation()
  • for цикл по списку очередей, подграфики (axs) и строки.
  • Сразу проверьте, не пуста ли очередь. Если очередь пуста, эта очередь (следовательно, этот запас) пропускается.
  • Если очередь не пуста, получите ответ API из канала с очередью.() и нарисуйте обновленный график с помощью line.set_data().
  • Для списка установите флаг Did_we_update = True< /code>
    (После приведенного выше цикла for)
  • После цикла for выполните цикл Did_we_update и обновите время последнего ответа (то, что мы см. на графике).
  • Верните строки и таймеры, которые

import http.client
import json
from websocket import create_connection
from websocket._exceptions import WebSocketConnectionClosedException
from datetime import datetime, timedelta,timezone
import credentials
import multiprocessing
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
import numpy as np
import pytz
import time
from matplotlib.ticker import MaxNLocator
import matplotlib.dates as mdates
import threading

def update_plot(frame, queues, lines, axs, timer_texts, prev_time):

did_we_update = [False] * len(queues)

"""
The for-loop below loops for each plot. At each loop, the multiprocess queue is checked using the queue.empty() method to see if there was a tick from the API.
If there is not a tick, the subplot will not be updated.

"""
for i, (queue, line, ax)in enumerate(zip(queues, lines, axs)):

if not queue.empty():

new_data, epic, data_dict, pair = queue.get()
old_xdata, old_ydata = line.get_data()

if new_data[1] == None:
print(f"ISSUE WITH NONE FOR EPIC: {epic}")
print(new_data[1])
continue
else:
new_price = new_data[1]
perc_diff = 100 * (new_data[1] - data_dict[epic]['price_4PM_NY']) / data_dict[epic]['price_4PM_NY']

new_xdata = np.append(old_xdata,new_data[0])
new_ydata = np.append(old_ydata,new_price)

ax.set_xlim(new_xdata[0], new_data[0] + timedelta(minutes=30))
city_tz= pytz.timezone('some_timezone')

ax.xaxis.set_major_formatter(mdates.DateFormatter('%d %b %H:%M', tz=city_tz))
ax.tick_params(axis='x', rotation=15, labelsize=8)
ax.xaxis.set_major_locator(MaxNLocator(6))

if min(new_ydata)

Подробнее здесь: https://stackoverflow.com/questions/784 ... ng-clogged
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Проблемы при использовании python queue.queue () с несколькими потоками
    Anonymous » » в форуме Python
    0 Ответы
    23 Просмотры
    Последнее сообщение Anonymous
  • Проблемы при использовании python queue.queue () с несколькими потоками
    Anonymous » » в форуме Python
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous
  • Python `multiprocessing.Queue` зависающий процесс
    Anonymous » » в форуме Python
    0 Ответы
    27 Просмотры
    Последнее сообщение Anonymous
  • Python Multiprocessing Queue Pool работает в функциональном коде, но не в ООП
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Невозможно передать multiprocessing.Manager для обработки при использовании forkserver: невозможно выбрать объект «weakr
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»