Мне нужно было создать Node JS-клиент, который будет:
- получать сообщение
- добавить некоторые свойства
- опубликовать его в определенной теме Kafka
количество сообщений меняется в течение дня, но имеет очень большой пик в 14:30, около 30 000 сообщений в секунду, в течение примерно 2 минут, а затем исчезает.
Мое текущее решение состоит из родительского процесса, который подключается к веб-сокету, получает сообщения и отправляет их по мере их поступления (через IPC) дочернему процессу.
дочерний процесс добавляет необходимые свойства и отправляет их через производителя Kafka.
проблема: в пиковое время данные публикуются в Kafka через 6–10 секунд после поступления сообщения (предположительно) в вебсокет, что недопустимо.Я пробовал: логику одного/множества процессов, настройки производителя с высокой пропускной способностью Kafka, библиотеку npm fast-json-stringify, но мне все равно не удалось избежать этой задержки.
контейнер работает в Google облачная виртуальная машина с 4 ядрами ЦП и 16 ГБ оперативной памяти.
как избежать этой задержки?
server.js код:
import websocket from "websocket";
import path from "path";
import { fileURLToPath } from "url";
import cp from "child_process";
import { resolve } from "path";
const __filename = fileURLToPath(import.meta.url); // get the resolved path to the file
const __dirname = path.dirname(__filename); // get the name of the directory
const worker = cp.fork(resolve(__dirname, "worker.js"));
const url = "myurl";
let ws;
function createSocket() {
ws = new websocket.w3cwebsocket(url);
ws.onerror = (err) => console.log(`${prefix}error in socket`, err);
ws.onclose = (msg) => {
const { code } = msg;
console.log(`${prefix}Connection closed | reason: "${JSON.stringify(code)};"`);
setTimeout(function () {
createSocket();
}, 1000);
};
ws.onmessage = (msg) => {
worker.send(msg);
}
}
worker.js код:
import kafka from "./kafka.js";
async function run() {
process.on("message", async (input) => {
try {
const msg = {
src: "i",
sym: input.sym,
p: input.p,
s: input.s,
t: input.t,
vt: input.tv,
ot: Date.now(),
};
kafka.producer.send({
topic: "i",
messages: [{ key: msg.sym, value: JSON.stringify(msg) }],
});
} catch (err) {
console.log(`service general error`);
}
});
}
run().catch((e) => console.error(`[worker/producer] ${e.message}`, e));
kafka.js код:
import dotenv from "dotenv";
import { KafkaJS } from "@confluentinc/kafka-javascript";
dotenv.config();
const Kafka = KafkaJS.Kafka;
const kafakStr = "ws";
const kafka = new Kafka({
kafkaJS: {
clientId: kafakStr,
brokers: [process.env.KAFKA],
},
});
const kafkaObj = {
producer: kafka.producer({
"batch.size": 100000,
"linger.ms": 50,
"compression.type": "lz4",
kafkaJS: { acks: 1 },
}),
consumer: kafka.consumer({ kafkaJS: { groupId: kafakStr } }),
};
export default kafkaObj;
Подробнее здесь: https://stackoverflow.com/questions/793 ... throughput
Мобильная версия