Я запускаю программу с портом 61000 и числом потоков как 2.
Я также запустил 3 клиента для подключения к нему.
На стороне клиента я увидел, что все 3 из них подключены однако журналы приемника показали, что подключены только два из них.
netstat -an|grep 61000|grep -i ESTABLISHED
указано всего 6 подключений, поскольку клиент и сервер выполняются на одном компьютере.
Мои вопросы:
- Почему журнал клиента в третий раз показывает, что он может подключиться к программе на 61000, в то время как я использую резерв из 2. Также Executors.newFixedThreadPool(numThreads); разрешает только 2 клиенты должны быть подключены.
- Хотя server.accept происходит в MyWriter.java и в журналах нет указаний на то, что третий клиент может подключиться, почему netstat показывает это как установленное соединение?
MyReceiver.java:
package com.vikas;
import java.net.ServerSocket;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MyReceiver{
protected int serverPort = -1;
protected int numThreads = -1;
protected boolean isStopped = false;
protected Thread runningThread = null;
protected ExecutorService threadPool = null;
protected static Logger logger = LogManager.getLogger(MyReceiver.class);
protected static ServerSocket serverSocket = null;
protected static Map mapConnections = new ConcurrentHashMap();
public MyReceiver(int port){
this.serverPort = port;
}
public void run(int numThreads){
this.threadPool = Executors.newFixedThreadPool(numThreads);
try {
logger.info("Starting server on port " + this.serverPort);
MyReceiver.serverSocket = new ServerSocket(this.serverPort, numThreads);
} catch (IOException e) {
//throw new RuntimeException("Cannot open port " + this.serverPort, e);
logger.error("Cannot open port " + this.serverPort, e);
}
while(!isStopped()){
this.threadPool.execute(new MyWriter());
}
if (MyReceiver.mapConnections.isEmpty()) {
this.threadPool.shutdown();
//System.out.println("Server Stopped after shutdown.") ;
logger.info("Server Stopped after shutdown.");
}
}
public synchronized boolean isStopped() {
return this.isStopped;
}
public synchronized void stop() {
this.isStopped = true;
try {
MyReceiver.serverSocket.close();
} catch (IOException e) {
//throw new RuntimeException("Error closing server", e);
logger.error("Error closing server", e);
}
}
public static void main(String[] args) {
if(args.length != 2){
System.out.println("Number of input arguments is not equal to 4.");
System.out.println("Usage: java -cp YOUR_CLASSPATH -Dlog4j.configurationFile=/path/to/log4j2.xml com.vikas.MyReceiver
");
System.out.println("java -cp \"$CLASSPATH:./MyReceiver.jar:./log4j-api-2.6.2.jar:./log4j-core-2.6.2.jar\" -Dlog4j.configurationFile=log4j2.xml com.vikas.MyReceiver 61000 2");
}
int port = Integer.parseInt(args[0].trim());
int numThreads = Integer.parseInt(args[1].trim());
final MyReceiver myConnection = new MyReceiver(port, topic, brokers);
myConnection.run(numThreads);
/*Thread t = new Thread(myConnection);
t.start();*/
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
//e.printStackTrace();
logger.error("Something went wrong", e);
}
//System.out.println("Stopping Server");
Runtime.getRuntime().addShutdownHook(new Thread()
{
@Override
public void run()
{
logger.info("SocketServer - Receive SIGINT!!!");
logger.info("Stopping Server");
if (!myConnection.isStopped()) {
myConnection.stop();
}
logger.info("Server stopped successfully");
try
{
Thread.sleep(1000);
}
catch (Exception e) {}
}
});
//myConnection.stop();
}
}
MyWriter.java:
package com.vikas;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.Socket;
import java.util.Properties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MyWriter implements Runnable {
protected String topic = null;
protected String brokers = null;
protected static Logger logger = LogManager.getLogger(MyWriter.class);
public MyWriter () {
}
public void run() {
while (!MyReceiver.serverSocket.isClosed()) {
Socket server = null;
try {
server = MyReceiver.serverSocket.accept();
//System.out.println("Just connected to " + server.getRemoteSocketAddress());
logger.info("Just connected to " + server.getRemoteSocketAddress());
MyReceiver.mapConnections.put(server.getRemoteSocketAddress().toString().trim(), "");
//change for prod deployment //change implemented
String key = null;
String message = null;
char ch;
StringBuilder msg = new StringBuilder();
int value = 0;
try {
BufferedReader in = new BufferedReader(new InputStreamReader(server.getInputStream()));
while ((value = in.read()) != -1) {
ch = (char)value;
if (ch == 0x0a) {
//msg.append(ch);
//System.out.println(msg);
message = msg.toString().trim();
//code change as part of testing in prod
if (message.length() != 0) {
//do something
msg.setLength(0);
}
else{
logger.error("Blank String received");
msg.setLength(0);
}
}
else{
msg.append(ch);
}
}
logger.info("Closing connection for client :" + server.getRemoteSocketAddress());
//System.out.println("Closing connection for client :" + this.getClientSocket().getRemoteSocketAddress());
server.close();
MyReceiver.mapConnections.remove(server.getRemoteSocketAddress());
} catch (IOException e) {
//report exception somewhere.
//e.printStackTrace();
logger.error("Something went wrong!!", e);
}
finally {
producer.close();
}
} catch (IOException e) {
if (MyReceiver.serverSocket.isClosed()) {
//System.out.println("Server was found to be Stopped.");
logger.error("Server was found to be Stopped.");
logger.error("Error accepting client connection", e);
break;
}
}
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/390 ... ket-server
Мобильная версия