У меня возникает ошибка при создании кадра данных при запуске приведенного ниже кода на листе Snowflake Python, когда я использую Schema=["ENTITY_ID", "SRC_ID", "REF_ID"],
Я получаю сообщение об ошибке:
Traceback (most recent call last):
Worksheet, line 130, in main
File "snowflake/snowpark/session.py", line 2590, in create_dataframe
new_schema = reduce(
File "snowflake/snowpark/session.py", line 2592, in
(infer_schema(row, names) for row in data),
File "snowflake/snowpark/_internal/type_utils.py", line 465, in infer_schema
raise TypeError(f"Unable to infer the type of the field {k}.") from e
TypeError: Unable to infer the type of the field ENTITY_ID.
С другой стороны, когда я пытаюсь использовать схему:
df_schema = StructType([
StructField("ENTITY_ID", IntegerType()),
StructField("SRC_ID", IntegerType()),
StructField("REF_ID", IntegerType())
])
Я получаю следующую ошибку без номеров строк:
100357 (P0000): Python Interpreter Error: TypeError: Object of type DataFrame is not serializable
В моей локальной сети, когда я пытаюсь запустить код с параметрами сеанса, я могу запустить код с df_schema=["ENTITY_ID", "SRC_ID", "REF_ID" ] в операторе create_dataframe(output_table, Schema=df_schema).
Попросите совета по устранению этой ошибки.
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col, sql_expr, lit, lower,
when_matched, when_not_matched, StoredProcedure, coalesce, concat
from snowflake.snowpark.types import
ArrayType,FloatType,VariantType, StructType, StructField,
StringType, IntegerType
import json
from copy import copy
import ast
import concurrent.futures
import logging
import configparser
import os
import sys
from time import time
from datetime import datetime
import json
def main(session: snowpark.Session):
# Your code goes here, inside the "main" handler.
start_time = time()
start_timestamp = datetime.now()
print(f"Start Timestamp : {start_timestamp}"
# import the tables to snowflake data frame format
df_ER_XREF_MATCH = session.table(CROSS_REF_TBL_NM)
df_ER_XREF_subset = df_ER_XREF_MATCH.select([col("ID"), col("SRC_ID"),
col("REF_ID"), col("RULE_ID"), col("ENTITY_ID")]).filter(
col('src_id') == '356761')
df_ER_XREF_subset_matched = df_ER_XREF_subset.filter(col("RULE_ID") != 0)
df_ER_XREF_subset_unmatched = df_ER_XREF_subset.filter(col("RULE_ID") == 0)
df_ER_XREF_subset_unmatched = df_ER_XREF_subset_unmatched.with_column("ENTITY_ID",
concat(lit("ERMDM"), sql_expr(
f"{XREF_ENTITY_SEQ_NM}.NEXTVAL").cast(
"string")))
connected_ids_data = df_ER_XREF_subset_matched.collect()
connected_ids_data = [(row['ID'], row['SRC_ID'], row['REF_ID']) for row in
connected_ids_data]
graph = {}
for _, src_id, ref_id in connected_ids_data:
if src_id not in graph:
graph[src_id] = []
if ref_id not in graph:
graph[ref_id] = []
graph[src_id].append(ref_id)
graph[ref_id].append(src_id) # Ensure bidirectional connection
# Function to perform DFS and find all nodes in the same component
def dfs(node, visited, component):
stack = [node]
while stack:
current = stack.pop()
if current not in visited:
visited.add(current)
component.add(current)
stack.extend(set(graph[current]) - visited)
# Initialize variables to store connected components
visited = set()
components = []
# Find all connected components
for node in graph:
if node not in visited:
component = set()
dfs(node, visited, component)
components.append(component)
# Generate output table
output_table = []
visited_edges = set()
for component in components:
common_id = concat(lit("ERMDM"),sql_expr(f{XREF_ENTITY_SEQ_NM}.NEXTVAL").cast("string"))
for node in component:
for neighbor in graph[node]:
if neighbor in component and (node, neighbor) not in visited_edges:
output_table.append((common_id, node, neighbor))
output_table.append((common_id, neighbor, node))
visited_edges.add((node, neighbor))
visited_edges.add((neighbor, node))
df_schema = StructType([StructField("ENTITY_ID", IntegerType()),
StructField("SRC_ID", IntegerType()),
StructField("REF_ID", IntegerType())])
matched_output_df = session.create_dataframe(output_table, schema=df_schema)
Подробнее здесь: https://stackoverflow.com/questions/790 ... n-but-does
Почему запуск Snowflake Python работает при работе через локальный сеанс, но не работает на листе снежинки во время созд ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Ошибка ноутбуков снежинки: «Объект результата запроса снежинки не подписан
Anonymous » » в форуме Python - 0 Ответы
- 5 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Соединение Snowflake sqlalchemy со Snowflake выдает ошибку разрешения в Python
Anonymous » » в форуме Python - 0 Ответы
- 32 Просмотры
-
Последнее сообщение Anonymous
-