Ошибка pyspark Преобразование серии сплавок Pandas в массив стрелка (строка)Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Ошибка pyspark Преобразование серии сплавок Pandas в массив стрелка (строка)

Сообщение Anonymous »

Меня попросили отладить какой -то код, который извлекает данные из API, преобразует его из JSON в Pandas, Pandas, чтобы исчез, а затем записывает в таблицу. Следующая ошибка возникает, когда она пытается преобразовать из Pandas в Spark:

Код: Выделить всё

Exception thrown when converting pandas.Series (float64) with name 'latitude' to Arrow Array (string).< /code>

это приводит к потере данных в таблице, на которую он пишет: API, как известно, содержит ~ 30 000 записей, но в таблице конечной работы только 11 000 записано. Я полагаю, что это связано с функцией process_and_write_page () 
сбой в середине страницы и перейти на следующий. Для анонимности детали API, но, надеюсь, это дает достаточно информации для диагностики проблемы. < /p>

Код: Выделить всё

import requests, json, time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

import pandas as pd
from tqdm import tqdm

# Initialize Spark session with Arrow optimization and schema auto-merge enabled
spark = SparkSession.builder.appName("app") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
.getOrCreate()

# API details
base_api_url = "https://an/api/url"
api_key = "api12345"

# Headers for the API request
headers = {
"subscriptionKey": api_key,
"userAgent": "aUserAgent"
}

# Create a session
session = requests.Session()
session.headers.update(headers)

# Function to fetch a page of IDs
def fetch_ids(page=1, per_page=100, get_total=False):
try:
response = session.get(f"{base_api_url}?page={page}&perPage={per_page}", timeout=30)
response.raise_for_status()
data = response.json()
if get_total:
return data['totalPages']
ids = [customer['id'] for customer in data['customers']]
next_page_uri = data.get('nextPageUri')
return ids, next_page_uri
except requests.exceptions.RequestException as e:
print(f"Failed to fetch data for page {page}: {e}")
return [], None

# Function to fetch detailed information for a single customer with retry
def fetch_details(id, retries=5):
for attempt in range(retries):
try:
response = session.get(f"{base_api_url}/{id}", timeout=30)
response.raise_for_status()
data = response.json()

basic_data = {
"id": data.get("id"),
"type": data.get("type"),
"name": data.get("name"),
"latitude": data.get("latitude"),
"longitude": data.get("longitude"),
"alsoKnownAs": data.get("alsoKnownAs")
}
return (basic_data)

except requests.exceptions.RequestException as e:
if hasattr(response, 'status_code') and response.status_code == 429:
time.sleep(1.33 ** attempt)
else:
print(f"Request exception for customer {id}: {e}")
break
return None

# Function to process and write a page of customer data
def process_and_write_page(ids):
basic_data_list = []
inspection_areas_list = []

with ThreadPoolExecutor(max_workers=10) as executor:
future_to_id = {executor.submit(fetch_location_details, id): id for id in ids}
for future in as_completed(future_to_id):
id = future_to_id[future]
try:
basic_data = future.result()
if basic_data:
basic_data_list.append(basic_data)
})

except Exception as e:
print(f"Exception fetching details for {id}:  {e}")

basic_schema = StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("name", StringType(), True),
StructField("latitude", StringType(), True),
StructField("longitude", StringType(), True),
StructField("alsoKnownAs", StringType(), True)
])

basic_df = pd.DataFrame(basic_data_list)

if not basic_df.empty:
# Problem seems to occur here, with createDataFrame()
basic_spark_df = spark.createDataFrame(basic_df, schema=basic_schema)
basic_spark_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable("api_test")

total_pages = fetch_ids(get_total=True)
display(total_pages)
# 292 pages - at 100 records per page, should return ~29200 reocrds (API contains 29126)
# (Only ~11,000 records in table)

# Fetch and process data page by page
for p in tqdm(range(1, total_pages+1), desc='Fetching pages', unit='page'):
ids, next_page_uri = fetch_ids(page=p)
if not ids or not next_page_uri:
display("Invalid ids or next_page_uri value")
break
# Directly process and write page data
process_and_write_page(ids)
Я попытался изменить элементы Latitude и долготы в basic_schema на floattype ; Однако это дает другую ошибку:

Код: Выделить всё

AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'latitude' and 'latitude'


Подробнее здесь: https://stackoverflow.com/questions/794 ... ray-string
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • 3D Стрелка Стрелка
    Anonymous » » в форуме Python
    0 Ответы
    27 Просмотры
    Последнее сообщение Anonymous
  • 3D Стрелка Стрелка
    Anonymous » » в форуме Python
    0 Ответы
    28 Просмотры
    Последнее сообщение Anonymous
  • Почему группировка серии Pandas с использованием одной и той же серии не имеет смысла?
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Почему группировка серии Pandas с использованием одной и той же серии не имеет смысла?
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Вычитание серии панд из всех элементов другой серии панд с общим идентификатором
    Anonymous » » в форуме Python
    0 Ответы
    44 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»