Первый вопрос, опубликованный в Stackoverflow, поэтому не стесняйтесь вносить предложения относительно того, какие теги использовать для моего поста и т. Д. Наконец, я хотел бы написать это обратно в API отдыха, но в настоящее время это не проблема. Для справки см. Внедрение моего кода ниже. < /P>
import argparse
import logging
import os
import apache_beam as beam
from apache_beam.transforms.userstate import BagStateSpec, ReadModifyWriteStateSpec, TimerSpec, on_timer
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.windowed_value import WindowedValue
from apache_beam.transforms.trigger import AfterCount, AccumulationMode, Repeatedly
from apache_beam.transforms.window import GlobalWindows, Duration, TimestampedValue
import typing
from apache_beam.typehints.decorators import with_input_types, with_output_types
import requests
from apache_beam.options.pipeline_options import PipelineOptions
from typing import Union
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
django_container_name = os.getenv("DJANGO_CONTAINER", "django")
django_port = os.getenv("DJANGO_PORT", "8000")
def run(argv=None):
# Parse command-line arguments
parser = argparse.ArgumentParser()
args, beam_args = parser.parse_known_args(argv)
# Define custom pipeline options
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
"--source_table_id",
required=True,
help="Table ID containing the raw data.",
type=str,
)
parser.add_argument(
"--source_table_column",
required=True,
help="Column in source table.",
type=str,
)
parser.add_argument(
"--destination_table_id",
required=True,
help="Destination table ID for the output.",
type=str,
)
parser.add_argument(
"--smoothing",
required=False,
help="Smoothing variable of the EMA.",
type=float,
default=2.0,
)
parser.add_argument(
"--range",
required=False,
help="Timerange of the EMA in minutes.",
type=int,
default=60,
)
# Parse Beam pipeline options into a PipelineOptions object
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
# Function to fetch data from the API
def get_api_data(dummy_start):
# Construct the API URL
api_url = (
f"http://{django_container_name}:{django_port}"
f"/api/dynamic-table/{args.source_table_id}/?ordering=timestamp"
)
logging.debug("Now fetching from ", api_url)
# Make the initial API request
response = requests.get(api_url, timeout=10)
response = response.json()
results = response.get("results", [])
next_url = response.get("next")
# Fetch data from paginated API
while next_url:
logging.debug("Now fetching from ", next_url)
response = requests.get(next_url, timeout=10)
response = response.json()
results.extend(response.get("results", []))
next_url = response.get("next")
# Extract relevant data from the API response
results = [
{
"timestamp": result.get("timestamp"),
args.source_table_column: result.get(args.source_table_column),
}
for result in results
]
return results
def post_api_data(data, feature_name="feature_0"):
headers = {"Content-Type": "application/json"}
# Construct the API URL
api_url = (
f"http://{django_container_name}:{django_port}"
f"/api/dynamic-table/feature/{args.destination_table_id}/"
)
data[feature_name] = data.pop("ema")
logging.debug("Now posting to ", api_url)
# Make the API request
response = requests.post(api_url, json=data, headers=headers, timeout=10)
if response.status_code not in [200, 201]:
logger.error(f"Failed to save data: {response.status_code}, {response.json()}")
return
return
class TimestampedValueCoder(beam.coders.Coder):
def encode(self, value: TimestampedValue):
"""Encode TimestampedValue to bytes."""
timestamp = value.timestamp.to_rfc3339()
element = value.value
return f"{timestamp}:{element}".encode("utf-8")
def decode(self, encoded: bytes):
decoded = encoded.decode("utf-8")
timestamp_str, element = decoded.split(":", 1)
timestamp = Timestamp.from_rfc3339(timestamp_str)
return TimestampedValue(value=element, timestamp=timestamp)
def is_deterministic(self):
return True
beam.coders.registry.register_coder(TimestampedValue, TimestampedValueCoder)
class EMAStatefulDoFn(beam.DoFn):
PREVIOUS_EMA = BagStateSpec("previous_ema", TimestampedValueCoder())
def process(self, element,
previous_ema=beam.DoFn.StateParam(PREVIOUS_EMA),
timestamp=beam.DoFn.TimestampParam,):
# Get the previous EMA
previous_ema_value = previous_ema.read()
if not previous_ema_value:
previous_ema_value = element
# Calculate the EMA
alpha = 2 / (args.smoothing + 1)
ema = (element * alpha) + (previous_ema_value * (1 - alpha))
# Update the state
previous_ema.clear()
previous_ema.add(ema)
# Output the EMA
return [TimestampedValue(value=ema, timestamp=timestamp)]
# Define the Beam pipeline
with beam.Pipeline(options=beam_options) as p:
_ = (
p
| "Create" >> beam.Create(["Start"]) # Workaround to kickstart the pipeline
| "fetch API data" >> beam.FlatMap(get_api_data) # Fetch data from the API
| "Timestamp values" >> beam.Map(
lambda x: TimestampedValue(x[args.source_table_column],
Timestamp.from_rfc3339(x["timestamp"])))
| "Into windows" >> beam.WindowInto(GlobalWindows())
| "Calculate EMA" >> beam.ParDo(EMAStatefulDoFn())
| "Print EMA" >> beam.Map(logging.info)
)
if __name__ == "__main__":
# Set logging level
logging.getLogger().setLevel(logging.DEBUG)
run()
< /code>
Проблема, с которой я сталкиваюсь, заключается в том, что я не могу должным образом реализовать Statefuldofn для этого. Причина, по которой я пытаюсь использовать Statefuldofn в окне GlobalWindows, заключается в том, что я сохраняю доступ к любой ранее рассчитанной EMA, но, возможно, я не рассматриваю более подходящую реализацию SlidingWindows. Это преобразуется в временные рамки и передается в мой Dofn. Если я полностью удалю DOFN и просто войдет в систему выходной сигнал, он итеративно зарегистрирует полученное значение с входа API REST, предполагая, что «в Windows» на самом деле не является неисправной Ptransform.
Заранее!
Подробнее здесь: https://stackoverflow.com/questions/794 ... eam-but-ke
Создание экспоненциального скользящего среднего с использованием Statefuldofns в Beam Apache, но продолжайте сталкиватьс ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение