From e0737f8a4282ddfdc05f25cd6ab5b7624c328e9f Mon Sep 17 00:00:00 2001 From: fangshunjian Date: Wed, 21 Nov 2018 13:26:37 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E4=BF=AE=E5=A4=8Ddc=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E8=BF=9E=E6=8E=A5=E6=B2=A1=E6=9C=89=E9=87=8A?= =?UTF-8?q?=E6=94=BE=E7=9A=84bug=202=E3=80=81socket=20server=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E8=BF=9E=E6=8E=A5client=E9=99=90=E5=88=B6=EF=BC=8C?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E5=90=8C=E6=97=B6=E5=A4=9A=E4=B8=AAclient?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=AF=BC=E8=87=B4dc=E7=A8=8B=E5=BA=8F?= =?UTF-8?q?=E4=B8=8D=E8=83=BD=E6=AD=A3=E5=B8=B8=E8=BF=90=E8=A1=8C=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/com/nms/server/common/Common.java | 31 ++++++++++ src/com/nms/server/common/Constants.java | 16 +++++ .../thread/change/ChangePluginScriptFile.java | 5 ++ .../thread/socket/NMSWebBPDownload.java | 2 +- .../thread/socket/SSLServerManager.java | 10 +++- .../server/util/socket/SSLSocketCallable.java | 59 ++++++++++--------- 6 files changed, 90 insertions(+), 33 deletions(-) diff --git a/src/com/nms/server/common/Common.java b/src/com/nms/server/common/Common.java index 868c655..4984c85 100644 --- a/src/com/nms/server/common/Common.java +++ b/src/com/nms/server/common/Common.java @@ -206,6 +206,12 @@ public class Common { private static final Semaphore missionSemaphore = new Semaphore(Constants.MISSION_RELEASE_SEMAPHORE_MAX,true); //任务通信线程最大并发数 private static final Semaphore monitorSemaphore = new Semaphore(Constants.DETEC_RELEASE_SEMAPHORE_MAX,true); //主动监测线程最大并发数 private static final Semaphore changeSemaphore = new Semaphore(Constants.CHANGE_RELEASE_SEMAPHORE_MAX,true); //主动监测线程最大并发数 + /** + * socket server 并发连接 信号量 + */ + private static final Semaphore SERVER_ACCEPT_SEMAPHORE = new Semaphore(Constants.SERVER_ACCEPT_SEMAPHORE_MAX,true); // + + // #---任务结果数据集合 private static final LinkedList missionResult2List1 = new LinkedList(); @@ -308,6 +314,7 @@ public class Common { logger.debug("缓存中任务并发剩余许可:"+(missionSemaphore.availablePermits())); logger.debug("缓存中监测并发剩余许可:"+(monitorSemaphore.availablePermits())); logger.debug("缓存中变更并发剩余许可:"+(changeSemaphore.availablePermits())); + logger.debug("缓存中server并发剩余许可:"+(SERVER_ACCEPT_SEMAPHORE.availablePermits())); // logger.debug("基本线程池用量:"+(service.)); // logger.debug("定时线程池用量:"+(scheduled)); @@ -1016,6 +1023,30 @@ public class Common { // } } + /** + * 获取server 信号 + * @throws InterruptedException + */ + public static void acquireServerSemaphore() throws InterruptedException { + if(Constants.SERVER_ACCEPT_SEMAPHORE_FLAG){//开启client 连接数限制 + SERVER_ACCEPT_SEMAPHORE.acquire(); + logger.debug("Server Semaphore 已申请 剩余可用许可:> "+SERVER_ACCEPT_SEMAPHORE.availablePermits()); + }else{ + logger.debug("禁用 Server Semaphore "); + } + } + /** + * 释放 server 信号 + */ + public static void releaseServerSemaphore() { + if(Constants.SERVER_ACCEPT_SEMAPHORE_FLAG){//开启client 连接数限制 + SERVER_ACCEPT_SEMAPHORE.release(); + logger.debug("Server Semaphore 已释放 当前可用许可:> "+SERVER_ACCEPT_SEMAPHORE.availablePermits()); + }else{ + logger.debug("禁用 Server Semaphore "); + } + } + /** * 将任务结果存放到任务结果集合中 diff --git a/src/com/nms/server/common/Constants.java b/src/com/nms/server/common/Constants.java index d79cf0a..5fdd159 100644 --- a/src/com/nms/server/common/Constants.java +++ b/src/com/nms/server/common/Constants.java @@ -479,6 +479,14 @@ public class Constants { * druid 连接池 获取连接最大等待时间 */ public static final Long DRUID_MAXWAIT_MILLIS; + /** + * server 同时可以并发接收多少 client 请求连接 + */ + public static final int SERVER_ACCEPT_SEMAPHORE_MAX; + /** + * 是否开启 client 请求连接限制,默认 true + */ + public static final boolean SERVER_ACCEPT_SEMAPHORE_FLAG; static { // InetAddress inetAddress = null; @@ -847,6 +855,14 @@ public class Constants { * 默认值:10s */ DRUID_MAXWAIT_MILLIS = Long.valueOf(Config.getString("druid.maxwait.millis", "10000")); + /* + * socket server 可以并发接收 client 连接数 + */ + SERVER_ACCEPT_SEMAPHORE_MAX = Config.getInteger("server.accept.semaphore.max", 10); + /* + * 是否开启 client 连接数限制 + */ + SERVER_ACCEPT_SEMAPHORE_FLAG = Config.getBoolan("server.accept.semaphore.flag", true); } //文件传输 临时文件命名后缀 diff --git a/src/com/nms/server/thread/change/ChangePluginScriptFile.java b/src/com/nms/server/thread/change/ChangePluginScriptFile.java index ad9317a..428569c 100644 --- a/src/com/nms/server/thread/change/ChangePluginScriptFile.java +++ b/src/com/nms/server/thread/change/ChangePluginScriptFile.java @@ -348,6 +348,11 @@ public class ChangePluginScriptFile implements Runnable { } catch (SQLException e) { logger.error("Delete EventRecordLibrary", e); + } finally { + if (dao != null) { + dao.close(); + dao = null; + } } diff --git a/src/com/nms/server/thread/socket/NMSWebBPDownload.java b/src/com/nms/server/thread/socket/NMSWebBPDownload.java index 2d4d155..698fe3b 100644 --- a/src/com/nms/server/thread/socket/NMSWebBPDownload.java +++ b/src/com/nms/server/thread/socket/NMSWebBPDownload.java @@ -75,7 +75,7 @@ public class NMSWebBPDownload extends SocketUtils implements Callable{ } catch (Exception e) { - logger.warn("Target communication:>"+ip+" create failure",e); + logger.warn("Target communication:>"+ip+":"+port+" create failure",e); close(); return -2; diff --git a/src/com/nms/server/thread/socket/SSLServerManager.java b/src/com/nms/server/thread/socket/SSLServerManager.java index 8230f32..ad606b9 100644 --- a/src/com/nms/server/thread/socket/SSLServerManager.java +++ b/src/com/nms/server/thread/socket/SSLServerManager.java @@ -66,19 +66,23 @@ public class SSLServerManager implements Callable{ Thread.currentThread().setName("Receiving communication"); try { while (true) { + Common.acquireServerSemaphore();//阻塞 获取 server 信号量 Socket socket = sslServer.accept(); + String address = socket.getInetAddress().getHostAddress(); + logger.debug(String.format("client addr :%s", address)); if(Common.SERVER_UN_UPGRADE_FLAG){ //当SOCKET_FLAG为true时,允许建立通讯,否则放弃通讯,用于NMSServer升级功能 - logger.debug("来自:"+socket.getInetAddress().getHostAddress()); Common.service.submit(new SSLServer(socket)); }else{ //关闭 放弃的通讯 - logger.debug("Server升级 抛弃通讯:"+socket.getInetAddress().getHostAddress()); + logger.debug(String.format("Server升级 抛弃通讯:%s", address)); + Common.releaseServerSemaphore(); socket.close(); } } } catch (IOException e) { logger.error("Local communication port monitor thread exception termination"+e.getMessage()); Common.addErrorInfo(Constants.ERROR_CODE_SOCKET_SERVER_RUNTIME,Common.getLocalIp(), Calendar.getInstance().getTime(), Constants.ERROR_INFO_SATAE_ERROR,""); - + } catch (InterruptedException e) { + logger.error("socket server error",e); }finally{ try { sslServer.close(); diff --git a/src/com/nms/server/util/socket/SSLSocketCallable.java b/src/com/nms/server/util/socket/SSLSocketCallable.java index ef72a34..d7c05b3 100644 --- a/src/com/nms/server/util/socket/SSLSocketCallable.java +++ b/src/com/nms/server/util/socket/SSLSocketCallable.java @@ -24,7 +24,7 @@ public abstract class SSLSocketCallable extends SocketUtils implements Callable< private static Logger logger = Logger.getLogger(SSLSocketCallable.class); //ServerSocket 通讯服务 protected SSLServerSocket ss = null; - + private boolean clientFlag = false;// 是否dc server 接收的client /** * 通讯创建 * @param ip 目标主机IP @@ -41,6 +41,7 @@ public abstract class SSLSocketCallable extends SocketUtils implements Callable< */ public SSLSocketCallable(Socket client) { super(client); + clientFlag = true; } /** @@ -51,39 +52,39 @@ public abstract class SSLSocketCallable extends SocketUtils implements Callable< public Object call(){ Thread.currentThread().setName("Create communication"+ip+":"+port); Object obj = null; //返回对象 - try { + try { + //- 校验 是否创建新通讯连接 + if(socket==null){ //客户端通讯创建 + createClientSocket(); + }else{ //服务端通讯创建 + createServerSocket(); + } + + } catch (SocketException e) { + logger.error("Create failed:"+e.getMessage(),e); + Common.addErrorInfo(Constants.ERROR_CODE_CREATE_SOCKET,ip, Calendar.getInstance().getTime(), Constants.ERROR_INFO_SATAE_ERROR,""); + close(); + return obj; + } catch (Exception e) { + close(); + return obj; + } - //- 校验 是否创建新通讯连接 - if(socket==null){ //客户端通讯创建 - createClientSocket(); - }else{ //服务端通讯创建 - createServerSocket(); + //-- 自定义通讯操作 + try { + obj = toDo(); + }catch (Exception e) { + logger.error("Communicate contents exception"+e.getMessage(),e); + Common.addErrorInfo(Constants.ERROR_CODE_SOCKET_RUNTIME,ip, Calendar.getInstance().getTime(), Constants.ERROR_INFO_SATAE_ERROR,""); + } finally { + close(); } - - } catch (SocketException e) { - logger.error("Create failed:"+e.getMessage(),e); - Common.addErrorInfo(Constants.ERROR_CODE_CREATE_SOCKET,ip, Calendar.getInstance().getTime(), Constants.ERROR_INFO_SATAE_ERROR,""); - close(); - return obj; - } catch (IOException e) { - close(); - return obj; - }catch (Exception e) { - close(); - return obj; - } - - //-- 自定义通讯操作 - try { - obj = toDo(); - }catch (Exception e) { - logger.error("Communicate contents exception"+e.getMessage(),e); - Common.addErrorInfo(Constants.ERROR_CODE_SOCKET_RUNTIME,ip, Calendar.getInstance().getTime(), Constants.ERROR_INFO_SATAE_ERROR,""); } finally { - close(); + if(clientFlag){ + Common.releaseServerSemaphore();//释放server信号量 + } } - return obj; }