Почему я сталкиваюсь с [Errno 32] Сломанная труба, когда запускаю новый процесс?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Почему я сталкиваюсь с [Errno 32] Сломанная труба, когда запускаю новый процесс?

Сообщение Anonymous »

Я использую confluent-kafka для обработки сообщений из темы Kafka.
Вот шаблон кода:
from multiprocessing import Process, Manager

def handle_records(records):
for record in records:
# extracted all details for each record. Hidden here.
process = Process(
target=process_msg,
args=(
partition,
offset,
headers_dict,
msg_key,
msg,
shared_dict,
),
)
p.start()
# I am joining this properly

def foo(shared_dict, partition_num):
consumer = create_consumer()
consumer.assign([TopicPartition(topic, partition_num)]

while True:
# here I am consuming 2 messages each based on some calculation.

records = consumer.consume(num_messages=2, timeout=1)
if not records:
continue
handle_records(records)

def main(shared_dict):
total_partitions = 8 # Calculated using kafka method. Directly written here
total_processes = []

for partition_num in range(total_partitions):
p = Process(target=foo, args=(shared_dict, partition_num), deamon=False)
p.start()
total_processes.append(p)

for p in total_processes:
p.join()

if __name__ == "__main__":
manager = Manager()
shared_dict = manager.dict()
# Added some key-value pairs to shared_dict
main(shared_dict)

Здесь я просто привел код шаблона. Ошибка, с которой я столкнулся, заключается в том, что я создаю 8 процессов из-за 8 разделов. И каждый процесс порождает 2 процесса в зависимости от количества использованных им записей. Мой код работал нормально первые 30/40 минут. Позже я сталкиваюсь с ошибкой Broken Pipe, когда запускаю новый процесс в handle_records.
Примечание. В методе handle_records я «присоединяюсь» к процессам, которые были созданы после обработки. мое сообщение. И снова я создам новый процесс, когда получу/потреблю новые записи из темы. Это итеративный процесс. Позже мне удалось сделать это для некоторых записей.
Если мой потребитель не получит никаких сообщений в течение часа, я закрою потребителя и присоединюсь к породившим процессам. в основной функции.
2024-07-17 10:34:41, 480, Process-2, handle_records, Exception raised in handle_records: [Errno 32] Broken pipe
Traceback (most recent call last) :
File "kafka_main.py", line 2380, in handle_records
process.start ()
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/process.py", line 121, in start
self. popen = self. Popen (self)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
return default_context.get_context () . Process ._ Popen (process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/context.py", line 284, in Popen
return Popen (process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in init
super (). _init_(process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/popen_fork.py", line 16, in init
util. flush_std_streams ()
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/util.py", line 439, in _flush_std_streams
sys.stderr. flush ()
BrokenPipeError: [Errno 32] Broken pipe


Подробнее здесь: https://stackoverflow.com/questions/787 ... ew-process
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Почему я сталкиваюсь с [Errno 32] Сломанная труба, когда запускаю новый процесс?
    Anonymous » » в форуме Python
    0 Ответы
    23 Просмотры
    Последнее сообщение Anonymous
  • Маним — BrokenPipeError: [Errno 32] Сломанная труба
    Anonymous » » в форуме Python
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous
  • Сокет Python errno 32, сломанная труба
    Anonymous » » в форуме Python
    0 Ответы
    22 Просмотры
    Последнее сообщение Anonymous
  • FWRITE () отправка 40 байтов не удалась с Errno = 32 сломанная труба
    Anonymous » » в форуме Php
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • «BrokenPipeError: [Errno 32] Сломанная труба» при отправке сценариев Python на петле
    Anonymous » » в форуме Python
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»