Получение списка событий для модуля при его запускеPython

Программы на Python
Ответить
Anonymous
 Получение списка событий для модуля при его запуске

Сообщение Anonymous »

У меня есть интерфейс TypeScript, который использует серверную часть Python 3.11 для запуска образов Docker в кластер Kubernetes (с использованием kubernetes 29.3.0). В рамках этого процесса он вызывает API, чтобы получить статус образа при его запуске, чтобы он мог перенаправить пользователя к своей службе после его запуска.
В настоящее время код имеет довольно простое решение, которое смотрит на статус развертывания:
  • Если развертывание не существует, статуса нет.
  • Если развертывание существует и имеет 1 доступная_реплика (это то, что мы указываем), она работает
  • В противном случае она ожидает
и пользовательский интерфейс по сути принимает этот «статус» и помещает пользовательский интерфейс для просмотра пользователем.
95% случаев изображения запускаются в течение нескольких секунд, и проблем нет
Что я хочу, чтобы в пользовательском интерфейсе были более подробные/тонкие сообщения, особенно когда все идет гладко. Например, очень редко изображение еще не находится на узле и его необходимо извлечь — это требует времени или, возможно, на выбранном узле закончились ресурсы для запуска этого образа. Было бы неплохо проинформировать пользователя о том, что происходит.
Мне нужен код для получения последнего события для модуля:

Код: Выделить всё

from kubernetes import watch
from kubernetes.client import CoreV1Api

def get_client:
pseudo_code: return client.CoreV1Api() with config

# return the _last_ event for a pod
def watch_pod_events(name, namespace):
v1 = get_client()
w = watch.Watch()
message = None
for event in w.stream(v1.list_namespaced_event, namespace=namespace):
involved_obj = event['object'].involved_object
if involved_obj.name == name:
message = event['object'].message
return message
Если я распечатаю сообщения в цикле, я получу фактические данные, но они не распространяются обратно в стек.
Я подозреваю, что это связано с тем, что w.stream() является итерируемым, а не реальным списком, но я не уверен.
Я также пробовал:

Код: Выделить всё

my_events = list(filter(lambda e: e['object'].involved_object.name == name, w.stream(v1.list_namespaced_event, namespace=namespace)))

Код: Выделить всё

my_events =  [e for e in w.stream(v1.list_namespaced_event, namespace=namespace) if e['object'].involved_object.name == name]

Код: Выделить всё

my_events = v1.list_namespaced_event(namespace=namespace, field_selector=f"involvedObject.name={name}").items
но, похоже, ничего не работает.

Подробнее: https://stackoverflow.com/questions/799 ... -it-starts
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»