Я публикую здесь полный код, так как пробую его уже несколько дней, но результатов пока нет.
Это популярный код, который объясняется командой Apache Beam и доступно на YT.
Вот ссылка:
Я изменил код для чтения данных из SQS
import enum
import json
from typing import Optional, Iterable, List, NamedTuple
import apache_beam as beam
from apache_beam import PCollection, TimeDomain
from apache_beam import coders
from apache_beam.io.fileio import WriteToFiles
from apache_beam.io.textio import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, BagStateSpec, TimerSpec, \
CombiningValueStateSpec, on_timer
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import Timestamp, Duration
from dateutil import parser
from apache_beam.io.textio import WriteToText
import boto3
from apache_beam import DoFn, ParDo, Map
import time
from datetime import datetime
SQS_QUEUE_URL = 'my_sqs_string'
class ReadFromSQS(DoFn):
def __init__(self, queue_url):
self.queue_url = queue_url
def process(self, element):
# Initialize AWS session with credentials
self.session = boto3.Session(
aws_access_key_id='access key',
aws_secret_access_key='secret',
region_name='us-east-1'
)
self.sqs_client = self.session.client('sqs')
# sqs_client = boto3.client('sqs', region_name='us-east-1') # Initialize the SQS client here
"""Continuously polls the SQS queue and yields messages."""
while True:
response = self.sqs_client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20 # Long polling
)
messages = response.get('Messages', [])
if messages:
for message in messages:
# print('$$$$$$$$$$$$$$$ : ', message)
body = json.loads(message['Body'])
yield body # Yield each message body
# Delete the message from the queue after processing
self.sqs_client.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
else:
print("No messages received, waiting...")
time.sleep(5) # Sleep to avoid rapid polling if no messages are available
# ---------------------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------------------
# Great intro to Beam schemas in python: [youtube]zx4p-UNSmrA[/youtube]
# First we create a class that inherits from NamedTuple, this is our Schema
#
# To actually create an instance of TaxiPoint you can leverage dictionary unpacking
# Let's say you have a dictionary d = {"ride_id": asdf, ...,"passenger_count": 8}
# This dictionary's keys match the fields of TaxiPoint.
# In this case, you use dictionary unpacking '**' to make class construction easy.
# Dictionary unpacking is when passing a dictionary to a function,
# the key-value pairs can be unpacked into keyword arguments in a function call
# where the dictionary keys match the parameter names of the function.
# So the call to the constructor looks like ==> TaxiPoint(**d)
class TaxiPoint(NamedTuple):
ride_id: str
point_idx: int
latitude: float
longitude: float
timestamp: str
meter_reading: float
meter_increment: float
ride_status: str
passenger_count: int
class SessionReason(enum.Enum):
DROPOFF_SEEN = 1
GARBAGE_COLLECTION = 2
class TaxiSession(NamedTuple):
ride_id: str
duration: float
start_timestamp: str
end_timestamp: str
n_points: int
start_status: str
end_status: str
session_reason: SessionReason
# Second we let Beam know about our Schema by registering it
beam.coders.registry.register_coder(TaxiPoint, beam.coders.RowCoder)
beam.coders.registry.register_coder(TaxiSession, beam.coders.RowCoder)
# ---------------------------------------------------------------------------------------
# Parsing functions
# ---------------------------------------------------------------------------------------
def json_to_taxi_point(s: str) -> TaxiPoint:
d: dict = json.loads(s)
return TaxiPoint(**d)
def add_timestamp(p: TaxiPoint) -> TaxiPoint:
ts: float = parser.parse(p.timestamp).timestamp()
print('$$$$$$$$$$$$$$$$$ ', p.timestamp, ts, TimestampedValue(p, ts))
return TimestampedValue(p, ts)
def max_timestamp_combine(input_iterable):
print(f"Input iterable: {input_iterable}")
print(max(input_iterable, default=0))
return max(input_iterable, default=0)
# --------------------------------------------------------------------------------------
# TASK: Write a DoFn to find sessions using state & timers
# --------------------------------------------------------------------------------------
class FindSessions(beam.DoFn):
"""This DoFn applies state & timers to try to infer the session data from the received
points."""
# The state for the key
KEY_STATE = ReadModifyWriteStateSpec('state', coders.StrUtf8Coder())
# Elements bag of taxi ride events
TAXI_RIDE_EVENTS_BAG = BagStateSpec('taxi_ride_events_bag',
coders.registry.get_coder(TaxiPoint))
# Event time timer for Garbage Collection
GC_TIMER = TimerSpec('gc_timer', TimeDomain.WATERMARK)
# The maximum element timestamp seen so far.
MAX_TIMESTAMP = CombiningValueStateSpec('max_timestamp_seen', max_timestamp_combine)
def process(self, element: tuple[str, TaxiPoint],
element_timestamp=beam.DoFn.TimestampParam,
key_state=beam.DoFn.StateParam(KEY_STATE),
taxi_ride_events_bag=beam.DoFn.StateParam(TAXI_RIDE_EVENTS_BAG),
max_timestamp_seen=beam.DoFn.StateParam(MAX_TIMESTAMP),
gc_timer=beam.DoFn.TimerParam(GC_TIMER)
# timer = beam.DoFn.TimerParam(GC_TIMER)
) -> Iterable[TaxiSession]:
# TODO
# Update the state for every new message
key, point = element
# print(f"Processing element {point}")
if (key_state.read() is None) or (key_state.read() != key):
key_state.write(key)
taxi_ride_events_bag.add(point)
max_timestamp_seen.add(element_timestamp.micros)
print(f"Max timestamp seen: {max_timestamp_seen.read()}")
print(f"element_timestamp: {element_timestamp}")
# Check if end of session is seen, and emit the session data
if point.ride_status == 'dropoff':
print(f"Dropoff seen")
#Session has ended
session = self._calculate_session(key, taxi_ride_events_bag.read(), SessionReason.DROPOFF_SEEN)
yield session
# print(f"Session ended: {session}")
taxi_ride_events_bag.clear()
key_state.clear()
max_timestamp_seen.clear()
gc_timer.clear()
else:
print(f"Dropoff not seen")
# gc_timer.set(expiration_ts)
# Set the timer to expire 2 minutes after the current event timestamp
expiration_seconds = element_timestamp.micros / 1_000_000 + 120 # Adding 120 seconds (2 minutes)
expiration_ts = Timestamp(seconds=int(expiration_seconds))
print(f"Setting timer for {expiration_ts}, expiration_seconds: {expiration_seconds}")
# gc_timer.set(expiration_ts)
print('********** ', Timestamp.now() + Duration(seconds=60))
gc_timer.set(Timestamp.now() + Duration(seconds=100))
@on_timer(GC_TIMER)
def expiry_callback(
self,
key_state=beam.DoFn.StateParam(KEY_STATE),
taxi_ride_events_bag=beam.DoFn.StateParam(TAXI_RIDE_EVENTS_BAG),
max_timestamp_seen=beam.DoFn.StateParam(MAX_TIMESTAMP)) -> Iterable[TaxiSession]:
# The timer has been triggered, you need to emit a session with whatever data has been
# accumulated so far
print(f"----------------------------Timer triggered-----------------------------")
session = self._calculate_session(key_state.read(), taxi_ride_events_bag.read(), SessionReason.GARBAGE_COLLECTION)
yield session
taxi_ride_events_bag.clear()
key_state.clear()
max_timestamp_seen.clear()
@staticmethod
def _calculate_session(key: str,
taxi_ride_events_bag: Iterable[TaxiPoint],
session_reason: SessionReason) -> TaxiSession:
# TODO
# Since we emit sessions from two different methods, let's use the same function to emit
# sessions, so we do it with consistency.
points = list(taxi_ride_events_bag)
points.sort(key=lambda x: x.timestamp)
start_time = parser.parse(points[0].timestamp).timestamp()
end_time = parser.parse(points[-1].timestamp).timestamp()
print(f"Start time: {start_time}, End time: {end_time}")
s = TaxiSession(
ride_id=key,
duration=end_time - start_time,
start_timestamp=points[0].timestamp,
end_timestamp=points[-1].timestamp,
n_points=0,
start_status=points[0].ride_status,
end_status=points[-1].ride_status,
session_reason=session_reason
)
print(f"\nSession: {s}\n")
return s
@beam.ptransform_fn
def taxi_stats_transform(json_strs: PCollection[str]) -> PCollection[TaxiSession]:
points: PCollection[TaxiPoint] = json_strs | "parse json strings" >> beam.Map(
json_to_taxi_point)
tstamp: PCollection[TaxiPoint] = points | "timestamping" >> beam.Map(add_timestamp)
key: PCollection[tuple[str, TaxiPoint]] = tstamp | "key" >> beam.WithKeys(
lambda e: e.ride_id)
key | "Debug JSON Strings after timestamp" >> beam.Map(print)
sessions: PCollection[TaxiSession] = key | "stats" >> beam.ParDo(FindSessions())
sessions | "Debug JSON Strings" >> beam.Map(print)
output: PCollection[TaxiSession] = sessions | "cep" >> beam.Filter(
lambda s: s.duration > 1000)
print("----------------Output: ", output)
return output
# ------------------------------------------------------------------------------------
# Pipeline
# ------------------------------------------------------------------------------------
# def run(beam_options: Optional[PipelineOptions] = None):
# with beam.Pipeline(options=beam_options) as pipeline:
# rides: PCollection[str] = pipeline | "Read ndjson input" >> ReadFromText(
# file_pattern=beam_options.input_filename)
# calculations: PCollection[TaxiSession] = rides | "calculations" >> taxi_stats_transform()
# writeablecalculations: PCollection[str] = calculations | beam.Map(json.dumps)
# writeablecalculations | WriteToFiles(path=beam_options.output_filename)
# def run(beam_options: Optional[PipelineOptions] = None):
# with beam.Pipeline(options=beam_options) as pipeline:
# # Read input
# rides: PCollection[str] = pipeline | "Read ndjson input" >> ReadFromText(
# file_pattern=beam_options.input_filename)
# # Apply transformations
# calculations: PCollection[TaxiSession] = rides | "calculations" >> taxi_stats_transform()
# # Convert SessionReason enum to string for JSON serialization
# calculations = calculations | "Convert SessionReason to string" >> beam.Map(
# lambda session: session._replace(session_reason=session.session_reason.name)
# )
# # Convert to JSON strings
# writeablecalculations: PCollection[str] = calculations | beam.Map(json.dumps)
# # Debug: Output the JSON strings
# writeablecalculations | "Debug JSON Strings" >> beam.Map(print)
# # Write to output
# writeablecalculations | "Write to output" >> beam.io.WriteToText(
# file_path_prefix=beam_options.output_filename,
# file_name_suffix=".json"
# )
def run(beam_options: Optional[PipelineOptions] = None):
with beam.Pipeline(options=beam_options) as pipeline:
# Read input
rides: PCollection[str] = pipeline | 'ReadFromSQS' >> beam.Create([None])
rides: PCollection[str] = rides | 'ReadAccessLogs' >> ParDo(ReadFromSQS(SQS_QUEUE_URL))
rides | "Debug JSON Strings" >> beam.Map(print)
# Ensure the input is a JSON string before parsing
rides = rides | "Convert to JSON String" >> beam.Map(json.dumps)
# Apply transformations
calculations: PCollection[TaxiSession] = rides | "calculations" >> taxi_stats_transform()
# Convert SessionReason enum to string for JSON serialization
calculations = calculations | "Convert SessionReason to string" >> beam.Map(
lambda session: session._replace(session_reason=session.session_reason.name)
)
# Convert to JSON strings
writeablecalculations: PCollection[str] = calculations | beam.Map(json.dumps)
# Debug: Output the JSON strings
# writeablecalculations | "Debug JSON Strings" >> beam.Map(print)
# Write to output
writeablecalculations | "Write to output" >> beam.io.WriteToText(
file_path_prefix=beam_options.output_filename,
file_name_suffix=".json"
)
Я не получаю никаких ошибок, но таймер очистки не вызывается. Я вызываю его через 100 секунд, поэтому, как и ожидалось, он должен вызвать функцию очистки таймера через 100 секунд и очистить все состояние, но оно не вызывает его.
И в функции _calculate_session он должен напечатать сеанс, но я думаю, что код не доходит до этого момента и находится в зависшем режиме. ТАК что делать, чтобы очистить состояние и завершить весь процесс.
Пожалуйста, помогите мне запустить его.
Я не удалил комментарии на случай, если они вам понадобятся.
Я публикую здесь полный код, так как пробую его уже несколько дней, но результатов пока нет. Это популярный код, который объясняется командой Apache Beam и доступно на YT. Вот ссылка: [youtube]OpannyigCFg[/youtube] Я изменил код для чтения данных из SQS [code] import enum import json from typing import Optional, Iterable, List, NamedTuple
import apache_beam as beam from apache_beam import PCollection, TimeDomain from apache_beam import coders from apache_beam.io.fileio import WriteToFiles from apache_beam.io.textio import ReadFromText from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, BagStateSpec, TimerSpec, \ CombiningValueStateSpec, on_timer from apache_beam.transforms.window import TimestampedValue from apache_beam.utils.timestamp import Timestamp, Duration from dateutil import parser from apache_beam.io.textio import WriteToText import boto3 from apache_beam import DoFn, ParDo, Map import time from datetime import datetime
# Initialize AWS session with credentials self.session = boto3.Session( aws_access_key_id='access key', aws_secret_access_key='secret', region_name='us-east-1' ) self.sqs_client = self.session.client('sqs') # sqs_client = boto3.client('sqs', region_name='us-east-1') # Initialize the SQS client here """Continuously polls the SQS queue and yields messages.""" while True: response = self.sqs_client.receive_message( QueueUrl=self.queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=20 # Long polling )
messages = response.get('Messages', [])
if messages: for message in messages: # print('$$$$$$$$$$$$$$$ : ', message) body = json.loads(message['Body']) yield body # Yield each message body
# Delete the message from the queue after processing self.sqs_client.delete_message( QueueUrl=self.queue_url, ReceiptHandle=message['ReceiptHandle'] ) else: print("No messages received, waiting...") time.sleep(5) # Sleep to avoid rapid polling if no messages are available
# Great intro to Beam schemas in python: [youtube]zx4p-UNSmrA[/youtube]
# First we create a class that inherits from NamedTuple, this is our Schema # # To actually create an instance of TaxiPoint you can leverage dictionary unpacking # Let's say you have a dictionary d = {"ride_id": asdf, ...,"passenger_count": 8} # This dictionary's keys match the fields of TaxiPoint. # In this case, you use dictionary unpacking '**' to make class construction easy. # Dictionary unpacking is when passing a dictionary to a function, # the key-value pairs can be unpacked into keyword arguments in a function call # where the dictionary keys match the parameter names of the function. # So the call to the constructor looks like ==> TaxiPoint(**d) class TaxiPoint(NamedTuple): ride_id: str point_idx: int latitude: float longitude: float timestamp: str meter_reading: float meter_increment: float ride_status: str passenger_count: int
class SessionReason(enum.Enum): DROPOFF_SEEN = 1 GARBAGE_COLLECTION = 2
class TaxiSession(NamedTuple): ride_id: str duration: float start_timestamp: str end_timestamp: str n_points: int start_status: str end_status: str session_reason: SessionReason
# Second we let Beam know about our Schema by registering it beam.coders.registry.register_coder(TaxiPoint, beam.coders.RowCoder) beam.coders.registry.register_coder(TaxiSession, beam.coders.RowCoder)
# -------------------------------------------------------------------------------------- # TASK: Write a DoFn to find sessions using state & timers # --------------------------------------------------------------------------------------
class FindSessions(beam.DoFn): """This DoFn applies state & timers to try to infer the session data from the received points."""
# The state for the key KEY_STATE = ReadModifyWriteStateSpec('state', coders.StrUtf8Coder())
# Elements bag of taxi ride events TAXI_RIDE_EVENTS_BAG = BagStateSpec('taxi_ride_events_bag', coders.registry.get_coder(TaxiPoint))
# Event time timer for Garbage Collection GC_TIMER = TimerSpec('gc_timer', TimeDomain.WATERMARK)
# The maximum element timestamp seen so far. MAX_TIMESTAMP = CombiningValueStateSpec('max_timestamp_seen', max_timestamp_combine)
def process(self, element: tuple[str, TaxiPoint], element_timestamp=beam.DoFn.TimestampParam, key_state=beam.DoFn.StateParam(KEY_STATE), taxi_ride_events_bag=beam.DoFn.StateParam(TAXI_RIDE_EVENTS_BAG), max_timestamp_seen=beam.DoFn.StateParam(MAX_TIMESTAMP), gc_timer=beam.DoFn.TimerParam(GC_TIMER) # timer = beam.DoFn.TimerParam(GC_TIMER) ) -> Iterable[TaxiSession]: # TODO # Update the state for every new message key, point = element # print(f"Processing element {point}") if (key_state.read() is None) or (key_state.read() != key): key_state.write(key)
taxi_ride_events_bag.add(point) max_timestamp_seen.add(element_timestamp.micros) print(f"Max timestamp seen: {max_timestamp_seen.read()}") print(f"element_timestamp: {element_timestamp}") # Check if end of session is seen, and emit the session data if point.ride_status == 'dropoff':
@staticmethod def _calculate_session(key: str, taxi_ride_events_bag: Iterable[TaxiPoint], session_reason: SessionReason) -> TaxiSession: # TODO # Since we emit sessions from two different methods, let's use the same function to emit # sessions, so we do it with consistency.
# Write to output writeablecalculations | "Write to output" >> beam.io.WriteToText( file_path_prefix=beam_options.output_filename, file_name_suffix=".json" ) [/code] Я не получаю никаких ошибок, но таймер очистки не вызывается. Я вызываю его через 100 секунд, поэтому, как и ожидалось, он должен вызвать функцию очистки таймера через 100 секунд и очистить все состояние, но оно не вызывает его. И в функции _calculate_session он должен напечатать сеанс, но я думаю, что код не доходит до этого момента и находится в зависшем режиме. ТАК что делать, чтобы очистить состояние и завершить весь процесс. Пожалуйста, помогите мне запустить его. Я не удалил комментарии на случай, если они вам понадобятся.
У меня есть этот фрагмент кода.
class UserAccessInfo(beam.DoFn):
# State to store the access data for a user during the day
ACCESS_STATE = BagStateSpec('access_state', StrUtf8Coder())
CLEAR_TIMER = TimerSpec('clear', TimeDomain.REAL_TIME)
У меня есть этот фрагмент кода.
class UserAccessInfo(beam.DoFn):
# State to store the access data for a user during the day
ACCESS_STATE = BagStateSpec('access_state', StrUtf8Coder())
CLEAR_TIMER = TimerSpec('clear', TimeDomain.REAL_TIME)
Истекает ли System.Timers.Timer в отдельном потоке, отличном от потока, который его создал?
Предположим, у меня есть класс с таймером, который срабатывает каждый strong>5 секунд. Когда таймер срабатывает, в методе elapsed изменяется некоторый...
У меня есть следующая простая функция:
export async function useTimeout() {
const sleep = () => new Promise((resolve) => setTimeout(resolve, 1000));
let c = 1;
while (true) {
c += 1;
if (c > 5) {
break;
}
// THIS LINE BREAKS THE TEST
// await new...
У меня есть код, в котором SetTimeout вызывается неоднократно. По сути, обратный вызов One Settimout устанавливает еще одну и так далее. Я хотел бы проверить задержки с помощью vi.usefaketimers () , чтобы код был выполнен мгновенно, но я все еще...