Я использую официальный коннектор API binance. Он состоял в основном из двух частей: 1. Подписка на WS для получения данных. 2 REST для отправки запросов. версия «все в синхронизации»
Сначала я отправляю два заказа, используя мою функцию модификации async для отправки двух заказов одновременно. Тогда я получу два order_updates. Внутри каждого из них я хочу запустить новый заказ (новый заказ имеет новое обновление, которое запускает другой заказ, снова и снова...).
Но поскольку мои первые два заказа были отправлены одновременно с использованием async , то я получу два обновления примерно одновременно. Но отправка заказа обычно занимает 10мс. Я хочу отправить заказ мгновенно, без блокировки. Например, первое обновление запускает заказ, через 1 мс было получено другое обновление (но первый заказ не был завершен). Я хочу отправить еще один заказ мгновенно, не дожидаясь завершения первого.
Ниже приведен мой код, но он не сработал. Первые два заказа были отправлены успешно, но когда обновление запускает другой заказ, новый заказ, похоже, никогда не отправляется (застревает в конечной точке отметки).
class Strategy(Utils):
def __init__(self, async_client):
super().__init__()
self.async_client = async_client
self.order_queue = asyncio.Queue()
self.loop = asyncio.get_event_loop()
def order_trigger_sync(self, msg):
"""Callback function for WebSocket to enqueue updates."""
try:
if self.loop.is_closed():
print("Event loop is closed. Cannot enqueue message.")
return
self.order_queue.put_nowait(msg)
except Exception as e:
print(f"Error enqueuing message: {e}")
async def order_worker(self, worker_id):
"""Asynchronous worker to process orders from the queue."""
while True:
msg = await self.order_queue.get()
print(f'dd{msg}')
try:
if "e" in msg and msg['e'] == 'ORDER_TRADE_UPDATE':
to_send = {}
for token in self.tokens:
mid = self.book_series[token].get_current_mid()
to_send[token] = self.gen_signal(token, mid)
orders = self.to_orders(to_send)
print(f"Worker {worker_id}: Sending orders: {orders}")
await self.async_client.process_requests(orders)
except Exception as e:
logging.error(f"Worker {worker_id}: Error processing order: {e}")
finally:
self.order_queue.task_done()
async def start_workers(self, num_workers=3):
"""Start multiple workers to process the order queue."""
for i in range(num_workers):
asyncio.create_task(self.order_worker(worker_id=i))
async def start(self):
await self.start_workers()
await asyncio.sleep(10)
to_send = dict()
for token in self.tokens:
mid = self.book_series[token].get_current_mid()
to_send[token] = self.gen_signal(token, mid)
orders = self.to_orders(to_send)
await self.async_client.process_requests(orders)
async def main():
async with AsyncClient(show_header=True) as client:
strategy = Strategy(async_client=client)
# Start WebSocket connection and register callbacks
connect = Connection(
book_callback=strategy.book_callback, # Depth book updates
trade_callback=strategy.order_trigger_sync, # Triggered on order updates
)
await strategy.start()
if __name__ == "__main__":
asyncio.run(main())
Моя функцияprocess_requests выглядит следующим образом:
async def process_requests(self, reqs):
tasks = []
for action, content in reqs.items():
if len(content) != 0:
for _ in content:
tasks.append(self.action_mapper[action](**_))
if len(tasks) != 0:
print('mark')
done, pending = await asyncio.wait(tasks)```
Подробнее здесь: https://stackoverflow.com/questions/793 ... s-callback
Python использует асинхронную функцию в обратном вызове WS ⇐ Python
Программы на Python
-
Anonymous
1736918727
Anonymous
Я использую официальный коннектор API binance. Он состоял в основном из двух частей: 1. Подписка на WS для получения данных. 2 REST для отправки запросов. версия «все в синхронизации»
Сначала я отправляю два заказа, используя мою функцию модификации async для отправки двух заказов одновременно. Тогда я получу два order_updates. Внутри каждого из них я хочу запустить новый заказ (новый заказ имеет новое обновление, которое запускает другой заказ, снова и снова...).
Но поскольку мои первые два заказа были отправлены одновременно с использованием async , то я получу два обновления примерно одновременно. Но отправка заказа обычно занимает 10мс. Я хочу отправить заказ мгновенно, без блокировки. Например, первое обновление запускает заказ, через 1 мс было получено другое обновление (но первый заказ не был завершен). Я хочу отправить еще один заказ мгновенно, не дожидаясь завершения первого.
Ниже приведен мой код, но он не сработал. Первые два заказа были отправлены успешно, но когда обновление запускает другой заказ, новый заказ, похоже, никогда не отправляется (застревает в конечной точке отметки).
class Strategy(Utils):
def __init__(self, async_client):
super().__init__()
self.async_client = async_client
self.order_queue = asyncio.Queue()
self.loop = asyncio.get_event_loop()
def order_trigger_sync(self, msg):
"""Callback function for WebSocket to enqueue updates."""
try:
if self.loop.is_closed():
print("Event loop is closed. Cannot enqueue message.")
return
self.order_queue.put_nowait(msg)
except Exception as e:
print(f"Error enqueuing message: {e}")
async def order_worker(self, worker_id):
"""Asynchronous worker to process orders from the queue."""
while True:
msg = await self.order_queue.get()
print(f'dd{msg}')
try:
if "e" in msg and msg['e'] == 'ORDER_TRADE_UPDATE':
to_send = {}
for token in self.tokens:
mid = self.book_series[token].get_current_mid()
to_send[token] = self.gen_signal(token, mid)
orders = self.to_orders(to_send)
print(f"Worker {worker_id}: Sending orders: {orders}")
await self.async_client.process_requests(orders)
except Exception as e:
logging.error(f"Worker {worker_id}: Error processing order: {e}")
finally:
self.order_queue.task_done()
async def start_workers(self, num_workers=3):
"""Start multiple workers to process the order queue."""
for i in range(num_workers):
asyncio.create_task(self.order_worker(worker_id=i))
async def start(self):
await self.start_workers()
await asyncio.sleep(10)
to_send = dict()
for token in self.tokens:
mid = self.book_series[token].get_current_mid()
to_send[token] = self.gen_signal(token, mid)
orders = self.to_orders(to_send)
await self.async_client.process_requests(orders)
async def main():
async with AsyncClient(show_header=True) as client:
strategy = Strategy(async_client=client)
# Start WebSocket connection and register callbacks
connect = Connection(
book_callback=strategy.book_callback, # Depth book updates
trade_callback=strategy.order_trigger_sync, # Triggered on order updates
)
await strategy.start()
if __name__ == "__main__":
asyncio.run(main())
Моя функцияprocess_requests выглядит следующим образом:
async def process_requests(self, reqs):
tasks = []
for action, content in reqs.items():
if len(content) != 0:
for _ in content:
tasks.append(self.action_mapper[action](**_))
if len(tasks) != 0:
print('mark')
done, pending = await asyncio.wait(tasks)```
Подробнее здесь: [url]https://stackoverflow.com/questions/79357143/python-using-a-async-function-in-a-ws-callback[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия