Меня попросили отладить какой -то код, который извлекает данные из API, преобразует его из JSON в Pandas, Pandas, чтобы исчез, а затем записывает в таблицу. Следующая ошибка возникает, когда она пытается преобразовать из Pandas в Spark:
Exception thrown when converting pandas.Series (float64) with name 'latitude' to Arrow Array (string).< /code>
это приводит к потере данных в таблице, на которую он пишет: API, как известно, содержит ~ 30 000 записей, но в таблице конечной работы только 11 000 записано. Я полагаю, что это связано с функцией process_and_write_page ()
сбой в середине страницы и перейти на следующий. Для анонимности детали API, но, надеюсь, это дает достаточно информации для диагностики проблемы. < /p>
import requests, json, time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
import pandas as pd
from tqdm import tqdm
# Initialize Spark session with Arrow optimization and schema auto-merge enabled
spark = SparkSession.builder.appName("app") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
.getOrCreate()
# API details
base_api_url = "https://an/api/url"
api_key = "api12345"
# Headers for the API request
headers = {
"subscriptionKey": api_key,
"userAgent": "aUserAgent"
}
# Create a session
session = requests.Session()
session.headers.update(headers)
# Function to fetch a page of IDs
def fetch_ids(page=1, per_page=100, get_total=False):
try:
response = session.get(f"{base_api_url}?page={page}&perPage={per_page}", timeout=30)
response.raise_for_status()
data = response.json()
if get_total:
return data['totalPages']
ids = [customer['id'] for customer in data['customers']]
next_page_uri = data.get('nextPageUri')
return ids, next_page_uri
except requests.exceptions.RequestException as e:
print(f"Failed to fetch data for page {page}: {e}")
return [], None
# Function to fetch detailed information for a single customer with retry
def fetch_details(id, retries=5):
for attempt in range(retries):
try:
response = session.get(f"{base_api_url}/{id}", timeout=30)
response.raise_for_status()
data = response.json()
basic_data = {
"id": data.get("id"),
"type": data.get("type"),
"name": data.get("name"),
"latitude": data.get("latitude"),
"longitude": data.get("longitude"),
"alsoKnownAs": data.get("alsoKnownAs")
}
return (basic_data)
except requests.exceptions.RequestException as e:
if hasattr(response, 'status_code') and response.status_code == 429:
time.sleep(1.33 ** attempt)
else:
print(f"Request exception for customer {id}: {e}")
break
return None
# Function to process and write a page of customer data
def process_and_write_page(ids):
basic_data_list = []
inspection_areas_list = []
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_id = {executor.submit(fetch_location_details, id): id for id in ids}
for future in as_completed(future_to_id):
id = future_to_id[future]
try:
basic_data = future.result()
if basic_data:
basic_data_list.append(basic_data)
})
except Exception as e:
print(f"Exception fetching details for {id}: {e}")
basic_schema = StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("name", StringType(), True),
StructField("latitude", StringType(), True),
StructField("longitude", StringType(), True),
StructField("alsoKnownAs", StringType(), True)
])
basic_df = pd.DataFrame(basic_data_list)
if not basic_df.empty:
# Problem seems to occur here, with createDataFrame()
basic_spark_df = spark.createDataFrame(basic_df, schema=basic_schema)
basic_spark_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable("api_test")
total_pages = fetch_ids(get_total=True)
display(total_pages)
# 292 pages - at 100 records per page, should return ~29200 reocrds (API contains 29126)
# (Only ~11,000 records in table)
# Fetch and process data page by page
for p in tqdm(range(1, total_pages+1), desc='Fetching pages', unit='page'):
ids, next_page_uri = fetch_ids(page=p)
if not ids or not next_page_uri:
display("Invalid ids or next_page_uri value")
break
# Directly process and write page data
process_and_write_page(ids)
Я попытался изменить элементы Latitude и долготы в basic_schema на floattype ; Однако это дает другую ошибку:
Меня попросили отладить какой -то код, который извлекает данные из API, преобразует его из JSON в Pandas, Pandas, чтобы исчез, а затем записывает в таблицу. Следующая ошибка возникает, когда она пытается преобразовать из Pandas в Spark:
[code]Exception thrown when converting pandas.Series (float64) with name 'latitude' to Arrow Array (string).< /code>
это приводит к потере данных в таблице, на которую он пишет: API, как известно, содержит ~ 30 000 записей, но в таблице конечной работы только 11 000 записано. Я полагаю, что это связано с функцией process_and_write_page () [/code] сбой в середине страницы и перейти на следующий. Для анонимности детали API, но, надеюсь, это дает достаточно информации для диагностики проблемы. < /p> [code]import requests, json, time from concurrent.futures import ThreadPoolExecutor, as_completed from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, FloatType
# API details base_api_url = "https://an/api/url" api_key = "api12345"
# Headers for the API request headers = { "subscriptionKey": api_key, "userAgent": "aUserAgent" }
# Create a session session = requests.Session() session.headers.update(headers)
# Function to fetch a page of IDs def fetch_ids(page=1, per_page=100, get_total=False): try: response = session.get(f"{base_api_url}?page={page}&perPage={per_page}", timeout=30) response.raise_for_status() data = response.json() if get_total: return data['totalPages'] ids = [customer['id'] for customer in data['customers']] next_page_uri = data.get('nextPageUri') return ids, next_page_uri except requests.exceptions.RequestException as e: print(f"Failed to fetch data for page {page}: {e}") return [], None
# Function to fetch detailed information for a single customer with retry def fetch_details(id, retries=5): for attempt in range(retries): try: response = session.get(f"{base_api_url}/{id}", timeout=30) response.raise_for_status() data = response.json()
except requests.exceptions.RequestException as e: if hasattr(response, 'status_code') and response.status_code == 429: time.sleep(1.33 ** attempt) else: print(f"Request exception for customer {id}: {e}") break return None
# Function to process and write a page of customer data def process_and_write_page(ids): basic_data_list = [] inspection_areas_list = []
with ThreadPoolExecutor(max_workers=10) as executor: future_to_id = {executor.submit(fetch_location_details, id): id for id in ids} for future in as_completed(future_to_id): id = future_to_id[future] try: basic_data = future.result() if basic_data: basic_data_list.append(basic_data) })
except Exception as e: print(f"Exception fetching details for {id}: {e}")
if not basic_df.empty: # Problem seems to occur here, with createDataFrame() basic_spark_df = spark.createDataFrame(basic_df, schema=basic_schema) basic_spark_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable("api_test")
total_pages = fetch_ids(get_total=True) display(total_pages) # 292 pages - at 100 records per page, should return ~29200 reocrds (API contains 29126) # (Only ~11,000 records in table)
# Fetch and process data page by page for p in tqdm(range(1, total_pages+1), desc='Fetching pages', unit='page'): ids, next_page_uri = fetch_ids(page=p) if not ids or not next_page_uri: display("Invalid ids or next_page_uri value") break # Directly process and write page data process_and_write_page(ids) [/code] Я попытался изменить элементы Latitude и долготы в basic_schema на floattype ; Однако это дает другую ошибку:
[code]AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'latitude' and 'latitude'[/code]
Я пытаюсь построить стрелку внутри сферы и иметь следующий код
u, v = np.mgrid
x = np.cos(u)*np.sin(v)
y = np.sin(u)*np.sin(v)
z = np.cos(v)
fig = pl.figure(figsize=(20,15))
ax = fig.gca(projection='3d')
ax.quiver(0, 0, 0,blocx , blocy , blocz...
Я пытаюсь построить стрелку внутри сферы и иметь следующий код
u, v = np.mgrid
x = np.cos(u)*np.sin(v)
y = np.sin(u)*np.sin(v)
z = np.cos(v)
fig = pl.figure(figsize=(20,15))
ax = fig.gca(projection='3d')
ax.quiver(0, 0, 0,blocx , blocy , blocz...
В примере кода ниже я группирую серию Pandas, используя ту же серию, но с измененным индексом.
Группы в конце не имеют смысла. Не существует предупреждения или ошибки.
Не могли бы вы помочь мне понять, что происходит? Модифицированный индекс явно...
В примере кода ниже я группирую серию Pandas, используя ту же серию, но с измененным индексом.
Группы в конце не имеют смысла. Не существует предупреждения или ошибки.
Не могли бы вы помочь мне понять, что происходит? Модифицированный индекс явно...