У меня есть приложение Flask, в котором потребитель Kafka получает сообщения. Я хочу поместить эти сообщения в очередь, и фоновый поток переместит их в базу данных sqlite в зависимости от второстепенной логики. Глядя на Celery, кажется, что мне понадобится Redis в качестве брокера и я настрою все это в своем приложении. Просто мне кажется, что это много. Мне просто нужна задача kafka, чтобы отправить сообщение в список, а затем другой поток/гевент/мультипроцесс(?) прочитает его и вставит в базу данных
Код: Выделить всё
def kafka_read(consumer):
while True:
message = consumer.poll(0.1)
if message is not None:
if condition:
some_q_data_structure.push(message)
else:
some_other_q_data_structure.push(message)
def q_process1():
while True:
message = some_q_data_structure.pop
# logic
dbase.add(message)
def q_process2():
while True:
message = some_other_q_data_structure.pop
#some other logic
dbase.add(message)
Все это работает на Gunicorn с 1 рабочим. Я создаю события и переключаюсь между ними для выполнения различных задач. Я изучил сельдерей, и хотя я могу сделать это с помощью Celery, это кажется излишним, поскольку мне также придется подключить Redis. Я не очень хорошо знаком с многопроцессорностью/поточностью в Python, поэтому любые советы помогут.
Посмотрел здесь Celery и другие очереди задач -
https://www.fullstackpython.com/task-queues .html
Посмотрел
https://docs.python.org/3/library/multiprocessing.html
Подробнее здесь:
https://stackoverflow.com/questions/786 ... asks-celer