ASN导入全量,删除asnip逻辑修改为基于asn no的批量删除。

每次删除定量的asn ip cfg遍历,基于本批asn no,asn ip数量大于maat_send_size即删除并入库最新asn ip信息
This commit is contained in:
duandongmei
2019-01-30 11:07:22 +06:00
parent 64e3c13513
commit b95e597b57
4 changed files with 258 additions and 40 deletions

View File

@@ -958,7 +958,7 @@ public class BaseController {
* @throws InterruptedException
* @throws ExecutionException
*/
public BlockingQueue<BaseIpCfg> checkIpCfgMulity(StringBuffer _msg,FunctionServiceDict serviceDict, FunctionRegionDict regionDict,List<Map<Long,AsnGroupInfo>> asnNos,List<Map<String,ConfigGroupInfo>> asnOrgList, BlockingQueue<? extends Object> list) throws ServiceException, InterruptedException, ExecutionException{
public BlockingQueue<BaseIpCfg> checkIpCfgMulity(StringBuffer _msg,FunctionServiceDict serviceDict, FunctionRegionDict regionDict,List<Map<Long,AsnGroupInfo>> asnNos,List<Map<String,ConfigGroupInfo>> asnOrgList,Map<String,List<IpPortCfg>> asnIpMap, BlockingQueue<? extends Object> list) throws ServiceException, InterruptedException, ExecutionException{
logger.warn("start checkIpCfgMulity ,size "+list.size());
long start=System.currentTimeMillis();
BlockingQueue<BaseIpCfg> queue=new ArrayBlockingQueue<>(list.size());
@@ -970,6 +970,66 @@ public class BaseController {
CheckIpFormatThread t=new CheckIpFormatThread(serviceDict,regionDict, props, list, queue);
t.setAsnNoMaps(asnNos);
t.setAsnOrgList(asnOrgList);
t.setAsnIpMap(asnIpMap);
futures.add(service.submit(t));
}
service.shutdown();
while(!service.isTerminated()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
boolean valideteError=false;
for(Future<String> future:futures) {
String msg = future.get();
if(StringUtils.isNotBlank(msg)) {
if(msg.equals("validate_error")) {
valideteError=true;
}else if(msg.endsWith("validate_error")) {
_msg.append(msg.substring(0, msg.length()-1-"validate_error".length()));
valideteError=true;
}else {
_msg.append(msg);
}
//throw new ServiceException(msg);
}
}
if(valideteError) {
_msg.append(props.getProperty("validate_error", "Unexpected error occurred while validating"));
}
long end=System.currentTimeMillis();
logger.warn("checkIpCfgMulity finish,cost:"+(end-start)+",size:"+queue.size());
return queue;
}
/**
* 多线程验证
* @param serviceDict
* @param regionDict
* @param asnNos
* @param list
* @return
* @throws ServiceException
* @throws InterruptedException
* @throws ExecutionException
*/
public BlockingQueue<BaseIpCfg> checkAsnIpCfgMulity(StringBuffer _msg,FunctionServiceDict serviceDict, FunctionRegionDict regionDict,List<Map<Long,AsnGroupInfo>> asnNos,List<Map<String,ConfigGroupInfo>> asnOrgList,Map<String,List<IpPortCfg>> asnIpMap, BlockingQueue<? extends Object> list) throws ServiceException, InterruptedException, ExecutionException{
logger.warn("start checkIpCfgMulity ,size "+list.size());
long start=System.currentTimeMillis();
BlockingQueue<BaseIpCfg> queue=new ArrayBlockingQueue<>(list.size());
ExecutorService service=Executors.newFixedThreadPool(Constants.MULITY_THREAD_SIZE);
List<Future<String>> futures=new ArrayList<>();
Properties props=this.getMsgProp();
for(int i=0;i<Constants.MULITY_THREAD_SIZE;i++) {
CheckIpFormatThread t=new CheckIpFormatThread(serviceDict,regionDict, props, list, queue);
t.setAsnNoMaps(asnNos);
t.setAsnOrgList(asnOrgList);
t.setAsnIpMap(asnIpMap);
futures.add(service.submit(t));
}
service.shutdown();
@@ -1410,6 +1470,8 @@ public class BaseController {
*/
public void _import(HttpServletRequest request, HttpServletResponse response, RedirectAttributes redirectAttributes,
MultipartFile[] files, Integer serviceDictId, String regionDictIds, Integer requestId, String attribute, String classify) {
int isNullIndex=0;
int ipIndex=0;
logger.warn("import start...");
long start=System.currentTimeMillis();
ImportBigExcel ei=null;
@@ -1440,6 +1502,7 @@ public class BaseController {
Map<String,ConfigGroupInfo> oldOrgMap=Maps.newConcurrentMap();
asnOrgList.add(newOrgMap); //【0】 新的组织信息
asnOrgList.add(oldOrgMap);//【1】 旧的组织信息
Map<String,List<IpPortCfg>> asnIpMaps=Maps.newConcurrentMap();
FunctionRegionDict appRegion = null;
FunctionRegionDict appFeatureRegion = null;
if(serviceDict!=null) {
@@ -1476,27 +1539,27 @@ public class BaseController {
if (serviceDict!=null&&serviceDict.getAction().equals(64)) {
BlockingQueue<IpRateLimitTemplate> list = ei.getDataList(IpRateLimitTemplate.class
);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null,null, list);
} else if(serviceDict!=null&&serviceDict.getAction().equals(32)) {
BlockingQueue<IpAllNotDoLogTemplate> list = ei.getDataList(IpAllNotDoLogTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
} else {
BlockingQueue<IpAllTemplate> list = ei.getDataList(IpAllTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
}
} else if (regionDict.getFunctionId().equals(7)&&serviceDict!=null&&serviceDict.getAction().intValue()==16) {
BlockingQueue<DnsIpTemplate> list = ei.getDataList(DnsIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
}else if(regionDict.getFunctionId().equals(401)) {
BlockingQueue<DnsFakeIpTemplate> list = ei.getDataList(DnsFakeIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
}else if (regionDict.getFunctionId().equals(212)) {
BlockingQueue<IpPayloadTemplate> list = ei.getDataList(IpPayloadTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
} else if (regionDict.getFunctionId().equals(510)
&& "p2p_ip".equals(regionDict.getConfigServiceType())) { // P2p IP
BlockingQueue<P2pIpTemplate> list = ei.getDataList(P2pIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
} else if (regionDict.getFunctionId().equals(600)) {// ASN IP
ei.setHasImportLimit(false);
//从数据库中读取
@@ -1506,44 +1569,44 @@ public class BaseController {
//初始化所有数据库中的组织信息
Map<String,ConfigGroupInfo> oldOrgMapData=configGroupInfoService.getConfigGroupInfo(asnOrgList,4);
BlockingQueue<AsnIpTemplate> list = ei.getDataList(AsnIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, asnNoMaps,asnOrgList, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, asnNoMaps,asnOrgList,asnIpMaps, list);
}else if (regionDict.getFunctionId().equals(301)) {// DDOS IP
BlockingQueue<DdosIpTemplate> list = ei.getDataList(DdosIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
}else if(regionDict.getFunctionId().equals(207)){// HTTP(s)阻断
BlockingQueue<HttpsRejectIpTemplate> list = ei.getDataList(HttpsRejectIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
}else if(regionDict.getFunctionId().equals(208)){// HTTP(s)重定向
BlockingQueue<HttpsRedirectIpTemplate> list = ei.getDataList(HttpsRedirectIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
}else if(regionDict.getFunctionId().equals(209)){// HTTP(s)替换
BlockingQueue<HttpsReplaceIpTemplate> list = ei.getDataList(HttpsReplaceIpTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
}else if(regionDict.getFunctionId().equals(211)){// HTTP(s)白名单
BlockingQueue<IpAllNotDoLogTemplate> list = ei.getDataList(IpAllNotDoLogTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
}else if(regionDict.getFunctionId().equals(200)) {// 拦截策略
if(serviceDict.getAction().equals(64)) {
BlockingQueue<IpRateLimitTemplate> list = ei.getDataList(IpRateLimitTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null,null, list);
}else {
BlockingQueue<IpAllNotDoLogTemplate> list = ei.getDataList(IpAllNotDoLogTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null,null, list);
}
}else if(regionDict.getFunctionId().equals(3)) { // IP白名单
BlockingQueue<IpWhitelistTemplate> list = ei.getDataList(IpWhitelistTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null,null, list);
}else if(regionDict.getFunctionId().equals(214)) { // IpSpoofing
BlockingQueue<IpSpoofingTemplate> list = ei.getDataList(IpSpoofingTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null,null, list);
}else if(regionDict.getFunctionId().equals(405) || regionDict.getFunctionId().equals(563) || regionDict.getFunctionId().equals(565)
|| regionDict.getFunctionId().equals(566)) { // APP Feature IP
BlockingQueue<IpAllNotDoLogTemplate> list = ei.getDataList(IpAllNotDoLogTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict,null,null,null, list);
}else {
BlockingQueue<IpAllTemplate> list = ei.getDataList(IpAllTemplate.class );
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null, list);
ipPortCfgs=this.checkIpCfgMulity(errTip,serviceDict, regionDict, null,null,null, list);
}
} else if (regionDict.getRegionType().equals(2)) {// 字符串类
if (regionDict.getFunctionId().equals(510)
@@ -1653,10 +1716,148 @@ public class BaseController {
}
if (regionDict.getRegionType().equals(1)) {// IP
if(regionDict.getFunctionId().intValue()==600) {//ans ip
//是否全量下发所有的asn info信息
List<SysDataDictionaryItem> isImportAll=DictUtils.getDictList("IS_ASN_IP_IMPORT_ALL");
//处理config_group_info和asn_group_info
asnIpCfgService.processGroup(serviceDict,asnNoMaps,asnOrgList,isSend,isImportAll.get(0).getItemCode());
if(!asnIpMaps.isEmpty()){
int index=0;
List<AsnIpCfg> asnIpCfgs=Lists.newArrayList();
String asnIds="";
List<Integer> regionIds=Lists.newArrayList();
try {
regionIds = ConfigServiceUtil.getId(3,ipPortCfgs.size());
} catch (Exception e1) {
e1.printStackTrace();
logger.info("获取域ID出错");
throw new MaatConvertException("<spring:message code=\"request_service_failed\"/>:"+e1.getMessage());
}
for (Entry<String, List<IpPortCfg>> e:asnIpMaps.entrySet()) {
for (BaseIpCfg cfg : e.getValue()) {
if(cfg != null){
AsnIpCfg asnIpCfg=new AsnIpCfg();
BeanUtils.copyProperties(cfg, asnIpCfg,new String[] {"cfgId"});
asnIpCfg.setTableName(AsnIpCfg.getTablename());
asnIpCfg.setAction(serviceDict==null?0:serviceDict.getAction());
asnIpCfg.setCfgRegionCode(regionDict.getConfigRegionCode());
asnIpCfg.setCfgType(regionDict.getConfigRegionValue());
asnIpCfg.setCreateTime(date);
asnIpCfg.setCreatorId(UserUtils.getUser().getId());
asnIpCfg.setDoLog(2);
asnIpCfg.setFunctionId(regionDict.getFunctionId());
if(isSend.equals("1")) {
asnIpCfg.setIsAudit(Constants.AUDIT_YES);
asnIpCfg.setIsValid(Constants.VALID_YES);
asnIpCfg.setAuditorId(UserUtils.getUser().getId());
asnIpCfg.setAuditTime(date);
}else {
asnIpCfg.setIsAudit(Constants.AUDIT_NOT_YET);
asnIpCfg.setIsValid(Constants.VALID_NO);
}
asnIpCfg.setIsAreaEffective(0);
asnIpCfg.setLable("0");
asnIpCfg.setRequestId(StringUtil.isEmpty(requestId) ? 0 : requestId);
asnIpCfg.setAttribute(attribute);
asnIpCfg.setClassify(classify);
asnIpCfg.setServiceId(serviceDict==null?0:serviceDict.getServiceId());
asnIpCfg.setRegionId(regionIds.get(index));
//设置group id,compileId
asnIpCfg.setAsnIpGroup(asnNoMaps.get(2).get(Long.parseLong(asnIpCfg.getUserRegion1())).getGroupId());
asnIpCfg.setCompileId(asnNoMaps.get(2).get(Long.parseLong(asnIpCfg.getUserRegion1())).getCompileId());
if(isSend.equals("1")) {
/*//已下发
if(asnNoMaps.get(3).keySet().contains(Long.valueOf(asnIpCfg.getUserRegion1()))){
//groupId已经在了
if(oldAsnIpMap.keySet().contains(Long.valueOf(asnIpCfg.getAsnIpGroup()))){
oldAsnIpMap.get(Long.valueOf(asnIpCfg.getAsnIpGroup())).add(asnIpCfg);
oldAsnIpIndex++;
}else{
List list=new ArrayList<>();
list.add(asnIpCfg);
oldAsnIpMap.put(Long.valueOf(asnIpCfg.getAsnIpGroup()), list);
oldAsnIpIndex++;
}
}else{//未下发
//groupId已经在了
if(newAsnIpMap.keySet().contains(Long.valueOf(asnIpCfg.getAsnIpGroup()))){
newAsnIpMap.get(Long.valueOf(asnIpCfg.getAsnIpGroup())).add(asnIpCfg);
newAsnIpIndex++;
}else{
List list=new ArrayList<>();
list.add(asnIpCfg);
newAsnIpMap.put(Long.valueOf(asnIpCfg.getAsnIpGroup()), list);
newAsnIpIndex++;
}
}
*//**********************新的asn ip达到最大量后发送一次***********************//*
//未下发过的asnGroup
if(newAsnIpIndex==Constants.MAAT_JSON_SEND_SIZE){
asnIpCfgService.auditAsnIp(newAsnIpMap,asnNoMaps,"maat",newAsnIpIndex);
newAsnIpMap.clear();
newAsnIpIndex=0;
}
*//**********************新的asn ip达到最大量后发送一次***********************//*
//已下发过的asnGroup
if(oldAsnIpIndex==Constants.MAAT_JSON_SEND_SIZE){
asnIpCfgService.auditAsnIp(oldAsnIpMap,asnNoMaps,"common",oldAsnIpIndex);
oldAsnIpMap.clear();
oldAsnIpIndex=0;
}*/
}else{
asnIpCfgs.add(asnIpCfg);
}
index++;
if(!((","+asnIds).indexOf(","+e.getKey()+",")>-1)){
asnIds+=e.getKey()+",";
}
ipIndex++;
}else{
isNullIndex++;
}
}
//
if(!StringUtil.isEmpty(asnIpCfgs) && asnIpCfgs.size()>Constants.MAAT_JSON_SEND_SIZE ){
if(!StringUtil.isEmpty(asnIds)){
asnIds=asnIds.substring(0,asnIds.length()-1);
}
asnIpCfgService.saveAsnIp(asnIpCfgs,asnIds);
asnIds="";
asnIpCfgs.clear();
}
/*if(isSend.equals("1")) {
//未下发过的asnGroup
if(!newAsnIpMap.isEmpty()){
asnIpCfgService.auditAsnIp(newAsnIpMap,asnNoMaps,"maat",newAsnIpIndex);
newAsnIpMap.clear();
newAsnIpIndex=0;
}
//已下发过的asnGroup
if(!oldAsnIpMap.isEmpty()){
asnIpCfgService.auditAsnIp(oldAsnIpMap,asnNoMaps,"common",oldAsnIpIndex);
oldAsnIpMap.clear();
oldAsnIpIndex=0;
}
}
//仅仅需要保存的数
if(!StringUtil.isEmpty(asnIpCfgs)){
asnIpCfgService.saveAsnIp(asnIpCfgs);
asnIpCfgs.clear();
}
_ipPortCfgs.clear();
logger.error("ASN NO:"+e.getKey()+" :value"+e.getValue().size());*/
}
if(!StringUtil.isEmpty(asnIpCfgs) ){
if(!StringUtil.isEmpty(asnIds)){
asnIds=asnIds.substring(0,asnIds.length()-1);
}
asnIpCfgService.saveAsnIp(asnIpCfgs,asnIds);
asnIpCfgs.clear();
asnIds="";
}
}
//是否全量下发所有的asn info信息
/*
//每次批量之后需要更新已下发的组
if(isImportAll.get(0).getItemCode().equals("1")){
asnNoMaps.get(3).clear();
@@ -1738,14 +1939,14 @@ public class BaseController {
newAsnIpIndex++;
}
}
/**********************新的asn ip达到最大量后发送一次***********************/
*//**********************新的asn ip达到最大量后发送一次***********************//*
//未下发过的asnGroup
if(newAsnIpIndex==Constants.MAAT_JSON_SEND_SIZE){
asnIpCfgService.auditAsnIp(newAsnIpMap,asnNoMaps,"maat",newAsnIpIndex);
newAsnIpMap.clear();
newAsnIpIndex=0;
}
/**********************新的asn ip达到最大量后发送一次***********************/
*//**********************新的asn ip达到最大量后发送一次***********************//*
//已下发过的asnGroup
if(oldAsnIpIndex==Constants.MAAT_JSON_SEND_SIZE){
asnIpCfgService.auditAsnIp(oldAsnIpMap,asnNoMaps,"common",oldAsnIpIndex);
@@ -1784,7 +1985,7 @@ public class BaseController {
asnIpCfgs.clear();
}
_ipPortCfgs.clear();
}
}*/
}else if(regionDict.getFunctionId().intValue()==214) { // IpSpoofing
List<BaseIpCfg> _ipPortCfgs=Lists.newArrayList(Constants.MAAT_JSON_SEND_SIZE);
while(!ipPortCfgs.isEmpty()) {