diff --git a/src/main/java/com/nis/util/JedisClusterFactory.java b/src/main/java/com/nis/util/JedisClusterFactory.java new file mode 100644 index 0000000..856ea6e --- /dev/null +++ b/src/main/java/com/nis/util/JedisClusterFactory.java @@ -0,0 +1,103 @@ +package com.nis.util; + +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.io.Resource; + +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; + +public class JedisClusterFactory implements FactoryBean, InitializingBean { + + private Resource addressConfig; + private String addressKeyPrefix; + + private JedisCluster jedisCluster; + private Integer timeout; + private Integer maxRedirections; + private GenericObjectPoolConfig genericObjectPoolConfig; + + private Pattern p = Pattern.compile("^.+[:]\\d{1,5}\\s*$"); + + @Override + public JedisCluster getObject() throws Exception { + return jedisCluster; + } + + @Override + public Class getObjectType() { + return (this.jedisCluster != null ? this.jedisCluster.getClass() : JedisCluster.class); + } + + @Override + public boolean isSingleton() { + return true; + } + + private Set parseHostAndPort() throws Exception { + try { + Properties prop = new Properties(); + prop.load(this.addressConfig.getInputStream()); + + Set haps = new HashSet(); + for (Object key : prop.keySet()) { + + if (!((String) key).startsWith(addressKeyPrefix)) { + continue; + } + + String val = (String) prop.get(key); + + boolean isIpPort = p.matcher(val).matches(); + + if (!isIpPort) { + throw new IllegalArgumentException("ip 或 port 不合法"); + } + String[] ipAndPort = val.split(":"); + + HostAndPort hap = new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1])); + haps.add(hap); + } + + return haps; + } catch (IllegalArgumentException ex) { + throw ex; + } catch (Exception ex) { + throw new Exception("解析 jedis 配置文件失败", ex); + } + } + + @Override + public void afterPropertiesSet() throws Exception { + Set haps = this.parseHostAndPort(); + + jedisCluster = new JedisCluster(haps, timeout, maxRedirections, genericObjectPoolConfig); + + } + + public void setAddressConfig(Resource addressConfig) { + this.addressConfig = addressConfig; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public void setMaxRedirections(int maxRedirections) { + this.maxRedirections = maxRedirections; + } + + public void setAddressKeyPrefix(String addressKeyPrefix) { + this.addressKeyPrefix = addressKeyPrefix; + } + + public void setGenericObjectPoolConfig(GenericObjectPoolConfig genericObjectPoolConfig) { + this.genericObjectPoolConfig = genericObjectPoolConfig; + } +} diff --git a/src/main/java/com/nis/web/controller/restful/MaatTestController.java b/src/main/java/com/nis/web/controller/restful/MaatTestController.java index 19ca65d..759b7ce 100644 --- a/src/main/java/com/nis/web/controller/restful/MaatTestController.java +++ b/src/main/java/com/nis/web/controller/restful/MaatTestController.java @@ -153,7 +153,8 @@ public class MaatTestController { configCompileList.add(getConfigCompile(ser)); } String jsonString = JsonMapper.toJsonString(configCompileList); - System.out.println(jsonString); + FileUtils.addStrToFile("下发的json串是"+jsonString+"\n", + Configurations.getStringProperty("maatTestLogPath", ""), true); // 保存测试配置 configSourcesService.saveMaatConfig(thread, start, configCompileList, sb); for (ConfigCompile configCompile : configCompileList) { @@ -179,7 +180,8 @@ public class MaatTestController { configCompileList.add(getConfigCompile(service)); } String jsonString = JsonMapper.toJsonString(configCompileList); - System.out.println(jsonString); + FileUtils.addStrToFile("下发的json串是"+jsonString+"\n", + Configurations.getStringProperty("maatTestLogPath", ""), true); // 保存测试配置 configSourcesService.saveMaatConfig(thread, start, configCompileList, sb); for (ConfigCompile configCompile : configCompileList) { diff --git a/src/main/java/com/nis/web/service/HiveSqlService.java b/src/main/java/com/nis/web/service/HiveSqlService.java index f57d24d..0dad3ef 100644 --- a/src/main/java/com/nis/web/service/HiveSqlService.java +++ b/src/main/java/com/nis/web/service/HiveSqlService.java @@ -127,10 +127,15 @@ public class HiveSqlService { key = key.replace("search", ""); key = key.substring(0, 1).toLowerCase() + key.substring(1); } - if (!Constants.ISUSECLICKHOUSE) {//hive写法 + if (!Constants.ISUSECLICKHOUSE) {// hive写法 if (typeName.equals("java.lang.String")) { - whereSB.append(" and " + filedAndColumnMap.get(key) + "='" - + value.toString().trim() + "'"); + String field = filedAndColumnMap.get(key); + if (field.equals("url")) { + whereSB.append( + " and " + field + " like '" + value.toString().trim() + "%'"); + } else { + whereSB.append(" and " + field + "='" + value.toString().trim() + "'"); + } } else if (typeName.equals("java.lang.Integer") || typeName.equals("int")) { whereSB.append( " and " + filedAndColumnMap.get(key) + "=" + value.toString().trim()); @@ -139,11 +144,16 @@ public class HiveSqlService { whereSB.append(" and " + filedAndColumnMap.get(key) + "=" + value.toString().trim() + "L"); } - } else {//clickhouse写法 + } else {// clickhouse写法 String type = filedsType.get(key).trim(); if (type.equals("java.lang.String")) { - whereSB.append(" and " + filedAndColumnMap.get(key).toLowerCase() + "='" - + value.toString().trim() + "'"); + String field = filedAndColumnMap.get(key).toLowerCase(); + if (field.equals("url")) { + whereSB.append( + " and " + field + " like '" + value.toString().trim() + "%'"); + } else { + whereSB.append(" and " + field + "='" + value.toString().trim() + "'"); + } } else if (type.equals("java.lang.Integer") || type.equals("int") || type.equals("java.lang.Long") || type.equals("long")) { whereSB.append(" and " + filedAndColumnMap.get(key).toLowerCase() + "=" @@ -170,8 +180,8 @@ public class HiveSqlService { } if (whereSB.length() > 0) { - int indexOf = whereSB.indexOf("and")+"and".length(); - sql.append(" where "+whereSB.substring(indexOf)); + int indexOf = whereSB.indexOf("and") + "and".length(); + sql.append(" where " + whereSB.substring(indexOf)); } if (Constants.ISUSECLICKHOUSE) { // Integer startNum = (page.getPageNo() - 1) * page.getPageSize() + 1; diff --git a/src/main/java/com/nis/web/service/restful/MaatTestServiceimpl.java b/src/main/java/com/nis/web/service/restful/MaatTestServiceimpl.java index 9dc4a32..f5ea6d8 100644 --- a/src/main/java/com/nis/web/service/restful/MaatTestServiceimpl.java +++ b/src/main/java/com/nis/web/service/restful/MaatTestServiceimpl.java @@ -159,6 +159,13 @@ public class MaatTestServiceimpl { } } String val = JedisUtils.get(key, redisDb); + FileUtils.addStrToFile("\t\t入库的key=" + key + "\n", Configurations.getStringProperty("maatTestLogPath", ""), + true); + FileUtils.addStrToFile("\t\t入库的val=" + val, Configurations.getStringProperty("maatTestLogPath", ""), + true); + FileUtils.addStrToFile("\t\t表达式" + maatXmlExpr.getValueExpression() + "\n", + Configurations.getStringProperty("maatTestLogPath", ""), true); + valArr = val.split("\\t"); break; } diff --git a/src/main/java/com/nis/web/task/SyncRedisToCluster.java b/src/main/java/com/nis/web/task/SyncRedisToCluster.java new file mode 100644 index 0000000..b90a4b4 --- /dev/null +++ b/src/main/java/com/nis/web/task/SyncRedisToCluster.java @@ -0,0 +1,173 @@ +package com.nis.web.task; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.PropertySource; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import com.nis.util.Configurations; +import com.nis.util.ExceptionUtil; +import com.nis.util.JedisUtils; +import com.nis.web.service.restful.ConfigJedisServiceimpl; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.Tuple; +import redis.clients.util.JedisClusterCRC16; + +@Component +@PropertySource(value = { "classpath:nis.properties", "classpath:jdbc.properties" }) +public class SyncRedisToCluster { + private static Logger logger = LoggerFactory.getLogger(ConfigJedisServiceimpl.class); + private static int redisStatisticsRealDBIndex = Configurations.getIntProperty("redisStatisticsRealDBIndex", 14); + + @Autowired + private JedisCluster jedisCluster; + + // @Scheduled(cron = "0/3 * * * * ?") + @Scheduled(cron = "${syncRedisToClusterCron}") + public void syncRedisToCluster() { + try { +// keys("EFFECTIVE_RULE*"); +// keys("OBSOLETE_RULE*"); + String clusterMaatVersionStr = jedisCluster.get("MAAT_VERSION"); + String redisMaatVersionStr = JedisUtils.get("MAAT_VERSION", redisStatisticsRealDBIndex); + if (clusterMaatVersionStr != null && !clusterMaatVersionStr.trim().equals("")) { + Integer clusterMaatVersion = Integer.valueOf(clusterMaatVersionStr); + Integer redisMaatVersion = Integer.valueOf(redisMaatVersionStr); + if (redisMaatVersion < clusterMaatVersion) {// 如果主从库比集群库的版本号小则下发全量 + logger.info("redis集群中的MAAT_VERSION为大于配置库中的MAAT_VERSION,开始执行全量同步"); + syncAllData(redisMaatVersionStr); + } else if (redisMaatVersion > clusterMaatVersion) {// 获取增量的数据 + logger.info("redis集群中的MAAT_VERSION为小于配置库中的MAAT_VERSION,开始执行增量同步,score是" + clusterMaatVersion + "-" + + redisMaatVersion); + syncData(clusterMaatVersion.doubleValue(), redisMaatVersion.doubleValue(), redisMaatVersionStr); + } else { + logger.info("redis集群中的MAAT_VERSION与配置库中的MAAT_VERSION相等,暂不执行配置同步操作"); + } + } else { + logger.info("redis集群中的MAAT_VERSION为null,开始执行全量同步"); + syncAllData(redisMaatVersionStr); + } + } catch (Exception e) { + logger.error("同步配置库配置到3A-redisCluster失败,失败原因:" + ExceptionUtil.getExceptionMsg(e)); + } + } + + public void deleteRedisKeyStartWith(String redisKeyStartWith) { + try { + Map clusterNodes = jedisCluster.getClusterNodes(); + for (Map.Entry entry : clusterNodes.entrySet()) { + Jedis jedis = entry.getValue().getResource(); + // 判断非从节点(因为若主从复制,从节点会跟随主节点的变化而变化) + if (!jedis.info("replication").contains("role:slave")) { + Set keys = jedis.keys(redisKeyStartWith + "*"); + if (keys.size() > 0) { + Map> map = new HashMap<>(); + for (String key : keys) { + // cluster模式执行多key操作的时候,这些key必须在同一个slot上,不然会报:JedisDataException: + // CROSSSLOT Keys in request don't hash to the same slot + int slot = JedisClusterCRC16.getSlot(key); + // 按slot将key分组,相同slot的key一起提交 + if (map.containsKey(slot)) { + map.get(slot).add(key); + } else { + ArrayList list = new ArrayList(); + list.add(key); + map.put(slot, list); + } + } + for (Map.Entry> integerListEntry : map.entrySet()) { + jedis.del(integerListEntry.getValue() + .toArray(new String[integerListEntry.getValue().size()])); + // jedisCluster.del(integerListEntry.getValue() + // .toArray(new String[integerListEntry.getValue().size()])); + + } + } + } + } + } finally { + } + } + + public TreeSet keys(String pattern) { + TreeSet keys = new TreeSet<>(); + // 获取所有的节点 + Map clusterNodes = jedisCluster.getClusterNodes(); + // 遍历节点 获取所有符合条件的KEY + for (String k : clusterNodes.keySet()) { + logger.info("Getting keys from: {}", k); + JedisPool jp = clusterNodes.get(k); + Jedis connection = jp.getResource(); + try { + keys.addAll(connection.keys(pattern)); + } catch (Exception e) { + logger.error("从{}获取{}失败,失败原因{}", k, pattern, e); + } finally { + connection.close();// 用完一定要close这个链接!!! + } + } +// for (String string : keys) { +// System.out.println(string); +// } + logger.debug("Keys gotten!"); + return keys; + } + + public void syncAllData(String version) { + // 先清空集群库配置数据 + if (jedisCluster.exists("MAAT_UPDATE_STATUS")) { + jedisCluster.del("MAAT_UPDATE_STATUS"); + logger.info("删除MAAT_UPDATE_STATUS成功"); + } + if (jedisCluster.exists("MAAT_VERSION")) { + jedisCluster.del("MAAT_VERSION"); + logger.info("删除MAAT_VERSION成功"); + } + deleteRedisKeyStartWith("EFFECTIVE_RULE*"); + logger.info("删除EFFECTIVE_RULE*成功"); + deleteRedisKeyStartWith("OBSOLETE_RULE*"); + logger.info("删除OBSOLETE_RULE*成功"); + syncData(null, null, version); + } + + public void syncData(Double min, Double max, String verion) { + Jedis resource = JedisUtils.getResource(redisStatisticsRealDBIndex); + Set zrangeByScoreWithScores = null; + if (min == null && max == null) { + zrangeByScoreWithScores = resource.zrangeByScoreWithScores("MAAT_UPDATE_STATUS", "-inf", "+inf");// 获取所有的maat_update_status + } else { + zrangeByScoreWithScores = resource.zrangeByScoreWithScores("MAAT_UPDATE_STATUS", min, max);// 获取增量的数据 + } + for (Tuple tuple : zrangeByScoreWithScores) { + String key = tuple.getElement(); + String zset = key; + double score = tuple.getScore(); + if (key.contains("ADD")) { + key = key.replace("ADD,", "EFFECTIVE_RULE:"); + } else if (key.contains("DEL")) { + key = key.replace("DEL,", "OBSOLETE_RULE:"); + } + String val = JedisUtils.get(key, redisStatisticsRealDBIndex); + if (val != null && !val.trim().equals("")) { + jedisCluster.set(key, val); + } + jedisCluster.zadd("MAAT_UPDATE_STATUS", score, zset); + // jedisCluster.incr("MAAT_VERSION"); + } + jedisCluster.set("MAAT_VERSION", verion); + logger.info("向redis集群同步数据成功"); + } + +} diff --git a/src/main/resources/applicationConfig-rule.properties b/src/main/resources/applicationConfig-rule.properties index 10903f2..6ae95c2 100644 --- a/src/main/resources/applicationConfig-rule.properties +++ b/src/main/resources/applicationConfig-rule.properties @@ -66,7 +66,7 @@ service=1:128;2:128;16:16;17:16;18:16;19:16;20:16;21:16;22:16;23:16;24:16;26:16; #0x24 隧道行为阻断 36=10:APP_COMPILE;11:APP_GROUP;12:NTC_UNIVERSAL_IP;13:NTC_UNIVERSAL_PROTO_TYPE;14:APP_POLICY,APP_SUBSCRIBE_ID;18:NTC_IP_RANGE #0x25 ASN IP阻断 -37=10:NTC_COMPILE;11:NTC_GROUP;12:NTC_ASN_IP;14:APP_SUBSCRIBE_ID;18:NTC_IP_RANGE +37=10:NTC_COMPILE;11:NTC_GROUP;12:NTC_ASN_IP;18:NTC_IP_RANGE #0x80 IP地址监测 128=10:NTC_COMPILE;11:NTC_GROUP;12:NTC_IP;14:NTC_SUBSCRIBE_ID;18:NTC_IP_RANGE @@ -107,7 +107,7 @@ service=1:128;2:128;16:16;17:16;18:16;19:16;20:16;21:16;22:16;23:16;24:16;26:16; #0x94 隧道行为监测 148=10:APP_COMPILE;11:APP_GROUP;12:NTC_UNIVERSAL_IP;13:NTC_UNIVERSAL_PROTO_TYPE;14:APP_POLICY,APP_SUBSCRIBE_ID;18:NTC_IP_RANGE #0x95 ASN IP监测 -149=10:NTC_COMPILE;11:NTC_GROUP;12:NTC_ASN_IP;14:APP_SUBSCRIBE_ID;18:NTC_IP_RANGE +149=10:NTC_COMPILE;11:NTC_GROUP;12:NTC_ASN_IP;18:NTC_IP_RANGE #0xA0 通联关系监测 monit 无配置 #0xA1 邮件泛收 monit 无配置 diff --git a/src/main/resources/applicationContext-redis.xml b/src/main/resources/applicationContext-redis.xml index 8d1d7c5..ccdb4de 100644 --- a/src/main/resources/applicationContext-redis.xml +++ b/src/main/resources/applicationContext-redis.xml @@ -25,431 +25,25 @@ - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - + + + classpath:jdbc.properties + + + + + + diff --git a/src/main/resources/jdbc.properties b/src/main/resources/jdbc.properties index 22b72c4..2939097 100644 --- a/src/main/resources/jdbc.properties +++ b/src/main/resources/jdbc.properties @@ -136,10 +136,23 @@ redis.host=10.0.6.249 #redis.host=192.168.10.215 #亦庄演示环境 #redis.host=10.3.34.1 -redis.port=6379 +redis.port=6381 redis.pass= redis.maxIdle=5 redis.maxTotal=250 redis.maxWaitMillis=100000 redis.testOnBorrow=true redis.testOnReturn=true +#客户端超时时间单位是毫秒 +redis.timeout=100000 +#用于 redis.clients.jedis.JedisCluster.JedisCluster(Set, int, int, GenericObjectPoolConfig) 第三个参数 maxRedirections +#默认值是5 +#一般当此值设置过大时,容易报:Too many Cluster redirections +redis.maxRedirects=3 + + +cluster1.host.port=192.168.10.205:7031 +cluster2.host.port=192.168.10.205:7032 +cluster3.host.port=192.168.10.205:7033 +cluster4.host.port=192.168.10.205:7034 +cluster5.host.port=192.168.10.205:7035 \ No newline at end of file diff --git a/src/main/resources/nis.properties b/src/main/resources/nis.properties index 8628ba4..94f9fae 100644 --- a/src/main/resources/nis.properties +++ b/src/main/resources/nis.properties @@ -206,4 +206,7 @@ redisStatisticsRealDBIndex=14 #maat测试程序输出日志的文件目录 maatTestLogPath=c:/maat/mmat.log #样例文件存放目录,{tableType}和{fileName}会替换成具体内容 -mmSampleDstPath=/home/mesasoft/{tableType}/full/{fileName} \ No newline at end of file +mmSampleDstPath=/home/mesasoft/{tableType}/full/{fileName} + +##定时将redis主从库的实时统计数据同步到redis集群中 +syncRedisToClusterCron=0/10 * * * * ? \ No newline at end of file diff --git a/src/main/resources/spring-mvc.xml b/src/main/resources/spring-mvc.xml index 23db0af..b8ec78b 100644 --- a/src/main/resources/spring-mvc.xml +++ b/src/main/resources/spring-mvc.xml @@ -19,7 +19,9 @@ http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/cache - http://www.springframework.org/schema/cache/spring-cache.xsd"> + http://www.springframework.org/schema/cache/spring-cache.xsd + http://www.springframework.org/schema/task + http://www.springframework.org/schema/task/spring-task.xsd"> @@ -44,6 +46,14 @@ + + + + + + + @@ -190,7 +200,7 @@ - +