Я строю низкодовой инструмент генерации SQL, который принимает объекты фильтров с столбцом, оператором, значением и т. Д. И компилируется в SQL. В идеале пользователи смогут указать оконные функции, которые я пытаюсь создать с помощью подразделений. В случае использования будет раздвижный окно -запрос, который суммирует суммы транзакций в течение n дней и флаг транзакций, которые превышают порог. < /p>
from sqlalchemy import select, func, inspect, text, MetaData, Table
from sqlalchemy.orm import sessionmaker, aliased
def apply_aggregation(query, aggregate_operator:str, time_window:str, time_column:str, column_name:str, filter_table:Table):
"""
Apply aggregation to the query (e.g., SUM).
"""
agg_func = getattr(func, aggregate_operator.lower(), None)
if not agg_func:
raise ValueError(f"Unsupported agregate function: {aggregate_operator}")
# Get the target column for aggregation
target_col = filter_table.c.get(column_name)
if target_col is None or (time_column not in filter_table.c):
raise ValueError(f"Invalid column names: {column_name} or {time_column}")
# Convert the time window into an integer (days)
try:
time_window_days = int(time_window.split()[0]) # Assumes format "X days"
except ValueError:
raise ValueError(f"Invalid time window format: {time_window}")
# Define the window function with RANGE INTERVAL
rolling_agg = agg_func(target_col).over(
partition_by=text("1"), # No partitioning
order_by=filter_table.c[time_column],
#does not work
range_=(-time_window_days,0)
#also does not work
#range_=(text(f"INTERVAL '{time_window}' PRECEDING"), text("CURRENT ROW"))
).label("rolling_agg")
# Build the subquery with the window function
subquery = (
select(
filter_table.c.id, # Keep primary key
filter_table.c[time_column], # Keep time column for reference
rolling_agg
)
.subquery()
)
# Join the subquery back to the original query
query = query.add_columns(subquery.c.rolling_agg).select_from(subquery)
return query
< /code>
А вот как я его использую < /p>
metadata=MetaData()
te_table = Table('transaction_entries',metadata, autoload_with=engine)
query = select(te_table)
query = apply_aggregation(query,
filters["aggregate_operator"],
filters["time_window"],
filters["time_column"],
filters["column_name"],
te_table) #simple case where filter table is target table
print(query)
rs = session.execute(query)
< /code>
желаемый результат < /p>
SELECT transaction_entries.id, ...other transaction_entries fields, anon_1.rolling_agg
FROM (SELECT transaction_entries.id AS id, transaction_entries.created_at AS created_at, sum(transaction_entries.amount) OVER (PARTITION BY 1 ORDER BY transaction_entries.created_at RANGE INTERVAL :param_1 PRECEDING) AS rolling_agg
FROM transaction_entries) AS anon_1, transaction_entries
< /code>
Фактический результат < /p>
SELECT transaction_entries.id, ...other transaction_entries fields, anon_1.rolling_agg
FROM (SELECT transaction_entries.id AS id, transaction_entries.created_at AS created_at, sum(transaction_entries.amount) OVER (PARTITION BY 1 ORDER BY transaction_entries.created_at RANGE BETWEEN :param_1 PRECEDING AND CURRENT ROW) AS rolling_agg
FROM transaction_entries) AS anon_1, transaction_entries
< /code>
Когда я пытаюсь изменить range_ на интервал, я получаю ошибку featurenotsupported < /p>
FeatureNotSupported: RANGE with offset PRECEDING/FOLLOWING is not supported for column type timestamp without time zone and offset type integer
LINE 2: ...R BY transaction_entries.created_at RANGE BETWEEN 30 PRECEDI...
^
HINT: Cast the offset value to an appropriate type.
Как я могу привести к правильному типу, или есть лучший способ этого?
Я строю низкодовой инструмент генерации SQL, который принимает объекты фильтров с столбцом, оператором, значением и т. Д. И компилируется в SQL. В идеале пользователи смогут указать оконные функции, которые я пытаюсь создать с помощью подразделений. В случае использования будет раздвижный окно -запрос, который суммирует суммы транзакций в течение n дней и флаг транзакций, которые превышают порог. < /p> [code]from sqlalchemy import select, func, inspect, text, MetaData, Table from sqlalchemy.orm import sessionmaker, aliased
def apply_aggregation(query, aggregate_operator:str, time_window:str, time_column:str, column_name:str, filter_table:Table): """ Apply aggregation to the query (e.g., SUM). """ agg_func = getattr(func, aggregate_operator.lower(), None) if not agg_func: raise ValueError(f"Unsupported agregate function: {aggregate_operator}")
# Get the target column for aggregation target_col = filter_table.c.get(column_name) if target_col is None or (time_column not in filter_table.c): raise ValueError(f"Invalid column names: {column_name} or {time_column}")
# Convert the time window into an integer (days) try: time_window_days = int(time_window.split()[0]) # Assumes format "X days" except ValueError: raise ValueError(f"Invalid time window format: {time_window}")
# Define the window function with RANGE INTERVAL rolling_agg = agg_func(target_col).over( partition_by=text("1"), # No partitioning order_by=filter_table.c[time_column], #does not work range_=(-time_window_days,0) #also does not work #range_=(text(f"INTERVAL '{time_window}' PRECEDING"), text("CURRENT ROW"))
).label("rolling_agg")
# Build the subquery with the window function subquery = ( select( filter_table.c.id, # Keep primary key filter_table.c[time_column], # Keep time column for reference rolling_agg ) .subquery() )
# Join the subquery back to the original query query = query.add_columns(subquery.c.rolling_agg).select_from(subquery)
return query
< /code> А вот как я его использую < /p> metadata=MetaData() te_table = Table('transaction_entries',metadata, autoload_with=engine)
query = select(te_table) query = apply_aggregation(query, filters["aggregate_operator"], filters["time_window"], filters["time_column"], filters["column_name"], te_table) #simple case where filter table is target table print(query) rs = session.execute(query) < /code> желаемый результат < /p> SELECT transaction_entries.id, ...other transaction_entries fields, anon_1.rolling_agg FROM (SELECT transaction_entries.id AS id, transaction_entries.created_at AS created_at, sum(transaction_entries.amount) OVER (PARTITION BY 1 ORDER BY transaction_entries.created_at RANGE INTERVAL :param_1 PRECEDING) AS rolling_agg FROM transaction_entries) AS anon_1, transaction_entries < /code> Фактический результат < /p> SELECT transaction_entries.id, ...other transaction_entries fields, anon_1.rolling_agg FROM (SELECT transaction_entries.id AS id, transaction_entries.created_at AS created_at, sum(transaction_entries.amount) OVER (PARTITION BY 1 ORDER BY transaction_entries.created_at RANGE BETWEEN :param_1 PRECEDING AND CURRENT ROW) AS rolling_agg FROM transaction_entries) AS anon_1, transaction_entries < /code> Когда я пытаюсь изменить range_ на интервал, я получаю ошибку featurenotsupported < /p> FeatureNotSupported: RANGE with offset PRECEDING/FOLLOWING is not supported for column type timestamp without time zone and offset type integer LINE 2: ...R BY transaction_entries.created_at RANGE BETWEEN 30 PRECEDI... ^ HINT: Cast the offset value to an appropriate type. [/code] Как я могу привести к правильному типу, или есть лучший способ этого?
Я строю низкодовой инструмент генерации SQL, который принимает объекты фильтров с столбцом, оператором, значением и т. Д. И компилируется в SQL. В идеале пользователи смогут указать оконные функции, которые я пытаюсь создать с помощью подразделений....
В настоящее время я пытаюсь предварительно обработать данные из исследования, в котором видеодады были собраны у пчел. Запись и вместо увеличения вместо этого наблюдается внезапное снижение значений временных метров. Значения внезапно уменьшаются до...
В настоящее время я пытаюсь предварительно обработать данные из исследования, в котором видеодады были собраны у пчел. Запись и вместо увеличения вместо этого наблюдается внезапное снижение значений временных метров. Значения внезапно уменьшаются до...
Я сталкиваюсь с проблемой изменения источника данных с Sybase на PostgreSQL. Существует оператор вставки с несколькими переменными, чтобы зацикливаться на запросе.
EntityManager em = createEntityManager();
Query query =...
В настоящее время я работаю над проектом, в котором мне нужно сначала объединить два набора данных:
Первый набор данных содержит данные о погоде с 30-минутными интервалами. Второй набор данных содержит данные минутного уровня с фотоэлектрическим...