Высокопроизводительный веб-сокет на Node JS – задержка пропускной способностиJavascript

Форум по Javascript
Ответить
Anonymous
 Высокопроизводительный веб-сокет на Node JS – задержка пропускной способности

Сообщение Anonymous »

У меня есть веб-сокет, который отправляет мне много сообщений.
Мне нужно было создать Node JS-клиент, который будет:
  • получать сообщение
  • добавить некоторые свойства
  • опубликовать его в определенной теме Kafka
сервис работает в docker-контейнере.
количество сообщений меняется в течение дня, но имеет очень большой пик в 14:30, около 30 000 сообщений в секунду, в течение примерно 2 минут, а затем исчезает.
Мое текущее решение состоит из родительского процесса, который подключается к веб-сокету, получает сообщения и отправляет их по мере их поступления (через IPC) дочернему процессу.
дочерний процесс добавляет необходимые свойства и отправляет их через производителя Kafka.
проблема: в пиковое время данные публикуются в Kafka через 6–10 секунд после поступления сообщения (предположительно) в вебсокет, что недопустимо.Я пробовал: логику одного/множества процессов, настройки производителя с высокой пропускной способностью Kafka, библиотеку npm fast-json-stringify, но мне все равно не удалось избежать этой задержки.
контейнер работает в Google облачная виртуальная машина с 4 ядрами ЦП и 16 ГБ оперативной памяти.
как избежать этой задержки?
server.js код:
import websocket from "websocket";

import path from "path";
import { fileURLToPath } from "url";
import cp from "child_process";
import { resolve } from "path";
const __filename = fileURLToPath(import.meta.url); // get the resolved path to the file
const __dirname = path.dirname(__filename); // get the name of the directory
const worker = cp.fork(resolve(__dirname, "worker.js"));

const url = "myurl";

let ws;

function createSocket() {
ws = new websocket.w3cwebsocket(url);

ws.onerror = (err) => console.log(`${prefix}error in socket`, err);
ws.onclose = (msg) => {
const { code } = msg;
console.log(`${prefix}Connection closed | reason: "${JSON.stringify(code)};"`);
setTimeout(function () {
createSocket();
}, 1000);
};
ws.onmessage = (msg) => {
worker.send(msg);
}
}

worker.js код:
import kafka from "./kafka.js";

async function run() {
process.on("message", async (input) => {
try {
const msg = {
src: "i",
sym: input.sym,
p: input.p,
s: input.s,
t: input.t,
vt: input.tv,
ot: Date.now(),
};
kafka.producer.send({
topic: "i",
messages: [{ key: msg.sym, value: JSON.stringify(msg) }],
});
} catch (err) {
console.log(`service general error`);
}
});
}

run().catch((e) => console.error(`[worker/producer] ${e.message}`, e));


kafka.js код:
import dotenv from "dotenv";
import { KafkaJS } from "@confluentinc/kafka-javascript";

dotenv.config();

const Kafka = KafkaJS.Kafka;
const kafakStr = "ws";

const kafka = new Kafka({
kafkaJS: {
clientId: kafakStr,
brokers: [process.env.KAFKA],
},
});

const kafkaObj = {
producer: kafka.producer({
"batch.size": 100000,
"linger.ms": 50,
"compression.type": "lz4",
kafkaJS: { acks: 1 },
}),
consumer: kafka.consumer({ kafkaJS: { groupId: kafakStr } }),
};

export default kafkaObj;



Подробнее здесь: https://stackoverflow.com/questions/793 ... throughput
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Javascript»