Серверные события (SSE) с использованием python httpx-ssePython

Программы на Python
Anonymous
 Серверные события (SSE) с использованием python httpx-sse

Сообщение Anonymous »

Недавно я переместил свой клиент API из запросов на httpx . Между тем, бэкэнд добавил поддержку SSE, которой я хотел бы воспользоваться. >)-это пакет httpx-sse . Тем не менее, я не могу заставить его работать. /> [*]

Код: Выделить всё

base_url
- как я заполните для этого поста, я буду использовать https://example.com/v2apiобразно[code]/data/sse[/code] - Путь, в котором можно получить SSE из

[*] - словарь в форме

Код: Выделить всё

{
'Content-type' : 'application/json',
'api-key' : 'test1234'
}
где Api-key требуется для любой операции отдыха
- словарь в форме

Код: Выделить всё

{
'layerList' : 'test1,test2'
}
, где LayerList представляет собой строку запятых идентификаторов структур данных, которые я хотел бы «исследовать» для SSE (в моем случае это уровни карты).
< /li>
< /ul>
Я использую следующий код для проверки на наличие SSE. Из -за тайм -аута с SSE мне пришлось установить тайм -аут клиента на нет , чтобы он работал на неопределенный срок. Это уже указывало на проблему ... < /p>

Код: Выделить всё

client = httpx.AsyncClient(
base_url='https://example.com/v2api',
headers={
'x-api-key' : 'test1234',
'Content-type' : 'application/json'
},
timeout=None)

try:
async with httpx_sse.aconnect_sse(client, 'GET', '/data/sse') as event_source:
events = [sse async for sse in event_source.aiter_sse()]
print(events)
except httpx.ReadTimeout as hrt:
print(hrt)
except Exception as ex:
print(ex)
Примечание: Клиент работает с другими конечными точками того же API, которые не являются SSE. Получите любой вывод в консоли. Я также попытался создать нового клиента в операторе с и вместо установки base_url ( автоматически объединяет любой URL с помощью base_url экземпляра клиента) я просто использовал полный (в случае, если httpx-sse работает по-другому) вместе с заголовками и параметры снова установлены в этом самой операторе. Результат был таким же.

Код: Выделить всё

from sseclient import SSEClient

events = SSEClient('https://example.com/v2api/data/sse', headers={
'Content-type' : 'application/json',
'x-api-key' : config['credentials']['api-key']
},
params={'layerList':'S1-test1-78bbda5d-929c-497e-b212-86a33dfbc8ff'})
for event in events:
print(event)
< /code>

 обновление: < /strong> < /p>
Достаточно интересно, он работает с несинхрочным клиентом от httpx-sse 
.

Подробнее здесь: https://stackoverflow.com/questions/783 ... -httpx-sse

Вернуться в «Python»