1:删除37与149对14:APP_SUBSCRIBE_ID表的支持
2:为日志查询的url字段做左匹配处理 3:修改测试程序打印表达式 4:加配置同步到集群库的定时任务
This commit is contained in:
103
src/main/java/com/nis/util/JedisClusterFactory.java
Normal file
103
src/main/java/com/nis/util/JedisClusterFactory.java
Normal file
@@ -0,0 +1,103 @@
|
||||
package com.nis.util;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||
import org.springframework.beans.factory.FactoryBean;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.core.io.Resource;
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.jedis.JedisCluster;
|
||||
|
||||
public class JedisClusterFactory implements FactoryBean<JedisCluster>, InitializingBean {
|
||||
|
||||
private Resource addressConfig;
|
||||
private String addressKeyPrefix;
|
||||
|
||||
private JedisCluster jedisCluster;
|
||||
private Integer timeout;
|
||||
private Integer maxRedirections;
|
||||
private GenericObjectPoolConfig genericObjectPoolConfig;
|
||||
|
||||
private Pattern p = Pattern.compile("^.+[:]\\d{1,5}\\s*$");
|
||||
|
||||
@Override
|
||||
public JedisCluster getObject() throws Exception {
|
||||
return jedisCluster;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends JedisCluster> getObjectType() {
|
||||
return (this.jedisCluster != null ? this.jedisCluster.getClass() : JedisCluster.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSingleton() {
|
||||
return true;
|
||||
}
|
||||
|
||||
private Set<HostAndPort> parseHostAndPort() throws Exception {
|
||||
try {
|
||||
Properties prop = new Properties();
|
||||
prop.load(this.addressConfig.getInputStream());
|
||||
|
||||
Set<HostAndPort> haps = new HashSet<HostAndPort>();
|
||||
for (Object key : prop.keySet()) {
|
||||
|
||||
if (!((String) key).startsWith(addressKeyPrefix)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String val = (String) prop.get(key);
|
||||
|
||||
boolean isIpPort = p.matcher(val).matches();
|
||||
|
||||
if (!isIpPort) {
|
||||
throw new IllegalArgumentException("ip 或 port 不合法");
|
||||
}
|
||||
String[] ipAndPort = val.split(":");
|
||||
|
||||
HostAndPort hap = new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
|
||||
haps.add(hap);
|
||||
}
|
||||
|
||||
return haps;
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
throw new Exception("解析 jedis 配置文件失败", ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
Set<HostAndPort> haps = this.parseHostAndPort();
|
||||
|
||||
jedisCluster = new JedisCluster(haps, timeout, maxRedirections, genericObjectPoolConfig);
|
||||
|
||||
}
|
||||
|
||||
public void setAddressConfig(Resource addressConfig) {
|
||||
this.addressConfig = addressConfig;
|
||||
}
|
||||
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public void setMaxRedirections(int maxRedirections) {
|
||||
this.maxRedirections = maxRedirections;
|
||||
}
|
||||
|
||||
public void setAddressKeyPrefix(String addressKeyPrefix) {
|
||||
this.addressKeyPrefix = addressKeyPrefix;
|
||||
}
|
||||
|
||||
public void setGenericObjectPoolConfig(GenericObjectPoolConfig genericObjectPoolConfig) {
|
||||
this.genericObjectPoolConfig = genericObjectPoolConfig;
|
||||
}
|
||||
}
|
||||
@@ -153,7 +153,8 @@ public class MaatTestController {
|
||||
configCompileList.add(getConfigCompile(ser));
|
||||
}
|
||||
String jsonString = JsonMapper.toJsonString(configCompileList);
|
||||
System.out.println(jsonString);
|
||||
FileUtils.addStrToFile("下发的json串是"+jsonString+"\n",
|
||||
Configurations.getStringProperty("maatTestLogPath", ""), true);
|
||||
// 保存测试配置
|
||||
configSourcesService.saveMaatConfig(thread, start, configCompileList, sb);
|
||||
for (ConfigCompile configCompile : configCompileList) {
|
||||
@@ -179,7 +180,8 @@ public class MaatTestController {
|
||||
configCompileList.add(getConfigCompile(service));
|
||||
}
|
||||
String jsonString = JsonMapper.toJsonString(configCompileList);
|
||||
System.out.println(jsonString);
|
||||
FileUtils.addStrToFile("下发的json串是"+jsonString+"\n",
|
||||
Configurations.getStringProperty("maatTestLogPath", ""), true);
|
||||
// 保存测试配置
|
||||
configSourcesService.saveMaatConfig(thread, start, configCompileList, sb);
|
||||
for (ConfigCompile configCompile : configCompileList) {
|
||||
|
||||
@@ -127,10 +127,15 @@ public class HiveSqlService {
|
||||
key = key.replace("search", "");
|
||||
key = key.substring(0, 1).toLowerCase() + key.substring(1);
|
||||
}
|
||||
if (!Constants.ISUSECLICKHOUSE) {//hive写法
|
||||
if (!Constants.ISUSECLICKHOUSE) {// hive写法
|
||||
if (typeName.equals("java.lang.String")) {
|
||||
whereSB.append(" and " + filedAndColumnMap.get(key) + "='"
|
||||
+ value.toString().trim() + "'");
|
||||
String field = filedAndColumnMap.get(key);
|
||||
if (field.equals("url")) {
|
||||
whereSB.append(
|
||||
" and " + field + " like '" + value.toString().trim() + "%'");
|
||||
} else {
|
||||
whereSB.append(" and " + field + "='" + value.toString().trim() + "'");
|
||||
}
|
||||
} else if (typeName.equals("java.lang.Integer") || typeName.equals("int")) {
|
||||
whereSB.append(
|
||||
" and " + filedAndColumnMap.get(key) + "=" + value.toString().trim());
|
||||
@@ -139,11 +144,16 @@ public class HiveSqlService {
|
||||
whereSB.append(" and " + filedAndColumnMap.get(key) + "="
|
||||
+ value.toString().trim() + "L");
|
||||
}
|
||||
} else {//clickhouse写法
|
||||
} else {// clickhouse写法
|
||||
String type = filedsType.get(key).trim();
|
||||
if (type.equals("java.lang.String")) {
|
||||
whereSB.append(" and " + filedAndColumnMap.get(key).toLowerCase() + "='"
|
||||
+ value.toString().trim() + "'");
|
||||
String field = filedAndColumnMap.get(key).toLowerCase();
|
||||
if (field.equals("url")) {
|
||||
whereSB.append(
|
||||
" and " + field + " like '" + value.toString().trim() + "%'");
|
||||
} else {
|
||||
whereSB.append(" and " + field + "='" + value.toString().trim() + "'");
|
||||
}
|
||||
} else if (type.equals("java.lang.Integer") || type.equals("int")
|
||||
|| type.equals("java.lang.Long") || type.equals("long")) {
|
||||
whereSB.append(" and " + filedAndColumnMap.get(key).toLowerCase() + "="
|
||||
@@ -170,8 +180,8 @@ public class HiveSqlService {
|
||||
}
|
||||
|
||||
if (whereSB.length() > 0) {
|
||||
int indexOf = whereSB.indexOf("and")+"and".length();
|
||||
sql.append(" where "+whereSB.substring(indexOf));
|
||||
int indexOf = whereSB.indexOf("and") + "and".length();
|
||||
sql.append(" where " + whereSB.substring(indexOf));
|
||||
}
|
||||
if (Constants.ISUSECLICKHOUSE) {
|
||||
// Integer startNum = (page.getPageNo() - 1) * page.getPageSize() + 1;
|
||||
|
||||
@@ -159,6 +159,13 @@ public class MaatTestServiceimpl {
|
||||
}
|
||||
}
|
||||
String val = JedisUtils.get(key, redisDb);
|
||||
FileUtils.addStrToFile("\t\t入库的key=" + key + "\n", Configurations.getStringProperty("maatTestLogPath", ""),
|
||||
true);
|
||||
FileUtils.addStrToFile("\t\t入库的val=" + val, Configurations.getStringProperty("maatTestLogPath", ""),
|
||||
true);
|
||||
FileUtils.addStrToFile("\t\t表达式" + maatXmlExpr.getValueExpression() + "\n",
|
||||
Configurations.getStringProperty("maatTestLogPath", ""), true);
|
||||
|
||||
valArr = val.split("\\t");
|
||||
break;
|
||||
}
|
||||
|
||||
173
src/main/java/com/nis/web/task/SyncRedisToCluster.java
Normal file
173
src/main/java/com/nis/web/task/SyncRedisToCluster.java
Normal file
@@ -0,0 +1,173 @@
|
||||
package com.nis.web.task;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.nis.util.Configurations;
|
||||
import com.nis.util.ExceptionUtil;
|
||||
import com.nis.util.JedisUtils;
|
||||
import com.nis.web.service.restful.ConfigJedisServiceimpl;
|
||||
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisCluster;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.Tuple;
|
||||
import redis.clients.util.JedisClusterCRC16;
|
||||
|
||||
@Component
|
||||
@PropertySource(value = { "classpath:nis.properties", "classpath:jdbc.properties" })
|
||||
public class SyncRedisToCluster {
|
||||
private static Logger logger = LoggerFactory.getLogger(ConfigJedisServiceimpl.class);
|
||||
private static int redisStatisticsRealDBIndex = Configurations.getIntProperty("redisStatisticsRealDBIndex", 14);
|
||||
|
||||
@Autowired
|
||||
private JedisCluster jedisCluster;
|
||||
|
||||
// @Scheduled(cron = "0/3 * * * * ?")
|
||||
@Scheduled(cron = "${syncRedisToClusterCron}")
|
||||
public void syncRedisToCluster() {
|
||||
try {
|
||||
// keys("EFFECTIVE_RULE*");
|
||||
// keys("OBSOLETE_RULE*");
|
||||
String clusterMaatVersionStr = jedisCluster.get("MAAT_VERSION");
|
||||
String redisMaatVersionStr = JedisUtils.get("MAAT_VERSION", redisStatisticsRealDBIndex);
|
||||
if (clusterMaatVersionStr != null && !clusterMaatVersionStr.trim().equals("")) {
|
||||
Integer clusterMaatVersion = Integer.valueOf(clusterMaatVersionStr);
|
||||
Integer redisMaatVersion = Integer.valueOf(redisMaatVersionStr);
|
||||
if (redisMaatVersion < clusterMaatVersion) {// 如果主从库比集群库的版本号小则下发全量
|
||||
logger.info("redis集群中的MAAT_VERSION为大于配置库中的MAAT_VERSION,开始执行全量同步");
|
||||
syncAllData(redisMaatVersionStr);
|
||||
} else if (redisMaatVersion > clusterMaatVersion) {// 获取增量的数据
|
||||
logger.info("redis集群中的MAAT_VERSION为小于配置库中的MAAT_VERSION,开始执行增量同步,score是" + clusterMaatVersion + "-"
|
||||
+ redisMaatVersion);
|
||||
syncData(clusterMaatVersion.doubleValue(), redisMaatVersion.doubleValue(), redisMaatVersionStr);
|
||||
} else {
|
||||
logger.info("redis集群中的MAAT_VERSION与配置库中的MAAT_VERSION相等,暂不执行配置同步操作");
|
||||
}
|
||||
} else {
|
||||
logger.info("redis集群中的MAAT_VERSION为null,开始执行全量同步");
|
||||
syncAllData(redisMaatVersionStr);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("同步配置库配置到3A-redisCluster失败,失败原因:" + ExceptionUtil.getExceptionMsg(e));
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteRedisKeyStartWith(String redisKeyStartWith) {
|
||||
try {
|
||||
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
|
||||
for (Map.Entry<String, JedisPool> entry : clusterNodes.entrySet()) {
|
||||
Jedis jedis = entry.getValue().getResource();
|
||||
// 判断非从节点(因为若主从复制,从节点会跟随主节点的变化而变化)
|
||||
if (!jedis.info("replication").contains("role:slave")) {
|
||||
Set<String> keys = jedis.keys(redisKeyStartWith + "*");
|
||||
if (keys.size() > 0) {
|
||||
Map<Integer, List<String>> map = new HashMap<>();
|
||||
for (String key : keys) {
|
||||
// cluster模式执行多key操作的时候,这些key必须在同一个slot上,不然会报:JedisDataException:
|
||||
// CROSSSLOT Keys in request don't hash to the same slot
|
||||
int slot = JedisClusterCRC16.getSlot(key);
|
||||
// 按slot将key分组,相同slot的key一起提交
|
||||
if (map.containsKey(slot)) {
|
||||
map.get(slot).add(key);
|
||||
} else {
|
||||
ArrayList<String> list = new ArrayList<String>();
|
||||
list.add(key);
|
||||
map.put(slot, list);
|
||||
}
|
||||
}
|
||||
for (Map.Entry<Integer, List<String>> integerListEntry : map.entrySet()) {
|
||||
jedis.del(integerListEntry.getValue()
|
||||
.toArray(new String[integerListEntry.getValue().size()]));
|
||||
// jedisCluster.del(integerListEntry.getValue()
|
||||
// .toArray(new String[integerListEntry.getValue().size()]));
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
}
|
||||
}
|
||||
|
||||
public TreeSet<String> keys(String pattern) {
|
||||
TreeSet<String> keys = new TreeSet<>();
|
||||
// 获取所有的节点
|
||||
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
|
||||
// 遍历节点 获取所有符合条件的KEY
|
||||
for (String k : clusterNodes.keySet()) {
|
||||
logger.info("Getting keys from: {}", k);
|
||||
JedisPool jp = clusterNodes.get(k);
|
||||
Jedis connection = jp.getResource();
|
||||
try {
|
||||
keys.addAll(connection.keys(pattern));
|
||||
} catch (Exception e) {
|
||||
logger.error("从{}获取{}失败,失败原因{}", k, pattern, e);
|
||||
} finally {
|
||||
connection.close();// 用完一定要close这个链接!!!
|
||||
}
|
||||
}
|
||||
// for (String string : keys) {
|
||||
// System.out.println(string);
|
||||
// }
|
||||
logger.debug("Keys gotten!");
|
||||
return keys;
|
||||
}
|
||||
|
||||
public void syncAllData(String version) {
|
||||
// 先清空集群库配置数据
|
||||
if (jedisCluster.exists("MAAT_UPDATE_STATUS")) {
|
||||
jedisCluster.del("MAAT_UPDATE_STATUS");
|
||||
logger.info("删除MAAT_UPDATE_STATUS成功");
|
||||
}
|
||||
if (jedisCluster.exists("MAAT_VERSION")) {
|
||||
jedisCluster.del("MAAT_VERSION");
|
||||
logger.info("删除MAAT_VERSION成功");
|
||||
}
|
||||
deleteRedisKeyStartWith("EFFECTIVE_RULE*");
|
||||
logger.info("删除EFFECTIVE_RULE*成功");
|
||||
deleteRedisKeyStartWith("OBSOLETE_RULE*");
|
||||
logger.info("删除OBSOLETE_RULE*成功");
|
||||
syncData(null, null, version);
|
||||
}
|
||||
|
||||
public void syncData(Double min, Double max, String verion) {
|
||||
Jedis resource = JedisUtils.getResource(redisStatisticsRealDBIndex);
|
||||
Set<Tuple> zrangeByScoreWithScores = null;
|
||||
if (min == null && max == null) {
|
||||
zrangeByScoreWithScores = resource.zrangeByScoreWithScores("MAAT_UPDATE_STATUS", "-inf", "+inf");// 获取所有的maat_update_status
|
||||
} else {
|
||||
zrangeByScoreWithScores = resource.zrangeByScoreWithScores("MAAT_UPDATE_STATUS", min, max);// 获取增量的数据
|
||||
}
|
||||
for (Tuple tuple : zrangeByScoreWithScores) {
|
||||
String key = tuple.getElement();
|
||||
String zset = key;
|
||||
double score = tuple.getScore();
|
||||
if (key.contains("ADD")) {
|
||||
key = key.replace("ADD,", "EFFECTIVE_RULE:");
|
||||
} else if (key.contains("DEL")) {
|
||||
key = key.replace("DEL,", "OBSOLETE_RULE:");
|
||||
}
|
||||
String val = JedisUtils.get(key, redisStatisticsRealDBIndex);
|
||||
if (val != null && !val.trim().equals("")) {
|
||||
jedisCluster.set(key, val);
|
||||
}
|
||||
jedisCluster.zadd("MAAT_UPDATE_STATUS", score, zset);
|
||||
// jedisCluster.incr("MAAT_VERSION");
|
||||
}
|
||||
jedisCluster.set("MAAT_VERSION", verion);
|
||||
logger.info("向redis集群同步数据成功");
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user