Я работаю с двумя рабочими процессами, каждый из которых использует машину n2d-standard-64. Приток сообщений от Kafka довольно высок. Я предполагаю, что сообщения не обрабатываются конвейером с такой скоростью. Из-за этого многие сообщения (необработанные сообщения) остаются в памяти дольше, а использование рабочей памяти продолжает увеличиваться, что в конечном итоге приводит к ошибке OOM.
Я попробовал несколько разных вещей, чтобы контролировать использование памяти:< /p>
- ограничить количество процессов SDK до 1
- ограничить количество потоков жгута до 1
- увеличить размер вычислений (использовать машины Highmem)
- ограничить приток kafka, настроив потребительские параметры kafka (max.poll.records, fetch.max.bytes, max.partition.fetch.bytes , fetch.min.bytes, poll.timeout.ms)
А также можно ли выполнять смещения фиксации вручную после записи сообщения в Postgres в конвейере лучей?
Код: Выделить всё
with beam.Pipeline(options=options) as pipeline:
kafka_config = {
'bootstrap.servers': ','.join(KAFKA_BOOTSTRAP_SERVERS),
'group.id': 'my_group_id',
'auto.offset.reset': 'earliest',
'enable.auto.commit': 'true',
}
_ = (
pipeline
| 'Kafka Read'
>> ReadFromKafka(
consumer_config=kafka_config,
topics=[KAFKA_TOPIC],
)
| 'filter' >> beam.Map(apply_filter)
| 'Write'
>> beam.ParDo(
PostgresWriter())
)

Подробнее здесь: https://stackoverflow.com/questions/789 ... ataflow-wi