Код: Выделить всё
require("dotenv").config();
const port = 8181;
const cluster = require("cluster");
const io_redis = require("socket.io-redis");
const num_processes = require("os").cpus().length;
import * as net from "net";
import cors from "cors";
import * as http from "http";
import router from "./routes";
import { PeerServer } from "peer";
import express from "express";
import * as socket from "socket.io";
import * as farmhash from "farmhash";
import cookieParser from "cookie-parser";
import { socketMain } from "./socket.io/socketMain";
import { inititalizeMongoDb } from "./database/mongoInstance";
import { isAuthSocket } from "./middlewares/isAuthSocket.middleware";
import { deleteOldMessageCron } from './services/deletemessagecron';
import { createClient, RedisClientType } from "redis";
// Create the Redis client using a URL
const redisClient: RedisClientType = createClient({ url: '' });
redisClient.connect();
(async () => {
if (cluster.isMaster) {
const workers: any = [];
const spawn = (i: number) => {
workers[i] = cluster.fork();
workers[i].on("exit", () => {
console.log("respawning worker", i);
spawn(i);
});
};
for (var i = 0; i < num_processes; i++) {
spawn(i);
}
const worker_index = (ip: string, len: number) => {
return farmhash.fingerprint32(ip) % len;
};
const server: net.Server = net.createServer(
{ pauseOnConnect: true },
(connection: net.Socket) => {
const worker =
workers[worker_index(connection.remoteAddress, num_processes)];
worker.send("sticky-session:connection", connection);
}
);
server.listen(port);
console.log(`Master listening on port ${port}`);
} else {
let app = express();
app.use(express.json({ limit: '50mb' }));
app.use(express.urlencoded({ limit: '50mb', extended: true }));
app.use(cookieParser());
app.use(
cors({
origin: [ "file://", "http://localhost", "http://localhost:3000"],
credentials: true,
})
);
app.use("/", router);
const server: http.Server = app.listen(0, "localhost");
console.log("Worker listening...");
const io = new socket.Server(server, {
cors: {
origin: [ "file://", "http://localhost", "http://localhost:3000"],
credentials: true,
},
pingTimeout: 120000,
pingInterval: 25000,
});
io.adapter(io_redis({
url: process.env.REDIS_URL,
retryStrategy: (times: any) => {
const delay = Math.min(times * 50, 2000);
return delay;
}
}));
await inititalizeMongoDb();
deleteOldMessageCron();
io.use(isAuthSocket);
io.on("error", (err: any) => {
console.log("Socket.io Error:", err);
});
io.on("connection", (socket: socket.Socket) => {
console.log("connected to socket server", socket.id);
socketMain(io, socket, redisClient);
console.log(`connected to worker: ${cluster.worker.id}`);
});
process.on("message", (message, connection) => {
if (message !== "sticky-session:connection") {
return;
}
server.emit("connection", connection);
//@ts-ignore
connection.resume();
});
}
})();мой файл сокетаMain, в котором обрабатываются все события
Код: Выделить всё
import { callOtherUser } from "./handlers/callOtherUser.handler";
import { disconnectVideoCall } from "./handlers/disconnectCall.handler";
import { getTotalUsers } from "./handlers/getTotalUsers.handler";
import { handleActiveSession } from "./handlers/handleActiveSession";
import { initialSocketConfig } from "./handlers/initialVerification";
import { iTextMessage } from "./handlers/iTextMessage.handler";
import { joinVideoRoom } from "./handlers/joinVideoRoom.handler";
import { rejectVideoCall } from "./handlers/rejectCall.handler";
import { socketDisconnect } from "./handlers/socketDisconnect.handler";
import { updateGroupInfo } from "./handlers/updateGroupInfo.handler";
import { updateOthersChats } from "./handlers/updateOthersChats.handler";
import { updateUserProfile } from "./handlers/updateUserProfile.handler";
import { userOnCall } from "./handlers/userOnCall.handler";
import { MarkAsReadMessage } from './handlers/MarkAsReadMessage.handler';
import { NickName } from './handlers/NickName.handler';
import { switchActiveChat } from "./handlers/switchActiveChat.handler";
import { RedisClientType } from "redis";
import { blockUser } from './blockUser.handler';
export const socketMain = async (io: any, socket: any, redisClient: RedisClientType) => {
try {
let heartbeatTimeout: NodeJS.Timeout;
const { _id, db, userPayload } = await initialSocketConfig(
io,
socket
);
handleActiveSession(io, socket, _id, userPayload?.displayName, redisClient);
socket.emit("signInSuccess", {
objectId: _id,
displayName: userPayload?.displayName,
email: userPayload?.email,
avatar: userPayload?.avatar,
createdOn: userPayload?.createdOn,
about: userPayload?.about,
lastSeen: userPayload?.lastSeen,
});
socket.on("callOtherUser", (payload: any) =>
callOtherUser(io, _id, db, payload)
);
socket.on("join-vc-room", (roomId: string, peerUserId: string) =>
joinVideoRoom(socket, roomId, peerUserId)
);
socket.on("diconnect-from-call", (roomId: string, peerUserId: string) =>
disconnectVideoCall(socket, roomId, peerUserId)
);
socket.on("reject-call", (roomId: string) =>
rejectVideoCall(socket, roomId)
);
socket.on("user-on-call", (roomId: string) => userOnCall(socket, roomId));
socket.on("getTotalUsers", () => getTotalUsers(db, socket, _id));
socket.broadcast.emit("updateTotalUsers", {
objectId: userPayload?._id,
displayName: userPayload?.displayName,
avatar: userPayload?.avatar,
createdOn: userPayload?.createdOn,
about: userPayload?.about,
lastSeen: userPayload?.lastSeen,
});
socket.on(
"updateUserProfile",
async (payload: any) => await updateUserProfile(socket, _id, payload, db)
);
socket.on("updateGroupInfo", async (payload: any) =>
updateGroupInfo(io, _id, payload, db)
);
socket.on("updateOthersChats", (payload: any) => {
console.log("updateOthersChats", payload)
updateOthersChats(db, io, _id, payload)
}
);
socket.on("iTextMessage", async (payload: any) =>
iTextMessage(io, socket, payload, db, _id)
);
socket.on("markAsReadMessage", async (payload: any) =>
MarkAsReadMessage(io, socket, payload, db, _id)
);
socket.on("setNickname", async (payload: any) =>
NickName(io, payload, db, _id)
);
socket.broadcast.emit("online", _id);
socket.on("switchActiveChat", async (payload: any) =>
switchActiveChat(io, socket, _id, db, payload)
);
socket.on("user-blocked", async (payload: any) => {
blockUser(io, socket, _id, db, payload)
})
socket.on('heartbeat', (data: any) => {
console.log('Client heartbeat:', socket.id);
// Respond immediately to client
socket.emit('heartbeat_received');
// Clear existing timeout
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
}
// Set new timeout for missed heartbeat
heartbeatTimeout = setTimeout(() => {
console.log('Client heartbeat timeout:', socket.id);
socket.disconnect(true);
}, 90000); // 90 seconds (allowing for network delays)
});
socket.on("disconnect", async (reason: any) => {
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
}
console.log("DISCONNECT REASON:", reason);
console.log("Socket Namespace:", socket.nsp.name);
console.log("Socket ID:", socket.id);
try {
await socketDisconnect(socket, _id, db, redisClient);
} catch (err) {
console.error("Error during socket disconnect:", err);
}
});
} catch (err) {
console.log("MAIN SOCKET ERR", err);
}
};Файл handleActiveSession, в котором пользовательские подключения сохраняются или удаляются из повторных операций
Код: Выделить всё
import * as socket from "socket.io";
import {
getActiveUserByObjectId,
removeActiveUserByObjectId,
addToActiveUsers,
getActiveUsers
} from "../../utils/activeUsers";
import { RedisClientType } from "redis";
export const handleActiveSession = async (
io: socket.Server,
socket: socket.Socket,
_id: string,
Name: string,
redisClient: RedisClientType
) => {
const userKey = `user:${_id}`;
const userSession = {
socketId: socket.id,
objectId: _id,
Name: Name
};
await redisClient.set(userKey, JSON.stringify(userSession));
if (!getActiveUserByObjectId(_id)) {
console.log("New session!");
addToActiveUsers(userSession);
console.log("Active Users: ", getActiveUsers());
} else {
console.log("Prev Disconnected, New session!");
const prevSocketId = getActiveUserByObjectId(_id)?.socketId;
if (io.sockets.sockets.get(prevSocketId)) {
console.log(prevSocketId + "multipleSession disconnected");
io.sockets.sockets.get(prevSocketId).emit("multipleSession");
io.sockets.sockets.get(prevSocketId).disconnect(true);
}
removeActiveUserByObjectId(_id);
addToActiveUsers(userSession);
}
};Подробнее здесь: https://stackoverflow.com/questions/793 ... ime-but-wh