Я вроде новичок в Фаусте, и мне нужно мое приложение для обработки сообщений Kakfa с использованием различных групп потребителей. Мне нужно иметь две разные группы потребителей в одной и той же программе, но, поскольку группа потребителей для каждой темы Kafka совпадает с идентификатором приложения Faust, мне нужны два приложения, работающие по одной и той же программе. Моя проблема в том, что я не знаю, как инициализировать два приложения в командной строке. Это не работает, например: faust -a test_faust: app1, app2 krabiler
Это пример того, что я пытаюсь сделать: < /p>
## Script test_faust.py
import faust
< /code>
app1 = faust.App('consumer_group1', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest')
app2 = faust.App('consumer_group2', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest')
< /code>
topic1 = app1.topic('topic1', value_type=str)
topic2 = app2.topic('topic2', value_type=str)
< /code>
@app1.agent(topic1)
async def process1(stream):
async for value in stream:
print(f'App1: {value}')
< /code>
@app2.agent(topic2)
async def process2(stream):
async for value in stream:
print(f'App2: {value}')
< /code>
Any ideas how I can initialize both apps with the same command?Or alternatively, using the same app, how can i create a different consumer groups for each topic? Is there an option to do something like
topic1 = app.topic('topic1', value_type=str, consumer_id='consumer_group1')
< /code>
topic2 = app.topic('topic2', value_type=str, consumer_id='consumer_group2')
< /code>
Or any way to start the two apps in the script via the command line? I'm trying
faust -A test_faust:app1,app2 worker
Я вроде новичок в Фаусте, и мне нужно мое приложение для обработки сообщений Kakfa с использованием различных групп потребителей. Мне нужно иметь две разные группы потребителей в одной и той же программе, но, поскольку группа потребителей для каждой темы Kafka совпадает с идентификатором приложения Faust, мне нужны два приложения, работающие по одной и той же программе. Моя проблема в том, что я не знаю, как инициализировать два приложения в командной строке. Это не работает, например: faust -a test_faust: app1, app2 krabiler Это пример того, что я пытаюсь сделать: < /p> [code]## Script test_faust.py import faust < /code> app1 = faust.App('consumer_group1', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest') app2 = faust.App('consumer_group2', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest') < /code> topic1 = app1.topic('topic1', value_type=str) topic2 = app2.topic('topic2', value_type=str) < /code> @app1.agent(topic1) async def process1(stream): async for value in stream: print(f'App1: {value}') < /code> @app2.agent(topic2) async def process2(stream): async for value in stream: print(f'App2: {value}') < /code> Any ideas how I can initialize both apps with the same command?Or alternatively, using the same app, how can i create a different consumer groups for each topic? Is there an option to do something like topic1 = app.topic('topic1', value_type=str, consumer_id='consumer_group1') < /code> topic2 = app.topic('topic2', value_type=str, consumer_id='consumer_group2') < /code> Or any way to start the two apps in the script via the command line? I'm trying faust -A test_faust:app1,app2 worker[/code] Но дает ошибку [code]ValueError: Component 'app1,app2' of 'test_faust:app1,app2' is not a valid identifier[/code] Спасибо
Я вроде новичок в Фаусте, и мне нужно мое приложение для обработки сообщений Kakfa с использованием различных групп потребителей. Мне нужно иметь две разные группы потребителей в одной и той же программе, но, поскольку группа потребителей для каждой...
В настоящее время у меня есть два экземпляра одного и того же приложения, работающие в двух средах (назовем их экземпляром 1 и экземпляром 2). Эти экземпляры потребляют сообщения от Kafka (разных брокеров Kafka). В журналах обоих экземпляров я вижу...
Концепция разделов и (потребительских) групп в Kafka была введена для реализации параллелизма. Я работаю с Кафкой через Python. У меня есть некая тема, имеющая (скажем) 2 раздела. Это означает, что если я создам группу потребителей с двумя...
Мне нужно получать сообщения из темы в кластере Kafka с 10 разделами, где сообщения равномерно распределяются по ним. У меня есть приложение ASP.NET с 10 фоновыми службами, где каждая служба запускает потребителя (Confluent.Kafka 2.6.1), чтобы я мог...
У меня есть приложение Spring-Boot с несколькими потребителями из одной и той же группы потребителей, использующими @KafkaListener из Spring-Kafka. После расследования я обнаружил, что на каждого потребителя создается 4 потока:...