Состояние и таймеры не работают должным образом в Apache BeamPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Состояние и таймеры не работают должным образом в Apache Beam

Сообщение Anonymous »

У меня есть этот фрагмент кода.

Код: Выделить всё

class UserAccessInfo(beam.DoFn):
# State to store the access data for a user during the day
ACCESS_STATE = BagStateSpec('access_state', StrUtf8Coder())
CLEAR_TIMER = TimerSpec('clear', TimeDomain.REAL_TIME)

def process(self, element, timestamp=beam.DoFn.TimestampParam, access_state=beam.DoFn.StateParam(ACCESS_STATE), clear_timer=beam.DoFn.TimerParam(CLEAR_TIMER)):
user_key, log = element
print('Current log: ', log)
user_id = log['actor']['user']['uuid']
location_id = log['location']['uuid']
access_date = log['date']
event_time = datetime.fromisoformat(log['time'][:-1])  # Parsing time

# Retrieve the current access state
current_state = list(access_state.read())
if current_state:
# We have existing state, load entry and exit info
entry_info = json.loads(current_state[0])
exit_info = json.loads(current_state[-1])
else:
# No previous data, initialize state
entry_info = None
exit_info = None

# Update entry and exit based on the event time
if entry_info is None or event_time < entry_info['event_time']:
# Update entry time and door if it's earlier
entry_info = {
'user_uuid': user_id,
'location_uuid': location_id,
'access_date': access_date,
'event_time': log['time'],
'door': log['door']['name'],
'type': 'entry'
}

if exit_info is None or event_time > exit_info['event_time']:
# Update exit time and door if it's later
exit_info = {
'user_uuid': user_id,
'location_uuid': location_id,
'access_date': access_date,
'event_time': log['time'],
'door': log['door']['name'],
'type': 'exit'
}

print('\nEntry info: ', entry_info)
print('\nExit info: ', exit_info)
# Store the updated state (entry first, exit last)
access_state.clear()
access_state.add(json.dumps(entry_info))
access_state.add(json.dumps(exit_info))

print('Current state: ', list(access_state.read()))

# Set the timer to fire at the end of the day (e.g., midnight)
try:
current_time = timestamp.to_utc_datetime()
except OverflowError:
current_time = datetime.utcnow().replace(tzinfo=pytz.UTC)  # Ensure it's timezone-aware

clear_time = current_time + timedelta(seconds=60)

print('\nClear time: ', clear_time)
# Set the timer for 60 seconds from the current processing time
clear_timer.set(clear_time)

# Yield current access info to give real-time results
yield {
'user_uuid': entry_info['user_uuid'],
'location_uuid': entry_info['location_uuid'],
'access_date': entry_info['access_date'],
'entry_time': entry_info['event_time'],
'entry_door': entry_info['door'],
'exit_time': exit_info['event_time'],
'exit_door': exit_info['door']
}

@on_timer(CLEAR_TIMER)
def clear_state(self, access_state=beam.DoFn.StateParam(ACCESS_STATE)):
# Clear the state at the end of the day

print('Clearing state...')
access_state.clear()
print('\nState cleared!\n')

Как вы можете видеть, я установил таймер на очистку состояния через 60 секунд, и код не выдает никаких ошибок, но через 60 секунд он должен запустить функциюclear_state, но это не делает этого, из-за чего мое состояние не очищается.
Можете ли вы помочь мне, как очистить состояние?

Подробнее здесь: https://stackoverflow.com/questions/790 ... pache-beam
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Состояние и таймеры не работают должным образом в Apache Beam
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Не активировать таймеры очистки состояния в Apache Beam
    Anonymous » » в форуме Python
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous
  • Общее состояние Apache Beam Parallel
    Anonymous » » в форуме Apache
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous
  • Таймеры C# истекают в отдельном потоке?
    Anonymous » » в форуме C#
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Узел: тестируйте таймеры промывки при использовании t.mock.timers.tick?
    Anonymous » » в форуме Javascript
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»