Ресурс 429 исчерпан (например, квота проверки): как обрабатывать ошибки 429, несмотря на соблюдение ограничений скоростиPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Ресурс 429 исчерпан (например, квота проверки): как обрабатывать ошибки 429, несмотря на соблюдение ограничений скорости

Сообщение Anonymous »

Я работаю с API (Gemini 1.5 Pro), который имеет ограничение скорости в 1000 вызовов в минуту на один ключ API (по крайней мере, там так указано). Мне нужно как можно быстрее обработать 14 000 записей из файла Excel.
Чтобы справиться с ограничениями скорости, я:
Использую несколько ключей API ( 5 ключей хранятся в файле TXT).
Распределяйте вызовы равномерно по клавишам, гарантируя, что ни один ключ не превысит лимит в 1000 вызовов в минуту.
Добавьте задержку time.sleep(0,1) между вызовами для каждого ключа, чтобы оставаться в рамках ограничение скорости.
Однако даже при наличии всего одного ключа API и правильно настроенного time.sleep я все равно сталкиваюсь с ошибками 429: Ресурс исчерпан.
Зачем мне нажимать на ограничения скорости? даже с учетом этих мер предосторожности? Существует ли недокументированное ограничение скорости или я что-то упускаю в расчете ограничений скорости?
Будем признательны за любые советы или предложения по отладке.
import os
import pandas as pd
import google.generativeai as genai
import threading
import queue
import time
import math

# --------------------------------------------------------------------------
# KONFIGURATION
# --------------------------------------------------------------------------

# Pfad zu den API-Keys
api_keys_path = os.path.join(os.path.dirname(__file__), '..', 'api_keys_multiplex.txt')

with open(api_keys_path, 'r') as file:
api_keys = [k.strip() for k in file if k.strip()]

if not api_keys:
raise ValueError("Keine API-Keys in api_keys_multiplex.txt gefunden.")

# Ziel: ~1000 Requests/Minute => ca. 16.67 Requests/Sek => alle ~0.06s ein Token
TOKENS_PER_MINUTE = 1000
TOKEN_INTERVAL = 60.0 / TOKENS_PER_MINUTE # ~0.06 Sek pro Token

# Anzahl Worker-Threads pro Key (je mehr, desto besser wird die Wartezeit überbrückt)
WORKERS_PER_KEY = 5

# System Prompt für Gemini
system_instructions = """
You are an AI assistant tasked with analyzing Bombora Intent Topics and determining whether they are relevant to the business focus of the company described in the user prompt.

### Input:
You will receive the following columns for each topic:
- **Theme**: The overarching theme (e.g., BioTech, Software).
- **Category**: The subcategory under the theme.
- **Topic Name**: The specific name of the topic.
- **Description**: A description of the topic's meaning.

### Task:
Your task is to:
1. Analyze the topic based on the information provided in the columns.
2. Strictly determine whether the topic aligns with the company's purpose as described in the user prompt.

### Notes:
- Be very strict. Only mark topics as relevant if they have a direct and clear connection to the company's purpose.
- Respond with "Yes" or "No" only, without any explanations.
"""

generation_config = {
"temperature": 0.3,
"top_p": 0.9,
"top_k": 40,
"max_output_tokens": 5,
"response_mime_type": "text/plain",
}

current_dir = os.path.dirname(__file__)
excel_files = [f for f in os.listdir(current_dir) if f.endswith('.xlsx')]
if len(excel_files) != 1:
raise ValueError("Es muss genau eine Excel-Datei im aktuellen Verzeichnis vorhanden sein!")

file_name = excel_files[0]
output_file_name = "filtered_output.xlsx"

theme_col = "Theme"
category_col = "Category"
topic_name_col = "Topic Name"
description_col = "Description"

data_df = pd.read_excel(os.path.join(current_dir, file_name))

# Queue mit allen Zeilen
task_queue = queue.Queue()
for index, row in data_df.iterrows():
theme = row[theme_col] if pd.notna(row[theme_col]) else ""
category = row[category_col] if pd.notna(row[category_col]) else ""
topic_name = row[topic_name_col] if pd.notna(row[topic_name_col]) else ""
description = row[description_col] if pd.notna(row[description_col]) else ""
task_queue.put((index, theme, category, topic_name, description))

results = {}
results_lock = threading.Lock()

# Token Bucket Mechanismus pro Key
class TokenBucket:
def __init__(self, tokens_per_minute=TOKENS_PER_MINUTE):
self.rate = tokens_per_minute / 60.0 # tokens per second
self.capacity = tokens_per_minute
self.tokens = tokens_per_minute
self.last_refill = time.time()
self.lock = threading.Lock()

def get_token(self):
# Versuche ein Token zu bekommen, blockiere wenn none verfügbar
while True:
with self.lock:
now = time.time()
# Tokens nachfüllen
elapsed = now - self.last_refill
# Anzahl tokens, die man seit letztem refill hinzufügen kann
new_tokens = elapsed * self.rate
if new_tokens > 0:
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now

if self.tokens >= 1:
self.tokens -= 1
return
time.sleep(0.1) # kurz warten und erneut versuchen

def worker(api_key, task_queue, results, results_lock, token_bucket):
# Jede Worker-Thread hat seine eigene Session mit diesem Key
genai.configure(api_key=api_key)
model = genai.GenerativeModel(
model_name="gemini-1.5-pro-002",
system_instruction=system_instructions,
generation_config=generation_config,
)
chat_session = model.start_chat()

while True:
try:
index, theme, category, topic_name, description = task_queue.get_nowait()
except queue.Empty:
break

user_prompt = f"""
We are a company focused on process optimization using digital solutions and artificial intelligence. Our goal is to streamline business operations and improve efficiency through advanced technologies.

Topic Details:
- **Theme**: {theme}
- **Category**: {category}
- **Topic Name**: {topic_name}
- **Description**: {description}

Does this topic directly align with our company's purpose?
"""

try:
# Bevor wir den Request senden, holen wir uns ein Token
token_bucket.get_token()

response = chat_session.send_message(user_prompt)
relevance = response.text.strip()

with results_lock:
results[index] = relevance

print(f"Zeile {index} klassifiziert: {relevance}")

except Exception as e:
with results_lock:
results[index] = f"Error: {e}"
print(f"Zeile {index} Fehler: {e}")

finally:
task_queue.task_done()

# Für jeden Key einen eigenen TokenBucket erstellen
token_buckets = [TokenBucket(tokens_per_minute=TOKENS_PER_MINUTE) for _ in api_keys]

threads = []
# Für jeden Schlüssel mehrere Worker starten
for key, bucket in zip(api_keys, token_buckets):
for _ in range(WORKERS_PER_KEY):
t = threading.Thread(target=worker, args=(key, task_queue, results, results_lock, bucket))
t.start()
threads.append(t)

# Warten bis alles abgearbeitet ist
for t in threads:
t.join()

relevances = [results for i in range(len(data_df))]
data_df["Relevance"] = relevances
filtered_df = data_df[data_df["Relevance"] == "Yes"]
filtered_df.to_excel(os.path.join(current_dir, output_file_name), index=False)

print(f"Prozess abgeschlossen. Gefilterte Datei gespeichert: {output_file_name}")


Подробнее здесь: https://stackoverflow.com/questions/792 ... errors-des
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»