Не активировать таймеры очистки состояния в Apache BeamPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Не активировать таймеры очистки состояния в Apache Beam

Сообщение Anonymous »

Я публикую здесь полный код, так как пробую его уже несколько дней, но результатов пока нет.
Это популярный код, который объясняется командой Apache Beam и доступно на YT.
Вот ссылка:
Я изменил код для чтения данных из SQS

Код: Выделить всё

import enum
import json
from typing import Optional, Iterable, List, NamedTuple

import apache_beam as beam
from apache_beam import PCollection, TimeDomain
from apache_beam import coders
from apache_beam.io.fileio import WriteToFiles
from apache_beam.io.textio import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, BagStateSpec, TimerSpec, \
CombiningValueStateSpec, on_timer
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import Timestamp, Duration
from dateutil import parser
from apache_beam.io.textio import WriteToText
import boto3
from apache_beam import DoFn, ParDo, Map
import time
from datetime import datetime

SQS_QUEUE_URL = 'my_sqs_string'

class ReadFromSQS(DoFn):

def __init__(self, queue_url):
self.queue_url = queue_url

def process(self, element):

# Initialize AWS session with credentials
self.session = boto3.Session(
aws_access_key_id='access key',
aws_secret_access_key='secret',
region_name='us-east-1'
)
self.sqs_client = self.session.client('sqs')
# sqs_client = boto3.client('sqs', region_name='us-east-1')  # Initialize the SQS client here
"""Continuously polls the SQS queue and yields messages."""
while True:
response = self.sqs_client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20  # Long polling
)

messages = response.get('Messages', [])

if messages:
for message in messages:
# print('$$$$$$$$$$$$$$$ : ', message)
body = json.loads(message['Body'])
yield body  # Yield each message body

# Delete the message from the queue after processing
self.sqs_client.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
else:
print("No messages received, waiting...")
time.sleep(5)  # Sleep to avoid rapid polling if no messages are available

# ---------------------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------------------

# Great intro to Beam schemas in python: [youtube]zx4p-UNSmrA[/youtube]

# First we create a class that inherits from NamedTuple, this is our Schema
#
# To actually create an instance of TaxiPoint you can leverage dictionary unpacking
# Let's say you have a dictionary d = {"ride_id": asdf, ...,"passenger_count": 8}
# This dictionary's keys match the fields of TaxiPoint.
# In this case, you use dictionary unpacking '**' to make class construction easy.
# Dictionary unpacking is when passing a dictionary to a function,
# the key-value pairs can be unpacked into keyword arguments in a function call
# where the dictionary keys match the parameter names of the function.
# So the call to the constructor looks like ==> TaxiPoint(**d)
class TaxiPoint(NamedTuple):
ride_id: str
point_idx: int
latitude: float
longitude: float
timestamp: str
meter_reading: float
meter_increment: float
ride_status: str
passenger_count: int

class SessionReason(enum.Enum):
DROPOFF_SEEN = 1
GARBAGE_COLLECTION = 2

class TaxiSession(NamedTuple):
ride_id: str
duration: float
start_timestamp: str
end_timestamp: str
n_points: int
start_status: str
end_status: str
session_reason: SessionReason

# Second we let Beam know about our Schema by registering it
beam.coders.registry.register_coder(TaxiPoint, beam.coders.RowCoder)
beam.coders.registry.register_coder(TaxiSession, beam.coders.RowCoder)

# ---------------------------------------------------------------------------------------
# Parsing functions
# ---------------------------------------------------------------------------------------

def json_to_taxi_point(s: str) -> TaxiPoint:
d: dict = json.loads(s)
return TaxiPoint(**d)

def add_timestamp(p: TaxiPoint) -> TaxiPoint:
ts: float = parser.parse(p.timestamp).timestamp()
print('$$$$$$$$$$$$$$$$$ ', p.timestamp, ts, TimestampedValue(p, ts))
return TimestampedValue(p, ts)

def max_timestamp_combine(input_iterable):
print(f"Input iterable: {input_iterable}")
print(max(input_iterable, default=0))
return max(input_iterable, default=0)

# --------------------------------------------------------------------------------------
# TASK: Write a DoFn to find sessions using state & timers
# --------------------------------------------------------------------------------------

class FindSessions(beam.DoFn):
"""This DoFn applies state & timers to try to infer the session data from the received
points."""

# The state for the key
KEY_STATE = ReadModifyWriteStateSpec('state', coders.StrUtf8Coder())

# Elements bag of taxi ride events
TAXI_RIDE_EVENTS_BAG = BagStateSpec('taxi_ride_events_bag',
coders.registry.get_coder(TaxiPoint))

# Event time timer for Garbage Collection
GC_TIMER = TimerSpec('gc_timer', TimeDomain.WATERMARK)

# The maximum element timestamp seen so far.
MAX_TIMESTAMP = CombiningValueStateSpec('max_timestamp_seen', max_timestamp_combine)

def process(self, element: tuple[str, TaxiPoint],
element_timestamp=beam.DoFn.TimestampParam,
key_state=beam.DoFn.StateParam(KEY_STATE),
taxi_ride_events_bag=beam.DoFn.StateParam(TAXI_RIDE_EVENTS_BAG),
max_timestamp_seen=beam.DoFn.StateParam(MAX_TIMESTAMP),
gc_timer=beam.DoFn.TimerParam(GC_TIMER)
# timer = beam.DoFn.TimerParam(GC_TIMER)
) ->  Iterable[TaxiSession]:
# TODO
# Update the state for every new message
key, point = element
# print(f"Processing element {point}")
if (key_state.read() is None) or (key_state.read() != key):
key_state.write(key)

taxi_ride_events_bag.add(point)
max_timestamp_seen.add(element_timestamp.micros)
print(f"Max timestamp seen: {max_timestamp_seen.read()}")
print(f"element_timestamp: {element_timestamp}")
# Check if end of session is seen, and emit the session data
if point.ride_status == 'dropoff':

print(f"Dropoff seen")
#Session has ended
session = self._calculate_session(key, taxi_ride_events_bag.read(), SessionReason.DROPOFF_SEEN)
yield session
# print(f"Session ended: {session}")
taxi_ride_events_bag.clear()
key_state.clear()
max_timestamp_seen.clear()
gc_timer.clear()
else:

print(f"Dropoff not seen")

# gc_timer.set(expiration_ts)
# Set the timer to expire 2 minutes after the current event timestamp
expiration_seconds = element_timestamp.micros / 1_000_000 + 120  # Adding 120 seconds (2 minutes)
expiration_ts = Timestamp(seconds=int(expiration_seconds))
print(f"Setting timer for {expiration_ts}, expiration_seconds: {expiration_seconds}")
# gc_timer.set(expiration_ts)
print('********** ', Timestamp.now() + Duration(seconds=60))
gc_timer.set(Timestamp.now() + Duration(seconds=100))

@on_timer(GC_TIMER)
def expiry_callback(
self,
key_state=beam.DoFn.StateParam(KEY_STATE),
taxi_ride_events_bag=beam.DoFn.StateParam(TAXI_RIDE_EVENTS_BAG),
max_timestamp_seen=beam.DoFn.StateParam(MAX_TIMESTAMP)) -> Iterable[TaxiSession]:

# The timer has been triggered, you need to emit a session with whatever data has been
# accumulated so far

print(f"----------------------------Timer triggered-----------------------------")
session = self._calculate_session(key_state.read(), taxi_ride_events_bag.read(), SessionReason.GARBAGE_COLLECTION)
yield session
taxi_ride_events_bag.clear()
key_state.clear()
max_timestamp_seen.clear()

@staticmethod
def _calculate_session(key: str,
taxi_ride_events_bag: Iterable[TaxiPoint],
session_reason: SessionReason) -> TaxiSession:
# TODO
# Since we emit sessions from two different methods, let's use the same function to emit
# sessions, so we do it with consistency.

points = list(taxi_ride_events_bag)
points.sort(key=lambda x: x.timestamp)

start_time = parser.parse(points[0].timestamp).timestamp()
end_time = parser.parse(points[-1].timestamp).timestamp()

print(f"Start time: {start_time}, End time: {end_time}")

s = TaxiSession(
ride_id=key,
duration=end_time - start_time,
start_timestamp=points[0].timestamp,
end_timestamp=points[-1].timestamp,
n_points=0,
start_status=points[0].ride_status,
end_status=points[-1].ride_status,
session_reason=session_reason
)

print(f"\nSession: {s}\n")

return s

@beam.ptransform_fn
def taxi_stats_transform(json_strs: PCollection[str]) -> PCollection[TaxiSession]:
points: PCollection[TaxiPoint] = json_strs | "parse json strings" >> beam.Map(
json_to_taxi_point)
tstamp: PCollection[TaxiPoint] = points | "timestamping" >> beam.Map(add_timestamp)

key: PCollection[tuple[str, TaxiPoint]] = tstamp | "key" >> beam.WithKeys(
lambda e: e.ride_id)

key | "Debug JSON Strings after timestamp" >> beam.Map(print)
sessions: PCollection[TaxiSession] = key | "stats" >> beam.ParDo(FindSessions())
sessions | "Debug JSON Strings" >>  beam.Map(print)
output: PCollection[TaxiSession] = sessions | "cep" >> beam.Filter(
lambda s: s.duration > 1000)

print("----------------Output: ", output)
return output

# ------------------------------------------------------------------------------------
# Pipeline
# ------------------------------------------------------------------------------------

# def run(beam_options: Optional[PipelineOptions] = None):
#     with beam.Pipeline(options=beam_options) as pipeline:
#         rides: PCollection[str] = pipeline | "Read ndjson input" >> ReadFromText(
#             file_pattern=beam_options.input_filename)
#         calculations: PCollection[TaxiSession] = rides | "calculations" >> taxi_stats_transform()
#         writeablecalculations: PCollection[str] = calculations | beam.Map(json.dumps)
#         writeablecalculations | WriteToFiles(path=beam_options.output_filename)

# def run(beam_options: Optional[PipelineOptions] = None):
#     with beam.Pipeline(options=beam_options) as pipeline:
#         # Read input
#         rides: PCollection[str] = pipeline | "Read ndjson input" >> ReadFromText(
#             file_pattern=beam_options.input_filename)

#         # Apply transformations
#         calculations: PCollection[TaxiSession] = rides | "calculations" >> taxi_stats_transform()
#         # Convert SessionReason enum to string for JSON serialization
#         calculations = calculations | "Convert SessionReason to string" >> beam.Map(
#             lambda session: session._replace(session_reason=session.session_reason.name)
#         )
#         # Convert to JSON strings
#         writeablecalculations: PCollection[str] = calculations | beam.Map(json.dumps)

#         # Debug: Output the JSON strings
#         writeablecalculations | "Debug JSON Strings" >> beam.Map(print)

#         # Write to output
#         writeablecalculations | "Write to output" >> beam.io.WriteToText(
#             file_path_prefix=beam_options.output_filename,
#             file_name_suffix=".json"
#         )
def run(beam_options: Optional[PipelineOptions] = None):
with beam.Pipeline(options=beam_options) as pipeline:
# Read input
rides: PCollection[str] = pipeline | 'ReadFromSQS' >> beam.Create([None])

rides: PCollection[str] = rides | 'ReadAccessLogs' >> ParDo(ReadFromSQS(SQS_QUEUE_URL))

rides | "Debug JSON Strings" >> beam.Map(print)
# Ensure the input is a JSON string before parsing
rides = rides | "Convert to JSON String" >> beam.Map(json.dumps)
# Apply transformations
calculations: PCollection[TaxiSession] = rides | "calculations" >> taxi_stats_transform()
# Convert SessionReason enum to string for JSON serialization
calculations = calculations | "Convert SessionReason to string" >> beam.Map(
lambda session: session._replace(session_reason=session.session_reason.name)
)
# Convert to JSON strings
writeablecalculations: PCollection[str] = calculations | beam.Map(json.dumps)

# Debug: Output the JSON strings
# writeablecalculations | "Debug JSON Strings" >> beam.Map(print)

# Write to output
writeablecalculations | "Write to output" >> beam.io.WriteToText(
file_path_prefix=beam_options.output_filename,
file_name_suffix=".json"
)
Я не получаю никаких ошибок, но таймер очистки не вызывается. Я вызываю его через 100 секунд, поэтому, как и ожидалось, он должен вызвать функцию очистки таймера через 100 секунд и очистить все состояние, но оно не вызывает его.
И в функции _calculate_session он должен напечатать сеанс, но я думаю, что код не доходит до этого момента и находится в зависшем режиме. ТАК что делать, чтобы очистить состояние и завершить весь процесс.
Пожалуйста, помогите мне запустить его.
Я не удалил комментарии на случай, если они вам понадобятся.

Подробнее здесь: https://stackoverflow.com/questions/791 ... pache-beam
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Состояние и таймеры не работают должным образом в Apache Beam
    Anonymous » » в форуме Python
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous
  • Состояние и таймеры не работают должным образом в Apache Beam
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Таймеры C# истекают в отдельном потоке?
    Anonymous » » в форуме C#
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous
  • Узел: тестируйте таймеры промывки при использовании t.mock.timers.tick?
    Anonymous » » в форуме Javascript
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous
  • Видовые поддельные таймеры и вложенное время
    Anonymous » » в форуме Javascript
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»