Compare commits
1 Commits
FastJson2-
...
SketchHLL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b7a9229aec |
9
pom.xml
9
pom.xml
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>log-olap-analysis-schema</artifactId>
|
<artifactId>log-olap-analysis-schema</artifactId>
|
||||||
<version>230414-FastJson2</version>
|
<version>230317-DataSketches</version>
|
||||||
|
|
||||||
<name>log-olap-analysis-schema</name>
|
<name>log-olap-analysis-schema</name>
|
||||||
<url>http://www.example.com</url>
|
<url>http://www.example.com</url>
|
||||||
@@ -40,7 +40,6 @@
|
|||||||
<hbase.version>2.2.3</hbase.version>
|
<hbase.version>2.2.3</hbase.version>
|
||||||
<nacos.version>1.2.0</nacos.version>
|
<nacos.version>1.2.0</nacos.version>
|
||||||
<zdjz.tools.version>1.0.8</zdjz.tools.version>
|
<zdjz.tools.version>1.0.8</zdjz.tools.version>
|
||||||
<fastjson.version>2.0.26</fastjson.version>
|
|
||||||
<scope.type>provided</scope.type>
|
<scope.type>provided</scope.type>
|
||||||
<!--<scope.type>compile</scope.type>-->
|
<!--<scope.type>compile</scope.type>-->
|
||||||
</properties>
|
</properties>
|
||||||
@@ -239,12 +238,6 @@
|
|||||||
<version>3.2.0</version>
|
<version>3.2.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.alibaba</groupId>
|
|
||||||
<artifactId>fastjson</artifactId>
|
|
||||||
<version>${fastjson.version}</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
session.timeout.ms=60000
|
session.timeout.ms=60000
|
||||||
|
|
||||||
#kafka source poll
|
#kafka source poll
|
||||||
max.poll.records=5000
|
max.poll.records=3000
|
||||||
|
|
||||||
#kafka source poll bytes
|
#kafka source poll bytes
|
||||||
max.partition.fetch.bytes=31457280
|
max.partition.fetch.bytes=31457280
|
||||||
|
|||||||
@@ -15,42 +15,41 @@ tools.library=D:\\workerspace\\dat
|
|||||||
nacos.server=192.168.44.12:8848
|
nacos.server=192.168.44.12:8848
|
||||||
|
|
||||||
#nacos namespace
|
#nacos namespace
|
||||||
nacos.schema.namespace=livecharts
|
nacos.schema.namespace=prod
|
||||||
|
|
||||||
#nacos data id
|
#nacos data id
|
||||||
nacos.data.id=liveChart_session_test.json
|
nacos.data.id=liveChart_session.json
|
||||||
|
|
||||||
#--------------------------------Kafka消费组信息------------------------------#
|
#--------------------------------Kafka消费组信息------------------------------#
|
||||||
|
|
||||||
#kafka 接收数据topic
|
#kafka 接收数据topic
|
||||||
source.kafka.topic=test
|
source.kafka.topic=SESSION-RECORD
|
||||||
|
|
||||||
#补全数据 输出 topic
|
#补全数据 输出 topic
|
||||||
sink.kafka.topic=TRAFFIC-PROTOCOL-TEST
|
sink.kafka.topic=test-result
|
||||||
|
|
||||||
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
||||||
group.id=livecharts-test-20230327-3
|
group.id=livecharts-test-20220816-1
|
||||||
|
|
||||||
#--------------------------------topology配置------------------------------#
|
#--------------------------------topology配置------------------------------#
|
||||||
|
|
||||||
#consumer 并行度
|
#consumer 并行度
|
||||||
source.parallelism=3
|
source.parallelism=1
|
||||||
|
|
||||||
#map函数并行度
|
#map函数并行度
|
||||||
parse.parallelism=3
|
parse.parallelism=1
|
||||||
|
|
||||||
#第一次窗口计算并行度
|
#第一次窗口计算并行度
|
||||||
first.window.parallelism=3
|
first.window.parallelism=1
|
||||||
|
|
||||||
#第二次窗口计算并行度
|
#第二次窗口计算并行度
|
||||||
second.window.parallelism=3
|
second.window.parallelism=1
|
||||||
|
|
||||||
#producer 并行度
|
#producer 并行度
|
||||||
sink.parallelism=3
|
sink.parallelism=1
|
||||||
|
|
||||||
#初次随机预聚合窗口时间
|
#初次随机预聚合窗口时间
|
||||||
first.count.window.time=5
|
first.count.window.time=5
|
||||||
|
|
||||||
#二次聚合窗口时间
|
#二次聚合窗口时间
|
||||||
second.count.window.time=15
|
second.count.window.time=15
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package com.zdjizhi.topology;
|
|||||||
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.zdjizhi.common.StreamAggregateConfig;
|
import com.zdjizhi.common.StreamAggregateConfig;
|
||||||
import com.zdjizhi.utils.functions.keyby.FirstKeyByFunction;
|
import com.zdjizhi.utils.functions.keyby.FirstKeyByFunction;
|
||||||
import com.zdjizhi.utils.functions.keyby.SecondKeyByFunction;
|
import com.zdjizhi.utils.functions.keyby.SecondKeyByFunction;
|
||||||
@@ -46,25 +45,25 @@ public class StreamAggregateTopology {
|
|||||||
.setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM).name(StreamAggregateConfig.SOURCE_KAFKA_TOPIC);
|
.setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM).name(StreamAggregateConfig.SOURCE_KAFKA_TOPIC);
|
||||||
|
|
||||||
//解析原始日志初步聚合计算,增加自定义key 缓解数据倾斜
|
//解析原始日志初步聚合计算,增加自定义key 缓解数据倾斜
|
||||||
SingleOutputStreamOperator<Tuple3<String, String, JSONObject>> parseDataMap = streamSource.map(new ParseMapFunction())
|
SingleOutputStreamOperator<Tuple3<String, String, Map<String, Object>>> parseDataMap = streamSource.map(new ParseMapFunction())
|
||||||
.name("ParseDataMap")
|
.name("ParseDataMap")
|
||||||
.setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
|
.setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
|
||||||
|
|
||||||
//初步聚合计算,增加自定义key 缓解数据倾斜
|
//初步聚合计算,增加自定义key 缓解数据倾斜
|
||||||
WindowedStream<Tuple3<String, String, JSONObject>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction())
|
WindowedStream<Tuple3<String, String, Map<String, Object>>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction())
|
||||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.FIRST_COUNT_WINDOW_TIME)));
|
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.FIRST_COUNT_WINDOW_TIME)));
|
||||||
|
|
||||||
//初次聚合计算窗口
|
//初次聚合计算窗口
|
||||||
SingleOutputStreamOperator<Tuple2<String, JSONObject>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction())
|
SingleOutputStreamOperator<Tuple2<String, Map<String, Object>>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction())
|
||||||
.name("FirstCountWindow")
|
.name("FirstCountWindow")
|
||||||
.setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM);
|
.setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM);
|
||||||
|
|
||||||
//二次聚合计算,使用业务的key 进行数据汇总
|
//二次聚合计算,使用业务的key 进行数据汇总
|
||||||
WindowedStream<Tuple2<String, JSONObject>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction())
|
WindowedStream<Tuple2<String, Map<String, Object>>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction())
|
||||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.SECOND_COUNT_WINDOW_TIME)));
|
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.SECOND_COUNT_WINDOW_TIME)));
|
||||||
|
|
||||||
//二次聚合计算窗口
|
//二次聚合计算窗口
|
||||||
SingleOutputStreamOperator<JSONObject> secondCountWindow = secondWindow.process(new SecondCountWindowFunction())
|
SingleOutputStreamOperator<Map<String, Object>> secondCountWindow = secondWindow.process(new SecondCountWindowFunction())
|
||||||
.name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM);
|
.name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM);
|
||||||
|
|
||||||
//拆解结果数据按protocol id循环输出
|
//拆解结果数据按protocol id循环输出
|
||||||
|
|||||||
@@ -0,0 +1,17 @@
|
|||||||
|
package com.zdjizhi.utils.functions.filter;
|
||||||
|
|
||||||
|
import com.zdjizhi.utils.StringUtil;
|
||||||
|
import org.apache.flink.api.common.functions.FilterFunction;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author qidaijie
|
||||||
|
* @Package com.zdjizhi.utils.functions
|
||||||
|
* @Description:
|
||||||
|
* @date 2021/5/2715:01
|
||||||
|
*/
|
||||||
|
public class FilterNullFunction implements FilterFunction<String> {
|
||||||
|
@Override
|
||||||
|
public boolean filter(String message) {
|
||||||
|
return StringUtil.isNotBlank(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
package com.zdjizhi.utils.functions.keyby;
|
package com.zdjizhi.utils.functions.keyby;
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import cn.hutool.core.util.RandomUtil;
|
||||||
|
import com.zdjizhi.common.StreamAggregateConfig;
|
||||||
import org.apache.flink.api.java.functions.KeySelector;
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
import org.apache.flink.api.java.tuple.Tuple3;
|
import org.apache.flink.api.java.tuple.Tuple3;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple4;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@@ -12,10 +14,10 @@ import java.util.Map;
|
|||||||
* @Description:
|
* @Description:
|
||||||
* @date 2021/7/2112:13
|
* @date 2021/7/2112:13
|
||||||
*/
|
*/
|
||||||
public class FirstKeyByFunction implements KeySelector<Tuple3< String, String, JSONObject>, String> {
|
public class FirstKeyByFunction implements KeySelector<Tuple3< String, String, Map<String, Object>>, String> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getKey(Tuple3<String, String, JSONObject> value) throws Exception {
|
public String getKey(Tuple3<String, String, Map<String, Object>> value) throws Exception {
|
||||||
//以map拼接的key分组
|
//以map拼接的key分组
|
||||||
return value.f0;
|
return value.f0;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
package com.zdjizhi.utils.functions.keyby;
|
package com.zdjizhi.utils.functions.keyby;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.RandomUtil;
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.zdjizhi.common.StreamAggregateConfig;
|
||||||
import org.apache.flink.api.java.functions.KeySelector;
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
import org.apache.flink.api.java.tuple.Tuple2;
|
import org.apache.flink.api.java.tuple.Tuple2;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple4;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author qidaijie
|
* @author qidaijie
|
||||||
@@ -12,10 +14,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
|
|||||||
* @Description:
|
* @Description:
|
||||||
* @date 2021/7/2112:13
|
* @date 2021/7/2112:13
|
||||||
*/
|
*/
|
||||||
public class SecondKeyByFunction implements KeySelector<Tuple2<String, JSONObject>, String> {
|
public class SecondKeyByFunction implements KeySelector<Tuple2<String,Map<String, Object>>, String> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getKey(Tuple2<String, JSONObject> value) throws Exception {
|
public String getKey(Tuple2<String, Map<String, Object>> value) throws Exception {
|
||||||
//以map拼接的key分组
|
//以map拼接的key分组
|
||||||
return value.f0;
|
return value.f0;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,15 +2,16 @@ package com.zdjizhi.utils.functions.parse;
|
|||||||
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.alibaba.fastjson2.JSON;
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.zdjizhi.common.StreamAggregateConfig;
|
import com.zdjizhi.common.StreamAggregateConfig;
|
||||||
|
import com.zdjizhi.utils.JsonMapper;
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
import com.zdjizhi.utils.general.ParseFunctions;
|
import com.zdjizhi.utils.general.ParseFunctions;
|
||||||
import com.zdjizhi.utils.meta.MetaDataParse;
|
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||||
import org.apache.flink.api.common.functions.MapFunction;
|
import org.apache.flink.api.common.functions.MapFunction;
|
||||||
import org.apache.flink.api.java.tuple.Tuple3;
|
import org.apache.flink.api.java.tuple.Tuple3;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
@@ -21,19 +22,21 @@ import java.util.concurrent.ThreadLocalRandom;
|
|||||||
* @Description:
|
* @Description:
|
||||||
* @date 2021/5/2715:01
|
* @date 2021/5/2715:01
|
||||||
*/
|
*/
|
||||||
public class ParseMapFunction implements MapFunction<String, Tuple3<String, String, JSONObject>> {
|
public class ParseMapFunction implements MapFunction<String, Tuple3<String, String, Map<String, Object>>> {
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public Tuple3<String, String, JSONObject> map(String message) {
|
public Tuple3<String, String, Map<String, Object>> map(String message) {
|
||||||
try {
|
try {
|
||||||
|
ArrayList<String[]> jobList = JsonParseUtil.getTransformsList();
|
||||||
|
HashMap<String, String> dimensionsMap = JsonParseUtil.getDimensionsMap();
|
||||||
if (StringUtil.isNotBlank(message)) {
|
if (StringUtil.isNotBlank(message)) {
|
||||||
JSONObject originalLog = JSON.parseObject(message);
|
Map<String, Object> originalLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
|
||||||
Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(MetaDataParse.getDimensionsMap(), originalLog);
|
Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, originalLog);
|
||||||
if (ParseFunctions.filterLogs(originalLog)) {
|
if (ParseFunctions.filterLogs(originalLog)) {
|
||||||
JSONObject metricsLog = ParseFunctions.getMetricsLog(originalLog);
|
Map<String, Object> metricsLog = ParseFunctions.getMetricsLog(originalLog);
|
||||||
for (String[] strings : MetaDataParse.getTransformsList()) {
|
for (String[] strings : jobList) {
|
||||||
//函数名称
|
//函数名称
|
||||||
String function = strings[0];
|
String function = strings[0];
|
||||||
//结果集字段key
|
//结果集字段key
|
||||||
@@ -43,7 +46,7 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Stri
|
|||||||
//额外的参数的值
|
//额外的参数的值
|
||||||
String parameters = strings[3];
|
String parameters = strings[3];
|
||||||
//原始日志字段对应的值
|
//原始日志字段对应的值
|
||||||
Object logsValue = originalLog.get(logsKeyName);
|
Object logsValue = JsonParseUtil.getValue(originalLog, logsKeyName);
|
||||||
|
|
||||||
switch (function) {
|
switch (function) {
|
||||||
case "combination":
|
case "combination":
|
||||||
@@ -61,8 +64,8 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Stri
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case "hierarchy":
|
case "hierarchy":
|
||||||
String key = dimensionsObj.get(resultKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM);
|
String key = JsonParseUtil.getString(dimensionsObj, resultKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM);
|
||||||
return new Tuple3<>(key, JSONObject.toJSONString(dimensionsObj), metricsLog);
|
return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), metricsLog);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,23 +1,21 @@
|
|||||||
package com.zdjizhi.utils.functions.result;
|
package com.zdjizhi.utils.functions.result;
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.zdjizhi.common.StreamAggregateConfig;
|
import com.zdjizhi.common.StreamAggregateConfig;
|
||||||
|
import com.zdjizhi.utils.JsonMapper;
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
import com.zdjizhi.utils.general.ParseFunctions;
|
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author qidaijie
|
* @author qidaijie
|
||||||
* @Package com.zdjizhi.utils.functions
|
* @Package com.zdjizhi.utils.functions
|
||||||
* @Description:
|
* @Description:
|
||||||
* @date 2021/7/2114:52
|
* @date 2021/7/2114:52
|
||||||
*/
|
*/
|
||||||
public class ResultFlatMapFunction implements FlatMapFunction<JSONObject, String> {
|
public class ResultFlatMapFunction implements FlatMapFunction<Map<String, Object>, String> {
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
|
|
||||||
private static final String PROTOCOL_ID_KEY = "protocol_stack_id";
|
private static final String PROTOCOL_ID_KEY = "protocol_stack_id";
|
||||||
private static final String APP_NAME_KEY = "app_name";
|
private static final String APP_NAME_KEY = "app_name";
|
||||||
private static final String HLL_SKETCH_KEY = "client_ip_sketch";
|
private static final String HLL_SKETCH_KEY = "client_ip_sketch";
|
||||||
@@ -25,26 +23,28 @@ public class ResultFlatMapFunction implements FlatMapFunction<JSONObject, String
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void flatMap(JSONObject jsonObject, Collector<String> out) throws Exception {
|
public void flatMap(Map<String, Object> jsonObject, Collector<String> out) throws Exception {
|
||||||
String protocol = jsonObject.getString(PROTOCOL_ID_KEY);
|
String protocol = JsonParseUtil.getString(jsonObject, PROTOCOL_ID_KEY);
|
||||||
if (jsonObject.containsKey(HLL_SKETCH_KEY)) {
|
if (jsonObject.containsKey(HLL_SKETCH_KEY)){
|
||||||
jsonObject.put(HLL_SKETCH_KEY, ParseFunctions.getHllSketch(jsonObject, HLL_SKETCH_KEY));
|
JsonParseUtil.setValue(jsonObject, HLL_SKETCH_KEY, JsonParseUtil.getHllSketch(jsonObject, HLL_SKETCH_KEY));
|
||||||
}
|
}
|
||||||
out.collect(jsonObject.toString());
|
out.collect(JsonMapper.toJsonString(jsonObject));
|
||||||
jsonObject.remove(APP_NAME_KEY);
|
jsonObject.remove(APP_NAME_KEY);
|
||||||
|
|
||||||
StringBuilder stringBuilder = new StringBuilder();
|
StringBuilder stringBuilder = new StringBuilder();
|
||||||
String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER);
|
if (StringUtil.isNotBlank(protocol)) {
|
||||||
int protocolIdsNum = protocolIds.length;
|
String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER);
|
||||||
for (int i = 0; i < protocolIdsNum - 1; i++) {
|
int protocolIdsNum = protocolIds.length;
|
||||||
if (StringUtil.isBlank(stringBuilder.toString())) {
|
for (int i = 0; i < protocolIdsNum - 1; i++) {
|
||||||
stringBuilder.append(protocolIds[i]);
|
if (StringUtil.isBlank(stringBuilder.toString())) {
|
||||||
jsonObject.put(PROTOCOL_ID_KEY, stringBuilder.toString());
|
stringBuilder.append(protocolIds[i]);
|
||||||
out.collect(jsonObject.toString());
|
jsonObject.put(PROTOCOL_ID_KEY, stringBuilder.toString());
|
||||||
} else {
|
out.collect(JsonMapper.toJsonString(jsonObject));
|
||||||
stringBuilder.append(".").append(protocolIds[i]);
|
} else {
|
||||||
jsonObject.put(PROTOCOL_ID_KEY, stringBuilder.toString());
|
stringBuilder.append(".").append(protocolIds[i]);
|
||||||
out.collect(jsonObject.toString());
|
jsonObject.put(PROTOCOL_ID_KEY, stringBuilder.toString());
|
||||||
|
out.collect(JsonMapper.toJsonString(jsonObject));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,19 +1,21 @@
|
|||||||
package com.zdjizhi.utils.functions.statistics;
|
package com.zdjizhi.utils.functions.statistics;
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
import com.zdjizhi.utils.JsonMapper;
|
||||||
import cn.hutool.log.LogFactory;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
import com.alibaba.fastjson2.JSON;
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.zdjizhi.utils.general.MetricFunctions;
|
import com.zdjizhi.utils.general.MetricFunctions;
|
||||||
import com.zdjizhi.utils.meta.MetaDataParse;
|
import com.zdjizhi.utils.general.ParseFunctions;
|
||||||
|
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||||
import org.apache.datasketches.hll.HllSketch;
|
import org.apache.datasketches.hll.HllSketch;
|
||||||
import org.apache.flink.api.java.tuple.Tuple2;
|
import org.apache.flink.api.java.tuple.Tuple2;
|
||||||
import org.apache.flink.api.java.tuple.Tuple3;
|
import org.apache.flink.api.java.tuple.Tuple3;
|
||||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author qidaijie
|
* @author qidaijie
|
||||||
@@ -21,31 +23,32 @@ import java.util.HashMap;
|
|||||||
* @Description:
|
* @Description:
|
||||||
* @date 2021/7/2113:55
|
* @date 2021/7/2113:55
|
||||||
*/
|
*/
|
||||||
public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
|
public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, Map<String, Object>>, Tuple2<String, Map<String, Object>>, String, TimeWindow> {
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
|
||||||
|
|
||||||
private HashMap<String, JSONObject> cacheMap = new HashMap<>(32);
|
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(32);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void process(String key, Context context, Iterable<Tuple3<String, String, JSONObject>> input, Collector<Tuple2<String, JSONObject>> output) {
|
public void process(String key, Context context, Iterable<Tuple3<String, String, Map<String, Object>>> input, Collector<Tuple2<String, Map<String, Object>>> output) {
|
||||||
try {
|
try {
|
||||||
HashMap<String, String[]> metricsMap = MetaDataParse.getMetricFunctionsMap();
|
HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricFunctionsMap();
|
||||||
for (Tuple3<String, String, JSONObject> tuple : input) {
|
for (Tuple3<String, String, Map<String, Object>> tuple : input) {
|
||||||
String dimensions = tuple.f1;
|
String dimensions = tuple.f1;
|
||||||
JSONObject metrics = tuple.f2;
|
Map<String, Object> metrics = tuple.f2;
|
||||||
JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, JSON.parseObject(dimensions));
|
if (metrics.size() != 0) {
|
||||||
|
Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class));
|
||||||
|
|
||||||
for (String resultKeyName : metricsMap.keySet()) {
|
for (String resultKeyName : metricsMap.keySet()) {
|
||||||
String[] functions = metricsMap.get(resultKeyName);
|
String[] functions = metricsMap.get(resultKeyName);
|
||||||
String function = functions[0];
|
String function = functions[0];
|
||||||
String fieldName = functions[1];
|
String fieldName = functions[1];
|
||||||
functionSet(function, cacheMessage, resultKeyName, metrics.get(fieldName));
|
functionSet(function, cacheMessage, resultKeyName, JsonParseUtil.getValue(metrics, fieldName));
|
||||||
|
}
|
||||||
|
cacheMap.put(dimensions, cacheMessage);
|
||||||
}
|
}
|
||||||
cacheMap.put(dimensions, cacheMessage);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (!cacheMap.isEmpty()) {
|
if (!cacheMap.isEmpty()) {
|
||||||
for (String dimensions : cacheMap.keySet()) {
|
for (String dimensions : cacheMap.keySet()) {
|
||||||
output.collect(new Tuple2<>(dimensions, cacheMap.get(dimensions)));
|
output.collect(new Tuple2<>(dimensions, cacheMap.get(dimensions)));
|
||||||
@@ -69,7 +72,7 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<Strin
|
|||||||
* @param resultKeyName 结果字段名称
|
* @param resultKeyName 结果字段名称
|
||||||
* @param fieldNameValue 新加值
|
* @param fieldNameValue 新加值
|
||||||
*/
|
*/
|
||||||
private static void functionSet(String function, JSONObject cacheMessage, String resultKeyName, Object fieldNameValue) {
|
private static void functionSet(String function, Map<String, Object> cacheMessage, String resultKeyName, Object fieldNameValue) {
|
||||||
switch (function) {
|
switch (function) {
|
||||||
case "sum":
|
case "sum":
|
||||||
cacheMessage.put(resultKeyName, MetricFunctions.longSum(cacheMessage.get(resultKeyName), fieldNameValue));
|
cacheMessage.put(resultKeyName, MetricFunctions.longSum(cacheMessage.get(resultKeyName), fieldNameValue));
|
||||||
|
|||||||
@@ -1,17 +1,16 @@
|
|||||||
package com.zdjizhi.utils.functions.statistics;
|
package com.zdjizhi.utils.functions.statistics;
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
import com.zdjizhi.utils.JsonMapper;
|
||||||
import cn.hutool.log.LogFactory;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
import com.alibaba.fastjson2.JSON;
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.zdjizhi.utils.general.MetricFunctions;
|
import com.zdjizhi.utils.general.MetricFunctions;
|
||||||
import com.zdjizhi.utils.general.ParseFunctions;
|
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||||
import com.zdjizhi.utils.meta.MetaDataParse;
|
|
||||||
import org.apache.datasketches.hll.HllSketch;
|
import org.apache.datasketches.hll.HllSketch;
|
||||||
import org.apache.flink.api.java.tuple.Tuple2;
|
import org.apache.flink.api.java.tuple.Tuple2;
|
||||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -22,26 +21,27 @@ import java.util.Map;
|
|||||||
* @Description:
|
* @Description:
|
||||||
* @date 2021/7/2113:55
|
* @date 2021/7/2113:55
|
||||||
*/
|
*/
|
||||||
public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<String, JSONObject>, JSONObject, String, TimeWindow> {
|
public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<String, Map<String, Object>>, Map<String, Object>, String, TimeWindow> {
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class);
|
||||||
|
|
||||||
private HashMap<String, JSONObject> cacheMap = new HashMap<>(32);
|
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(32);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void process(String key, Context context, Iterable<Tuple2<String, JSONObject>> input, Collector<JSONObject> output) {
|
public void process(String key, Context context, Iterable<Tuple2<String, Map<String, Object>>> input, Collector<Map<String, Object>> output) {
|
||||||
try {
|
try {
|
||||||
HashMap<String, String[]> metricsMap = MetaDataParse.getMetricFunctionsMap();
|
HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricFunctionsMap();
|
||||||
for (Tuple2<String, JSONObject> tuple : input) {
|
for (Tuple2<String, Map<String, Object>> tuple : input) {
|
||||||
String dimensions = tuple.f0;
|
String dimensions = tuple.f0;
|
||||||
Map<String, Object> message = tuple.f1;
|
Map<String, Object> message = tuple.f1;
|
||||||
if (message.size() != 0) {
|
if (message.size() != 0) {
|
||||||
JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, JSON.parseObject(dimensions));
|
Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class);
|
||||||
|
|
||||||
|
Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj);
|
||||||
for (String resultName : metricsMap.keySet()) {
|
for (String resultName : metricsMap.keySet()) {
|
||||||
String[] metrics = metricsMap.get(resultName);
|
String[] metrics = metricsMap.get(resultName);
|
||||||
String function = metrics[0];
|
String function = metrics[0];
|
||||||
functionSet(function, cacheMessage, resultName, message.get(resultName));
|
functionSet(function, cacheMessage, resultName, JsonParseUtil.getValue(message, resultName));
|
||||||
|
|
||||||
}
|
}
|
||||||
cacheMap.put(dimensions, cacheMessage);
|
cacheMap.put(dimensions, cacheMessage);
|
||||||
@@ -52,8 +52,8 @@ public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<Stri
|
|||||||
Long endTime = context.window().getEnd() / 1000;
|
Long endTime = context.window().getEnd() / 1000;
|
||||||
|
|
||||||
for (String countKey : cacheMap.keySet()) {
|
for (String countKey : cacheMap.keySet()) {
|
||||||
JSONObject resultMap = cacheMap.get(countKey);
|
Map<String, Object> resultMap = cacheMap.get(countKey);
|
||||||
resultMap.put(MetaDataParse.getResultTimeKey(), endTime);
|
JsonParseUtil.setValue(resultMap, JsonParseUtil.getResultTimeKey(), endTime);
|
||||||
output.collect(resultMap);
|
output.collect(resultMap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
package com.zdjizhi.utils.general;
|
package com.zdjizhi.utils.general;
|
||||||
|
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
|
import com.zdjizhi.utils.json.JsonTypeUtil;
|
||||||
import org.apache.datasketches.hll.HllSketch;
|
import org.apache.datasketches.hll.HllSketch;
|
||||||
import org.apache.datasketches.hll.Union;
|
import org.apache.datasketches.hll.Union;
|
||||||
|
|
||||||
@@ -15,8 +14,6 @@ import org.apache.datasketches.hll.Union;
|
|||||||
* @date 2021/7/2015:31
|
* @date 2021/7/2015:31
|
||||||
*/
|
*/
|
||||||
public class MetricFunctions {
|
public class MetricFunctions {
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Long类型的数据求和
|
* Long类型的数据求和
|
||||||
@@ -26,8 +23,8 @@ public class MetricFunctions {
|
|||||||
* @return value1 + value2
|
* @return value1 + value2
|
||||||
*/
|
*/
|
||||||
public static Long longSum(Object value1, Object value2) {
|
public static Long longSum(Object value1, Object value2) {
|
||||||
Long res1 = checkLongValue(value1);
|
Long res1 = JsonTypeUtil.checkLongValue(value1);
|
||||||
Long res2 = checkLongValue(value2);
|
Long res2 = JsonTypeUtil.checkLongValue(value2);
|
||||||
|
|
||||||
return res1 + res2;
|
return res1 + res2;
|
||||||
}
|
}
|
||||||
@@ -39,7 +36,7 @@ public class MetricFunctions {
|
|||||||
* @return count+1
|
* @return count+1
|
||||||
*/
|
*/
|
||||||
public static Long count(Object count) {
|
public static Long count(Object count) {
|
||||||
return checkLongValue(count) + 1L;
|
return JsonTypeUtil.checkLongValue(count) + 1L;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -77,30 +74,4 @@ public class MetricFunctions {
|
|||||||
}
|
}
|
||||||
return HllSketch.heapify(union.getResult().toUpdatableByteArray());
|
return HllSketch.heapify(union.getResult().toUpdatableByteArray());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static long checkLongValue(Object value) {
|
|
||||||
if (value == null) {
|
|
||||||
return 0L;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (value instanceof Long) {
|
|
||||||
return ((Long) value);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (value instanceof Number) {
|
|
||||||
return ((Number) value).longValue();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (value instanceof String) {
|
|
||||||
String str = (String) value;
|
|
||||||
try {
|
|
||||||
return Long.parseLong(str);
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
logger.error("Can not cast " + value.getClass() + "to Long,exception is:" + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0L;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,15 +3,13 @@ package com.zdjizhi.utils.general;
|
|||||||
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.jayway.jsonpath.JsonPath;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
import com.zdjizhi.common.StreamAggregateConfig;
|
import com.zdjizhi.common.StreamAggregateConfig;
|
||||||
|
import com.zdjizhi.utils.JsonMapper;
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
import com.zdjizhi.utils.meta.MetaDataParse;
|
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||||
import org.apache.datasketches.hll.HllSketch;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Base64;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@@ -31,15 +29,18 @@ public class ParseFunctions {
|
|||||||
* @param object 原始日志
|
* @param object 原始日志
|
||||||
* @return true or false
|
* @return true or false
|
||||||
*/
|
*/
|
||||||
public static boolean filterLogs(JSONObject object) {
|
public static boolean filterLogs(Map<String, Object> object) {
|
||||||
boolean available = false;
|
boolean available = false;
|
||||||
HashMap<String, String> filtersMap = MetaDataParse.getFiltersMap();
|
HashMap<String, String> filtersMap = JsonParseUtil.getFiltersMap();
|
||||||
for (String key : filtersMap.keySet()) {
|
for (String key : filtersMap.keySet()) {
|
||||||
if ("notempty".equals(key)) {
|
switch (key) {
|
||||||
String value = object.getString(filtersMap.get(key));
|
case "notempty":
|
||||||
if (StringUtil.isNotBlank(value)) {
|
String value = JsonParseUtil.getString(object, filtersMap.get(key));
|
||||||
available = true;
|
if (StringUtil.isNotBlank(value)) {
|
||||||
}
|
available = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return available;
|
return available;
|
||||||
@@ -52,12 +53,11 @@ public class ParseFunctions {
|
|||||||
* @param originalLog 原始日志
|
* @param originalLog 原始日志
|
||||||
* @return 结果维度集
|
* @return 结果维度集
|
||||||
*/
|
*/
|
||||||
public static Map<String, Object> transDimensions(Map<String, String> dimensions, JSONObject originalLog) {
|
public static Map<String, Object> transDimensions(Map<String, String> dimensions, Map<String, Object> originalLog) {
|
||||||
HashMap<String, Object> dimensionsObj = new HashMap<>(16);
|
HashMap<String, Object> dimensionsObj = new HashMap<>(16);
|
||||||
|
|
||||||
for (String key : dimensions.keySet()) {
|
for (String dimension : dimensions.keySet()) {
|
||||||
originalLog.get(dimensions.get(key));
|
dimensionsObj.put(dimension, JsonParseUtil.getValue(originalLog, dimensions.get(dimension)));
|
||||||
dimensionsObj.put(key, originalLog.get(dimensions.get(key)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return dimensionsObj;
|
return dimensionsObj;
|
||||||
@@ -67,17 +67,18 @@ public class ParseFunctions {
|
|||||||
* 根据原始日志字段,生成schema内指定的metrics指标json。
|
* 根据原始日志字段,生成schema内指定的metrics指标json。
|
||||||
*
|
*
|
||||||
* @param originalLog 原始日志json
|
* @param originalLog 原始日志json
|
||||||
* @return 统计metrics meta
|
* @return 统计metrics json
|
||||||
*/
|
*/
|
||||||
public static JSONObject getMetricsLog(JSONObject originalLog) {
|
public static Map<String, Object> getMetricsLog(Map<String, Object> originalLog) {
|
||||||
JSONObject metricsJson = new JSONObject();
|
Map<String, Object> metricsMap = new HashMap<>(16);
|
||||||
|
|
||||||
for (String logsKeyName : MetaDataParse.getMetricsFiledNameList()) {
|
for (String logsKeyName : JsonParseUtil.getMetricsFiledNameList()) {
|
||||||
if (originalLog.containsKey(logsKeyName)) {
|
if (originalLog.containsKey(logsKeyName)) {
|
||||||
metricsJson.put(logsKeyName, originalLog.get(logsKeyName));
|
metricsMap.put(logsKeyName, originalLog.get(logsKeyName));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return metricsJson;
|
|
||||||
|
return metricsMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -86,25 +87,26 @@ public class ParseFunctions {
|
|||||||
* 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符
|
* 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符
|
||||||
*
|
*
|
||||||
* @param parameters 参数集
|
* @param parameters 参数集
|
||||||
* @param originalLog 原始日志
|
* @param message 原始日志
|
||||||
* @param logsKeyName 原始日志列名
|
* @param logsKeyName 原始日志列名
|
||||||
*/
|
*/
|
||||||
public static void combinationUtils(Map<String, Object> dimensions, JSONObject originalLog, String parameters, String resultKeyName, String logsKeyName) {
|
public static void combinationUtils(Map<String, Object> dimensions, Map<String, Object> message, String parameters, String resultKeyName, String logsKeyName) {
|
||||||
String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
|
String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
|
||||||
String combinationFieldKey = combinationPars[0];
|
String combinationFieldKey = combinationPars[0];
|
||||||
String separator = combinationPars[1];
|
String separator = combinationPars[1];
|
||||||
Object combinationFieldValue = originalLog.get(combinationFieldKey);
|
Object combinationFieldValue = JsonParseUtil.getValue(message, combinationFieldKey);
|
||||||
if (combinationFieldValue != null) {
|
if (combinationFieldValue != null) {
|
||||||
Object logsFieldValue = originalLog.get(logsKeyName);
|
Object logsFieldValue = JsonParseUtil.getValue(message, logsKeyName);
|
||||||
String combinationResult = logsFieldValue + separator + combinationFieldValue;
|
String combinationResult = logsFieldValue + separator + combinationFieldValue;
|
||||||
dimensions.put(resultKeyName, combinationResult);
|
JsonParseUtil.setValue(dimensions, resultKeyName, combinationResult);
|
||||||
|
JsonParseUtil.setValue(message, logsKeyName, combinationResult);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 根据表达式解析json
|
* 根据表达式解析json
|
||||||
* <p>
|
* <p>
|
||||||
* //* @param message meta
|
* //* @param message json
|
||||||
*
|
*
|
||||||
* @param expr 解析表达式
|
* @param expr 解析表达式
|
||||||
* @return 解析结果
|
* @return 解析结果
|
||||||
@@ -133,7 +135,7 @@ public class ParseFunctions {
|
|||||||
@Deprecated
|
@Deprecated
|
||||||
private static Object isJsonValue(Map<String, Object> jsonMap, String param) {
|
private static Object isJsonValue(Map<String, Object> jsonMap, String param) {
|
||||||
if (param.contains(StreamAggregateConfig.IS_JSON_KEY_TAG)) {
|
if (param.contains(StreamAggregateConfig.IS_JSON_KEY_TAG)) {
|
||||||
return jsonMap.get(param.substring(2));
|
return JsonParseUtil.getValue(jsonMap, param.substring(2));
|
||||||
} else {
|
} else {
|
||||||
return param;
|
return param;
|
||||||
}
|
}
|
||||||
@@ -180,23 +182,4 @@ public class ParseFunctions {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取HLLSketch内容
|
|
||||||
*
|
|
||||||
* @param jsonMap 原始日志
|
|
||||||
* @param key meta key名称
|
|
||||||
* @return HLLSketch数据数组
|
|
||||||
*/
|
|
||||||
public static String getHllSketch(JSONObject jsonMap, String key) {
|
|
||||||
try {
|
|
||||||
HllSketch hllSketchResult = (HllSketch) jsonMap.get(key);
|
|
||||||
if (hllSketchResult != null) {
|
|
||||||
return Base64.getEncoder().encodeToString(hllSketchResult.toUpdatableByteArray());
|
|
||||||
}
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
logger.error("HllSketch data conversion exception,data may be empty! exception:{}", e);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.utils.meta;
|
package com.zdjizhi.utils.json;
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
@@ -10,18 +10,21 @@ import com.alibaba.nacos.api.exception.NacosException;
|
|||||||
import com.jayway.jsonpath.DocumentContext;
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
import com.jayway.jsonpath.JsonPath;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
import com.zdjizhi.common.StreamAggregateConfig;
|
import com.zdjizhi.common.StreamAggregateConfig;
|
||||||
|
import com.zdjizhi.utils.JsonMapper;
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
import org.apache.datasketches.hll.HllSketch;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 使用FastJson解析json的工具类
|
* 使用FastJson解析json的工具类
|
||||||
*
|
*
|
||||||
* @author qidaijie
|
* @author qidaijie
|
||||||
*/
|
*/
|
||||||
public class MetaDataParse {
|
public class JsonParseUtil {
|
||||||
|
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
private static Properties propNacos = new Properties();
|
private static Properties propNacos = new Properties();
|
||||||
@@ -87,6 +90,89 @@ public class MetaDataParse {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取属性值的方法
|
||||||
|
*
|
||||||
|
* @param jsonMap 原始日志
|
||||||
|
* @param key josn key名称
|
||||||
|
* @return 属性的值
|
||||||
|
*/
|
||||||
|
public static Object getValue(Map<String, Object> jsonMap, String key) {
|
||||||
|
try {
|
||||||
|
return jsonMap.getOrDefault(key, null);
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
logger.error("Get the JSON value is abnormal,The key is :" + key + "error message is :" + e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取HLLSketch内容
|
||||||
|
*
|
||||||
|
* @param jsonMap 原始日志
|
||||||
|
* @param key json key名称
|
||||||
|
* @return HLLSketch数据数组
|
||||||
|
*/
|
||||||
|
public static byte[] getHllSketch(Map<String, Object> jsonMap, String key) {
|
||||||
|
try {
|
||||||
|
HllSketch hllSketchResult = (HllSketch) jsonMap.getOrDefault(key, null);
|
||||||
|
if (hllSketchResult != null) {
|
||||||
|
return hllSketchResult.toUpdatableByteArray();
|
||||||
|
}
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
logger.error("HllSketch data conversion exception,data may be empty! exception:{}", e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* long 类型检验转换方法,若为空返回基础值
|
||||||
|
*
|
||||||
|
* @return Long value
|
||||||
|
*/
|
||||||
|
public static Long getLong(Map<String, Object> jsonMap, String property) {
|
||||||
|
Object value = jsonMap.getOrDefault(property, null);
|
||||||
|
Long longVal = TypeUtils.castToLong(value);
|
||||||
|
|
||||||
|
if (longVal == null) {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
return longVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getString(Map<String, Object> jsonMap, String property) {
|
||||||
|
Object value = jsonMap.getOrDefault(property, null);
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof Map) {
|
||||||
|
return JsonMapper.toJsonString(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof List) {
|
||||||
|
return JsonMapper.toJsonString(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
return value.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新属性值的方法
|
||||||
|
*
|
||||||
|
* @param jsonMap 原始日志json map
|
||||||
|
* @param property 更新的key
|
||||||
|
* @param value 更新的值
|
||||||
|
*/
|
||||||
|
public static void setValue(Map<String, Object> jsonMap, String property, Object value) {
|
||||||
|
try {
|
||||||
|
jsonMap.put(property, value);
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
logger.error("The JSON set value is abnormal,the error message is :", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
|
* 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
|
||||||
* 用于反射生成schema类型的对象的一个map集合
|
* 用于反射生成schema类型的对象的一个map集合
|
||||||
144
src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
Normal file
144
src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
package com.zdjizhi.utils.json;
|
||||||
|
|
||||||
|
|
||||||
|
import com.zdjizhi.utils.JsonMapper;
|
||||||
|
import com.zdjizhi.utils.exception.AnalysisException;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author qidaijie
|
||||||
|
* @Package PACKAGE_NAME
|
||||||
|
* @Description:
|
||||||
|
* @date 2021/7/1217:34
|
||||||
|
*/
|
||||||
|
public class JsonTypeUtil {
|
||||||
|
/**
|
||||||
|
* String 类型检验转换方法
|
||||||
|
*
|
||||||
|
* @param value json value
|
||||||
|
* @return String value
|
||||||
|
*/
|
||||||
|
public static String checkString(Object value) {
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof Map) {
|
||||||
|
return JsonMapper.toJsonString(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof List) {
|
||||||
|
return JsonMapper.toJsonString(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
return value.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* array 类型检验转换方法
|
||||||
|
*
|
||||||
|
* @param value json value
|
||||||
|
* @return List value
|
||||||
|
*/
|
||||||
|
private static Map checkObject(Object value) {
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof Map) {
|
||||||
|
return (Map) value;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new AnalysisException("can not cast to map, value : " + value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* array 类型检验转换方法
|
||||||
|
*
|
||||||
|
* @param value json value
|
||||||
|
* @return List value
|
||||||
|
*/
|
||||||
|
private static List checkArray(Object value) {
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof List) {
|
||||||
|
return (List) value;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new AnalysisException("can not cast to List, value : " + value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Long checkLong(Object value) {
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TypeUtils.castToLong(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* long 类型检验转换方法,若为空返回基础值
|
||||||
|
*
|
||||||
|
* @param value json value
|
||||||
|
* @return Long value
|
||||||
|
*/
|
||||||
|
public static long checkLongValue(Object value) {
|
||||||
|
|
||||||
|
Long longVal = TypeUtils.castToLong(value);
|
||||||
|
|
||||||
|
if (longVal == null) {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (longVal < 0L) {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
return longVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Double 类型校验转换方法
|
||||||
|
*
|
||||||
|
* @param value json value
|
||||||
|
* @return Double value
|
||||||
|
*/
|
||||||
|
private static Double checkDouble(Object value) {
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TypeUtils.castToDouble(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static Integer checkInt(Object value) {
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TypeUtils.castToInt(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* int 类型检验转换方法,若为空返回基础值
|
||||||
|
*
|
||||||
|
* @param value json value
|
||||||
|
* @return int value
|
||||||
|
*/
|
||||||
|
private static int getIntValue(Object value) {
|
||||||
|
|
||||||
|
Integer intVal = TypeUtils.castToInt(value);
|
||||||
|
|
||||||
|
if (intVal == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return intVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
172
src/main/java/com/zdjizhi/utils/json/TypeUtils.java
Normal file
172
src/main/java/com/zdjizhi/utils/json/TypeUtils.java
Normal file
@@ -0,0 +1,172 @@
|
|||||||
|
package com.zdjizhi.utils.json;
|
||||||
|
|
||||||
|
import cn.hutool.log.Log;
|
||||||
|
import cn.hutool.log.LogFactory;
|
||||||
|
import com.zdjizhi.common.StreamAggregateConfig;
|
||||||
|
import com.zdjizhi.utils.StringUtil;
|
||||||
|
import com.zdjizhi.utils.exception.AnalysisException;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author qidaijie
|
||||||
|
* @Package PACKAGE_NAME
|
||||||
|
* @Description:
|
||||||
|
* @date 2021/7/1218:20
|
||||||
|
*/
|
||||||
|
public class TypeUtils {
|
||||||
|
private static final Log logger = LogFactory.get();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integer 类型判断方法
|
||||||
|
*
|
||||||
|
* @param value json value
|
||||||
|
* @return Integer value or null
|
||||||
|
*/
|
||||||
|
public static Object castToIfFunction(Object value) {
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof String) {
|
||||||
|
return value.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof Integer) {
|
||||||
|
return ((Number) value).intValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof Long) {
|
||||||
|
return ((Number) value).longValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof Boolean) {
|
||||||
|
return (Boolean) value ? 1 : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new AnalysisException("can not cast to int, value : " + value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integer 类型判断方法
|
||||||
|
*
|
||||||
|
* @param value json value
|
||||||
|
* @return Integer value or null
|
||||||
|
*/
|
||||||
|
static Integer castToInt(Object value) {
|
||||||
|
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof Integer) {
|
||||||
|
return (Integer) value;
|
||||||
|
}
|
||||||
|
|
||||||
|
//此判断数值超范围不抛出异常,会截取成对应类型数值
|
||||||
|
// if (value instanceof Number) {
|
||||||
|
// return ((Number) value).intValue();
|
||||||
|
// }
|
||||||
|
|
||||||
|
if (value instanceof String) {
|
||||||
|
String strVal = (String) value;
|
||||||
|
if (StringUtil.isBlank(strVal)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
//将 10,20 类数据转换为10
|
||||||
|
if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
|
||||||
|
strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return Integer.parseInt(strVal);
|
||||||
|
} catch (NumberFormatException ex) {
|
||||||
|
logger.error("String change Integer Error,The error Str is:" + strVal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof Boolean) {
|
||||||
|
return (Boolean) value ? 1 : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new AnalysisException("can not cast to int, value : " + value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Double类型判断方法
|
||||||
|
*
|
||||||
|
* @param value json value
|
||||||
|
* @return double value or null
|
||||||
|
*/
|
||||||
|
static Double castToDouble(Object value) {
|
||||||
|
|
||||||
|
if (value instanceof Double) {
|
||||||
|
return (Double) value;
|
||||||
|
}
|
||||||
|
|
||||||
|
//此判断数值超范围不抛出异常,会截取成对应类型数值
|
||||||
|
// if (value instanceof Number) {
|
||||||
|
// return ((Number) value).doubleValue();
|
||||||
|
// }
|
||||||
|
|
||||||
|
if (value instanceof String) {
|
||||||
|
String strVal = (String) value;
|
||||||
|
|
||||||
|
if (StringUtil.isBlank(strVal)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
//将 10,20 类数据转换为10
|
||||||
|
if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
|
||||||
|
strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return Double.parseDouble(strVal);
|
||||||
|
} catch (NumberFormatException ex) {
|
||||||
|
logger.error("String change Double Error,The error Str is:" + strVal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new AnalysisException("can not cast to double, value : " + value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Long类型判断方法
|
||||||
|
*
|
||||||
|
* @param value json value
|
||||||
|
* @return (Long)value or null
|
||||||
|
*/
|
||||||
|
static Long castToLong(Object value) {
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 此判断数值超范围不抛出异常,会截取成对应类型数值
|
||||||
|
if (value instanceof Number) {
|
||||||
|
return ((Number) value).longValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof String) {
|
||||||
|
String strVal = (String) value;
|
||||||
|
|
||||||
|
if (StringUtil.isBlank(strVal)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
//将 10,20 类数据转换为10
|
||||||
|
if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
|
||||||
|
strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return Long.parseLong(strVal);
|
||||||
|
} catch (NumberFormatException ex) {
|
||||||
|
logger.error("String change Long Error,The error Str is:" + strVal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new AnalysisException("can not cast to long, value : " + value);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -22,7 +22,6 @@ public class KafkaConsumer {
|
|||||||
properties.put("session.timeout.ms", StreamAggregateConfig.SESSION_TIMEOUT_MS);
|
properties.put("session.timeout.ms", StreamAggregateConfig.SESSION_TIMEOUT_MS);
|
||||||
properties.put("max.poll.records", StreamAggregateConfig.MAX_POLL_RECORDS);
|
properties.put("max.poll.records", StreamAggregateConfig.MAX_POLL_RECORDS);
|
||||||
properties.put("max.partition.fetch.bytes", StreamAggregateConfig.MAX_PARTITION_FETCH_BYTES);
|
properties.put("max.partition.fetch.bytes", StreamAggregateConfig.MAX_PARTITION_FETCH_BYTES);
|
||||||
properties.put("partition.discovery.interval.ms", "10000");
|
|
||||||
|
|
||||||
CertUtils.chooseCert(StreamAggregateConfig.SOURCE_KAFKA_SERVERS, properties);
|
CertUtils.chooseCert(StreamAggregateConfig.SOURCE_KAFKA_SERVERS, properties);
|
||||||
|
|
||||||
|
|||||||
@@ -1,15 +1,16 @@
|
|||||||
package com.zdjizhi;
|
package com.zdjizhi;
|
||||||
|
|
||||||
import cn.hutool.json.JSONUtil;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.alibaba.fastjson2.*;
|
|
||||||
import com.zdjizhi.utils.JsonMapper;
|
|
||||||
import org.apache.datasketches.hll.HllSketch;
|
import org.apache.datasketches.hll.HllSketch;
|
||||||
import org.apache.datasketches.hll.Union;
|
import org.apache.datasketches.hll.Union;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author qidaijie
|
* @author qidaijie
|
||||||
@@ -79,7 +80,6 @@ public class DatasketchesTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void HllSketchDruidTest() {
|
public void HllSketchDruidTest() {
|
||||||
HashMap<String, Object> dataMap = new HashMap<>();
|
|
||||||
|
|
||||||
HashSet<String> strings = new HashSet<>();
|
HashSet<String> strings = new HashSet<>();
|
||||||
|
|
||||||
@@ -135,39 +135,15 @@ public class DatasketchesTest {
|
|||||||
|
|
||||||
//CompactByte
|
//CompactByte
|
||||||
result.setIp_object(sketch_result2.toCompactByteArray());
|
result.setIp_object(sketch_result2.toCompactByteArray());
|
||||||
// System.out.println(result.toString());
|
System.out.println(result.toString());
|
||||||
//sendMessage(JsonMapper.toJsonString(result);
|
sendMessage(result);
|
||||||
|
|
||||||
|
|
||||||
//UpdatableByte
|
//UpdatableByte
|
||||||
result.setIp_object(sketch_result2.toUpdatableByteArray());
|
result.setIp_object(sketch_result2.toUpdatableByteArray());
|
||||||
// System.out.println(result.toString());
|
System.out.println(result.toString());
|
||||||
//sendMessage(JsonMapper.toJsonString(result);
|
sendMessage(result);
|
||||||
|
|
||||||
//Hashmap
|
|
||||||
dataMap.put("app_name", "TEST");
|
|
||||||
dataMap.put("protocol_stack_id", "HTTP");
|
|
||||||
dataMap.put("vsys_id", 1);
|
|
||||||
dataMap.put("stat_time", 1681370100);
|
|
||||||
dataMap.put("client_ip_sketch", sketch_result2.toUpdatableByteArray());
|
|
||||||
|
|
||||||
System.out.println("Jackson:" + JsonMapper.toJsonString(dataMap));
|
|
||||||
System.out.println("FastJson2:" + JSONObject.toJSONString(dataMap));
|
|
||||||
System.out.println("Hutool:" + JSONUtil.toJsonStr(dataMap) + "\n\n");
|
|
||||||
|
|
||||||
byte[] toJSONB = JSONB.toBytes(dataMap);
|
|
||||||
// sendMessage(toJSONB);
|
|
||||||
JSONObject jsonObject = JSONB.parseObject(toJSONB);
|
|
||||||
System.out.println("FastJson2 Byte(JSONB):" + jsonObject.toJSONString() + "\n\n");
|
|
||||||
|
|
||||||
|
|
||||||
dataMap.put("client_ip_sketch", Base64.getEncoder().encodeToString(sketch_result2.toUpdatableByteArray()));
|
|
||||||
System.out.println("FastJson2 Byte(Base64):" + JSONObject.toJSONString(dataMap));
|
|
||||||
System.out.println("Hutool Byte(Base64):" + JSONObject.toJSONString(dataMap));
|
|
||||||
System.out.println(JSONUtil.toJsonStr(dataMap));
|
|
||||||
|
|
||||||
|
|
||||||
// sendMessage(JSONObject.toJSONString(dataMap));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -181,7 +157,7 @@ public class DatasketchesTest {
|
|||||||
return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4;
|
return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void sendMessage(Object message) {
|
private static void sendMessage(Result result) {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
//kafka地址
|
//kafka地址
|
||||||
props.put("bootstrap.servers", "192.168.44.12:9092");
|
props.put("bootstrap.servers", "192.168.44.12:9092");
|
||||||
@@ -189,13 +165,11 @@ public class DatasketchesTest {
|
|||||||
props.put("retries", 0);
|
props.put("retries", 0);
|
||||||
props.put("linger.ms", 1);
|
props.put("linger.ms", 1);
|
||||||
props.put("buffer.memory", 67108864);
|
props.put("buffer.memory", 67108864);
|
||||||
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
|
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
|
||||||
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
|
|
||||||
KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<String, Object>(props);
|
KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<String, Object>(props);
|
||||||
|
|
||||||
kafkaProducer.send(new ProducerRecord<String, Object>("TRAFFIC-PROTOCOL-TEST", message));
|
kafkaProducer.send(new ProducerRecord<String, Object>("TRAFFIC-PROTOCOL-TEST", JSONObject.toJSONString(result)));
|
||||||
|
|
||||||
kafkaProducer.close();
|
kafkaProducer.close();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package com.zdjizhi;
|
|||||||
import com.jayway.jsonpath.JsonPath;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
import com.zdjizhi.common.StreamAggregateConfig;
|
import com.zdjizhi.common.StreamAggregateConfig;
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
|
import com.zdjizhi.utils.json.JsonTypeUtil;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@@ -67,4 +68,12 @@ public class FunctionTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void longSumTest() {
|
||||||
|
Long res1 = JsonTypeUtil.checkLongValue(123);
|
||||||
|
Long res2 = JsonTypeUtil.checkLongValue("123");
|
||||||
|
|
||||||
|
System.out.println(res1 + res2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,145 +0,0 @@
|
|||||||
package com.zdjizhi;
|
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSONArray;
|
|
||||||
import com.alibaba.fastjson.serializer.SerializerFeature;
|
|
||||||
import com.alibaba.fastjson2.JSON;
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
|
||||||
import com.jayway.jsonpath.JsonPath;
|
|
||||||
import net.sf.cglib.beans.BeanGenerator;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author qidaijie
|
|
||||||
* @Package com.zdjizhi
|
|
||||||
* @Description:
|
|
||||||
* @date 2023/4/1218:08
|
|
||||||
*/
|
|
||||||
public class JsonTest {
|
|
||||||
|
|
||||||
// /**
|
|
||||||
// * 在内存中加载反射类用的map
|
|
||||||
// */
|
|
||||||
// private static HashMap<String, Class> map = MetaDataParse.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * 反射成一个类
|
|
||||||
// */
|
|
||||||
// private static Object mapObject = MetaDataParse.generateObject(map);
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void fastJson2Test() {
|
|
||||||
// SerializerFeature.DisableCircularReferenceDetect
|
|
||||||
// SerializerFeature.WriteNullStringAsEmpty
|
|
||||||
// SerializerFeature.WriteNullNumberAsZero
|
|
||||||
HashMap<String, Class> classHashMap = new HashMap<>();
|
|
||||||
|
|
||||||
String message = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"GET sampleFile.html HTTP/1.1\",\"http_host\":\"www.texaslotto.com\",\"http_url\":\"www.texaslotto.com/sampleFile.html\",\"http_user_agent\":\"xPTS/2.0\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_isn\":1953597368,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":1,\"http_session_duration_ms\":2,\"http_response_content_type\":\"text/html\",\"http_sequence\":80,\"common_protocol_label\":\"ETHERNET.IPv4.UDP.GTP.IPv4.TCP\",\"common_c2s_byte_diff\":17200,\"common_c2s_pkt_diff\":120,\"common_s2c_byte_diff\":16490,\"common_s2c_pkt_diff\":81,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_flags\":24720,\"common_flags_identify_info\":\"{\\\"Server is Local\\\":1,\\\"Inbound\\\":201,\\\"C2S\\\":1,\\\"S2C\\\":2}\",\"common_direction\":73,\"common_app_full_path\":\"http\",\"common_app_label\":\"http\",\"common_tcp_client_isn\":1953597368,\"common_tcp_server_isn\":1950649408,\"common_server_ip\":\"192.50.199.25\",\"common_client_ip\":\"192.50.146.197\",\"common_server_port\":80,\"common_client_port\":22533,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"IPv4_TCP<22533-80-192.50.146.197-192.50.199.25>|GTP<111001144-851056526>|IPv4_UDP<2152-2152-192.50.235.220-192.50.135.83>|MAC<000c299b2fa4-000c2915b4f4>\",\"common_start_time\":1680475247,\"common_end_time\":1680475247,\"common_con_duration_ms\":23,\"common_s2c_pkt_num\":81,\"common_s2c_byte_num\":16490,\"common_c2s_pkt_num\":120,\"common_c2s_byte_num\":17200,\"common_establish_latency_ms\":2,\"common_client_location\":\"日本.Unknown.Unknown\",\"common_server_location\":\"日本.Unknown.Unknown\",\"common_service_category\":[6223,6219,5093,5089],\"common_apn\":\"cmiott.owflr.mcto60g.com\",\"common_imsi\":\"460045157091460\",\"common_imei\":\"8626070583005833\",\"common_phone_number\":\"861440152028973\",\"common_tunnel_endpoint_a_desc\":\"test_50_gtp\",\"common_tunnel_endpoint_b_desc\":\"test_50_gtp\",\"common_tunnels\":[{\"tunnels_schema_type\":\"GTP\",\"gtp_a2b_teid\":111001144,\"gtp_b2a_teid\":851056526,\"gtp_endpoint_a_ip\":\"192.50.235.220\",\"gtp_endpoint_b_ip\":\"192.50.135.83\",\"gtp_endpoint_a_port\":2152,\"gtp_endpoint_b_port\":2152},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0c:29:9b:2f:a4\",\"c2s_destination_mac\":\"00:0c:29:15:b4:f4\",\"s2c_source_mac\":\"00:0c:29:15:b4:f4\",\"s2c_destination_mac\":\"00:0c:29:9b:2f:a4\"}],\"common_stream_trace_id\":\"578829229323951427\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.161\",\"common_device_id\":\"unknown\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-7400\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-7400\\\"}]}\",\"common_t_vsys_id\":1,\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_vsys_id\":1}";
|
|
||||||
|
|
||||||
JSONObject json = JSON.parseObject(message);
|
|
||||||
Object mapObject = generateObject(classHashMap);
|
|
||||||
Object object = JSON.parseObject(message, mapObject.getClass());
|
|
||||||
|
|
||||||
|
|
||||||
System.out.println(json.get("common_schema_type"));
|
|
||||||
json.put("common_schema_type", "SSH");
|
|
||||||
|
|
||||||
|
|
||||||
System.out.println(json.toJSONString());
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Class getClassName(String type) {
|
|
||||||
Class clazz;
|
|
||||||
|
|
||||||
switch (type) {
|
|
||||||
case "int":
|
|
||||||
clazz = Integer.class;
|
|
||||||
break;
|
|
||||||
case "string":
|
|
||||||
clazz = String.class;
|
|
||||||
break;
|
|
||||||
case "long":
|
|
||||||
clazz = long.class;
|
|
||||||
break;
|
|
||||||
case "array":
|
|
||||||
clazz = List.class;
|
|
||||||
break;
|
|
||||||
case "double":
|
|
||||||
clazz = double.class;
|
|
||||||
break;
|
|
||||||
case "float":
|
|
||||||
clazz = float.class;
|
|
||||||
break;
|
|
||||||
case "char":
|
|
||||||
clazz = char.class;
|
|
||||||
break;
|
|
||||||
case "byte":
|
|
||||||
clazz = byte.class;
|
|
||||||
break;
|
|
||||||
case "boolean":
|
|
||||||
clazz = boolean.class;
|
|
||||||
break;
|
|
||||||
case "short":
|
|
||||||
clazz = short.class;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
clazz = String.class;
|
|
||||||
}
|
|
||||||
return clazz;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据反射生成对象的方法
|
|
||||||
*
|
|
||||||
* @param properties 反射类用的map
|
|
||||||
* @return 生成的Object类型的对象
|
|
||||||
*/
|
|
||||||
private static Object generateObject(Map properties) {
|
|
||||||
BeanGenerator generator = new BeanGenerator();
|
|
||||||
Set keySet = properties.keySet();
|
|
||||||
for (Object aKeySet : keySet) {
|
|
||||||
String key = (String) aKeySet;
|
|
||||||
generator.addProperty(key, (Class) properties.get(key));
|
|
||||||
}
|
|
||||||
return generator.create();
|
|
||||||
}
|
|
||||||
|
|
||||||
// /**
|
|
||||||
// * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
|
|
||||||
// *
|
|
||||||
// * @param http 网关schema地址
|
|
||||||
// * @return 用于反射生成schema类型的对象的一个map集合
|
|
||||||
// */
|
|
||||||
// public static HashMap<String, Class> getMapFromHttp(String schema) {
|
|
||||||
// HashMap<String, Class> map = new HashMap<>(16);
|
|
||||||
//
|
|
||||||
// DocumentContext parse = JsonPath.parse(schema);
|
|
||||||
//
|
|
||||||
// //获取fields,并转化为数组,数组的每个元素都是一个name doc type
|
|
||||||
// com.alibaba.fastjson.JSONObject schemaJson = com.alibaba.fastjson.JSON.parseObject(data.toString());
|
|
||||||
// JSONArray fields = (JSONArray) schemaJson.get("fields");
|
|
||||||
//
|
|
||||||
// for (Object field : fields) {
|
|
||||||
// String filedStr = field.toString();
|
|
||||||
// if (checkKeepField(filedStr)) {
|
|
||||||
// String name = JsonPath.read(filedStr, "$.name").toString();
|
|
||||||
// String type = JsonPath.read(filedStr, "$.type").toString();
|
|
||||||
// if (type.contains("{")) {
|
|
||||||
// type = JsonPath.read(filedStr, "$.type.type").toString();
|
|
||||||
// }
|
|
||||||
// //组合用来生成实体类的map
|
|
||||||
// map.put(name, getClassName(type));
|
|
||||||
// } else {
|
|
||||||
// dropList.add(filedStr);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// return map;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user