Создание экспоненциального скользящего среднего с использованием Statefuldofns в Beam Apache, но продолжайте сталкиватьсPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Создание экспоненциального скользящего среднего с использованием Statefuldofns в Beam Apache, но продолжайте сталкиватьс

Сообщение Anonymous »

Первый вопрос, опубликованный в Stackoverflow, поэтому не стесняйтесь вносить предложения относительно того, какие теги использовать для моего поста и т. Д. Наконец, я хотел бы написать это обратно в API отдыха, но в настоящее время это не проблема. Для справки см. Внедрение моего кода ниже. < /P>
import argparse
import logging
import os

import apache_beam as beam
from apache_beam.transforms.userstate import BagStateSpec, ReadModifyWriteStateSpec, TimerSpec, on_timer
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.windowed_value import WindowedValue
from apache_beam.transforms.trigger import AfterCount, AccumulationMode, Repeatedly
from apache_beam.transforms.window import GlobalWindows, Duration, TimestampedValue

import typing
from apache_beam.typehints.decorators import with_input_types, with_output_types
import requests
from apache_beam.options.pipeline_options import PipelineOptions
from typing import Union

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load environment variables
django_container_name = os.getenv("DJANGO_CONTAINER", "django")
django_port = os.getenv("DJANGO_PORT", "8000")

def run(argv=None):
# Parse command-line arguments
parser = argparse.ArgumentParser()
args, beam_args = parser.parse_known_args(argv)

# Define custom pipeline options
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
"--source_table_id",
required=True,
help="Table ID containing the raw data.",
type=str,
)
parser.add_argument(
"--source_table_column",
required=True,
help="Column in source table.",
type=str,
)
parser.add_argument(
"--destination_table_id",
required=True,
help="Destination table ID for the output.",
type=str,
)
parser.add_argument(
"--smoothing",
required=False,
help="Smoothing variable of the EMA.",
type=float,
default=2.0,
)
parser.add_argument(
"--range",
required=False,
help="Timerange of the EMA in minutes.",
type=int,
default=60,
)

# Parse Beam pipeline options into a PipelineOptions object
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

# Function to fetch data from the API
def get_api_data(dummy_start):

# Construct the API URL
api_url = (
f"http://{django_container_name}:{django_port}"
f"/api/dynamic-table/{args.source_table_id}/?ordering=timestamp"
)

logging.debug("Now fetching from ", api_url)

# Make the initial API request
response = requests.get(api_url, timeout=10)
response = response.json()
results = response.get("results", [])
next_url = response.get("next")

# Fetch data from paginated API
while next_url:
logging.debug("Now fetching from ", next_url)
response = requests.get(next_url, timeout=10)
response = response.json()
results.extend(response.get("results", []))
next_url = response.get("next")

# Extract relevant data from the API response
results = [
{
"timestamp": result.get("timestamp"),
args.source_table_column: result.get(args.source_table_column),
}
for result in results
]

return results

def post_api_data(data, feature_name="feature_0"):
headers = {"Content-Type": "application/json"}
# Construct the API URL
api_url = (
f"http://{django_container_name}:{django_port}"
f"/api/dynamic-table/feature/{args.destination_table_id}/"
)

data[feature_name] = data.pop("ema")
logging.debug("Now posting to ", api_url)

# Make the API request
response = requests.post(api_url, json=data, headers=headers, timeout=10)

if response.status_code not in [200, 201]:
logger.error(f"Failed to save data: {response.status_code}, {response.json()}")
return

return

class TimestampedValueCoder(beam.coders.Coder):
def encode(self, value: TimestampedValue):
"""Encode TimestampedValue to bytes."""
timestamp = value.timestamp.to_rfc3339()
element = value.value
return f"{timestamp}:{element}".encode("utf-8")

def decode(self, encoded: bytes):
decoded = encoded.decode("utf-8")
timestamp_str, element = decoded.split(":", 1)
timestamp = Timestamp.from_rfc3339(timestamp_str)
return TimestampedValue(value=element, timestamp=timestamp)

def is_deterministic(self):
return True

beam.coders.registry.register_coder(TimestampedValue, TimestampedValueCoder)

class EMAStatefulDoFn(beam.DoFn):

PREVIOUS_EMA = BagStateSpec("previous_ema", TimestampedValueCoder())

def process(self, element,
previous_ema=beam.DoFn.StateParam(PREVIOUS_EMA),
timestamp=beam.DoFn.TimestampParam,):
# Get the previous EMA
previous_ema_value = previous_ema.read()
if not previous_ema_value:
previous_ema_value = element

# Calculate the EMA
alpha = 2 / (args.smoothing + 1)
ema = (element * alpha) + (previous_ema_value * (1 - alpha))

# Update the state
previous_ema.clear()
previous_ema.add(ema)

# Output the EMA
return [TimestampedValue(value=ema, timestamp=timestamp)]

# Define the Beam pipeline
with beam.Pipeline(options=beam_options) as p:

_ = (
p
| "Create" >> beam.Create(["Start"]) # Workaround to kickstart the pipeline
| "fetch API data" >> beam.FlatMap(get_api_data) # Fetch data from the API
| "Timestamp values" >> beam.Map(
lambda x: TimestampedValue(x[args.source_table_column],
Timestamp.from_rfc3339(x["timestamp"])))
| "Into windows" >> beam.WindowInto(GlobalWindows())
| "Calculate EMA" >> beam.ParDo(EMAStatefulDoFn())
| "Print EMA" >> beam.Map(logging.info)

)

if __name__ == "__main__":
# Set logging level
logging.getLogger().setLevel(logging.DEBUG)
run()

< /code>
Проблема, с которой я сталкиваюсь, заключается в том, что я не могу должным образом реализовать Statefuldofn для этого. Причина, по которой я пытаюсь использовать Statefuldofn в окне GlobalWindows, заключается в том, что я сохраняю доступ к любой ранее рассчитанной EMA, но, возможно, я не рассматриваю более подходящую реализацию SlidingWindows. Это преобразуется в временные рамки и передается в мой Dofn. Если я полностью удалю DOFN и просто войдет в систему выходной сигнал, он итеративно зарегистрирует полученное значение с входа API REST, предполагая, что «в Windows» на самом деле не является неисправной Ptransform.
Заранее!

Подробнее здесь: https://stackoverflow.com/questions/794 ... eam-but-ke
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»