Python использует асинхронную функцию в обратном вызове WSPython

Программы на Python
Ответить
Anonymous
 Python использует асинхронную функцию в обратном вызове WS

Сообщение Anonymous »

Я использую официальный коннектор API binance. Он состоял в основном из двух частей: 1. Подписка на WS для получения данных. 2 REST для отправки запросов. версия «все в синхронизации»
Сначала я отправляю два заказа, используя мою функцию модификации async для отправки двух заказов одновременно. Тогда я получу два order_updates. Внутри каждого из них я хочу запустить новый заказ (новый заказ имеет новое обновление, которое запускает другой заказ, снова и снова...).
Но поскольку мои первые два заказа были отправлены одновременно с использованием async , то я получу два обновления примерно одновременно. Но отправка заказа обычно занимает 10мс. Я хочу отправить заказ мгновенно, без блокировки. Например, первое обновление запускает заказ, через 1 мс было получено другое обновление (но первый заказ не был завершен). Я хочу отправить еще один заказ мгновенно, не дожидаясь завершения первого.
Ниже приведен мой код, но он не сработал. Первые два заказа были отправлены успешно, но когда обновление запускает другой заказ, новый заказ, похоже, никогда не отправляется (застревает в конечной точке отметки).

class Strategy(Utils):
def __init__(self, async_client):
super().__init__()
self.async_client = async_client

self.order_queue = asyncio.Queue()

self.loop = asyncio.get_event_loop()

def order_trigger_sync(self, msg):
"""Callback function for WebSocket to enqueue updates."""
try:
if self.loop.is_closed():
print("Event loop is closed. Cannot enqueue message.")
return
self.order_queue.put_nowait(msg)
except Exception as e:
print(f"Error enqueuing message: {e}")

async def order_worker(self, worker_id):
"""Asynchronous worker to process orders from the queue."""
while True:
msg = await self.order_queue.get()
print(f'dd{msg}')
try:
if "e" in msg and msg['e'] == 'ORDER_TRADE_UPDATE':
to_send = {}
for token in self.tokens:
mid = self.book_series[token].get_current_mid()
to_send[token] = self.gen_signal(token, mid)
orders = self.to_orders(to_send)
print(f"Worker {worker_id}: Sending orders: {orders}")
await self.async_client.process_requests(orders)
except Exception as e:
logging.error(f"Worker {worker_id}: Error processing order: {e}")
finally:
self.order_queue.task_done()

async def start_workers(self, num_workers=3):
"""Start multiple workers to process the order queue."""
for i in range(num_workers):
asyncio.create_task(self.order_worker(worker_id=i))

async def start(self):
await self.start_workers()
await asyncio.sleep(10)
to_send = dict()
for token in self.tokens:
mid = self.book_series[token].get_current_mid()
to_send[token] = self.gen_signal(token, mid)
orders = self.to_orders(to_send)

await self.async_client.process_requests(orders)

async def main():
async with AsyncClient(show_header=True) as client:
strategy = Strategy(async_client=client)

# Start WebSocket connection and register callbacks
connect = Connection(
book_callback=strategy.book_callback, # Depth book updates
trade_callback=strategy.order_trigger_sync, # Triggered on order updates
)
await strategy.start()

if __name__ == "__main__":
asyncio.run(main())


Моя функцияprocess_requests выглядит следующим образом:
async def process_requests(self, reqs):
tasks = []
for action, content in reqs.items():
if len(content) != 0:
for _ in content:
tasks.append(self.action_mapper[action](**_))
if len(tasks) != 0:
print('mark')
done, pending = await asyncio.wait(tasks)```


Подробнее здесь: https://stackoverflow.com/questions/793 ... s-callback
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»