Anonymous
Тайм -аут узлов libp2p при попытке подключиться друг к другу развернуты в отдельных экземплярах терминала
Сообщение
Anonymous » 08 фев 2025, 14:44
Я создал сервер реестра для хранения идентификаторов сверстников и основных деталей, а также протокол Libp2p. У меня есть три агента, которые выходят в интернет после инициализации и развертывания через протокол. Однако, когда я запускаю их в отдельных экземплярах терминала, они не могут обнаружить друг друга и время. Если я запускаю их из одного сценария, они могут найти друг друга. Что может вызвать эту проблему? >
Код: Выделить всё
import { gossipsub } from "@chainsafe/libp2p-gossipsub";
import { yamux } from "@chainsafe/libp2p-yamux";
import { identify } from '@libp2p/identify';
import { kadDHT } from '@libp2p/kad-dht';
import { noise } from "@libp2p/noise";
import { tcp } from "@libp2p/tcp";
import axios from "axios";
import { createLibp2p } from "libp2p";
import { getProtocolTools } from './tools.js';
export default class AgentNetworkProtocol {
constructor() {
this.registrarUrl = 'http://localhost:3000';
this.messageHandlers = new Map();
this.pendingResponses = new Map();
this.nodes = new Map();
}
async initialize() {
this.baseConfig = {
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0']
},
transports: [tcp()],
connectionEncryption: [noise()],
streamMuxers: [yamux()],
services: {
identify: identify(),
pubsub: gossipsub({
emitSelf: true,
allowPublishToZeroPeers: true,
gossipIncoming: true,
fallbackToFloodsub: true,
floodPublish: true,
}),
dht: kadDHT({
enabled: true,
clientMode: false,
pingTimeout: 5000,
maxInboundStreams: 5000,
maxOutboundStreams: 5000,
})
}
};
}
async createNode() {
const port = Math.floor(Math.random() * (65535 - 1024) + 1024);
const nodeConfig = {
...this.baseConfig,
addresses: {
listen: [`/ip4/127.0.0.1/tcp/${port}`]
}
};
const node = await createLibp2p(nodeConfig);
await node.start();
await new Promise(resolve => setTimeout(resolve, 1000));
// Subscribe to messages for this node
const topic = `/agent/${node.peerId.toString()}`;
await node.services.pubsub.subscribe(topic);
// Set up message handler
node.services.pubsub.addEventListener('message', (evt) => {
if (evt.detail.topic === topic) {
this.handleIncomingMessage(evt.detail);
}
});
return node;
}
async deployAgent(agentInstance, agentMetadata) {
if (!this.baseConfig) {
throw new Error('Protocol not initialized. Call initialize() first.');
}
const { name, description, capabilities, walletAddress } = agentMetadata;
if (!name || !description || !capabilities) {
throw new Error('Missing required agent metadata');
}
const node = await this.createNode();
const peerId = node.peerId.toString();
this.nodes.set(peerId, node);
this.messageHandlers.set(peerId, async (message) => {
const response = await agentInstance.handleMessage(message);
return response;
});
try {
await this._registerAgent({
peerId,
name,
description,
capabilities,
walletAddress
});
console.log('Successfully registered agent:', name, 'with peerId:', peerId);
await new Promise(resolve => setTimeout(resolve, 1000));
await this.connectNodes();
} catch (error) {
await node.stop();
this.nodes.delete(peerId);
this.messageHandlers.delete(peerId);
throw error;
}
return {
peerId,
agentMetadata
};
}
async findAgentsByCapability(capability) {
try {
console.log('Protocol searching for capability:', capability);
const response = await axios.get(
`${this.registrarUrl}/lookup?capability=${capability}`
);
console.log('Protocol received response:', response.data);
return response.data;
} catch (error) {
console.error('Protocol error finding agents:', error);
throw new Error(`Failed to find agents: ${error.message}`);
}
}
async sendMessage(targetPeerId, message) {
console.log('\n=== Sending Message ===');
console.log('Target PeerId:', targetPeerId);
console.log('Message:', message);
const nodes = Array.from(this.nodes.values());
if (nodes.length === 0) {
throw new Error('No nodes available to send message');
}
let senderNode = this.nodes.get(targetPeerId);
if (!senderNode) {
console.log('Using fallback sender node');
senderNode = nodes[0];
}
try {
const topic = `/agent/${targetPeerId}`;
console.log('Publishing to topic:', topic);
const responsePromise = new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
this.pendingResponses.delete(senderNode.peerId.toString());
reject(new Error(`Response timeout waiting for agent ${targetPeerId}. The agent may be busy or not responding.`));
}, 30000);
console.log('Setting up response handler for:', senderNode.peerId.toString());
this.pendingResponses.set(senderNode.peerId.toString(), (response) => {
console.log('Received response:', response);
clearTimeout(timeoutId);
resolve(response);
});
});
if (!senderNode.services.pubsub.getTopics().includes(topic)) {
await senderNode.services.pubsub.subscribe(topic);
await new Promise(resolve => setTimeout(resolve, 1000));
}
const messageData = JSON.stringify({
to: targetPeerId,
from: senderNode.peerId.toString(),
content: message,
timestamp: Date.now()
});
await senderNode.services.pubsub.publish(
topic,
new TextEncoder().encode(messageData)
);
console.log('Message published successfully');
return await responsePromise;
} catch (error) {
console.error('Error sending message:', error);
throw new Error(`Failed to send message: ${error.message}`);
}
}
async handleIncomingMessage(message) {
try {
const data = JSON.parse(new TextDecoder().decode(message.data));
console.log('\n=== Incoming Message ===');
console.log('Message data:', data);
console.log('Registered handlers:', Array.from(this.messageHandlers.keys()));
console.log('Pending responses:', Array.from(this.pendingResponses.keys()));
if (data.isResponse) {
console.log('Processing response message');
const resolver = this.pendingResponses.get(data.to);
if (resolver) {
console.log('Found resolver for response');
resolver(data.content);
this.pendingResponses.delete(data.to);
} else {
console.log('No resolver found for response');
}
return;
}
console.log('Processing new request');
const handler = this.messageHandlers.get(data.to);
if (handler) {
console.log('Found message handler, invoking...');
try {
const response = await handler(data.content);
console.log('Handler response:', response);
if (!response) {
console.log('No response from handler');
return;
}
const receivingNode = this.nodes.get(data.to);
if (!receivingNode) {
console.log('No receiving node found');
return;
}
const responseData = {
to: data.from,
from: data.to,
content: response,
timestamp: Date.now(),
isResponse: true
};
console.log('Sending response:', responseData);
const responseTopic = `/agent/${data.from}`;
await receivingNode.services.pubsub.publish(
responseTopic,
new TextEncoder().encode(JSON.stringify(responseData))
);
console.log('Response sent successfully');
} catch (error) {
console.error('Error processing message:', error);
const errorResponse = {
to: data.from,
from: data.to,
content: { type: 'error', content: error.message },
timestamp: Date.now(),
isResponse: true
};
const receivingNode = this.nodes.get(data.to);
if (receivingNode) {
await receivingNode.services.pubsub.publish(
`/agent/${data.from}`,
new TextEncoder().encode(JSON.stringify(errorResponse))
);
}
}
} else {
console.log('No handler found for message');
}
} catch (error) {
console.error('Error handling message:', error);
}
}
async _registerAgent(registrationData) {
try {
const response = await axios.post(
`${this.registrarUrl}/register`,
registrationData
);
return response.data;
} catch (error) {
throw new Error(`Failed to register agent: ${error.message}`);
}
}
async stop() {
for (const [peerId, node] of this.nodes) {
await node.stop();
this.nodes.delete(peerId);
this.messageHandlers.delete(peerId);
}
}
async connectNodes() {
const connectedPeers = new Set();
for (const [peerId, node] of this.nodes) {
for (const [otherPeerId, otherNode] of this.nodes) {
if (peerId !== otherPeerId && !connectedPeers.has(`${peerId}-${otherPeerId}`)) {
try {
const topic = `/agent/${otherPeerId}`;
await node.services.pubsub.subscribe(topic);
let connected = false;
let attempts = 0;
while (!connected && attempts < 3) {
try {
await node.dial(otherNode.peerId);
connected = true;
console.log(`Successfully connected ${peerId} to ${otherPeerId}`);
} catch (error) {
attempts++;
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
connectedPeers.add(`${peerId}-${otherPeerId}`);
connectedPeers.add(`${otherPeerId}-${peerId}`);
} catch (error) {
console.error(`Failed to connect ${peerId} to ${otherPeerId}:`, error.message);
}
}
}
}
}
getTools() {
return getProtocolTools(this);
}
}
Сервер реестра:
Код: Выделить всё
import bodyParser from 'body-parser';
import express from 'express';
const app = express();
app.use(bodyParser.json());
const agentsRegistry = {};
app.post('/register', (req, res) => {
const { peerId, name, description, capabilities, walletAddress } = req.body;
if (!peerId) return res.status(400).send('Missing peerId');
agentsRegistry[peerId] = { name, description, capabilities, walletAddress };
console.log('Registered agent:', { peerId, name, capabilities, walletAddress });
console.log('Current registry:', agentsRegistry);
res.status(200).send({ message: 'Registered successfully' });
});
app.get('/lookup', (req, res) => {
const { capability, walletAddress } = req.query;
console.log('Looking up with params:', { capability, walletAddress });
console.log('Current registry:', agentsRegistry);
const result = Object.entries(agentsRegistry)
.filter(([peerId, agent]) => {
if (capability && walletAddress) {
return agent.capabilities.includes(capability) && agent.walletAddress === walletAddress;
} else if (capability) {
return agent.capabilities.includes(capability);
} else if (walletAddress) {
return agent.walletAddress === walletAddress;
}
return false;
})
.map(([peerId, agent]) => ({
...agent,
peerId
}));
console.log('Lookup result:', result);
res.status(200).json(result);
});
const PORT = 3000;
app.listen(PORT, () => {
console.log(`Registry server listening on port ${PORT}`);
});
Подробнее здесь:
https://stackoverflow.com/questions/794 ... n-separate
1739015098
Anonymous
Я создал сервер реестра для хранения идентификаторов сверстников и основных деталей, а также протокол Libp2p. У меня есть три агента, которые выходят в интернет после инициализации и развертывания через протокол. Однако, когда я запускаю их в отдельных экземплярах терминала, они не могут обнаружить друг друга и время. Если я запускаю их из одного сценария, они могут найти друг друга. Что может вызвать эту проблему? > [code]import { gossipsub } from "@chainsafe/libp2p-gossipsub"; import { yamux } from "@chainsafe/libp2p-yamux"; import { identify } from '@libp2p/identify'; import { kadDHT } from '@libp2p/kad-dht'; import { noise } from "@libp2p/noise"; import { tcp } from "@libp2p/tcp"; import axios from "axios"; import { createLibp2p } from "libp2p"; import { getProtocolTools } from './tools.js'; export default class AgentNetworkProtocol { constructor() { this.registrarUrl = 'http://localhost:3000'; this.messageHandlers = new Map(); this.pendingResponses = new Map(); this.nodes = new Map(); } async initialize() { this.baseConfig = { addresses: { listen: ['/ip4/0.0.0.0/tcp/0'] }, transports: [tcp()], connectionEncryption: [noise()], streamMuxers: [yamux()], services: { identify: identify(), pubsub: gossipsub({ emitSelf: true, allowPublishToZeroPeers: true, gossipIncoming: true, fallbackToFloodsub: true, floodPublish: true, }), dht: kadDHT({ enabled: true, clientMode: false, pingTimeout: 5000, maxInboundStreams: 5000, maxOutboundStreams: 5000, }) } }; } async createNode() { const port = Math.floor(Math.random() * (65535 - 1024) + 1024); const nodeConfig = { ...this.baseConfig, addresses: { listen: [`/ip4/127.0.0.1/tcp/${port}`] } }; const node = await createLibp2p(nodeConfig); await node.start(); await new Promise(resolve => setTimeout(resolve, 1000)); // Subscribe to messages for this node const topic = `/agent/${node.peerId.toString()}`; await node.services.pubsub.subscribe(topic); // Set up message handler node.services.pubsub.addEventListener('message', (evt) => { if (evt.detail.topic === topic) { this.handleIncomingMessage(evt.detail); } }); return node; } async deployAgent(agentInstance, agentMetadata) { if (!this.baseConfig) { throw new Error('Protocol not initialized. Call initialize() first.'); } const { name, description, capabilities, walletAddress } = agentMetadata; if (!name || !description || !capabilities) { throw new Error('Missing required agent metadata'); } const node = await this.createNode(); const peerId = node.peerId.toString(); this.nodes.set(peerId, node); this.messageHandlers.set(peerId, async (message) => { const response = await agentInstance.handleMessage(message); return response; }); try { await this._registerAgent({ peerId, name, description, capabilities, walletAddress }); console.log('Successfully registered agent:', name, 'with peerId:', peerId); await new Promise(resolve => setTimeout(resolve, 1000)); await this.connectNodes(); } catch (error) { await node.stop(); this.nodes.delete(peerId); this.messageHandlers.delete(peerId); throw error; } return { peerId, agentMetadata }; } async findAgentsByCapability(capability) { try { console.log('Protocol searching for capability:', capability); const response = await axios.get( `${this.registrarUrl}/lookup?capability=${capability}` ); console.log('Protocol received response:', response.data); return response.data; } catch (error) { console.error('Protocol error finding agents:', error); throw new Error(`Failed to find agents: ${error.message}`); } } async sendMessage(targetPeerId, message) { console.log('\n=== Sending Message ==='); console.log('Target PeerId:', targetPeerId); console.log('Message:', message); const nodes = Array.from(this.nodes.values()); if (nodes.length === 0) { throw new Error('No nodes available to send message'); } let senderNode = this.nodes.get(targetPeerId); if (!senderNode) { console.log('Using fallback sender node'); senderNode = nodes[0]; } try { const topic = `/agent/${targetPeerId}`; console.log('Publishing to topic:', topic); const responsePromise = new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { this.pendingResponses.delete(senderNode.peerId.toString()); reject(new Error(`Response timeout waiting for agent ${targetPeerId}. The agent may be busy or not responding.`)); }, 30000); console.log('Setting up response handler for:', senderNode.peerId.toString()); this.pendingResponses.set(senderNode.peerId.toString(), (response) => { console.log('Received response:', response); clearTimeout(timeoutId); resolve(response); }); }); if (!senderNode.services.pubsub.getTopics().includes(topic)) { await senderNode.services.pubsub.subscribe(topic); await new Promise(resolve => setTimeout(resolve, 1000)); } const messageData = JSON.stringify({ to: targetPeerId, from: senderNode.peerId.toString(), content: message, timestamp: Date.now() }); await senderNode.services.pubsub.publish( topic, new TextEncoder().encode(messageData) ); console.log('Message published successfully'); return await responsePromise; } catch (error) { console.error('Error sending message:', error); throw new Error(`Failed to send message: ${error.message}`); } } async handleIncomingMessage(message) { try { const data = JSON.parse(new TextDecoder().decode(message.data)); console.log('\n=== Incoming Message ==='); console.log('Message data:', data); console.log('Registered handlers:', Array.from(this.messageHandlers.keys())); console.log('Pending responses:', Array.from(this.pendingResponses.keys())); if (data.isResponse) { console.log('Processing response message'); const resolver = this.pendingResponses.get(data.to); if (resolver) { console.log('Found resolver for response'); resolver(data.content); this.pendingResponses.delete(data.to); } else { console.log('No resolver found for response'); } return; } console.log('Processing new request'); const handler = this.messageHandlers.get(data.to); if (handler) { console.log('Found message handler, invoking...'); try { const response = await handler(data.content); console.log('Handler response:', response); if (!response) { console.log('No response from handler'); return; } const receivingNode = this.nodes.get(data.to); if (!receivingNode) { console.log('No receiving node found'); return; } const responseData = { to: data.from, from: data.to, content: response, timestamp: Date.now(), isResponse: true }; console.log('Sending response:', responseData); const responseTopic = `/agent/${data.from}`; await receivingNode.services.pubsub.publish( responseTopic, new TextEncoder().encode(JSON.stringify(responseData)) ); console.log('Response sent successfully'); } catch (error) { console.error('Error processing message:', error); const errorResponse = { to: data.from, from: data.to, content: { type: 'error', content: error.message }, timestamp: Date.now(), isResponse: true }; const receivingNode = this.nodes.get(data.to); if (receivingNode) { await receivingNode.services.pubsub.publish( `/agent/${data.from}`, new TextEncoder().encode(JSON.stringify(errorResponse)) ); } } } else { console.log('No handler found for message'); } } catch (error) { console.error('Error handling message:', error); } } async _registerAgent(registrationData) { try { const response = await axios.post( `${this.registrarUrl}/register`, registrationData ); return response.data; } catch (error) { throw new Error(`Failed to register agent: ${error.message}`); } } async stop() { for (const [peerId, node] of this.nodes) { await node.stop(); this.nodes.delete(peerId); this.messageHandlers.delete(peerId); } } async connectNodes() { const connectedPeers = new Set(); for (const [peerId, node] of this.nodes) { for (const [otherPeerId, otherNode] of this.nodes) { if (peerId !== otherPeerId && !connectedPeers.has(`${peerId}-${otherPeerId}`)) { try { const topic = `/agent/${otherPeerId}`; await node.services.pubsub.subscribe(topic); let connected = false; let attempts = 0; while (!connected && attempts < 3) { try { await node.dial(otherNode.peerId); connected = true; console.log(`Successfully connected ${peerId} to ${otherPeerId}`); } catch (error) { attempts++; await new Promise(resolve => setTimeout(resolve, 1000)); } } connectedPeers.add(`${peerId}-${otherPeerId}`); connectedPeers.add(`${otherPeerId}-${peerId}`); } catch (error) { console.error(`Failed to connect ${peerId} to ${otherPeerId}:`, error.message); } } } } } getTools() { return getProtocolTools(this); } }[/code] [b] Сервер реестра: [/b] [code]import bodyParser from 'body-parser'; import express from 'express'; const app = express(); app.use(bodyParser.json()); const agentsRegistry = {}; app.post('/register', (req, res) => { const { peerId, name, description, capabilities, walletAddress } = req.body; if (!peerId) return res.status(400).send('Missing peerId'); agentsRegistry[peerId] = { name, description, capabilities, walletAddress }; console.log('Registered agent:', { peerId, name, capabilities, walletAddress }); console.log('Current registry:', agentsRegistry); res.status(200).send({ message: 'Registered successfully' }); }); app.get('/lookup', (req, res) => { const { capability, walletAddress } = req.query; console.log('Looking up with params:', { capability, walletAddress }); console.log('Current registry:', agentsRegistry); const result = Object.entries(agentsRegistry) .filter(([peerId, agent]) => { if (capability && walletAddress) { return agent.capabilities.includes(capability) && agent.walletAddress === walletAddress; } else if (capability) { return agent.capabilities.includes(capability); } else if (walletAddress) { return agent.walletAddress === walletAddress; } return false; }) .map(([peerId, agent]) => ({ ...agent, peerId })); console.log('Lookup result:', result); res.status(200).json(result); }); const PORT = 3000; app.listen(PORT, () => { console.log(`Registry server listening on port ${PORT}`); });[/code] Подробнее здесь: [url]https://stackoverflow.com/questions/79423111/libp2p-nodes-timeout-when-trying-to-connect-with-each-other-deployed-in-separate[/url]