代码优化,对不规范字节数处理,科学计数法处理

This commit is contained in:
zhanghongqing
2022-08-25 15:52:28 +08:00
parent 065db72593
commit 812fb82c95
7 changed files with 202 additions and 26 deletions

View File

@@ -18,7 +18,12 @@ public enum LogMetadata {
* 日志名称,表名,字段
* */
CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{"cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status", "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy", "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent", "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num", "imei", "imsi"}),
CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{
"cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status",
"dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy",
"user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent",
"http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num",
"imei", "imsi"}),
CONNECTION_RELATION_LOG("connection_relation_log", "connection_relation_log_local", new String[]{"start_time", "end_time", "src_ip", "dst_ip", "sessions", "packets", "bytes"}),
CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log_local", new String[]{"sled_ip", "sketch_start_time", "sketch_duration", "src_ip", "dst_ip", "sketch_sessions", "sketch_packets", "sketch_bytes"}),
DNS_RECORD_LOG("dns_record_log", "dns_record_log_local", new String[]{"capture_time", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "dns_flag", "ttl", "protocol", "fxo_id", "req_type", "qname", "response", "dns_a", "dns_a_num", "dns_cname", "dns_cname_num", "dns_aaaa", "dns_aaaa_num", "dns_mx", "dns_mx_num", "dns_ns", "dns_ns_num"}),

View File

@@ -1,6 +1,7 @@
package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import com.alibaba.fastjson.util.TypeUtils;
import com.zdjizhi.etl.LogService;
import com.zdjizhi.etl.dns.SketchTimeMapFunction;
import com.zdjizhi.utils.kafka.KafkaConsumer;
@@ -85,7 +86,7 @@ public class ConnLogService {
private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
.withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction());

View File

@@ -0,0 +1,162 @@
//package com.zdjizhi.etl.connection;
//
//import cn.hutool.core.convert.Convert;
//import cn.hutool.core.util.StrUtil;
//import com.zdjizhi.etl.LogService;
//import com.zdjizhi.etl.dns.SketchTimeMapFunction;
//import com.zdjizhi.utils.kafka.KafkaConsumer;
//import org.apache.flink.api.common.eventtime.WatermarkStrategy;
//import org.apache.flink.api.common.functions.MapFunction;
//import org.apache.flink.api.java.utils.ParameterTool;
//import org.apache.flink.streaming.api.datastream.DataStream;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
//import org.apache.flink.streaming.api.windowing.time.Time;
//import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
//
//import java.time.Duration;
//import java.util.*;
//
//import static com.zdjizhi.common.FlowWriteConfig.*;
//
//
//public class ConnLogService2 {
//
// public static String convertToCsv(Map<String, Object> map) {
// List<Object> list = new ArrayList<>();
// list.add("(");
// for (Map.Entry<String, Object> m : map.entrySet()) {
// if (m.getValue() instanceof String) {
// list.add("'" + m.getValue() + "'");
// } else {
// list.add(m.getValue());
// }
// }
// list.add(")");
// String join = StrUtil.join(",", list);
// return join;
//
// }
//
// public static void connLogStream(StreamExecutionEnvironment env) throws Exception {
// //connection
// DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
// //sketch
// DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
//
// //写入CKsink,批量处理
// LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
//
// Map<String, String> globalParameters = new HashMap<>();
// // ClickHouse cluster properties
// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://bigdata-85:8123/");
// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, CK_USERNAME);
// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, CK_PIN);
//
// // sink common
// globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1");
// globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");
// globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2");
// globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2");
// globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2");
// globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
//
// // set global paramaters
// ParameterTool parameters = ParameterTool.fromMap(globalParameters);
// env.getConfig().setGlobalJobParameters(parameters);
//
// // Transform 操作
// DataStream<String> dataStream = sketchSource.map(new MapFunction<Map<String, Object>, String>() {
// @Override
// public String map(Map<String, Object> data) throws Exception {
// String s = convertToCsv(data);
// System.err.println(s);
// return convertToCsv(data);
// }
// });
//
//
// // create props for sink
// Properties props = new Properties();
// props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, CK_DATABASE + "." + SINK_CK_TABLE_SKETCH);
// props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, CK_BATCH);
// ClickHouseSink sink = new ClickHouseSink(props);
// dataStream.addSink(sink);
// dataStream.print();
//
//// LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
//
// //transform
// DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource);
//
// //写入ck通联relation表
// LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
//
// DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
//
// //合并通联和通联sketch
// DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
//
// //写入arangodb
// LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
//
// }
//
// /**
// * 通联原始日志数据源消费kafka
// *
// * @param source
// * @return
// */
// private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
//
// String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
//
// DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
// .setParallelism(SOURCE_PARALLELISM)
// .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0)
// .setParallelism(SOURCE_PARALLELISM)
// .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction())
// .setParallelism(SOURCE_PARALLELISM)
// .name(source)
// .setParallelism(SOURCE_PARALLELISM);
// return sourceStream;
// }
//
// private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception {
// DataStream<Map<String, Object>> connTransformStream = connSource
// .assignTimestampsAndWatermarks(WatermarkStrategy
// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
// .withTimestampAssigner((event, timestamp) -> {
// return Convert.toLong(event.get("conn_start_time")) * 1000;
// }))
// .setParallelism(TRANSFORM_PARALLELISM)
// .keyBy(new IpKeysSelector())
// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
// .process(new ConnProcessFunction())
// .setParallelism(TRANSFORM_PARALLELISM);
// return connTransformStream;
// }
//
// private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {
// DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
// .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
// .keyBy(new IpKeysSelector())
// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
// .process(new SketchProcessFunction());
// return sketchTransformStream;
// }
//
// private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
// DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
// .keyBy(new IpKeysSelector())
// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
// .process(new Ip2IpGraphProcessFunction())
// .setParallelism(TRANSFORM_PARALLELISM);
// return ip2ipGraph;
// }
//
//}

View File

@@ -4,13 +4,14 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.util.TypeUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -28,7 +29,7 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
try {
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
if (values != null) {
Map<String, Object> result = new LinkedHashMap<>();
Map<String, Object> result = new HashMap<>();
result.put("start_time", values.f0);
result.put("end_time", values.f1);
result.put("src_ip", keys.f0);
@@ -45,18 +46,24 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
}
private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) {
long sessions = 0;
long packets = 0;
long bytes = 0;
long sessions = 0L;
long packets = 0L;
long bytes = 0L;
long startTime = DateUtil.currentSeconds();
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time"));
if(connStartTimetime>0){
if (connStartTimetime > 0) {
sessions++;
packets = packets + Convert.toLong(newSketchLog.get("total_cs_pkts")) + Convert.toLong(newSketchLog.get("total_sc_pkts"));
bytes = bytes + Convert.toLong(newSketchLog.get("total_cs_bytes")) + Convert.toLong(newSketchLog.get("total_sc_bytes"));
Long totalCsPkts = TypeUtils.castToLong(newSketchLog.get("total_cs_pkts"));
Long totalScPkts = TypeUtils.castToLong(newSketchLog.get("total_sc_pkts"));
packets = packets + totalCsPkts < Long.MAX_VALUE ? totalCsPkts : 0 + totalScPkts < Long.MAX_VALUE ? totalScPkts : 0;
Long totalCsBytes = TypeUtils.castToLong(newSketchLog.get("total_cs_bytes"));
Long totalScBytes = TypeUtils.castToLong(newSketchLog.get("total_sc_bytes"));
bytes = bytes + totalCsBytes< Long.MAX_VALUE ? totalCsBytes : 0 + totalScBytes< Long.MAX_VALUE ? totalScBytes : 0;
startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
}

View File

@@ -4,13 +4,14 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.util.TypeUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -41,7 +42,7 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
try {
if (values != null) {
Map<String, Object> result = new LinkedHashMap<>();
Map<String, Object> result = new HashMap<>();
result.put("start_time", values.f0);
result.put("end_time", values.f1);
result.put("src_ip", keys.f0);
@@ -59,20 +60,20 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
}
private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) {
long sessions = 0;
long packets = 0;
long bytes = 0;
long sessions = 0L;
long packets = 0L;
long bytes = 0L;
long startTime = DateUtil.currentSeconds();
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
long connStartTimetime = Convert.toLong(newSketchLog.get("sketch_start_time"));
if(connStartTimetime>0){
sessions += Convert.toLong(newSketchLog.get("sketch_sessions"));
packets += Convert.toLong(newSketchLog.get("sketch_packets"));
bytes += Convert.toLong(newSketchLog.get("sketch_bytes"));
startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
long connStartTime = Convert.toLong(newSketchLog.get("sketch_start_time"));
if (connStartTime > 0) {
sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_sessions")) : 0;
packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_packets")) : 0;
bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_bytes")) : 0;
startTime = connStartTime < startTime ? connStartTime : startTime;
endTime = connStartTime > endTime ? connStartTime : endTime;
}
}
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);

View File

@@ -9,7 +9,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -45,7 +45,7 @@ public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String
endTime = logStartTime > endTime ? logStartTime : endTime;
}
}
Map<String, Object> newDns = new LinkedHashMap<>();
Map<String, Object> newDns = new HashMap<>();
newDns.put("start_time", startTime);
newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION);
newDns.put("record_type", keys.f0);

View File

@@ -65,7 +65,7 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
try {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.info("开始写入ck数据 {}", data.size());
log.debug("开始写入ck数据 {}", data.size());
boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
@@ -86,7 +86,7 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
connection.commit();
connection.setAutoCommit(autoCommit);
stopWatch.stop();
log.info("总共花费时间 {}", stopWatch.getTime());
log.debug("总共花费时间 {}", stopWatch.getTime());
} catch (Exception ex) {
log.error("ClickhouseSink插入报错", ex);
}