class UserAccessInfo(beam.DoFn):
# State to store the access data for a user during the day
ACCESS_STATE = BagStateSpec('access_state', StrUtf8Coder())
CLEAR_TIMER = TimerSpec('clear', TimeDomain.REAL_TIME)
def process(self, element, timestamp=beam.DoFn.TimestampParam, access_state=beam.DoFn.StateParam(ACCESS_STATE), clear_timer=beam.DoFn.TimerParam(CLEAR_TIMER)):
user_key, log = element
print('Current log: ', log)
user_id = log['actor']['user']['uuid']
location_id = log['location']['uuid']
access_date = log['date']
event_time = datetime.fromisoformat(log['time'][:-1]) # Parsing time
# Retrieve the current access state
current_state = list(access_state.read())
if current_state:
# We have existing state, load entry and exit info
entry_info = json.loads(current_state[0])
exit_info = json.loads(current_state[-1])
else:
# No previous data, initialize state
entry_info = None
exit_info = None
# Update entry and exit based on the event time
if entry_info is None or event_time < entry_info['event_time']:
# Update entry time and door if it's earlier
entry_info = {
'user_uuid': user_id,
'location_uuid': location_id,
'access_date': access_date,
'event_time': log['time'],
'door': log['door']['name'],
'type': 'entry'
}
if exit_info is None or event_time > exit_info['event_time']:
# Update exit time and door if it's later
exit_info = {
'user_uuid': user_id,
'location_uuid': location_id,
'access_date': access_date,
'event_time': log['time'],
'door': log['door']['name'],
'type': 'exit'
}
print('\nEntry info: ', entry_info)
print('\nExit info: ', exit_info)
# Store the updated state (entry first, exit last)
access_state.clear()
access_state.add(json.dumps(entry_info))
access_state.add(json.dumps(exit_info))
print('Current state: ', list(access_state.read()))
# Set the timer to fire at the end of the day (e.g., midnight)
try:
current_time = timestamp.to_utc_datetime()
except OverflowError:
current_time = datetime.utcnow().replace(tzinfo=pytz.UTC) # Ensure it's timezone-aware
clear_time = current_time + timedelta(seconds=60)
print('\nClear time: ', clear_time)
# Set the timer for 60 seconds from the current processing time
clear_timer.set(clear_time)
# Yield current access info to give real-time results
yield {
'user_uuid': entry_info['user_uuid'],
'location_uuid': entry_info['location_uuid'],
'access_date': entry_info['access_date'],
'entry_time': entry_info['event_time'],
'entry_door': entry_info['door'],
'exit_time': exit_info['event_time'],
'exit_door': exit_info['door']
}
@on_timer(CLEAR_TIMER)
def clear_state(self, access_state=beam.DoFn.StateParam(ACCESS_STATE)):
# Clear the state at the end of the day
print('Clearing state...')
access_state.clear()
print('\nState cleared!\n')
Как вы можете видеть, я установил таймер на очистку состояния через 60 секунд, и код не выдает никаких ошибок, но через 60 секунд он должен запустить функциюclear_state, но это не делает этого, из-за чего мое состояние не очищается.
Можете ли вы помочь мне, как очистить состояние?
У меня есть этот фрагмент кода. [code]class UserAccessInfo(beam.DoFn): # State to store the access data for a user during the day ACCESS_STATE = BagStateSpec('access_state', StrUtf8Coder()) CLEAR_TIMER = TimerSpec('clear', TimeDomain.REAL_TIME)
# Retrieve the current access state current_state = list(access_state.read()) if current_state: # We have existing state, load entry and exit info entry_info = json.loads(current_state[0]) exit_info = json.loads(current_state[-1]) else: # No previous data, initialize state entry_info = None exit_info = None
# Update entry and exit based on the event time if entry_info is None or event_time < entry_info['event_time']: # Update entry time and door if it's earlier entry_info = { 'user_uuid': user_id, 'location_uuid': location_id, 'access_date': access_date, 'event_time': log['time'], 'door': log['door']['name'], 'type': 'entry' }
if exit_info is None or event_time > exit_info['event_time']: # Update exit time and door if it's later exit_info = { 'user_uuid': user_id, 'location_uuid': location_id, 'access_date': access_date, 'event_time': log['time'], 'door': log['door']['name'], 'type': 'exit' }
print('\nEntry info: ', entry_info) print('\nExit info: ', exit_info) # Store the updated state (entry first, exit last) access_state.clear() access_state.add(json.dumps(entry_info)) access_state.add(json.dumps(exit_info))
# Set the timer to fire at the end of the day (e.g., midnight) try: current_time = timestamp.to_utc_datetime() except OverflowError: current_time = datetime.utcnow().replace(tzinfo=pytz.UTC) # Ensure it's timezone-aware
clear_time = current_time + timedelta(seconds=60)
print('\nClear time: ', clear_time) # Set the timer for 60 seconds from the current processing time clear_timer.set(clear_time)
# Yield current access info to give real-time results yield { 'user_uuid': entry_info['user_uuid'], 'location_uuid': entry_info['location_uuid'], 'access_date': entry_info['access_date'], 'entry_time': entry_info['event_time'], 'entry_door': entry_info['door'], 'exit_time': exit_info['event_time'], 'exit_door': exit_info['door'] }
@on_timer(CLEAR_TIMER) def clear_state(self, access_state=beam.DoFn.StateParam(ACCESS_STATE)): # Clear the state at the end of the day
[/code] Как вы можете видеть, я установил таймер на очистку состояния через 60 секунд, и код не выдает никаких ошибок, но через 60 секунд он должен запустить функциюclear_state, но это не делает этого, из-за чего мое состояние не очищается. Можете ли вы помочь мне, как очистить состояние?
У меня есть этот фрагмент кода.
class UserAccessInfo(beam.DoFn):
# State to store the access data for a user during the day
ACCESS_STATE = BagStateSpec('access_state', StrUtf8Coder())
CLEAR_TIMER = TimerSpec('clear', TimeDomain.REAL_TIME)
Я публикую здесь полный код, так как пробую его уже несколько дней, но результатов пока нет.
Это популярный код, который объясняется командой Apache Beam и доступно на YT.
Вот ссылка: OpannyigCFg
Я изменил код для чтения данных из SQS
Истекает ли System.Timers.Timer в отдельном потоке, отличном от потока, который его создал?
Предположим, у меня есть класс с таймером, который срабатывает каждый strong>5 секунд. Когда таймер срабатывает, в методе elapsed изменяется некоторый...
У меня есть следующая простая функция:
export async function useTimeout() {
const sleep = () => new Promise((resolve) => setTimeout(resolve, 1000));
let c = 1;
while (true) {
c += 1;
if (c > 5) {
break;
}
// THIS LINE BREAKS THE TEST
// await new...