集成Nacos动态获取schema功能

This commit is contained in:
qidaijie
2022-04-19 14:16:38 +08:00
parent 2b16bd7fce
commit 3df5d8c51e
15 changed files with 674 additions and 376 deletions

18
pom.xml
View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-stream-voip-relation</artifactId>
<version>220316-encryption</version>
<version>220418-Nacos</version>
<name>log-stream-voip-relation</name>
<url>http://www.example.com</url>
@@ -38,6 +38,8 @@
<hadoop.version>2.7.1</hadoop.version>
<kafka.version>1.0.0</kafka.version>
<hbase.version>2.2.3</hbase.version>
<nacos.version>1.2.0</nacos.version>
<zdjz.tools.version>1.0.8</zdjz.tools.version>
<scope.type>provided</scope.type>
<!--<scope.type>compile</scope.type>-->
</properties>
@@ -113,16 +115,11 @@
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.8</version>
<version>${zdjz.tools.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -236,6 +233,13 @@
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.nacos/nacos-client -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
<dependency>
<groupId>org.jasypt</groupId>

View File

@@ -27,14 +27,29 @@ buffer.memory=134217728
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
#10M
max.request.size=10485760
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none
#生产者ack
producer.ack=1
#====================kafka default====================#
#kafka SASL验证用户名-加密
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
#kafka SASL及SSL验证密码-加密
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#====================nacos default====================#
#nacos username
nacos.username=nacos
#nacos password
nacos.pin=nacos
#nacos group
nacos.group=Galaxy
#====================Topology Default====================#
#check ip is Inner network;0 off, 1 on.
check.inner.network=1
check.inner.network=0

View File

@@ -10,11 +10,17 @@ sink.kafka.servers=192.168.44.12:9094
#定位库地址
tools.library=D:\\workerspace\\dat\\
#网关的schema位置
schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/voip_record
#--------------------------------nacos配置------------------------------#
#nacos 地址
nacos.server=192.168.44.12:8848
#nacos namespace
nacos.schema.namespace=flink
#nacos data id
nacos.data.id=voip_record.json
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
source.kafka.topic=test
@@ -24,22 +30,15 @@ sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=mytest-1
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none
#生产者ack
producer.ack=1
#--------------------------------topology配置------------------------------#
#map函数并行度
window.parallelism=1
#voip日志对准窗口时间 seconds
voip.calibration.window.time=30
voip.calibration.window.time=60
#单向流对准窗口时间 seconds
one.sided.window.time=5
one.sided.window.time=10
#voip二次对准时间 seconds
sec.combine.sr.cache.secs=180
sec.combine.sr.cache.secs=120

View File

@@ -23,6 +23,17 @@ public class VoipRelationConfig {
public static final String VISIBILITY = "disabled";
public static final String FORMAT_SPLITTER = ",";
/**
* Nacos
*/
public static final String NACOS_SERVER = VoipRelationConfigurations.getStringProperty(0, "nacos.server");
public static final String NACOS_SCHEMA_NAMESPACE = VoipRelationConfigurations.getStringProperty(0, "nacos.schema.namespace");
public static final String NACOS_DATA_ID = VoipRelationConfigurations.getStringProperty(0, "nacos.data.id");
public static final String NACOS_PIN = VoipRelationConfigurations.getStringProperty(1, "nacos.pin");
public static final String NACOS_GROUP = VoipRelationConfigurations.getStringProperty(1, "nacos.group");
public static final String NACOS_USERNAME = VoipRelationConfigurations.getStringProperty(1, "nacos.username");
/**
* System
*/
@@ -38,8 +49,8 @@ public class VoipRelationConfig {
public static final String SOURCE_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "source.kafka.topic");
public static final String SINK_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String GROUP_ID = VoipRelationConfigurations.getStringProperty(0, "group.id");
public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(0, "producer.ack");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(0, "producer.kafka.compression.type");
public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(1, "producer.ack");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(1, "producer.kafka.compression.type");
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(VoipRelationConfigurations.getStringProperty(1, "kafka.user"));
public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(VoipRelationConfigurations.getStringProperty(1, "kafka.pin"));

View File

@@ -2,11 +2,11 @@ package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.ip.IPUtils;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
@@ -27,31 +27,32 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
/**
* key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流)
*/
private static HashMap<String, String> sipOriHmList = new HashMap<>(16);
private static HashMap<String, String> sipOriHmList = new HashMap<>(32);
/**
* key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流)
*/
private static HashMap<String, String> rtpOriHmList = new HashMap<>(16);
private static HashMap<String, String> rtpOriHmList = new HashMap<>(32);
@Override
@SuppressWarnings("unchecked")
public void process(Context context, Iterable<String> inputs, Collector<Tuple3<String, String, String>> out) throws Exception {
public void process(Context context, Iterable<String> inputs, Collector<Tuple3<String, String, String>> out) {
for (String input : inputs) {
if (StringUtil.isNotBlank(input)) {
JSONObject object = JSONObject.parseObject(input);
String commonSchemaType = object.getString(JsonProConfig.SCHEMA_TYPE);
String sipCallId = object.getString(JsonProConfig.SIP_CALL_ID);
try {
Map<String, Object> object = JsonParseUtil.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(input, Map.class));
String commonSchemaType = JsonParseUtil.getString(object, JsonProConfig.SCHEMA_TYPE);
String sipCallId = JsonParseUtil.getString(object, JsonProConfig.SIP_CALL_ID);
//1c2s2s2c3double
int commonStreamDir = object.getInteger(JsonProConfig.STREAM_DIR);
int commonStreamDir = JsonParseUtil.getInteger(object, JsonProConfig.STREAM_DIR);
/*
* 针对SIP日志进行处理
*/
if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) {
if (checkSipCompleteness(object)) {
if (relationUtils.checkSipCompleteness(object)) {
if (commonStreamDir != JsonProConfig.DOUBLE) {
putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out);
} else {
@@ -66,11 +67,12 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
* 针对RTP日志进行处理
*/
if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
String clientIP = JsonParseUtil.getString(object, JsonProConfig.CLIENT_IP);
int clientPort = JsonParseUtil.getInteger(object, JsonProConfig.CLIENT_PORT);
String ServerIP = JsonParseUtil.getString(object, JsonProConfig.SERVER_IP);
int ServerPort = JsonParseUtil.getInteger(object, JsonProConfig.SERVER_PORT);
String rtpIpPort4Key = getFourKey(object.getString(JsonProConfig.CLIENT_IP),
object.getInteger(JsonProConfig.CLIENT_PORT),
object.getString(JsonProConfig.SERVER_IP),
object.getInteger(JsonProConfig.SERVER_PORT));
String rtpIpPort4Key = getFourKey(clientIP, clientPort, ServerIP, ServerPort);
if (commonStreamDir != JsonProConfig.DOUBLE) {
//对rtp单向流进行关联
@@ -81,13 +83,16 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input));
}
}
} catch (RuntimeException e) {
logger.error("parsing JSON or Unidirectional data flow fusion has exception! error is :" + e);
}
}
}
/*
* 定时发送SIP或RTP未关联上数据
* 定时发送SIP未关联上数据
*/
if (sipOriHmList.size() > 0) {
HashMap<String, String> tmpSipOriHmList = new HashMap<String, String>(sipOriHmList);
HashMap<String, String> tmpSipOriHmList = new HashMap<>(sipOriHmList);
sipOriHmList.clear();
for (String sipKey : tmpSipOriHmList.keySet()) {
String sipSingleMsg = tmpSipOriHmList.get(sipKey);
@@ -95,9 +100,11 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
out.collect(new Tuple3<>(sipKey, "sip-single", sipSingleMsg));
}
}
/*
* 定时发送RTP未关联上数据
*/
if (rtpOriHmList.size() > 0) {
HashMap<String, String> tmpRtpOriHmList = new HashMap<String, String>(rtpOriHmList);
HashMap<String, String> tmpRtpOriHmList = new HashMap<>(rtpOriHmList);
rtpOriHmList.clear();
for (String rtpKey : tmpRtpOriHmList.keySet()) {
String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
@@ -110,41 +117,40 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
/**
* 存放key并关联拼接对应Key
*/
@SuppressWarnings("unchecked")
private static void putKeyAndMsg(String message, String hmStrKey, HashMap<String, String> hashMapStr, String protocolType, Collector<Tuple3<String, String, String>> out) {
//和上次存入的数据关联
if (hashMapStr.containsKey(hmStrKey)) {
JSONObject jsonCombinObject = new JSONObject();
HashMap<String, Object> jsonCommonMap = new HashMap<>(32);
String[] strArr = new String[2];
String firstMsg = hashMapStr.remove(hmStrKey);
JSONObject firstSipOrRtpLog = JSONObject.parseObject(firstMsg);
JSONObject secendSipOrRtpLog = JSONObject.parseObject(message);
Map<String, Object> firstSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(firstMsg, Map.class);
Map<String, Object> secondSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
//1c2s2s2c3double,1表示firstMsg为请求侧(c2s),合并时以它为准
if (firstSipOrRtpLog.getInteger(JsonProConfig.STREAM_DIR) == 1) {
if (JsonParseUtil.getInteger(firstSipOrRtpLog, JsonProConfig.STREAM_DIR) == 1) {
strArr[0] = message;
strArr[1] = firstMsg;
} else {
strArr[0] = firstMsg;
strArr[1] = message;
}
jsonCombinObject.putAll(JSONObject.parseObject(strArr[0]));
jsonCombinObject.putAll(JSONObject.parseObject(strArr[1]));
String sipTwoMsg = jsonCombinObject.toString();
jsonCommonMap.putAll((Map<String, Object>) JsonMapper.fromJsonString(strArr[0], Map.class));
jsonCommonMap.putAll((Map<String, Object>) JsonMapper.fromJsonString(strArr[1], Map.class));
String sipTwoMsg = jsonCommonMap.toString();
JSONObject sipOrRtpCombin = JSONObject.parseObject(sipTwoMsg);
accumulateMsg(firstSipOrRtpLog, secendSipOrRtpLog, sipOrRtpCombin);
Map<String, Object> sipOrRtpCombin = (Map<String, Object>) JsonMapper.fromJsonString(sipTwoMsg, Map.class);
accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin);
sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
if (JsonProConfig.SIP_MARK.equals(protocolType)) {
//手动关联SIP后区分内外网IP再下发
separateInnerIp(sipOrRtpCombin, out);
} else if (JsonProConfig.RTP_MARK.equals(protocolType)) {
//手动关联RTP后按四元组下发
sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPath(firstSipOrRtpLog, secendSipOrRtpLog));
out.collect(new Tuple3<>(hmStrKey, "rtp-two", JSONObject.toJSONString(sipOrRtpCombin)));
sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog));
out.collect(new Tuple3<>(hmStrKey, "rtp-two", JsonMapper.toJsonString(sipOrRtpCombin)));
}
} else {
hashMapStr.put(hmStrKey, message);
@@ -154,16 +160,15 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
/**
* 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP
*/
private static void separateInnerIp(JSONObject object, Collector<Tuple3<String, String, String>> out) {
private static void separateInnerIp(Map<String, Object> object, Collector<Tuple3<String, String, String>> out) {
String sipOriginatorIp = object.getString(JsonProConfig.SIP_ORIGINATOR_IP);
String sipResponderIp = object.getString(JsonProConfig.SIP_RESPONDER_IP);
int sipOriginatorPort = object.getInteger(JsonProConfig.SIP_ORIGINATOR_PORT);
int sipResponderPort = object.getInteger(JsonProConfig.SIP_RESPONDER_PORT);
String sipOriginatorIp = JsonParseUtil.getString(object, JsonProConfig.SIP_ORIGINATOR_IP);
int sipOriginatorPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_ORIGINATOR_PORT);
String sipResponderIp = JsonParseUtil.getString(object, JsonProConfig.SIP_RESPONDER_IP);
int sipResponderPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_RESPONDER_PORT);
if (IPUtils.isInnerIp(sipOriginatorIp)
|| IPUtils.isInnerIp(sipResponderIp)) {
/**
if (relationUtils.isInnerIp(sipOriginatorIp) || relationUtils.isInnerIp(sipResponderIp)) {
/*
* 按from-ip_from-port_to-ip_to-port
*/
String sipInnerEmitKey = sipOriginatorIp + VoipRelationConfig.CORRELATION_STR
@@ -171,7 +176,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
+ sipResponderIp + VoipRelationConfig.CORRELATION_STR
+ sipResponderPort;
//包含内网IP的SIP关联后数据
out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JSONObject.toJSONString(object)));
out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JsonMapper.toJsonString(object)));
} else {
String sipIpPort4Key = getFourKey(sipOriginatorIp,
sipOriginatorPort,
@@ -179,7 +184,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
sipResponderPort);
//按照四元组的Key发送到下一个bolt
out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JSONObject.toJSONString(object)));
out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JsonMapper.toJsonString(object)));
}
}
@@ -216,7 +221,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
break;
//common_client_port = commonServerPort,开始按照IP比较
case 0:
ipPort4Key = compareIp(commonClientIp, commonServerIp, commonClientPort, commonServerPort);
ipPort4Key = compareQuadruple(commonClientIp, commonServerIp, commonClientPort, commonServerPort);
break;
//port端口值异常
case -2:
@@ -231,93 +236,83 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
}
/**
* 比较IP,并作key的拼接
* 比较四元组,拼接后作为key使用
*
* @param commonClientIp
* @param commonServerIp
* @param commonClientPort
* @param commonServerPort
* @return
* @param clientIp 客户端IP
* @param serverIp 服务端IP
* @param clientPort 客户端端口
* @param serverPort 服务端端口
* @return 比较后拼接的四元组key 或 异常后返回空值
*/
private static String compareIp(String commonClientIp, String commonServerIp, int commonClientPort, int commonServerPort) {
long clientIpNum = IPUtils.ipToLong(commonClientIp);
long serverIpNum = IPUtils.ipToLong(commonServerIp);
private static String compareQuadruple(String clientIp, String serverIp, int clientPort, int serverPort) {
long clientIpNum = IPUtil.getIpHostDesimal(clientIp);
long serverIpNum = IPUtil.getIpHostDesimal(serverIp);
int compareIpResult = compareNum(clientIpNum, serverIpNum);
switch (compareIpResult) {
//clientIpNum > serverIpNum
case 1:
return commonServerIp + VoipRelationConfig.CORRELATION_STR
+ commonServerPort + VoipRelationConfig.CORRELATION_STR
+ commonClientIp + VoipRelationConfig.CORRELATION_STR
+ commonClientPort;
return serverIp + VoipRelationConfig.CORRELATION_STR
+ serverPort + VoipRelationConfig.CORRELATION_STR
+ clientIp + VoipRelationConfig.CORRELATION_STR
+ clientPort;
//clientIpNum < serverIpNum
case -1:
return commonClientIp + VoipRelationConfig.CORRELATION_STR
+ commonClientPort + VoipRelationConfig.CORRELATION_STR
+ commonServerIp + VoipRelationConfig.CORRELATION_STR
+ commonServerPort;
return clientIp + VoipRelationConfig.CORRELATION_STR
+ clientPort + VoipRelationConfig.CORRELATION_STR
+ serverIp + VoipRelationConfig.CORRELATION_STR
+ serverPort;
//clientIpNum = serverIpNum,说明两个IP值一样即IP异常
case 0:
//IP值异常
case -2:
default:
logger.error("compareNum is error," +
"common_client_ip:" + commonClientIp + "," +
"commonServerIp:" + commonServerIp + "," +
"commonClientPort:" + commonClientPort + "," +
"commonServerPort:" + commonServerPort);
"common_client_ip:" + clientIp + "," +
"commonServerIp:" + serverIp + "," +
"commonClientPort:" + clientPort + "," +
"commonServerPort:" + serverPort);
return "";
}
}
/**
* 计算相关字节信息,主要是累加
* 对SIP单向流日志的指标数据进行聚合。
*
* @param firstSipOrRtpLog
* @param secendSipOrRtpLog
* @param sipOrRtpCombin
* @param firstSipOrRtpLog 第一条单向流日志
* @param secondSipOrRtpLog 第二条单向流日志
* @param sipOrRtpCombin SIP双向流日志集合
*/
private static void accumulateMsg(JSONObject firstSipOrRtpLog, JSONObject secendSipOrRtpLog, JSONObject sipOrRtpCombin) {
private static void accumulateMsg(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secondSipOrRtpLog, Map<String, Object> sipOrRtpCombin) {
//common_sessions
sipOrRtpCombin.put(JsonProConfig.SESSIONS, (firstSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS) + secendSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
sipOrRtpCombin.put(JsonProConfig.C2S_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM);
//common_s2c_pkt_num
sipOrRtpCombin.put(JsonProConfig.S2C_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM);
//common_c2s_byte_num
sipOrRtpCombin.put(JsonProConfig.C2S_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM);
//common_s2c_byte_num
sipOrRtpCombin.put(JsonProConfig.S2C_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM);
//common_c2s_ipfrag_num
sipOrRtpCombin.put(JsonProConfig.C2S_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM);
//common_s2c_ipfrag_num
sipOrRtpCombin.put(JsonProConfig.S2C_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM);
//common_c2s_tcp_lostlen
sipOrRtpCombin.put(JsonProConfig.C2S_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN);
//common_s2c_tcp_lostlen
sipOrRtpCombin.put(JsonProConfig.S2C_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN);
//common_c2s_tcp_unorder_num
sipOrRtpCombin.put(JsonProConfig.C2S_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM);
//common_s2c_tcp_unorder_num
sipOrRtpCombin.put(JsonProConfig.S2C_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM)));
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM);
}
/**
* int类型
* 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
*
* @param numOne
* @param numTwo
* @param numOne 数值1
* @param numTwo 数值2
*/
private static int compareNum(int numOne, int numTwo) {
if (numOne > 0 && numTwo > 0) {
@@ -331,8 +326,8 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
* long类型
* 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
*
* @param numOne
* @param numTwo
* @param numOne 数值1
* @param numTwo 数值2
*/
private static int compareNum(long numOne, long numTwo) {
if (numOne > 0 && numTwo > 0) {
@@ -349,28 +344,21 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
* @param secendSipOrRtpLog 第二个单向流日志
* @return 文件路径
*/
private static String setRtpPath(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secendSipOrRtpLog) {
private static String setRtpPacpPath(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secendSipOrRtpLog) {
String firstRtpPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
String secendRtpPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
String firstPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
String secondPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
if (StringUtil.isNotBlank(firstRtpPcapPath) && StringUtil.isNotBlank(secendRtpPcapPath)) {
if (firstRtpPcapPath.equals(secendRtpPcapPath)) {
return firstRtpPcapPath;
if (StringUtil.isNotBlank(firstPcapPath) && StringUtil.isNotBlank(secondPcapPath)) {
if (firstPcapPath.equals(secondPcapPath)) {
return firstPcapPath;
} else {
return firstRtpPcapPath + ";" + secendRtpPcapPath;
return firstPcapPath + ";" + secondPcapPath;
}
} else if (StringUtil.isNotBlank(firstRtpPcapPath)) {
return firstRtpPcapPath;
} else if (StringUtil.isNotBlank(firstPcapPath)) {
return firstPcapPath;
} else {
return secendRtpPcapPath;
return secondPcapPath;
}
}
private static boolean checkSipCompleteness(JSONObject object) {
return object.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) &&
object.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) &&
object.containsKey(JsonProConfig.SIP_RESPONDER_IP) &&
object.containsKey(JsonProConfig.SIP_RESPONDER_PORT);
}
}

View File

@@ -2,9 +2,9 @@ package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -12,10 +12,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFuncti
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
/**
* @author qidaijie
@@ -26,16 +23,6 @@ import java.util.List;
public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple3<String, String, String>, String, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
* 实体类反射map
*/
private static HashMap<String, Class> classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP);
/**
* 反射成一个类
*/
private static Object voipObject = JsonParseUtil.generateObject(classMap);
/**
* 关联用HashMap
* key---四元组
@@ -96,6 +83,7 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
*
* @param combineHmList
*/
@SuppressWarnings("unchecked")
private void tickCombineHmList(HashMap<String, LinkedList<String>> combineHmList, Collector<String> output) {
if (combineHmList.size() > 0) {
long nowTime = System.currentTimeMillis() / 1000;
@@ -109,12 +97,11 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
//包含SIP和RTP
int listSize = tempList.size();
if (listSize > 1) {
List<String> sipBeanArr = new ArrayList<>();
List<String> rtpBeanArr = new ArrayList<>();
for (String message : tempList) {
Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass());
Map<String, Object> tempSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE);
if (JsonProConfig.SIP_MARK.equals(schemaType)) {
sipBeanArr.add(message);
@@ -127,25 +114,23 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
if (rtpSize == 1 && sipSize >= 1) {
for (String sipMessage : sipBeanArr) {
Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass());
Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass());
accumulateVoipMsg(voipLog, rtpLog);
JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
Map<String, Object> rtpLog = (Map<String, Object>) JsonMapper.fromJsonString(rtpBeanArr.get(0), Map.class);
Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(sipMessage, Map.class);
accumulateVoipMsg(voIpLog, rtpLog);
JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog));
//四元组,voip,关联后的数据
output.collect(mergeJson(voipLog, rtpLog));
// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
output.collect(mergeJson(voIpLog, rtpLog));
}
} else if (sipSize == 1 && rtpSize >= 1) {
for (String rtpMessage : rtpBeanArr) {
Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass());
Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass());
accumulateVoipMsg(voipLog, rtpLog);
JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
Map<String, Object> rtpLog = (Map<String, Object>) JsonMapper.fromJsonString(rtpMessage, Map.class);
Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(sipBeanArr.get(0), Map.class);
accumulateVoipMsg(voIpLog, rtpLog);
JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog));
//四元组,voip,关联后的数据
output.collect(mergeJson(voipLog, rtpLog));
// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
output.collect(mergeJson(voIpLog, rtpLog));
}
} else {
logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
@@ -155,15 +140,14 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
} else {
String msg = tempList.get(0);
Object voipLog = JSONObject.parseObject(msg, voipObject.getClass());
long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME);
Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(msg, Map.class);
long commonEndTime = JsonParseUtil.getLong(voIpLog, JsonProConfig.END_TIME);
long intervalTime = nowTime - commonEndTime;
logger.error("VoIP日志时间差值记录" + intervalTime);
if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) {
putKeyAndMsg(msg, fourStrKey, secCombineSRHmList);
} else {
sendDirectlyOneElement(msg, voipLog, output);
sendDirectlyOneElement(msg, voIpLog, output);
}
}
}
@@ -173,83 +157,44 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
/**
* 累加关联后的字节类参数值
*
* @param voipLog
* @param rtpLog
* @param voIpLog 融合后voip日志
* @param rtpLog RTP日志
*/
private void accumulateVoipMsg(Object voipLog, Object rtpLog) {
Long sumCommonSessions = JsonParseUtil.getLong(voipLog, JsonProConfig.SESSIONS)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.SESSIONS);
Long sumCommonC2sPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_PKT_NUM)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_PKT_NUM);
Long sumCommonS2cPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_PKT_NUM)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_PKT_NUM);
Long sumCommonC2sByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_BYTE_NUM)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_BYTE_NUM);
Long sumCommonS2cByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_BYTE_NUM)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_BYTE_NUM);
Long sumCommonC2sIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_IPFRAG_NUM)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
Long sumCommonS2cIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_IPFRAG_NUM)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
Long sumCommonC2sTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_LOSTLEN)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
Long sumCommonS2cTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_LOSTLEN)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
Long sumCommonC2sTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
Long sumCommonS2cTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM)
+ JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
private void accumulateVoipMsg(Map<String, Object> voIpLog, Map<String, Object> rtpLog) {
//common_sessions
JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_PKT_NUM);
//common_s2c_pkt_num
JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_PKT_NUM);
//common_c2s_byte_num
JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_BYTE_NUM);
//common_s2c_byte_num
JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_BYTE_NUM);
//common_c2s_ipfrag_num
JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
//common_s2c_ipfrag_num
JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
//common_c2s_tcp_lostlen
JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
//common_s2c_tcp_lostlen
JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
//common_c2s_tcp_unorder_num
JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
//common_s2c_tcp_unorder_num
JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum);
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
}
/**
* 定时处理中List元素数仅为1的情况
*/
private void sendDirectlyOneElement(String msg, Object voipLog, Collector<String> output) {
private void sendDirectlyOneElement(String msg, Map<String, Object> voIpLog, Collector<String> output) {
//四元组,sip(一定为双侧)/rtp(可能为单侧也可能为双侧,看单向流字段信息),拿出来的原始数据
String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE);
String commonSchemaType = JsonParseUtil.getString(voIpLog, JsonProConfig.SCHEMA_TYPE);
if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
output.collect(msg);
} else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR);
int commonStreamDir = JsonParseUtil.getInteger(voIpLog, JsonProConfig.STREAM_DIR);
if (commonStreamDir != JsonProConfig.DOUBLE) {
output.collect(msg);
} else {
@@ -278,14 +223,14 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
* 判断RTP主叫方向-测试
*
* @param rtpLog RTP原始日志
* @param voipLog 融合后VOIP日志
* @param voIpLog 融合后VOIP日志
* @return 方向 0未知 1c2s 2s2c
*/
private static int judgeDirection(Object rtpLog, Object voipLog) {
private static int judgeDirection(Map<String, Object> rtpLog, Map<String, Object> voIpLog) {
String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP);
String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP);
String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP);
String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP);
if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
return 1;
@@ -308,19 +253,19 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
}
/**
*
* 发送VOIP日志到Kafka
*/
private static String mergeJson(Object voipLog, Object rtpLog) {
private static String mergeJson(Map<String, Object> voIpLog, Map<String, Object> rtpLog) {
int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S);
int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH);
JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
return JSONObject.toJSONString(voipLog);
return JsonMapper.toJsonString(voIpLog);
}
}

View File

@@ -0,0 +1,81 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2022/4/1911:30
*/
class relationUtils {
/**
* 将A日志内的某指标数据与B日志进行累加计算,并写入A日志中
*
* @param firstLog A日志
* @param secondLog B日志
* @param key 指标 json key
*/
static void metricSum(Map<String, Object> firstLog, Map<String, Object> secondLog, String key) {
Long firstMetric = JsonParseUtil.getLong(firstLog, key);
Long secondMetric = JsonParseUtil.getLong(secondLog, key);
Long sum = firstMetric + secondMetric;
JsonParseUtil.setValue(firstLog, key, sum);
}
/**
* 将A日志内的某指标数据与B日志进行累加计算,并写入C日志中
*
* @param firstLog A日志
* @param secondLog B日志
* @param otherLog C日志
* @param key 指标 json key
*/
static void metricSumSetOtherLog(Map<String, Object> firstLog, Map<String, Object> secondLog, Map<String, Object> otherLog, String key) {
Long firstMetric = JsonParseUtil.getLong(firstLog, key);
Long secondMetric = JsonParseUtil.getLong(secondLog, key);
Long sum = firstMetric + secondMetric;
JsonParseUtil.setValue(otherLog, key, sum);
}
/**
* 校验sip日志必须包含协商四元组否则将原样输出不处理
*
* @param sipLog SIP日志
* @return true or false
*/
static boolean checkSipCompleteness(Map<String, Object> sipLog) {
return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) &&
sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) &&
sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) &&
sipLog.containsKey(JsonProConfig.SIP_RESPONDER_PORT);
}
/**
* 是否为内网IP
*
* @param ip ip Address
* @return true or false
*/
static boolean isInnerIp(String ip) {
if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) {
//为空或者为特定IP时也算作内网IP
return StringUtil.isBlank(ip) || IPUtil.internalIp(ip);
}
return false;
}
}

View File

@@ -1,101 +0,0 @@
package com.zdjizhi.utils.ip;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.exception.VoipRelationException;
/**
* IP转换工具类
*
* @author Colbert
* @date 2021/03/16
*/
public class IPUtils {
private static final Log logger = LogFactory.get();
private static final long A_BEGIN = ipToLong("10.0.0.0");
private static final long A_END = ipToLong("10.255.255.255");
private static final long B_BEGIN = ipToLong("172.16.0.0");
private static final long B_END = ipToLong("172.31.255.255");
private static final long C_BEGIN = ipToLong("192.168.0.0");
private static final long C_END = ipToLong("192.168.255.255");
/**
* 将127.0.0.1形式的IP地址转换成十进制整数
*
* @param strIp
* @return
*/
public static long ipToLong(String strIp) {
try {
if (StringUtil.isBlank(strIp)) {
logger.error("IPUtils.ipToLong input IP is null!!!");
return 0L;
}
long[] ip = new long[4];
int position1 = strIp.indexOf(".");
int position2 = strIp.indexOf(".", position1 + 1);
int position3 = strIp.indexOf(".", position2 + 1);
ip[0] = Long.parseLong(strIp.substring(0, position1));
ip[1] = Long.parseLong(strIp.substring(position1 + 1, position2));
ip[2] = Long.parseLong(strIp.substring(position2 + 1, position3));
ip[3] = Long.parseLong(strIp.substring(position3 + 1));
return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3];
} catch (VoipRelationException e) {
logger.error("IPUtils.ipToLong input IP is:" + strIp + ",convert IP to Long is error:" + e.getMessage());
return 0L;
}
}
/**
* 将十进制整数形式转换成127.0.0.1形式的ip地址
*
* @param longIp
* @return
*/
public static String longToIp(long longIp) {
StringBuffer sb = new StringBuffer("");
sb.append(String.valueOf((longIp >>> 24)));
sb.append(".");
sb.append(String.valueOf((longIp & 0x00FFFFFF) >>> 16));
sb.append(".");
sb.append(String.valueOf((longIp & 0x0000FFFF) >>> 8));
sb.append(".");
sb.append(String.valueOf((longIp & 0x000000FF)));
return sb.toString();
}
/**
* 是否为内网IP
*
* @param ipAddress
* @return
*/
public static boolean isInnerIp(String ipAddress) {
if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) {
if (StringUtil.isBlank(ipAddress) || IPUtil.internalIp(ipAddress)) {
//为空或者为特定IP时也算作内网IP
return true;
}
long ipNum = ipToLong(ipAddress);
return isInner(ipNum, A_BEGIN, A_END) ||
isInner(ipNum, B_BEGIN, B_END) ||
isInner(ipNum, C_BEGIN, C_END);
} else {
return false;
}
}
private static boolean isInner(long userIp, long begin, long end) {
return (userIp >= begin) && (userIp <= end);
}
}

View File

@@ -6,15 +6,21 @@ import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.jayway.jsonpath.DocumentContext;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.http.HttpClientUtil;
import com.zdjizhi.utils.StringUtil;
import net.sf.cglib.beans.BeanGenerator;
import net.sf.cglib.beans.BeanMap;
import java.util.*;
import java.util.concurrent.Executor;
/**
* 使用FastJson解析json的工具类
@@ -24,6 +30,50 @@ import java.util.*;
public class JsonParseUtil {
private static final Log logger = LogFactory.get();
private static Properties propNacos = new Properties();
/**
* 获取需要删除字段的列表
*/
private static ArrayList<String> dropList = new ArrayList<>();
/**
* 在内存中加载反射类用的map
*/
private static HashMap<String, Class> jsonFieldsMap;
static {
propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, VoipRelationConfig.NACOS_SERVER);
propNacos.setProperty(PropertyKeyConst.NAMESPACE, VoipRelationConfig.NACOS_SCHEMA_NAMESPACE);
propNacos.setProperty(PropertyKeyConst.USERNAME, VoipRelationConfig.NACOS_USERNAME);
propNacos.setProperty(PropertyKeyConst.PASSWORD, VoipRelationConfig.NACOS_PIN);
try {
ConfigService configService = NacosFactory.createConfigService(propNacos);
String dataId = VoipRelationConfig.NACOS_DATA_ID;
String group = VoipRelationConfig.NACOS_GROUP;
String schema = configService.getConfig(dataId, group, 5000);
if (StringUtil.isNotBlank(schema)) {
jsonFieldsMap = getFieldsFromSchema(schema);
}
configService.addListener(dataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
if (StringUtil.isNotBlank(configMsg)) {
jsonFieldsMap = getFieldsFromSchema(configMsg);
}
}
});
} catch (NacosException e) {
logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
}
}
/**
* 模式匹配,给定一个类型字符串返回一个类类型
@@ -72,6 +122,44 @@ public class JsonParseUtil {
return clazz;
}
/**
* 类型转换
*
* @param jsonMap 原始日志map
*/
public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException {
JsonParseUtil.dropJsonField(jsonMap);
HashMap<String, Object> tmpMap = new HashMap<>(192);
for (String key : jsonMap.keySet()) {
if (jsonFieldsMap.containsKey(key)) {
String simpleName = jsonFieldsMap.get(key).getSimpleName();
switch (simpleName) {
case "String":
tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
break;
case "Integer":
tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key)));
break;
case "long":
tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key)));
break;
case "List":
tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key)));
break;
case "Map":
tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key)));
break;
case "double":
tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key)));
break;
default:
tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
}
}
}
return tmpMap;
}
/**
* 获取属性值的方法
@@ -155,7 +243,7 @@ public class JsonParseUtil {
if (intVal == null) {
return 0;
}
return intVal.intValue();
return intVal;
}
/**
@@ -170,7 +258,7 @@ public class JsonParseUtil {
if (intVal == null) {
return 0;
}
return intVal.intValue();
return intVal;
}
public static String getString(Map<String, Object> jsonMap, String property) {
@@ -263,18 +351,16 @@ public class JsonParseUtil {
/**
* 通过获取String类型的网关schema链接来获取map用于生成一个Object类型的对象
* <p>
* // * @param http 网关schema地址
*
* @param http 网关schema地址
* @return 用于反射生成schema类型的对象的一个map集合
*/
public static HashMap<String, Class> getMapFromHttp(String http) {
HashMap<String, Class> map = new HashMap<>();
String schema = HttpClientUtil.requestByGetMethod(http);
Object data = JSON.parseObject(schema).get("data");
private static HashMap<String, Class> getFieldsFromSchema(String schema) {
HashMap<String, Class> map = new HashMap<>(16);
//获取fields并转化为数组数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(data.toString());
JSONObject schemaJson = JSON.parseObject(schema);
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
@@ -282,13 +368,19 @@ public class JsonParseUtil {
if (checkKeepField(filedStr)) {
String name = JsonPath.read(filedStr, "$.name").toString();
String type = JsonPath.read(filedStr, "$.type").toString();
if (type.contains("{")) {
type = JsonPath.read(filedStr, "$.type.type").toString();
}
//组合用来生成实体类的map
map.put(name, getClassName(type));
} else {
dropList.add(filedStr);
}
}
return map;
}
/**
* 判断字段是否需要保留
*
@@ -310,4 +402,14 @@ public class JsonParseUtil {
return isKeepField;
}
/**
* 删除schema内指定的无效字段jackson
*
* @param jsonMap
*/
private static void dropJsonField(Map<String, Object> jsonMap) {
for (String field : dropList) {
jsonMap.remove(field);
}
}
}

View File

@@ -14,7 +14,7 @@ import java.util.Map;
* @Description:
* @date 2021/7/1217:34
*/
public class JsonTypeUtils {
class JsonTypeUtil {
private static final Log logger = LogFactory.get();
/**
* String 类型检验转换方法
@@ -22,7 +22,7 @@ public class JsonTypeUtils {
* @param value json value
* @return String value
*/
public static String checkString(Object value) {
static String checkString(Object value) {
if (value == null) {
return null;
}
@@ -44,7 +44,7 @@ public class JsonTypeUtils {
* @param value json value
* @return List value
*/
private static Map checkObject(Object value) {
static Map checkObject(Object value) {
if (value == null) {
return null;
}
@@ -62,7 +62,7 @@ public class JsonTypeUtils {
* @param value json value
* @return List value
*/
private static List checkArray(Object value) {
static List checkArray(Object value) {
if (value == null) {
return null;
}
@@ -88,7 +88,7 @@ public class JsonTypeUtils {
* @param value json value
* @return Long value
*/
public static long checkLongValue(Object value) {
static long checkLongValue(Object value) {
Long longVal = TypeUtils.castToLong(value);
@@ -105,7 +105,7 @@ public class JsonTypeUtils {
* @param value json value
* @return Double value
*/
private static Double checkDouble(Object value) {
static Double checkDouble(Object value) {
if (value == null) {
return null;
}
@@ -129,7 +129,7 @@ public class JsonTypeUtils {
* @param value json value
* @return int value
*/
private static int getIntValue(Object value) {
static int getIntValue(Object value) {
Integer intVal = TypeUtils.castToInt(value);

View File

@@ -40,7 +40,7 @@ public class KafkaProducer {
createProducerConfig(), Optional.empty());
//启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们
kafkaProducer.setLogFailuresOnly(false);
kafkaProducer.setLogFailuresOnly(true);
//写入kafka的消息携带时间戳
// kafkaProducer.setWriteTimestampToKafka(true);

View File

@@ -0,0 +1,35 @@
package com.zdjizhi;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
import org.junit.Test;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2022/3/1610:55
*/
public class EncryptorTest {
@Test
public void passwordTest(){
StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
// 配置加密解密的密码/salt值
encryptor.setPassword("galaxy");
// 对"raw_password"进行加密S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p
String pin = "galaxy2019";
String encPin = encryptor.encrypt(pin);
String user = "admin";
String encUser = encryptor.encrypt(user);
System.out.println(encPin);
System.out.println(encUser);
// 再进行解密raw_password
String rawPwd = encryptor.decrypt("6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ");
String rawUser = encryptor.decrypt("nsyGpHKGFA4KW0zro9MDdw==");
System.out.println("The username is: "+rawPwd);
System.out.println("The pin is: "+rawUser);
}
}

View File

@@ -0,0 +1,40 @@
package com.zdjizhi;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IpLookupV2;
import org.junit.Test;
import java.util.Calendar;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2021/11/611:38
*/
public class FunctionTest {
private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
.loadDataFileV4(VoipRelationConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb")
.loadDataFileV6(VoipRelationConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
.loadDataFilePrivateV4(VoipRelationConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
.loadDataFilePrivateV6(VoipRelationConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
.loadAsnDataFile(VoipRelationConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(VoipRelationConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
.build();
@Test
public void ipLookupTest() {
String ip = "61.144.36.144";
System.out.println(ipLookup.cityLookupDetail(ip));
System.out.println(ipLookup.countryLookup(ip));
}
@Test
public void timestampTest(){
Calendar cal = Calendar.getInstance();
Long utcTime=cal.getTimeInMillis();
System.out.println(utcTime);
System.out.println(System.currentTimeMillis());
}
}

View File

@@ -0,0 +1,79 @@
package com.zdjizhi.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.StringUtil;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
* @author qidaijie
* @Package com.zdjizhi.json
* @Description:
* @date 2022/3/2410:22
*/
public class JsonPathTest {
private static final Log logger = LogFactory.get();
private static Properties propNacos = new Properties();
/**
* 获取需要删除字段的列表
*/
private static ArrayList<String> dropList = new ArrayList<>();
/**
* 在内存中加载反射类用的map
*/
private static HashMap<String, Class> map;
/**
* 获取任务列表
* list的每个元素是一个四元字符串数组 (有format标识的字段补全的字段用到的功能函数用到的参数),例如:
* (mail_subject mail_subject decode_of_base64 mail_subject_charset)
*/
private static ArrayList<String[]> jobList;
private static String schema;
static {
propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, VoipRelationConfig.NACOS_SERVER);
propNacos.setProperty(PropertyKeyConst.NAMESPACE, VoipRelationConfig.NACOS_SCHEMA_NAMESPACE);
propNacos.setProperty(PropertyKeyConst.USERNAME, VoipRelationConfig.NACOS_USERNAME);
propNacos.setProperty(PropertyKeyConst.PASSWORD, VoipRelationConfig.NACOS_PIN);
try {
ConfigService configService = NacosFactory.createConfigService(propNacos);
String dataId = VoipRelationConfig.NACOS_DATA_ID;
String group = VoipRelationConfig.NACOS_GROUP;
String config = configService.getConfig(dataId, group, 5000);
if (StringUtil.isNotBlank(config)) {
schema = config;
}
} catch (NacosException e) {
logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
}
}
@Test
public void parseSchemaGetFields() {
DocumentContext parse = JsonPath.parse(schema);
List<Object> fields = parse.read("$.fields[*]");
for (Object field : fields) {
String name = JsonPath.read(field, "$.name").toString();
String type = JsonPath.read(field, "$.type").toString();
}
}
}

View File

@@ -0,0 +1,100 @@
package com.zdjizhi.nacos;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import org.junit.Test;
import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2022/3/1016:58
*/
public class NacosTest {
/**
* <dependency>
* <groupId>com.alibaba.nacos</groupId>
* <artifactId>nacos-client</artifactId>
* <version>1.2.0</version>
* </dependency>
*/
private static Properties properties = new Properties();
/**
* config data id = config name
*/
private static final String DATA_ID = "test";
/**
* config group
*/
private static final String GROUP = "Galaxy";
private void getProperties() {
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
}
@Test
public void GetConfigurationTest() {
try {
getProperties();
ConfigService configService = NacosFactory.createConfigService(properties);
String content = configService.getConfig(DATA_ID, GROUP, 5000);
Properties nacosConfigMap = new Properties();
nacosConfigMap.load(new StringReader(content));
System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
} catch (NacosException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void ListenerConfigurationTest() {
getProperties();
try {
//first get config
ConfigService configService = NacosFactory.createConfigService(properties);
String config = configService.getConfig(DATA_ID, GROUP, 5000);
System.out.println(config);
//start listenner
configService.addListener(DATA_ID, GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
System.out.println(configMsg);
}
});
} catch (NacosException e) {
e.printStackTrace();
}
//keep running,change nacos config,print new config
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}