Как добиться неявной передачи контекста между потоками (или процессами)Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как добиться неявной передачи контекста между потоками (или процессами)

Сообщение Anonymous »

Я пересмотрел вопрос, чтобы лучше понять, как этого добиться.

Я стремлюсь разработать более интуитивно понятную кодовую базу, которая скрывает больше деталей, позволяя пользователям сосредоточиться на бизнес-логике. Первоначально я создал класс Context, которым можно управлять с помощью блока Python with. Этот класс включает метод класса Context.instance, позволяющий напрямую получать экземпляр контекста внутри блока with, тем самым устраняя необходимость непрерывной передачи значений. В настоящее время в однопроцессном однопоточном коде все работает отлично.
Однако при использовании многопоточности неожиданно работает и внутреннее соединение. Это соответствует ожиданиям Python. При внешнем присоединении возникает ошибка, которая, хотя и ожидается Python, не соответствует моей намеченной цели, поскольку во многих случаях мы не ждем завершения выполнения потоков.
Естественно, многопроцессорность создает проблемы. Предположим, я инициализирую контекст в основном потоке и ввожу контекст с помощью блока with. Внутри процесса я не могу получить доступ к контексту. Это соответствует предполагаемому поведению Python. Однако я хочу его изменить.
Поэтому я переработал класс Worker, реализовав метод экземпляра inject(self, *contexts) для внедрения контекстов. Этот метод управляется Worker для __enter__ и __exit__. Теперь мой код работает так, как задумано.
Тем не менее, я считаю это решение неэлегантным. Мне нужен способ неявного внедрения контекста в потоки и процессы, чтобы гарантировать правильную работу экземпляра во всех потоках.
Ниже приведен полный минимальный пример, который я предоставил, с весь код готов к непосредственному запуску:
import contextlib
import multiprocessing
import threading
from abc import ABC
from typing import List
from typing import Optional
from typing import TypeVar

T = TypeVar('T')
ContextType = TypeVar('ContextType', bound='AbstractContext')
print_ = print
print = lambda *args, **kwargs: print_(f"{multiprocessing.current_process().name}:{threading.current_thread().name}>>>",
*args, **kwargs)

class AbstractContext(ABC):
_global_stack = []
_lock = threading.Lock()

@classmethod
def instance(cls: type[T]) -> Optional[T]:
with cls._lock:
if not cls._global_stack:
return None
return next((ctx for ctx in reversed(cls._global_stack) if isinstance(ctx, cls)), None)

@classmethod
def stack(cls: type[T]) -> List[T]:
with cls._lock:
return [ctx for ctx in cls._global_stack if isinstance(ctx, cls)]

def __enter__(self):
with self._lock:
print("Entering context", self)
self._global_stack.append(self)
return self

def __exit__(self, exc_type, exc_value, traceback):
with self._lock:
print("Exiting context", self)
self._global_stack.pop()
return False

class AbstractWorker:

def __init__(self):
self._injected_contexts = []
self._stack = contextlib.ExitStack()

def inject(self, *contexts: ContextType):
self._injected_contexts.extend(contexts)
return self

def __enter__(self):
self._stack.__enter__()
for context in self._injected_contexts:
self._stack.enter_context(context)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._stack.__exit__(exc_type, exc_val, exc_tb)
return False

class Context(AbstractContext):
def __init__(self, value: str):
self.value = value

class Context2(AbstractContext):
def __init__(self, value: str):
self.value = value

class Worker(AbstractWorker):
def start(self):
with self:
self.run()

def run(self):
print(f"{multiprocessing.current_process()}, {threading.current_thread()} is running")
try:
assert Context.instance() and Context.instance().value == "test", Context.instance()
assert Context2.instance() and Context2.instance().value == "test2", Context2.instance()
except AssertionError as e:
print("AssertionError:", f"{e}".replace('\n', ' '))
raise Exception(f"{multiprocessing.current_process()}, {threading.current_thread()}")
print(f"{multiprocessing.current_process()}, {threading.current_thread()} is exiting")

def test():
print()
with Context("test"):
with Context2("test2"):
thread = threading.Thread(target=Worker().start, name="test")
thread.start()
thread.join() # if thread.join() in there, everything is fine
# I want to use multiprocessing.Process, as threading.Thread.
with Context("test"):
with Context2("test2"):
process = multiprocessing.Process(
target=Worker()
# .inject(Context.instance(), Context2.instance()) # if inject in there, will enter twice and exit twice
.start, name="test")
process.start() # I want that: if process.join() in there, everything is fine
process.join() # will raise AssertionError
process = multiprocessing.Process(
target=Worker()
.inject(Context("test"), Context2("test2"))
.start, name="test-2")
process.start()
process.join()

Ниже представлена ​​исходная версия проблемы, которую я сохранил для справки.

Семантика проектирования контекстного менеджера
with WorkerContext():
worker = Worker()
thread = Thread()
thread.bind(worker)
thread.start()

# Worker class' run method
class Worker:
def run(self):
# Should this code be inside WorkerContext?

В моем WorkerContext, который основан на классе AbstractContext, метод экземпляра() позволяет получить текущий экземпляр контекста даже после вызывается __exit__() контекстного менеджера. Я хочу, чтобы worker.run() мог получить доступ к экземпляру контекста через WorkerContext.instance() во время его выполнения.
Однако в соответствии с контекстом Python семантики менеджера, после вызова __exit__() считается, что менеджер контекста завершил работу, и, как правило, экземпляр больше не является действительным. Мой вопрос: должен ли я намеренно нарушать семантику контекстного менеджера Python, чтобы экземпляр контекста оставался доступным после __exit__()?
Был бы этот дизайн разумным или он мог бы ввести потенциальные проблемы?
class AbstractContext(ABC):
"""
Base class for context managers.

Notes:
TODO: Support cross-process usage.
TODO: Support cross-machine usage.
TODO: Support cross-cluster usage.
"""
_global_stack = []
_lock = threading.Lock()

@classmethod
def instance(cls: type[T]) -> Optional[T]:
with cls._lock:
if not cls._global_stack:
return None
return next((ctx for ctx in reversed(cls._global_stack) if isinstance(ctx, cls)), None)

@classmethod
def stack(cls: type[T]) -> List[T]:
with cls._lock:
return [ctx for ctx in cls._global_stack if isinstance(ctx, cls)]

Или более дружелюбный способ описать проблему: является ли ее область действия динамической или статической, хотя это описание может быть немного неправильным, я считаю, что оно вполне отражает суть проблемы.< /p>
Когда я запускаю поток в WorkContext и выполняю его, я, естественно, думаю, что он выполняется в WorkContext, который является статическим.
Но на самом деле , он просто запускает поток, а затем завершает его, при этом внутренняя работа потока на самом деле не выполняется здесь, как мы могли бы себе представить. Я действительно не знаю, как здесь спроектировать его поведение.
Примечание: я не ищу конкретного решения, а скорее хочу понять, что представляет собой элегантный подход.

Я прикрепил контент и теперь это стало так:
worker = Worker()
thread = Thread()
thread.bind(worker)
worker.inject(WorkerContext())
thread.start()

class AbstractWorker:

def __init__(self):
self._injected_contexts = []
self._stack = contextlib.ExitStack()

def inject(self, *contexts: Context): # TODO: add ContextType
self._injected_contexts.extend(contexts)

def __enter__(self):
self._stack.__enter__()
for context in self._injected_contexts:
self._stack.enter_context(context)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._stack.__exit__(exc_type, exc_val, exc_tb)
return False


Подробнее здесь: https://stackoverflow.com/questions/790 ... -processes
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»