Я разрабатываю фильтровую книгу класс, который управляет вызовами фильтрами для абонентских процессов. Брокер получает функции, завернутые в объект фильтра через очередь сообщения. Тем не менее, я сталкиваюсь с ошибкой маринации при попытке отправить локально определенную функцию: < /p>
Код: Выделить всё
AttributeError: Can't get local object 'task..function'
< /code>
Ошибка возникает потому, что функция определяется внутри другой функции (task()
from multiprocessing import Queue, Manager, Process
from dataclasses import dataclass
from typing import Optional
import logging
import inspect
@dataclass
class Service:
id: Optional[int] = None
name: str = ""
port: int = 0
class Filter:
def __init__(self, filter_function: callable):
self.filter_function: callable = filter_function
self.subscribers: list[Service] = []
def __call__(self, *args, **kwds):
return self.filter_function(*args, **kwds)
class FilterBroker(Thread):
def __init__(self, queue: Queue) -> None:
super().__init__()
self.queue = queue
self.filters: dict[str, Filter] = {}
def add_filter(self, name: str, filter: Filter):
if len(inspect.signature(filter).parameters) != 2:
raise TypeError("Invalid Filter: must have exactly two parameters")
self.filters[name] = filter
def run(self):
class_name = self.__class__.__name__
logging.info(f"[{class_name}]: Process started")
while True:
try:
task = self.queue.get()
logging.debug(f"[{class_name}]: Task received: {task}")
if task is None:
break
if not isinstance(task, tuple) or not callable(task[0]) or not isinstance(task[1], Queue):
continue
response_queue, method, *args = task
response = method(self, *args)
except Exception:
response = None
finally:
response_queue.put_nowait(response)
@staticmethod
def ask(fb: 'FilterBroker', *task):
response_queue = Manager().Queue()
fb.queue.put((response_queue, *task))
print("I put in queue")
result = response_queue.get()
print("I got result")
response_queue.close()
return result
manager = Manager()
broker = FilterBroker(manager.Queue())
broker.start()
def task(broker):
def function(x):
return x > 0
f = Filter(function)
print(f(2))
FilterBroker.ask(broker, FilterBroker.add_filter, 'test', f)
logging.debug(f"Filter added")
process = Process(target=task, args=(broker,))
process.start()
process.join()
print("Process finished")
< /code>
полная ошибка Traceback < /h2>
Traceback (most recent call last):
File "/usr/lib64/python3.13/multiprocessing/process.py", line 313, in _bootstrap
self.run()
~~~~~~~~^^
File "/usr/lib64/python3.13/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/****/Scrivania/github/ctf_proxy/refactoring/test.py", line 22, in task
fb.ask(broker, fb.add_filter, 'test', f)
~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home//****/Scrivania/github/ctf_proxy/refactoring/proxy/multiprocess/FilterBroker.py", line 299, in ask
fb.queue.put((response_queue, *task))
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
File "", line 2, in put
File "/usr/lib64/python3.13/multiprocessing/managers.py", line 830, in _callmethod
conn.send((self._id, methodname, args, kwds))
~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.13/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
~~~~~~~~~~~~~~~~~~~~~^^^^^
File "/usr/lib64/python3.13/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
~~~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get local object 'task..function'
< /code>
Вопрос < /h2>
Как я могу изменить свой код для поддержки отправки локально определенных функций и лямбдов посредством многопроцессы? Я хочу, чтобы подписчики имели возможность регистрировать и извлекать функции пользовательских фильтров без необходимости определения их на уровне модуля.
Подробнее здесь: https://stackoverflow.com/questions/795 ... multiproce