Невозможно расколоть локальную функцию при отправке объектов Callible Filter через многопроцессорную очередьPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Невозможно расколоть локальную функцию при отправке объектов Callible Filter через многопроцессорную очередь

Сообщение Anonymous »

Описание задачи
Я разрабатываю фильтровую книгу класс, который управляет вызовами фильтрами для абонентских процессов. Брокер получает функции, завернутые в объект фильтра через очередь сообщения. Тем не менее, я сталкиваюсь с ошибкой маринации при попытке отправить локально определенную функцию: < /p>

Код: Выделить всё

AttributeError: Can't get local object 'task..function'
< /code>
Ошибка возникает потому, что функция определяется внутри другой функции (task()
) и поэтому не замайт, но мне нужно поддерживать отправку лямбды и локально определенные функции.from threading import Thread
from multiprocessing import Queue, Manager, Process
from dataclasses import dataclass
from typing import Optional
import logging
import inspect

@dataclass
class Service:
id: Optional[int] = None
name: str = ""
port: int = 0

class Filter:
def __init__(self, filter_function: callable):
self.filter_function: callable = filter_function
self.subscribers: list[Service] = []

def __call__(self, *args, **kwds):
return self.filter_function(*args, **kwds)

class FilterBroker(Thread):
def __init__(self, queue: Queue) -> None:
super().__init__()
self.queue = queue
self.filters: dict[str, Filter] = {}

def add_filter(self, name: str, filter: Filter):
if len(inspect.signature(filter).parameters) != 2:
raise TypeError("Invalid Filter: must have exactly two parameters")
self.filters[name] = filter

def run(self):
class_name = self.__class__.__name__
logging.info(f"[{class_name}]: Process started")
while True:
try:
task = self.queue.get()
logging.debug(f"[{class_name}]: Task received: {task}")
if task is None:
break
if not isinstance(task, tuple) or not callable(task[0]) or not isinstance(task[1], Queue):
continue
response_queue, method, *args = task
response = method(self, *args)
except Exception:
response = None
finally:
response_queue.put_nowait(response)

@staticmethod
def ask(fb: 'FilterBroker', *task):
response_queue = Manager().Queue()
fb.queue.put((response_queue, *task))
print("I put in queue")
result = response_queue.get()
print("I got result")
response_queue.close()
return result

manager = Manager()
broker = FilterBroker(manager.Queue())
broker.start()

def task(broker):
def function(x):
return x > 0

f = Filter(function)
print(f(2))
FilterBroker.ask(broker, FilterBroker.add_filter, 'test', f)
logging.debug(f"Filter added")

process = Process(target=task, args=(broker,))

process.start()
process.join()

print("Process finished")
< /code>
полная ошибка Traceback < /h2>
Traceback (most recent call last):
File "/usr/lib64/python3.13/multiprocessing/process.py", line 313, in _bootstrap
self.run()
~~~~~~~~^^
File "/usr/lib64/python3.13/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/****/Scrivania/github/ctf_proxy/refactoring/test.py", line 22, in task
fb.ask(broker, fb.add_filter, 'test', f)
~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home//****/Scrivania/github/ctf_proxy/refactoring/proxy/multiprocess/FilterBroker.py", line 299, in ask
fb.queue.put((response_queue, *task))
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
File "", line 2, in put
File "/usr/lib64/python3.13/multiprocessing/managers.py", line 830, in _callmethod
conn.send((self._id, methodname, args, kwds))
~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.13/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
~~~~~~~~~~~~~~~~~~~~~^^^^^
File "/usr/lib64/python3.13/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
~~~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get local object 'task..function'
< /code>
Вопрос < /h2>
Как я могу изменить свой код для поддержки отправки локально определенных функций и лямбдов посредством многопроцессы? Я хочу, чтобы подписчики имели возможность регистрировать и извлекать функции пользовательских фильтров без необходимости определения их на уровне модуля.


Подробнее здесь: https://stackoverflow.com/questions/795 ... multiproce
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»