Я использую confluent-kafka для обработки сообщений из темы Kafka.
Вот шаблон кода:
from multiprocessing import Process, Manager
def handle_records(records):
for record in records:
# extracted all details for each record. Hidden here.
process = Process(
target=process_msg,
args=(
partition,
offset,
headers_dict,
msg_key,
msg,
shared_dict,
),
)
p.start()
# I am joining this properly
def foo(shared_dict, partition_num):
consumer = create_consumer()
consumer.assign([TopicPartition(topic, partition_num)]
while True:
# here I am consuming 2 messages each based on some calculation.
records = consumer.consume(num_messages=2, timeout=1)
if not records:
continue
handle_records(records)
def main(shared_dict):
total_partitions = 8 # Calculated using kafka method. Directly written here
total_processes = []
for partition_num in range(total_partitions):
p = Process(target=foo, args=(shared_dict, partition_num), deamon=False)
p.start()
total_processes.append(p)
for p in total_processes:
p.join()
if __name__ == "__main__":
manager = Manager()
shared_dict = manager.dict()
# Added some key-value pairs to shared_dict
main(shared_dict)
Здесь я просто привел код шаблона. Ошибка, с которой я столкнулся, заключается в том, что я создаю 8 процессов из-за 8 разделов. И каждый процесс порождает 2 процесса в зависимости от количества использованных им записей. Мой код работал нормально первые 30/40 минут. Позже я сталкиваюсь с ошибкой Broken Pipe, когда запускаю новый процесс в handle_records.
Примечание. В методе handle_records я «присоединяюсь» к процессам, которые были созданы после обработки. мое сообщение. И снова я создам новый процесс, когда получу/потреблю новые записи из темы. Это итеративный процесс. Позже мне удалось сделать это для некоторых записей.
Если мой потребитель не получит никаких сообщений в течение часа, я закрою потребителя и присоединюсь к породившим процессам. в основной функции.
2024-07-17 10:34:41, 480, Process-2, handle_records, Exception raised in handle_records: [Errno 32] Broken pipe
Traceback (most recent call last) :
File "kafka_main.py", line 2380, in handle_records
process.start ()
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/process.py", line 121, in start
self. popen = self. Popen (self)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
return default_context.get_context () . Process ._ Popen (process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/context.py", line 284, in Popen
return Popen (process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in init
super (). _init_(process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/popen_fork.py", line 16, in init
util. flush_std_streams ()
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/util.py", line 439, in _flush_std_streams
sys.stderr. flush ()
BrokenPipeError: [Errno 32] Broken pipe
Подробнее здесь: https://stackoverflow.com/questions/787 ... ew-process
Почему я сталкиваюсь с [Errno 32] Сломанная труба, когда запускаю новый процесс? ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Почему я сталкиваюсь с [Errno 32] Сломанная труба, когда запускаю новый процесс?
Anonymous » » в форуме Python - 0 Ответы
- 23 Просмотры
-
Последнее сообщение Anonymous
-
-
-
«BrokenPipeError: [Errno 32] Сломанная труба» при отправке сценариев Python на петле
Anonymous » » в форуме Python - 0 Ответы
- 11 Просмотры
-
Последнее сообщение Anonymous
-