Проблема нехватки памяти при запуске конвейера потоковой передачи лучей с использованием потока данных gcp с Kafka в качPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Проблема нехватки памяти при запуске конвейера потоковой передачи лучей с использованием потока данных gcp с Kafka в кач

Сообщение Anonymous »

Я пытаюсь настроить конвейер потоковой передачи данных в Python, где источником является kafka, а приемником — таблица postgres (см. код моего конвейера ниже). Тема Kafka имеет несколько разделов с несколькими брокерами. Когда я запускаю этот конвейер, я наблюдаю, что объем рабочей памяти постоянно увеличивается.
Я работаю с двумя рабочими процессами, каждый из которых использует машину n2d-standard-64. Приток сообщений от Kafka довольно высок. Я предполагаю, что сообщения не обрабатываются конвейером с такой скоростью. Из-за этого многие сообщения (необработанные сообщения) остаются в памяти дольше, а использование рабочей памяти продолжает увеличиваться, что в конечном итоге приводит к ошибке OOM.
Я попробовал несколько разных вещей, чтобы контролировать использование памяти:< /p>
  • ограничить количество процессов SDK до 1
  • ограничить количество потоков жгута до 1
  • увеличить размер вычислений (использовать машины Highmem)
  • ограничить приток kafka, настроив потребительские параметры kafka (max.poll.records, fetch.max.bytes, max.partition.fetch.bytes , fetch.min.bytes, poll.timeout.ms)
Ни одно из вышеперечисленных решений не помогло контролировать рост рабочей памяти. Может ли кто-нибудь предложить решение для управления притоком Kafka в нижележащем конвейере потока данных?
А также можно ли выполнять смещения фиксации вручную после записи сообщения в Postgres в конвейере лучей?

Код: Выделить всё

with beam.Pipeline(options=options) as pipeline:

kafka_config = {
'bootstrap.servers': ','.join(KAFKA_BOOTSTRAP_SERVERS),
'group.id': 'my_group_id',
'auto.offset.reset': 'earliest',
'enable.auto.commit': 'true',
}

_ = (
pipeline
| 'Kafka Read'
>> ReadFromKafka(
consumer_config=kafka_config,
topics=[KAFKA_TOPIC],
)
| 'filter' >> beam.Map(apply_filter)
| 'Write'
>> beam.ParDo(
PostgresWriter())
)
Изображение


Подробнее здесь: https://stackoverflow.com/questions/789 ... ataflow-wi
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»