Код: Выделить всё
class BaseInputs(BaseModel):
"""Base inputs model."""
app: str
run_id: str
score: int
tool: str
asset: str
class GA4CallbackHandler(BaseCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
if not self.has_run:
print(track_feedback("123456.7654321", 'app', 'tool', 'asset', 'standard', "user", "0").status_code)
print("feedback_sent")
class FeedbackChain(Chain):
def __init__(self) -> None:
self.runnable = (
{
"app" : itemgetter("app"),
"run_id" : itemgetter("run_id"),
"score" : itemgetter("score"),
"tool": itemgetter("tool"),
"asset": itemgetter("asset")
}
| RunnableLambda(self.send_feedback)
).with_config(run_name="feedback",callbacks=[GA4CallbackHandler()]).with_types(input_type=BaseInputs)
Как я могу заставить это выполняться только один раз в начале?< /p>
Я пытался использовать флаг в классе обработчика, но тогда он выполняется только один раз, а последующие выполнения никогда не выполняются.
Я также пытался прикрепить обратный вызов к RunnableLambda внутри цепочки self.runnable , это сработало, но выглядит грязно. Я бы хотел, чтобы он был привязан ко всей цепочке.
Подробнее здесь: https://stackoverflow.com/questions/786 ... iple-times