Код: Выделить всё
base_url
[*]
Код: Выделить всё
headers
Код: Выделить всё
{
'Content-type' : 'application/json',
'api-key' : 'test1234'
}
Код: Выделить всё
params
Код: Выделить всё
{
'layerList' : 'test1,test2'
}
< /li>
< /ul>
Я использую следующий код для проверки на наличие SSE. Из -за тайм -аута с SSE мне пришлось установить тайм -аут клиента на нет , чтобы он работал на неопределенный срок. Это уже указывало на проблему ... < /p>
Код: Выделить всё
client = httpx.AsyncClient(
base_url='https://example.com/v2api',
headers={
'x-api-key' : 'test1234',
'Content-type' : 'application/json'
},
timeout=None)
try:
async with httpx_sse.aconnect_sse(client, 'GET', '/data/sse') as event_source:
events = [sse async for sse in event_source.aiter_sse()]
print(events)
except httpx.ReadTimeout as hrt:
print(hrt)
except Exception as ex:
print(ex)
Код: Выделить всё
httpx
Код: Выделить всё
from sseclient import SSEClient
events = SSEClient('https://example.com/v2api/data/sse', headers={
'Content-type' : 'application/json',
'x-api-key' : config['credentials']['api-key']
},
params={'layerList':'S1-test1-78bbda5d-929c-497e-b212-86a33dfbc8ff'})
for event in events:
print(event)
< /code>
обновление: < /strong> < /p>
Достаточно интересно, он работает с несинхрочным клиентом от httpx-sse
Подробнее здесь: https://stackoverflow.com/questions/783 ... -httpx-sse