Anonymous
Клиент Socket.io, неспособный получать события, перестает работать через некоторое время. Но когда клиент излучает, он р
Сообщение
Anonymous » 20 янв 2025, 16:34
Я работаю над приложением для чата. где я использую различные события сокетов для отправки сообщения от клиента к серверу через сокеты. Вот как это работает, запрос на отправку клиента с помощью jwt на сокете. Я проверяю, подключен ли он раньше или нет, если он просто удаляет этот сеанс, создает новый сеанс и сохраняет его информацию в Redies. Вот такой код файла моего сервера
Код: Выделить всё
require("dotenv").config();
const port = 8181;
const cluster = require("cluster");
const io_redis = require("socket.io-redis");
const num_processes = require("os").cpus().length;
import * as net from "net";
import cors from "cors";
import * as http from "http";
import router from "./routes";
import { PeerServer } from "peer";
import express from "express";
import * as socket from "socket.io";
import * as farmhash from "farmhash";
import cookieParser from "cookie-parser";
import { socketMain } from "./socket.io/socketMain";
import { inititalizeMongoDb } from "./database/mongoInstance";
import { isAuthSocket } from "./middlewares/isAuthSocket.middleware";
import { deleteOldMessageCron } from './services/deletemessagecron';
import { createClient, RedisClientType } from "redis";
// Create the Redis client using a URL
const redisClient: RedisClientType = createClient({ url: '' });
redisClient.connect();
(async () => {
if (cluster.isMaster) {
const workers: any = [];
const spawn = (i: number) => {
workers[i] = cluster.fork();
workers[i].on("exit", () => {
console.log("respawning worker", i);
spawn(i);
});
};
for (var i = 0; i < num_processes; i++) {
spawn(i);
}
const worker_index = (ip: string, len: number) => {
return farmhash.fingerprint32(ip) % len;
};
const server: net.Server = net.createServer(
{ pauseOnConnect: true },
(connection: net.Socket) => {
const worker =
workers[worker_index(connection.remoteAddress, num_processes)];
worker.send("sticky-session:connection", connection);
}
);
server.listen(port);
console.log(`Master listening on port ${port}`);
} else {
let app = express();
app.use(express.json({ limit: '50mb' }));
app.use(express.urlencoded({ limit: '50mb', extended: true }));
app.use(cookieParser());
app.use(
cors({
origin: [ "file://", "http://localhost", "http://localhost:3000"],
credentials: true,
})
);
app.use("/", router);
const server: http.Server = app.listen(0, "localhost");
console.log("Worker listening...");
const io = new socket.Server(server, {
cors: {
origin: [ "file://", "http://localhost", "http://localhost:3000"],
credentials: true,
},
pingTimeout: 120000,
pingInterval: 25000,
});
io.adapter(io_redis({
url: process.env.REDIS_URL,
retryStrategy: (times: any) => {
const delay = Math.min(times * 50, 2000);
return delay;
}
}));
await inititalizeMongoDb();
deleteOldMessageCron();
io.use(isAuthSocket);
io.on("error", (err: any) => {
console.log("Socket.io Error:", err);
});
io.on("connection", (socket: socket.Socket) => {
console.log("connected to socket server", socket.id);
socketMain(io, socket, redisClient);
console.log(`connected to worker: ${cluster.worker.id}`);
});
process.on("message", (message, connection) => {
if (message !== "sticky-session:connection") {
return;
}
server.emit("connection", connection);
//@ts-ignore
connection.resume();
});
}
})();
мой файл сокетаMain, в котором обрабатываются все события
Код: Выделить всё
import { callOtherUser } from "./handlers/callOtherUser.handler";
import { disconnectVideoCall } from "./handlers/disconnectCall.handler";
import { getTotalUsers } from "./handlers/getTotalUsers.handler";
import { handleActiveSession } from "./handlers/handleActiveSession";
import { initialSocketConfig } from "./handlers/initialVerification";
import { iTextMessage } from "./handlers/iTextMessage.handler";
import { joinVideoRoom } from "./handlers/joinVideoRoom.handler";
import { rejectVideoCall } from "./handlers/rejectCall.handler";
import { socketDisconnect } from "./handlers/socketDisconnect.handler";
import { updateGroupInfo } from "./handlers/updateGroupInfo.handler";
import { updateOthersChats } from "./handlers/updateOthersChats.handler";
import { updateUserProfile } from "./handlers/updateUserProfile.handler";
import { userOnCall } from "./handlers/userOnCall.handler";
import { MarkAsReadMessage } from './handlers/MarkAsReadMessage.handler';
import { NickName } from './handlers/NickName.handler';
import { switchActiveChat } from "./handlers/switchActiveChat.handler";
import { RedisClientType } from "redis";
import { blockUser } from './blockUser.handler';
export const socketMain = async (io: any, socket: any, redisClient: RedisClientType) => {
try {
let heartbeatTimeout: NodeJS.Timeout;
const { _id, db, userPayload } = await initialSocketConfig(
io,
socket
);
handleActiveSession(io, socket, _id, userPayload?.displayName, redisClient);
socket.emit("signInSuccess", {
objectId: _id,
displayName: userPayload?.displayName,
email: userPayload?.email,
avatar: userPayload?.avatar,
createdOn: userPayload?.createdOn,
about: userPayload?.about,
lastSeen: userPayload?.lastSeen,
});
socket.on("callOtherUser", (payload: any) =>
callOtherUser(io, _id, db, payload)
);
socket.on("join-vc-room", (roomId: string, peerUserId: string) =>
joinVideoRoom(socket, roomId, peerUserId)
);
socket.on("diconnect-from-call", (roomId: string, peerUserId: string) =>
disconnectVideoCall(socket, roomId, peerUserId)
);
socket.on("reject-call", (roomId: string) =>
rejectVideoCall(socket, roomId)
);
socket.on("user-on-call", (roomId: string) => userOnCall(socket, roomId));
socket.on("getTotalUsers", () => getTotalUsers(db, socket, _id));
socket.broadcast.emit("updateTotalUsers", {
objectId: userPayload?._id,
displayName: userPayload?.displayName,
avatar: userPayload?.avatar,
createdOn: userPayload?.createdOn,
about: userPayload?.about,
lastSeen: userPayload?.lastSeen,
});
socket.on(
"updateUserProfile",
async (payload: any) => await updateUserProfile(socket, _id, payload, db)
);
socket.on("updateGroupInfo", async (payload: any) =>
updateGroupInfo(io, _id, payload, db)
);
socket.on("updateOthersChats", (payload: any) => {
console.log("updateOthersChats", payload)
updateOthersChats(db, io, _id, payload)
}
);
socket.on("iTextMessage", async (payload: any) =>
iTextMessage(io, socket, payload, db, _id)
);
socket.on("markAsReadMessage", async (payload: any) =>
MarkAsReadMessage(io, socket, payload, db, _id)
);
socket.on("setNickname", async (payload: any) =>
NickName(io, payload, db, _id)
);
socket.broadcast.emit("online", _id);
socket.on("switchActiveChat", async (payload: any) =>
switchActiveChat(io, socket, _id, db, payload)
);
socket.on("user-blocked", async (payload: any) => {
blockUser(io, socket, _id, db, payload)
})
socket.on('heartbeat', (data: any) => {
console.log('Client heartbeat:', socket.id);
// Respond immediately to client
socket.emit('heartbeat_received');
// Clear existing timeout
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
}
// Set new timeout for missed heartbeat
heartbeatTimeout = setTimeout(() => {
console.log('Client heartbeat timeout:', socket.id);
socket.disconnect(true);
}, 90000); // 90 seconds (allowing for network delays)
});
socket.on("disconnect", async (reason: any) => {
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
}
console.log("DISCONNECT REASON:", reason);
console.log("Socket Namespace:", socket.nsp.name);
console.log("Socket ID:", socket.id);
try {
await socketDisconnect(socket, _id, db, redisClient);
} catch (err) {
console.error("Error during socket disconnect:", err);
}
});
} catch (err) {
console.log("MAIN SOCKET ERR", err);
}
};
Файл handleActiveSession, в котором пользовательские подключения сохраняются или удаляются из повторных операций
Код: Выделить всё
import * as socket from "socket.io";
import {
getActiveUserByObjectId,
removeActiveUserByObjectId,
addToActiveUsers,
getActiveUsers
} from "../../utils/activeUsers";
import { RedisClientType } from "redis";
export const handleActiveSession = async (
io: socket.Server,
socket: socket.Socket,
_id: string,
Name: string,
redisClient: RedisClientType
) => {
const userKey = `user:${_id}`;
const userSession = {
socketId: socket.id,
objectId: _id,
Name: Name
};
await redisClient.set(userKey, JSON.stringify(userSession));
if (!getActiveUserByObjectId(_id)) {
console.log("New session!");
addToActiveUsers(userSession);
console.log("Active Users: ", getActiveUsers());
} else {
console.log("Prev Disconnected, New session!");
const prevSocketId = getActiveUserByObjectId(_id)?.socketId;
if (io.sockets.sockets.get(prevSocketId)) {
console.log(prevSocketId + "multipleSession disconnected");
io.sockets.sockets.get(prevSocketId).emit("multipleSession");
io.sockets.sockets.get(prevSocketId).disconnect(true);
}
removeActiveUserByObjectId(_id);
addToActiveUsers(userSession);
}
};
Подробнее здесь:
https://stackoverflow.com/questions/793 ... ime-but-wh
1737380069
Anonymous
Я работаю над приложением для чата. где я использую различные события сокетов для отправки сообщения от клиента к серверу через сокеты. Вот как это работает, запрос на отправку клиента с помощью jwt на сокете. Я проверяю, подключен ли он раньше или нет, если он просто удаляет этот сеанс, создает новый сеанс и сохраняет его информацию в Redies. Вот такой код файла моего сервера [code]require("dotenv").config(); const port = 8181; const cluster = require("cluster"); const io_redis = require("socket.io-redis"); const num_processes = require("os").cpus().length; import * as net from "net"; import cors from "cors"; import * as http from "http"; import router from "./routes"; import { PeerServer } from "peer"; import express from "express"; import * as socket from "socket.io"; import * as farmhash from "farmhash"; import cookieParser from "cookie-parser"; import { socketMain } from "./socket.io/socketMain"; import { inititalizeMongoDb } from "./database/mongoInstance"; import { isAuthSocket } from "./middlewares/isAuthSocket.middleware"; import { deleteOldMessageCron } from './services/deletemessagecron'; import { createClient, RedisClientType } from "redis"; // Create the Redis client using a URL const redisClient: RedisClientType = createClient({ url: '' }); redisClient.connect(); (async () => { if (cluster.isMaster) { const workers: any = []; const spawn = (i: number) => { workers[i] = cluster.fork(); workers[i].on("exit", () => { console.log("respawning worker", i); spawn(i); }); }; for (var i = 0; i < num_processes; i++) { spawn(i); } const worker_index = (ip: string, len: number) => { return farmhash.fingerprint32(ip) % len; }; const server: net.Server = net.createServer( { pauseOnConnect: true }, (connection: net.Socket) => { const worker = workers[worker_index(connection.remoteAddress, num_processes)]; worker.send("sticky-session:connection", connection); } ); server.listen(port); console.log(`Master listening on port ${port}`); } else { let app = express(); app.use(express.json({ limit: '50mb' })); app.use(express.urlencoded({ limit: '50mb', extended: true })); app.use(cookieParser()); app.use( cors({ origin: [ "file://", "http://localhost", "http://localhost:3000"], credentials: true, }) ); app.use("/", router); const server: http.Server = app.listen(0, "localhost"); console.log("Worker listening..."); const io = new socket.Server(server, { cors: { origin: [ "file://", "http://localhost", "http://localhost:3000"], credentials: true, }, pingTimeout: 120000, pingInterval: 25000, }); io.adapter(io_redis({ url: process.env.REDIS_URL, retryStrategy: (times: any) => { const delay = Math.min(times * 50, 2000); return delay; } })); await inititalizeMongoDb(); deleteOldMessageCron(); io.use(isAuthSocket); io.on("error", (err: any) => { console.log("Socket.io Error:", err); }); io.on("connection", (socket: socket.Socket) => { console.log("connected to socket server", socket.id); socketMain(io, socket, redisClient); console.log(`connected to worker: ${cluster.worker.id}`); }); process.on("message", (message, connection) => { if (message !== "sticky-session:connection") { return; } server.emit("connection", connection); //@ts-ignore connection.resume(); }); } })();[/code] мой файл сокетаMain, в котором обрабатываются все события [code]import { callOtherUser } from "./handlers/callOtherUser.handler"; import { disconnectVideoCall } from "./handlers/disconnectCall.handler"; import { getTotalUsers } from "./handlers/getTotalUsers.handler"; import { handleActiveSession } from "./handlers/handleActiveSession"; import { initialSocketConfig } from "./handlers/initialVerification"; import { iTextMessage } from "./handlers/iTextMessage.handler"; import { joinVideoRoom } from "./handlers/joinVideoRoom.handler"; import { rejectVideoCall } from "./handlers/rejectCall.handler"; import { socketDisconnect } from "./handlers/socketDisconnect.handler"; import { updateGroupInfo } from "./handlers/updateGroupInfo.handler"; import { updateOthersChats } from "./handlers/updateOthersChats.handler"; import { updateUserProfile } from "./handlers/updateUserProfile.handler"; import { userOnCall } from "./handlers/userOnCall.handler"; import { MarkAsReadMessage } from './handlers/MarkAsReadMessage.handler'; import { NickName } from './handlers/NickName.handler'; import { switchActiveChat } from "./handlers/switchActiveChat.handler"; import { RedisClientType } from "redis"; import { blockUser } from './blockUser.handler'; export const socketMain = async (io: any, socket: any, redisClient: RedisClientType) => { try { let heartbeatTimeout: NodeJS.Timeout; const { _id, db, userPayload } = await initialSocketConfig( io, socket ); handleActiveSession(io, socket, _id, userPayload?.displayName, redisClient); socket.emit("signInSuccess", { objectId: _id, displayName: userPayload?.displayName, email: userPayload?.email, avatar: userPayload?.avatar, createdOn: userPayload?.createdOn, about: userPayload?.about, lastSeen: userPayload?.lastSeen, }); socket.on("callOtherUser", (payload: any) => callOtherUser(io, _id, db, payload) ); socket.on("join-vc-room", (roomId: string, peerUserId: string) => joinVideoRoom(socket, roomId, peerUserId) ); socket.on("diconnect-from-call", (roomId: string, peerUserId: string) => disconnectVideoCall(socket, roomId, peerUserId) ); socket.on("reject-call", (roomId: string) => rejectVideoCall(socket, roomId) ); socket.on("user-on-call", (roomId: string) => userOnCall(socket, roomId)); socket.on("getTotalUsers", () => getTotalUsers(db, socket, _id)); socket.broadcast.emit("updateTotalUsers", { objectId: userPayload?._id, displayName: userPayload?.displayName, avatar: userPayload?.avatar, createdOn: userPayload?.createdOn, about: userPayload?.about, lastSeen: userPayload?.lastSeen, }); socket.on( "updateUserProfile", async (payload: any) => await updateUserProfile(socket, _id, payload, db) ); socket.on("updateGroupInfo", async (payload: any) => updateGroupInfo(io, _id, payload, db) ); socket.on("updateOthersChats", (payload: any) => { console.log("updateOthersChats", payload) updateOthersChats(db, io, _id, payload) } ); socket.on("iTextMessage", async (payload: any) => iTextMessage(io, socket, payload, db, _id) ); socket.on("markAsReadMessage", async (payload: any) => MarkAsReadMessage(io, socket, payload, db, _id) ); socket.on("setNickname", async (payload: any) => NickName(io, payload, db, _id) ); socket.broadcast.emit("online", _id); socket.on("switchActiveChat", async (payload: any) => switchActiveChat(io, socket, _id, db, payload) ); socket.on("user-blocked", async (payload: any) => { blockUser(io, socket, _id, db, payload) }) socket.on('heartbeat', (data: any) => { console.log('Client heartbeat:', socket.id); // Respond immediately to client socket.emit('heartbeat_received'); // Clear existing timeout if (heartbeatTimeout) { clearTimeout(heartbeatTimeout); } // Set new timeout for missed heartbeat heartbeatTimeout = setTimeout(() => { console.log('Client heartbeat timeout:', socket.id); socket.disconnect(true); }, 90000); // 90 seconds (allowing for network delays) }); socket.on("disconnect", async (reason: any) => { if (heartbeatTimeout) { clearTimeout(heartbeatTimeout); } console.log("DISCONNECT REASON:", reason); console.log("Socket Namespace:", socket.nsp.name); console.log("Socket ID:", socket.id); try { await socketDisconnect(socket, _id, db, redisClient); } catch (err) { console.error("Error during socket disconnect:", err); } }); } catch (err) { console.log("MAIN SOCKET ERR", err); } };[/code] Файл handleActiveSession, в котором пользовательские подключения сохраняются или удаляются из повторных операций [code]import * as socket from "socket.io"; import { getActiveUserByObjectId, removeActiveUserByObjectId, addToActiveUsers, getActiveUsers } from "../../utils/activeUsers"; import { RedisClientType } from "redis"; export const handleActiveSession = async ( io: socket.Server, socket: socket.Socket, _id: string, Name: string, redisClient: RedisClientType ) => { const userKey = `user:${_id}`; const userSession = { socketId: socket.id, objectId: _id, Name: Name }; await redisClient.set(userKey, JSON.stringify(userSession)); if (!getActiveUserByObjectId(_id)) { console.log("New session!"); addToActiveUsers(userSession); console.log("Active Users: ", getActiveUsers()); } else { console.log("Prev Disconnected, New session!"); const prevSocketId = getActiveUserByObjectId(_id)?.socketId; if (io.sockets.sockets.get(prevSocketId)) { console.log(prevSocketId + "multipleSession disconnected"); io.sockets.sockets.get(prevSocketId).emit("multipleSession"); io.sockets.sockets.get(prevSocketId).disconnect(true); } removeActiveUserByObjectId(_id); addToActiveUsers(userSession); } };[/code] Подробнее здесь: [url]https://stackoverflow.com/questions/79371494/socket-io-client-not-able-to-receive-events-stop-working-after-some-time-but-wh[/url]