Какой асинхронный подход лучше подходит для декодирования сообщения Avro, полученного из темы Kafka, с точки зрения лучшей производительности и меньшей задержки?
Я использовал параллельные фьючерсы с Avro библиотеки, и я все еще вижу такое же время выполнения по сравнению с тем, которое было выполнено без использования параллельных фьючерсов.
from avro.io import BinaryDecoder, DatumReader
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
class DoIt():
def some_method():
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
msg_cnt_type_futures = [executor.submit(CatalogExporter.decode_avro, avro, self.schema_url) for avro in avro_list]
@staticmethod
def decode_avro(payload_tuple, schema_url):
# print("Process ID:", os.getpid())
msg_id, current_msg_offset, payload = payload_tuple
magic, schema_id = struct.unpack('>bi', payload[:5])
register_client = CachedSchemaRegistryClient(url=schema_url)
# Get Schema registry
# Avro value format
if magic == MAGIC_BYTES:
schema = register_client.get_by_id(schema_id)
reader = DatumReader(schema)
output = BinaryDecoder(io.BytesIO(payload[5:]))
decoded = reader.read(output)
return msg_id, current_msg_offset, decoded, schema.name
# no magic bytes, something is wrong
else:
raise ValueError
Подробнее здесь: https://stackoverflow.com/questions/788 ... -in-python
Avro: декодирование с использованием многопроцессорности в Python ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Что мне нужно установить для Avro-tools.jar, чтобы работать с Snappy-Compresed Avro-файлами?
Anonymous » » в форуме JAVA - 0 Ответы
- 28 Просмотры
-
Последнее сообщение Anonymous
-