ASN IP导入(全量+数据入库)开发

This commit is contained in:
duandongmei
2019-01-23 19:28:13 +06:00
parent 6885d467cb
commit a31769f3cb
2 changed files with 217 additions and 94 deletions

View File

@@ -120,6 +120,7 @@ import com.nis.domain.maat.MaatCfg.NumBoundaryCfg;
import com.nis.domain.maat.MaatCfg.StringCfg;
import com.nis.domain.maat.ToMaatBean;
import com.nis.domain.report.NtcPzReport;
import com.nis.domain.specific.ConfigGroupInfo;
import com.nis.domain.specific.SpecificServiceCfg;
import com.nis.exceptions.MaatConvertException;
//import com.nis.util.AsnCacheUtils;
@@ -957,16 +958,18 @@ public class BaseController {
* @throws InterruptedException
* @throws ExecutionException
*/
public BlockingQueue<BaseIpCfg> checkIpCfgMulity(StringBuffer _msg,FunctionServiceDict serviceDict, FunctionRegionDict regionDict,List<Map<Long,AsnGroupInfo>> asnNos,Map<Long,AsnGroupInfo> asnGroupInfos, BlockingQueue<? extends Object> list) throws ServiceException, InterruptedException, ExecutionException{
public BlockingQueue<BaseIpCfg> checkIpCfgMulity(StringBuffer _msg,FunctionServiceDict serviceDict, FunctionRegionDict regionDict,List<Map<Long,AsnGroupInfo>> asnNos,List<Map<String,ConfigGroupInfo>> asnOrgList, BlockingQueue<? extends Object> list) throws ServiceException, InterruptedException, ExecutionException{
logger.warn("start checkIpCfgMulity ,size "+list.size());
long start=System.currentTimeMillis();
BlockingQueue<BaseIpCfg> queue=new ArrayBlockingQueue<>(list.size());
ExecutorService service=Executors.newFixedThreadPool(Constants.MULITY_THREAD_SIZE);
List<Future<String>> futures=new ArrayList<>();
Properties props=this.getMsgProp();
for(int i=0;i<Constants.MULITY_THREAD_SIZE;i++) {
CheckIpFormatThread t=new CheckIpFormatThread(serviceDict,regionDict, this.getMsgProp(), list, queue);
CheckIpFormatThread t=new CheckIpFormatThread(serviceDict,regionDict, props, list, queue);
t.setAsnNoMaps(asnNos);
t.setAsnGroupInfos(asnGroupInfos);
t.setAsnOrgList(asnOrgList);
futures.add(service.submit(t));
}
service.shutdown();
@@ -978,13 +981,26 @@ public class BaseController {
e.printStackTrace();
}
}
boolean valideteError=false;
for(Future<String> future:futures) {
String msg = future.get();
if(StringUtils.isNotBlank(msg)) {
if(msg.equals("validate_error")) {
valideteError=true;
}else if(msg.endsWith("validate_error")) {
_msg.append(msg.substring(0, msg.length()-1-"validate_error".length()));
valideteError=true;
}else {
_msg.append(msg);
}
//throw new ServiceException(msg);
_msg.append(msg);
}
}
if(valideteError) {
_msg.append(props.getProperty("validate_error", "Unexpected error occurred while validating"));
}
long end=System.currentTimeMillis();
logger.warn("checkIpCfgMulity finish,cost:"+(end-start)+",size:"+queue.size());
return queue;
@@ -1005,8 +1021,10 @@ public class BaseController {
BlockingQueue<BaseStringCfg<?>> queue=new ArrayBlockingQueue<>(list.size());
ExecutorService service=Executors.newFixedThreadPool(Constants.MULITY_THREAD_SIZE);
List<Future<String>> futures=new ArrayList<>();
Properties props=this.getMsgProp();
for(int i=0;i<Constants.MULITY_THREAD_SIZE;i++) {
CheckStringFormatThread t=new CheckStringFormatThread(serviceDict,regionDict, this.getMsgProp(), list, queue);
CheckStringFormatThread t=new CheckStringFormatThread(serviceDict,regionDict, props, list, queue);
futures.add(service.submit(t));
}
service.shutdown();
@@ -1018,13 +1036,26 @@ public class BaseController {
e.printStackTrace();
}
}
boolean valideteError=false;
for(Future<String> future:futures) {
String msg = future.get();
if(StringUtils.isNotBlank(msg)) {
if(msg.equals("validate_error")) {
valideteError=true;
}else if(msg.endsWith("validate_error")) {
_msg.append(msg.substring(0, msg.length()-1-"validate_error".length()));
valideteError=true;
}else {
_msg.append(msg);
}
//throw new ServiceException(msg);
_msg.append(msg);
}
}
if(valideteError) {
_msg.append(props.getProperty("validate_error", "Unexpected error occurred while validating"));
}
long end=System.currentTimeMillis();
logger.warn("checkStringCfgMulity finish,cost:"+(end-start)+",size:"+queue.size());
return queue;
@@ -1045,8 +1076,10 @@ public class BaseController {
BlockingQueue<ComplexkeywordCfg> queue=new ArrayBlockingQueue<>(list.size());
ExecutorService service=Executors.newFixedThreadPool(Constants.MULITY_THREAD_SIZE);
List<Future<String>> futures=new ArrayList<>();
Properties props=this.getMsgProp();
for(int i=0;i<Constants.MULITY_THREAD_SIZE;i++) {
CheckComplexStringFormatThread t=new CheckComplexStringFormatThread(serviceDict,regionDict, this.getMsgProp(), list, queue);
CheckComplexStringFormatThread t=new CheckComplexStringFormatThread(serviceDict,regionDict, props, list, queue);
futures.add(service.submit(t));
}
service.shutdown();
@@ -1058,13 +1091,26 @@ public class BaseController {
e.printStackTrace();
}
}
boolean valideteError=false;
for(Future<String> future:futures) {
String msg = future.get();
if(StringUtils.isNotBlank(msg)) {
if(msg.equals("validate_error")) {
valideteError=true;
}else if(msg.endsWith("validate_error")) {
_msg.append(msg.substring(0, msg.length()-1-"validate_error".length()));
valideteError=true;
}else {
_msg.append(msg);
}
//throw new ServiceException(msg);
_msg.append(msg);
}
}
if(valideteError) {
_msg.append(props.getProperty("validate_error", "Unexpected error occurred while validating"));
}
long end=System.currentTimeMillis();
logger.warn("checkComplexStringCfgMulity finish,cost:"+(end-start)+",size:"+queue.size());
return queue;
@@ -1085,8 +1131,10 @@ public class BaseController {
BlockingQueue<DnsResStrategy> queue=new ArrayBlockingQueue<>(list.size());
ExecutorService service=Executors.newFixedThreadPool(Constants.MULITY_THREAD_SIZE);
List<Future<String>> futures=new ArrayList<>();
Properties props=this.getMsgProp();
for(int i=0;i<Constants.MULITY_THREAD_SIZE;i++) {
CheckDnsResStrategyFormatThread t=new CheckDnsResStrategyFormatThread(serviceDict,regionDict, this.getMsgProp(), list, queue);
CheckDnsResStrategyFormatThread t=new CheckDnsResStrategyFormatThread(serviceDict,regionDict, props, list, queue);
futures.add(service.submit(t));
}
service.shutdown();
@@ -1098,13 +1146,26 @@ public class BaseController {
e.printStackTrace();
}
}
boolean valideteError=false;
for(Future<String> future:futures) {
String msg = future.get();
if(StringUtils.isNotBlank(msg)) {
_msg.append(msg);
if(msg.equals("validate_error")) {
valideteError=true;
}else if(msg.endsWith("validate_error")) {
_msg.append(msg.substring(0, msg.length()-1-"validate_error".length()));
valideteError=true;
}else {
_msg.append(msg);
}
//throw new ServiceException(msg);
}
}
if(valideteError) {
_msg.append(props.getProperty("validate_error", "Unexpected error occurred while validating"));
}
long end=System.currentTimeMillis();
logger.warn("checkDnsResStrategyCfgMulity finish,cost:"+(end-start)+",size:"+queue.size());
return queue;
@@ -1127,8 +1188,10 @@ public class BaseController {
BlockingQueue<AppComplexFeatureCfg> queue=new ArrayBlockingQueue<>(list.size());
ExecutorService service=Executors.newFixedThreadPool(Constants.MULITY_THREAD_SIZE);
List<Future<String>> futures=new ArrayList<>();
Properties props=this.getMsgProp();
for(int i=0;i<Constants.MULITY_THREAD_SIZE;i++) {
CheckAppFeatureComplexStringFormatThread t = new CheckAppFeatureComplexStringFormatThread(serviceDict,regionDict, this.getMsgProp(), list, queue);
CheckAppFeatureComplexStringFormatThread t=new CheckAppFeatureComplexStringFormatThread(serviceDict,regionDict, props, list, queue);
futures.add(service.submit(t));
}
service.shutdown();
@@ -1140,13 +1203,26 @@ public class BaseController {
e.printStackTrace();
}
}
boolean valideteError=false;
for(Future<String> future:futures) {
String msg = future.get();
if(StringUtils.isNotBlank(msg)) {
_msg.append(msg);
if(msg.equals("validate_error")) {
valideteError=true;
}else if(msg.endsWith("validate_error")) {
_msg.append(msg.substring(0, msg.length()-1-"validate_error".length()));
valideteError=true;
}else {
_msg.append(msg);
}
//throw new ServiceException(msg);
}
}
if(valideteError) {
_msg.append(props.getProperty("validate_error", "Unexpected error occurred while validating"));
}
long end=System.currentTimeMillis();
logger.warn("checkAppComplexFeatureStringCfgMulity finish,cost:"+(end-start)+",size:"+queue.size());
return queue;
@@ -1168,10 +1244,11 @@ public class BaseController {
BlockingQueue<AppTopicDomainCfg> queue=new ArrayBlockingQueue<>(list.size());
ExecutorService service=Executors.newFixedThreadPool(Constants.MULITY_THREAD_SIZE);
List<Future<String>> futures=new ArrayList<>();
Properties props=this.getMsgProp();
for(int i=0;i<Constants.MULITY_THREAD_SIZE;i++) {
CheckTopicWebsiteFormatThread t=new CheckTopicWebsiteFormatThread(serviceDict,regionDict, this.getMsgProp(), list, queue);
CheckTopicWebsiteFormatThread t=new CheckTopicWebsiteFormatThread(serviceDict,regionDict, props, list, queue);
futures.add(service.submit(t));
}
service.shutdown();
while(!service.isTerminated()) {
@@ -1182,13 +1259,26 @@ public class BaseController {
e.printStackTrace();
}
}
boolean valideteError=false;
for(Future<String> future:futures) {
String msg = future.get();
if(StringUtils.isNotBlank(msg)) {
if(msg.equals("validate_error")) {
valideteError=true;
}else if(msg.endsWith("validate_error")) {
_msg.append(msg.substring(0, msg.length()-1-"validate_error".length()));
valideteError=true;
}else {
_msg.append(msg);
}
//throw new ServiceException(msg);
_msg.append(msg);
}
}
if(valideteError) {
_msg.append(props.getProperty("validate_error", "Unexpected error occurred while validating"));
}
long end=System.currentTimeMillis();
logger.warn("checkTopicWebsiteCfgMulity finish,cost:"+(end-start)+",size:"+queue.size());
return queue;
@@ -1338,9 +1428,18 @@ public class BaseController {
List<Map<Long,AsnGroupInfo>> asnNoMaps=Lists.newArrayList();
Map<Long,AsnGroupInfo> newAsnNoMap=Maps.newConcurrentMap();
Map<Long,AsnGroupInfo> OldAsnNoMap=Maps.newConcurrentMap();
Map<Long,AsnGroupInfo> asnGroupInfos=null;
asnNoMaps.add(newAsnNoMap);
asnNoMaps.add(OldAsnNoMap);
Map<Long,AsnGroupInfo> allAsnNoMap=Maps.newConcurrentMap();
Map<Long,AsnGroupInfo> auditedAsnNoMap=Maps.newConcurrentMap();
Map<Long,String> orgGroupInfoMap=Maps.newConcurrentMap();
asnNoMaps.add(newAsnNoMap); //【0】 新的asn group info
asnNoMaps.add(OldAsnNoMap); //【1】 旧的asn group info
asnNoMaps.add(allAsnNoMap); //【2】 所有的asn group info
asnNoMaps.add(auditedAsnNoMap);//【3】 所有已审核通过的asn group info
List<Map<String,ConfigGroupInfo>> asnOrgList=Lists.newArrayList(); //组织信息
Map<String,ConfigGroupInfo> newOrgMap=Maps.newConcurrentMap();
Map<String,ConfigGroupInfo> oldOrgMap=Maps.newConcurrentMap();
asnOrgList.add(newOrgMap); //【0】 新的组织信息
asnOrgList.add(oldOrgMap);//【1】 旧的组织信息
FunctionRegionDict appRegion = null;
FunctionRegionDict appFeatureRegion = null;
if(serviceDict!=null) {
@@ -1377,72 +1476,73 @@ public class BaseController {
if (serviceDict!=null&&serviceDict.getAction().equals(64)) {
BlockingQueue<IpRateLimitTemplate> list = ei.getDataList(IpRateLimitTemplate.class
);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
} else if(serviceDict!=null&&serviceDict.getAction().equals(32)) {
BlockingQueue<IpAllNotDoLogTemplate> list = ei.getDataList(IpAllNotDoLogTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
} else {
BlockingQueue<IpAllTemplate> list = ei.getDataList(IpAllTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
}
} else if (regionDict.getFunctionId().equals(7)&&serviceDict!=null&&serviceDict.getAction().intValue()==16) {
BlockingQueue<DnsIpTemplate> list = ei.getDataList(DnsIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
}else if(regionDict.getFunctionId().equals(401)) {
BlockingQueue<DnsFakeIpTemplate> list = ei.getDataList(DnsFakeIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
}else if (regionDict.getFunctionId().equals(212)) {
BlockingQueue<IpPayloadTemplate> list = ei.getDataList(IpPayloadTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
} else if (regionDict.getFunctionId().equals(510)
&& "p2p_ip".equals(regionDict.getConfigServiceType())) { // P2p IP
BlockingQueue<P2pIpTemplate> list = ei.getDataList(P2pIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
} else if (regionDict.getFunctionId().equals(600)) {// ASN IP
ei.setHasImportLimit(false);
//加载asn缓存
//AsnCacheUtils.init(true);
//从数据库中读取
asnGroupInfos=asnGroupInfoService.getGroupList();
//OldAsnNoMap.putAll(asnGroupInfoService.getGroupList());
List<AsnGroupInfo> asnGroupInfoList=asnGroupInfoService.findAsnGroupInfos();
//初始化所有数据库中的asn信息
asnGroupInfoService.getGroupList(asnGroupInfoList,asnNoMaps);
//初始化所有数据库中的组织信息
Map<String,ConfigGroupInfo> oldOrgMapData=configGroupInfoService.getConfigGroupInfo(asnOrgList,4);
BlockingQueue<AsnIpTemplate> list = ei.getDataList(AsnIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, asnNoMaps,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, asnNoMaps,asnOrgList, list);
}else if (regionDict.getFunctionId().equals(301)) {// DDOS IP
BlockingQueue<DdosIpTemplate> list = ei.getDataList(DdosIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
}else if(regionDict.getFunctionId().equals(207)){// HTTP(s)阻断
BlockingQueue<HttpsRejectIpTemplate> list = ei.getDataList(HttpsRejectIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
}else if(regionDict.getFunctionId().equals(208)){// HTTP(s)重定向
BlockingQueue<HttpsRedirectIpTemplate> list = ei.getDataList(HttpsRedirectIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
}else if(regionDict.getFunctionId().equals(209)){// HTTP(s)替换
BlockingQueue<HttpsReplaceIpTemplate> list = ei.getDataList(HttpsReplaceIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
}else if(regionDict.getFunctionId().equals(211)){// HTTP(s)白名单
BlockingQueue<IpAllNotDoLogTemplate> list = ei.getDataList(IpAllNotDoLogTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
}else if(regionDict.getFunctionId().equals(200)) {// 拦截策略
if(serviceDict.getAction().equals(64)) {
BlockingQueue<IpRateLimitTemplate> list = ei.getDataList(IpRateLimitTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
}else {
BlockingQueue<IpAllNotDoLogTemplate> list = ei.getDataList(IpAllNotDoLogTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
}
}else if(regionDict.getFunctionId().equals(3)) { // IP白名单
BlockingQueue<IpWhitelistTemplate> list = ei.getDataList(IpWhitelistTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
}else if(regionDict.getFunctionId().equals(214)) { // IpSpoofing
BlockingQueue<IpSpoofingTemplate> list = ei.getDataList(IpSpoofingTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
}else if(regionDict.getFunctionId().equals(405)) { // APP IP
BlockingQueue<IpAllNotDoLogTemplate> list = ei.getDataList(IpAllNotDoLogTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
}else {
BlockingQueue<IpAllTemplate> list = ei.getDataList(IpAllTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,asnGroupInfos, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
}
} else if (regionDict.getRegionType().equals(2)) {// 字符串类
if (regionDict.getFunctionId().equals(510)
@@ -1549,25 +1649,18 @@ public class BaseController {
}
if (regionDict.getRegionType().equals(1)) {// IP
if(regionDict.getFunctionId().intValue()==600) {//ans ip
//是否全量下发所有的asn info信息
List<SysDataDictionaryItem> isImportAll=DictUtils.getDictList("IS_ASN_IP_IMPORT_ALL");
//处理组,新的组会在这里保存
if(!asnNoMaps.get(0).isEmpty()) {
asnIpCfgService.processGroup(asnNoMaps.get(0));
}
//全量下发删除asnNo对应的已有的IP
if(isImportAll.get(0).getItemCode().equals("1")) {
logger.warn("Delete and send ip reuse regions start");
long _start=System.currentTimeMillis();
deleteIps(asnNoMaps.get(1));
long _end=System.currentTimeMillis();
logger.warn("Delete and send ip reuse regions end,cost:"+(_end-_start));
}
List<AsnIpCfg> asnIpCfgs=Lists.newArrayList(Constants.MAAT_JSON_SEND_SIZE);
//处理config_group_info和asn_group_info
asnIpCfgService.processGroup(serviceDict,asnNoMaps,asnOrgList,isSend,isImportAll.get(0).getItemCode());
//每次批量之后需要更新已下发的组
asnNoMaps.get(3).clear();
//处理asn ip
List<BaseIpCfg> _ipPortCfgs=Lists.newArrayList(Constants.MAAT_JSON_SEND_SIZE);
while(!ipPortCfgs.isEmpty()) {
ipPortCfgs.drainTo(_ipPortCfgs, Constants.MAAT_JSON_SEND_SIZE);
List<Integer> regionIds=Lists.newArrayList();
asnIpCfgService.processAsnIp(serviceDict,regionDict,_ipPortCfgs,asnNoMaps,isSend,isImportAll.get(0).getItemCode(),requestId, attribute, classify);
/*List<Integer> regionIds=Lists.newArrayList();
try {
regionIds = ConfigServiceUtil.getId(3,_ipPortCfgs.size());
} catch (Exception e) {
@@ -1626,11 +1719,9 @@ public class BaseController {
asnIpCfgs.add(_cfg);
ind++;
}
ipCfgService.saveAndSend(regionDict, serviceDict, specificServiceCfg, asnIpCfgs, cfgIndexInfos, appPolicyCfgs,appFeatureIndexs,asnNoMaps,isSend.equals("1"));
cfgIndexInfos.clear();
appPolicyCfgs.clear();
ipCfgService.saveAndSend(regionDict, serviceDict, specificServiceCfg, asnIpCfgs, cfgIndexInfos, appPolicyCfgs,appFeatureIndexs,asnNoMaps,isSend.equals("1"));*/
_ipPortCfgs.clear();
asnIpCfgs.clear();
/*asnIpCfgs.clear();*/
}
}else if(regionDict.getFunctionId().intValue()==214) { // IpSpoofing
List<BaseIpCfg> _ipPortCfgs=Lists.newArrayList(Constants.MAAT_JSON_SEND_SIZE);