Я пытаюсь понять жизненный цикл DoFn более подробно.
Я добавил этот счетчик в
init моего DoFn:
Код: Выделить всё
> self.counter = Metrics.counter(self.__class__, 'counts')
И соответственно увеличивать внутри соответствующего процесса()
Для простоты цель состоит в том, чтобы подсчитать общее количество элементов, прочитанных по всему потоку, т. е. сохранить метрику во всех вызовах DoFn. Я вижу, что при увеличении водяного знака/времени обработки значение счетчика сбрасывается. Журнал для справки:
Код: Выделить всё
> pardo start
> bundle started
> incremented counter.
> bundle started
> incremented counter.
> counter value: 1
Я ожидаю, что этот счетчик будет равен 2. Я могу увидеть правильное увеличение счетчика, если отправлю поток из нескольких элементов без искусственного разрыва во времени, увеличиваясь, как ожидалось. Почему происходит сброс при продвижении водяного знака? Разве метрики не должны использоваться для агрегированной статистики?
Поток отправляется через TestStream, при этом один набор элементов отправляется со значением 0, продвигается водяной знак и отправляется другой набор.
Спасибо и дайте мне знать, если я смогу где-нибудь уточнить.
Подробнее здесь:
https://stackoverflow.com/questions/793 ... ss-bundles