1、修复dc数据库连接没有释放的bug
2、socket server 添加连接client限制,防止同时多个client连接导致dc程序不能正常运行的问题
This commit is contained in:
@@ -206,6 +206,12 @@ public class Common {
|
||||
private static final Semaphore missionSemaphore = new Semaphore(Constants.MISSION_RELEASE_SEMAPHORE_MAX,true); //任务通信线程最大并发数
|
||||
private static final Semaphore monitorSemaphore = new Semaphore(Constants.DETEC_RELEASE_SEMAPHORE_MAX,true); //主动监测线程最大并发数
|
||||
private static final Semaphore changeSemaphore = new Semaphore(Constants.CHANGE_RELEASE_SEMAPHORE_MAX,true); //主动监测线程最大并发数
|
||||
/**
|
||||
* socket server 并发连接 信号量
|
||||
*/
|
||||
private static final Semaphore SERVER_ACCEPT_SEMAPHORE = new Semaphore(Constants.SERVER_ACCEPT_SEMAPHORE_MAX,true); //
|
||||
|
||||
|
||||
|
||||
// #---任务结果数据集合
|
||||
private static final LinkedList<MissionResult2> missionResult2List1 = new LinkedList<MissionResult2>();
|
||||
@@ -308,6 +314,7 @@ public class Common {
|
||||
logger.debug("缓存中任务并发剩余许可:"+(missionSemaphore.availablePermits()));
|
||||
logger.debug("缓存中监测并发剩余许可:"+(monitorSemaphore.availablePermits()));
|
||||
logger.debug("缓存中变更并发剩余许可:"+(changeSemaphore.availablePermits()));
|
||||
logger.debug("缓存中server并发剩余许可:"+(SERVER_ACCEPT_SEMAPHORE.availablePermits()));
|
||||
|
||||
// logger.debug("基本线程池用量:"+(service.));
|
||||
// logger.debug("定时线程池用量:"+(scheduled));
|
||||
@@ -1016,6 +1023,30 @@ public class Common {
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取server 信号
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public static void acquireServerSemaphore() throws InterruptedException {
|
||||
if(Constants.SERVER_ACCEPT_SEMAPHORE_FLAG){//开启client 连接数限制
|
||||
SERVER_ACCEPT_SEMAPHORE.acquire();
|
||||
logger.debug("Server Semaphore 已申请 剩余可用许可:> "+SERVER_ACCEPT_SEMAPHORE.availablePermits());
|
||||
}else{
|
||||
logger.debug("禁用 Server Semaphore ");
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 释放 server 信号
|
||||
*/
|
||||
public static void releaseServerSemaphore() {
|
||||
if(Constants.SERVER_ACCEPT_SEMAPHORE_FLAG){//开启client 连接数限制
|
||||
SERVER_ACCEPT_SEMAPHORE.release();
|
||||
logger.debug("Server Semaphore 已释放 当前可用许可:> "+SERVER_ACCEPT_SEMAPHORE.availablePermits());
|
||||
}else{
|
||||
logger.debug("禁用 Server Semaphore ");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 将任务结果存放到任务结果集合中
|
||||
|
||||
@@ -479,6 +479,14 @@ public class Constants {
|
||||
* druid 连接池 获取连接最大等待时间
|
||||
*/
|
||||
public static final Long DRUID_MAXWAIT_MILLIS;
|
||||
/**
|
||||
* server 同时可以并发接收多少 client 请求连接
|
||||
*/
|
||||
public static final int SERVER_ACCEPT_SEMAPHORE_MAX;
|
||||
/**
|
||||
* 是否开启 client 请求连接限制,默认 true
|
||||
*/
|
||||
public static final boolean SERVER_ACCEPT_SEMAPHORE_FLAG;
|
||||
static {
|
||||
|
||||
// InetAddress inetAddress = null;
|
||||
@@ -847,6 +855,14 @@ public class Constants {
|
||||
* 默认值:10s
|
||||
*/
|
||||
DRUID_MAXWAIT_MILLIS = Long.valueOf(Config.getString("druid.maxwait.millis", "10000"));
|
||||
/*
|
||||
* socket server 可以并发接收 client 连接数
|
||||
*/
|
||||
SERVER_ACCEPT_SEMAPHORE_MAX = Config.getInteger("server.accept.semaphore.max", 10);
|
||||
/*
|
||||
* 是否开启 client 连接数限制
|
||||
*/
|
||||
SERVER_ACCEPT_SEMAPHORE_FLAG = Config.getBoolan("server.accept.semaphore.flag", true);
|
||||
}
|
||||
|
||||
//文件传输 临时文件命名后缀
|
||||
|
||||
@@ -348,6 +348,11 @@ public class ChangePluginScriptFile implements Runnable {
|
||||
|
||||
} catch (SQLException e) {
|
||||
logger.error("Delete EventRecordLibrary", e);
|
||||
} finally {
|
||||
if (dao != null) {
|
||||
dao.close();
|
||||
dao = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -75,7 +75,7 @@ public class NMSWebBPDownload extends SocketUtils implements Callable<Object>{
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
logger.warn("Target communication:>"+ip+" create failure",e);
|
||||
logger.warn("Target communication:>"+ip+":"+port+" create failure",e);
|
||||
close();
|
||||
return -2;
|
||||
|
||||
|
||||
@@ -66,19 +66,23 @@ public class SSLServerManager implements Callable<Object>{
|
||||
Thread.currentThread().setName("Receiving communication");
|
||||
try {
|
||||
while (true) {
|
||||
Common.acquireServerSemaphore();//阻塞 获取 server 信号量
|
||||
Socket socket = sslServer.accept();
|
||||
String address = socket.getInetAddress().getHostAddress();
|
||||
logger.debug(String.format("client addr :%s", address));
|
||||
if(Common.SERVER_UN_UPGRADE_FLAG){ //当SOCKET_FLAG为true时,允许建立通讯,否则放弃通讯,用于NMSServer升级功能
|
||||
logger.debug("来自:"+socket.getInetAddress().getHostAddress());
|
||||
Common.service.submit(new SSLServer(socket));
|
||||
}else{ //关闭 放弃的通讯
|
||||
logger.debug("Server升级 抛弃通讯:"+socket.getInetAddress().getHostAddress());
|
||||
logger.debug(String.format("Server升级 抛弃通讯:%s", address));
|
||||
Common.releaseServerSemaphore();
|
||||
socket.close();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("Local communication port monitor thread exception termination"+e.getMessage());
|
||||
Common.addErrorInfo(Constants.ERROR_CODE_SOCKET_SERVER_RUNTIME,Common.getLocalIp(), Calendar.getInstance().getTime(), Constants.ERROR_INFO_SATAE_ERROR,"");
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("socket server error",e);
|
||||
}finally{
|
||||
try {
|
||||
sslServer.close();
|
||||
|
||||
@@ -24,7 +24,7 @@ public abstract class SSLSocketCallable extends SocketUtils implements Callable<
|
||||
private static Logger logger = Logger.getLogger(SSLSocketCallable.class);
|
||||
//ServerSocket 通讯服务
|
||||
protected SSLServerSocket ss = null;
|
||||
|
||||
private boolean clientFlag = false;// 是否dc server 接收的client
|
||||
/**
|
||||
* 通讯创建
|
||||
* @param ip 目标主机IP
|
||||
@@ -41,6 +41,7 @@ public abstract class SSLSocketCallable extends SocketUtils implements Callable<
|
||||
*/
|
||||
public SSLSocketCallable(Socket client) {
|
||||
super(client);
|
||||
clientFlag = true;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -51,39 +52,39 @@ public abstract class SSLSocketCallable extends SocketUtils implements Callable<
|
||||
public Object call(){
|
||||
Thread.currentThread().setName("Create communication"+ip+":"+port);
|
||||
Object obj = null; //返回对象
|
||||
|
||||
try {
|
||||
try {
|
||||
//- 校验 是否创建新通讯连接
|
||||
if(socket==null){ //客户端通讯创建
|
||||
createClientSocket();
|
||||
}else{ //服务端通讯创建
|
||||
createServerSocket();
|
||||
}
|
||||
|
||||
} catch (SocketException e) {
|
||||
logger.error("Create failed:"+e.getMessage(),e);
|
||||
Common.addErrorInfo(Constants.ERROR_CODE_CREATE_SOCKET,ip, Calendar.getInstance().getTime(), Constants.ERROR_INFO_SATAE_ERROR,"");
|
||||
close();
|
||||
return obj;
|
||||
} catch (Exception e) {
|
||||
close();
|
||||
return obj;
|
||||
}
|
||||
|
||||
//- 校验 是否创建新通讯连接
|
||||
if(socket==null){ //客户端通讯创建
|
||||
createClientSocket();
|
||||
}else{ //服务端通讯创建
|
||||
createServerSocket();
|
||||
//-- 自定义通讯操作
|
||||
try {
|
||||
obj = toDo();
|
||||
}catch (Exception e) {
|
||||
logger.error("Communicate contents exception"+e.getMessage(),e);
|
||||
Common.addErrorInfo(Constants.ERROR_CODE_SOCKET_RUNTIME,ip, Calendar.getInstance().getTime(), Constants.ERROR_INFO_SATAE_ERROR,"");
|
||||
} finally {
|
||||
close();
|
||||
}
|
||||
|
||||
} catch (SocketException e) {
|
||||
logger.error("Create failed:"+e.getMessage(),e);
|
||||
Common.addErrorInfo(Constants.ERROR_CODE_CREATE_SOCKET,ip, Calendar.getInstance().getTime(), Constants.ERROR_INFO_SATAE_ERROR,"");
|
||||
close();
|
||||
return obj;
|
||||
} catch (IOException e) {
|
||||
close();
|
||||
return obj;
|
||||
}catch (Exception e) {
|
||||
close();
|
||||
return obj;
|
||||
}
|
||||
|
||||
//-- 自定义通讯操作
|
||||
try {
|
||||
obj = toDo();
|
||||
}catch (Exception e) {
|
||||
logger.error("Communicate contents exception"+e.getMessage(),e);
|
||||
Common.addErrorInfo(Constants.ERROR_CODE_SOCKET_RUNTIME,ip, Calendar.getInstance().getTime(), Constants.ERROR_INFO_SATAE_ERROR,"");
|
||||
} finally {
|
||||
close();
|
||||
if(clientFlag){
|
||||
Common.releaseServerSemaphore();//释放server信号量
|
||||
}
|
||||
}
|
||||
|
||||
return obj;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user