Чтение из публикации/подписки и запись в Firestore (собственный) с использованием потоковой передачи Beam Dataflow (pythPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Чтение из публикации/подписки и запись в Firestore (собственный) с использованием потоковой передачи Beam Dataflow (pyth

Сообщение Anonymous »


Я читаю из публикации/подписки, используя скользящие окна Dataflow, применяю некоторые преобразования перед созданием сущностей и записываю в собственный формат Firestore. Я вижу, что луч не поддерживает встроенную библиотеку ввода-вывода Firestore, поэтому я проверил некоторый собственный [код Python] [https://www.the-swamp.info/blog/uploadi ... -dataflow/] в для записи в Firestore. Вот основные важные шаги:

1. Создать скользящее окно публикации/подписки:

с beam.Pipeline(options=options) как p: # Чтение данных из Pub/Sub входные_данные = ( п | «Читать из Pub/Sub» >> луч.io.ReadFromPubSub(подписка=known_args.input_subscription) | «Декодировать JSON» >> Beam.Map(lambda x: json.loads(x)) ) оконные_данные = ( входные данные | «Применить окно» >> луч.WindowInto( луч.окно.SlidingWindows( размер=60*60*5, период=60*15 ) # скользящее окно на 5 часов, срабатывает каждые 15 минут ) ) 2- создать объекты:

def create_content(element, window=DoFn.WindowParam): logging.info(f" элемент: {element}") Product_id = элемент [0] window_start = window.start.to_utc_datetime() window_end = window.end.to_utc_datetime() # установить срок действия на один час expiry_timestamp = datetime.utcnow() + timedelta(часы=5) # создать сущность сущность = { "expiry_timestamp": expiry_timestamp, «product_id»: Product_id, «список_длины»: len(элемент[1]), "window_start": window_start, "window_end": window_end, } вернуть объект 3- Запись в класс Firestore:

класс FirestoreUpdateDoFn(beam.DoFn): MAX_DOCUMENTS = 200 def __init__(я, проект, коллекция): self._project = проект self._collection = коллекция защита start_bundle (сам): из google.cloud импортировать Firestore self._mutations = [] self.db = firestore.Client(проект=self._project) self.batch = self.db.batch() защита Finish_bundle (сам): если self._mutations: self._flush_batch() logging.info("фиксация пакета элементов на Finish_bundle") logging.info("закрытие базы данных") self.db.close() процесс def(self, element, *args, **kwargs): self._mutations.append(элемент) logging.info(f"прочитанный элемент — {element}") если len(self._mutations) > self.MAX_DOCUMENTS: self._flush_batch() защита _flush_batch(self): logging.info(f"длина пакета равна {len(self._mutations)}") для мутации в self._mutations: logging.info(f"мутация — это {мутация}") Product_id = мутация["product_id"] ключ = hashlib.sha1( json.dumps(product_id, sort_keys=True).encode("utf-8") ).hexdigest() ref = self.db.collection(self._collection).document(ключ) self.batch.set(ссылка, мутация) logging.info("собираюсь зафиксировать пакет") г = self.batch.commit() logging.info("фиксация пакета элементов") журналирование.debug(r) self._mutations = [] 4- запустить все это вместе:

windowed_data | «Создание объектов» >> луч.Карта( create_content ) | «Записать в Firestore» >> луч.ParDo( FirestoreUpdateDoFn (project_id, Collection_id) ) Я использовал пакеты FirestoreUpdateDoFn для пакетной вставки в Firestore, что, как я ожидал, улучшит производительность за счет сокращения переключения между потоком данных и хранилищем данных. Однако я заметил, что запись в Firestore по-прежнему очень медленная. Кроме того, примерно через несколько минут после запуска задания поток данных зависает, и данные перестают записываться в базу данных. Более того, отставание публикации/подписки увеличивается бесконечно, что означает, что сообщения не подтверждаются (если я не ошибаюсь). Я также заметил, что пакеты очень маленькие, что в основном приводит к небольшим партиям (в основном размер равен 1, см. снимок экрана).

В настоящее время я застрял, так как задание ненадежно (оно становится очень медленным и перестает писать), и я не уверен, есть ли способ передавать события в Firestore с помощью луча Python. какие-нибудь подсказки?


Изображение

Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»