diff --git a/pom.xml b/pom.xml index 221db19..aa4dbae 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi knowledge-log - 20220722 + 20220819 log-completion-schema http://www.example.com @@ -218,7 +218,7 @@ ru.yandex.clickhouse clickhouse-jdbc - 0.2.6 + 0.2.4 diff --git a/properties/default_config.properties b/properties/default_config.properties index 25a975d..0adb876 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -44,4 +44,4 @@ mail.default.charset=UTF-8 log.transform.type=1 #\u4E24\u4E2A\u8F93\u51FA\u4E4B\u95F4\u7684\u6700\u5927\u65F6\u95F4(\u5355\u4F4Dmilliseconds) -buffer.timeout=5000 +buffer.timeout=15000 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 874188e..3f47eda 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -8,23 +8,23 @@ sink.kafka.servers= tools.library= #--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------# #\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B -group.id=KNOWLEDGE-GROUP3 +group.id=KNOWLEDGE-GROUP5 #--------------------------------topology\u914D\u7F6E------------------------------# #consumer \u5E76\u884C\u5EA6 source.parallelism=1 #\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6 transform.parallelism=1 #kafka producer \u5E76\u884C\u5EA6 -sink.parallelism=1 +sink.parallelism=3 #--------------------------------\u4E1A\u52A1\u914D\u7F6E------------------------------# #1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7 -log.type=1 +log.type=2 #\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy producer.kafka.compression.type=none #kafka\u6570\u636E\u6E90topic -source.kafka.topic.connection=test12 +source.kafka.topic.connection=CONNECTION-RECORD-LOG source.kafka.topic.sketch=CONNECTION-SKETCH-RECORD-LOG source.kafka.topic.dns=DNS-RECORD-LOG #\u5199\u5165clickhouse\u672C\u5730\u8868 @@ -43,27 +43,27 @@ sink.arangodb.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN #\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD70\uFF1A\u5426\uFF0C1\uFF1A\u662F sink.ck.raw.log.insert.open=1 #clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123 -ck.hosts=192.168.44.12:8123 +ck.hosts=192.168.44.85:8123,192.168.44.86:8123,192.168.44.87:8123 ck.database=tsg_galaxy_v3 -ck.username=default +ck.username=tsg_insert ck.pin=galaxy2019 #\u8D85\u65F6\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2 ck.connection.timeout=10000 -ck.socket.timeout=300000 +ck.socket.timeout=600000 #clickhouse\u5165\u5E93\u6279\u91CF\u5355\u4F4D\u6761 -ck.batch=10 +ck.batch=100000 #clickhouse\u5165\u5E93\u524D\u79EF\u7D2F\u6279\u91CF\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2 -sink.ck.batch.delay.time=2000 +sink.ck.batch.delay.time=30000 #flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4 -flink.watermark.max.delay.time=50 +flink.watermark.max.delay.time=60 #ck relation\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds -log.aggregate.duration=5 +log.aggregate.duration=30 #arangodb \u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds -log.aggregate.duration.graph=5 +log.aggregate.duration.graph=600 #arangoDB\u53C2\u6570\u914D\u7F6E -arangodb.host=192.168.44.12 +arangodb.host=192.168.44.83 arangodb.port=8529 arangodb.user=root arangodb.password=galaxy_2019 diff --git a/src/main/java/com/zdjizhi/enums/LogMetadata.java b/src/main/java/com/zdjizhi/enums/LogMetadata.java index d3a7f49..2949085 100644 --- a/src/main/java/com/zdjizhi/enums/LogMetadata.java +++ b/src/main/java/com/zdjizhi/enums/LogMetadata.java @@ -1,6 +1,11 @@ package com.zdjizhi.enums; import cn.hutool.core.util.EnumUtil; +import cn.hutool.core.util.StrUtil; + +import java.util.Arrays; + +import static com.zdjizhi.common.FlowWriteConfig.CK_DATABASE; /** * @description: \ @@ -10,23 +15,27 @@ import cn.hutool.core.util.EnumUtil; public enum LogMetadata { /* - * 日志名称topic,表名 + * 日志名称,表名,字段 * */ - CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log"), - CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log"), - DNS_RECORD_LOG("dns_record_log", "dns_record_log"), + CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{"cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status", "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy", "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent", "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num", "imei", "imsi"}), + CONNECTION_RELATION_LOG("connection_relation_log", "connection_relation_log_local", new String[]{"start_time", "end_time", "src_ip", "dst_ip", "sessions", "packets", "bytes"}), + CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log_local", new String[]{"sled_ip", "sketch_start_time", "sketch_duration", "src_ip", "dst_ip", "sketch_sessions", "sketch_packets", "sketch_bytes"}), + DNS_RECORD_LOG("dns_record_log", "dns_record_log_local", new String[]{"capture_time", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "dns_flag", "ttl", "protocol", "fxo_id", "req_type", "qname", "response", "dns_a", "dns_a_num", "dns_cname", "dns_cname_num", "dns_aaaa", "dns_aaaa_num", "dns_mx", "dns_mx_num", "dns_ns", "dns_ns_num"}), + DNS_RELATION_LOG("dns_relation_log", "dns_relation_log_local", new String[]{"start_time", "end_time", "record_type", "qname", "record", "sessions"}), ; private String source; private String sink; + private String[] fields; LogMetadata() { } - LogMetadata(String source, String sink) { + LogMetadata(String source, String sink, String[] fields) { this.source = source; this.sink = sink; + this.fields = fields; } public String getSource() { @@ -37,10 +46,31 @@ public enum LogMetadata { return sink; } + public String[] getFields() { + return fields; + } + public static String getLogSink(String source) { LogMetadata logMetadata = EnumUtil.fromString(LogMetadata.class, source); return logMetadata.getSink(); - } + public static String[] getLogFields(String tableName) { + LogMetadata[] values = LogMetadata.values(); + for (LogMetadata value : values) { + if (value.sink.equals(tableName)) { + return value.fields; + } + } + return null; + } + + public static String preparedSql(String tableName) { + String[] fields = LogMetadata.getLogFields(tableName); + String[] placeholders = new String[fields.length]; + Arrays.fill(placeholders, "?"); + + return StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName, + "(", StrUtil.join(",", fields), ") VALUES (", StrUtil.join(",", placeholders), ")"); + } } diff --git a/src/main/java/com/zdjizhi/etl/LogFormat.java b/src/main/java/com/zdjizhi/etl/LogFormat.java deleted file mode 100644 index c0edaa8..0000000 --- a/src/main/java/com/zdjizhi/etl/LogFormat.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.zdjizhi.etl; - -import com.zdjizhi.utils.json.TypeUtils; - -import java.util.Map; - -public class LogFormat { - - public static Map connTime(Map value) { - value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time"))); - return value; - } - - - public static Map sketchTime(Map value) { - value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time"))); - return value; - } -} diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java new file mode 100644 index 0000000..56989b1 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/LogService.java @@ -0,0 +1,38 @@ +package com.zdjizhi.etl; + +import com.zdjizhi.etl.connection.ArangodbBatchIPWindow; +import com.zdjizhi.utils.arangodb.ArangoDBSink; +import com.zdjizhi.utils.ck.ClickhouseSink; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.util.Map; + +import static com.zdjizhi.common.FlowWriteConfig.*; + +public interface LogService { + + public static void getLogCKSink(DataStream> sourceStream, String sink) throws Exception{ + + sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + + } + + public static void getLogArangoSink(DataStream> sourceStream, String sink) throws Exception{ + sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new ArangodbBatchIPWindow()) + .addSink(new ArangoDBSink(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + } +} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java new file mode 100644 index 0000000..f413cc3 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -0,0 +1,104 @@ +package com.zdjizhi.etl.connection; + +import cn.hutool.core.convert.Convert; +import com.zdjizhi.etl.LogService; +import com.zdjizhi.etl.dns.SketchTimeMapFunction; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; + +import static com.zdjizhi.common.FlowWriteConfig.*; + + +public class ConnLogService { + + public static void connLogStream(StreamExecutionEnvironment env) throws Exception{ + //connection + DataStream> connSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); + //sketch + DataStream> sketchSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); + + //写入CKsink,批量处理 + LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); + + LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); + + //transform + DataStream> connTransformStream = ConnLogService.getConnTransformStream(connSource); + + //写入ck通联relation表 + LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); + + DataStream> sketchTransformStream = ConnLogService.getSketchTransformStream(sketchSource); + + //合并通联和通联sketch + DataStream> ip2ipGraph = ConnLogService.getConnUnion(connTransformStream, sketchTransformStream); + + //写入arangodb + LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); + + } + + /** + * 通联原始日志数据源消费kafka + * + * @param source + * @return + */ + private static DataStream> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { + + String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; + + DataStream> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) + .setParallelism(SOURCE_PARALLELISM) + .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0) + .setParallelism(SOURCE_PARALLELISM) + .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction()) + .setParallelism(SOURCE_PARALLELISM) + .name(source) + .setParallelism(SOURCE_PARALLELISM); + return sourceStream; + } + + private static DataStream> getConnTransformStream(DataStream> connSource) throws Exception { + DataStream> connTransformStream = connSource + .assignTimestampsAndWatermarks(WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) + .withTimestampAssigner((event, timestamp) -> { + return Convert.toLong(event.get("conn_start_time")) * 1000; + })) + .setParallelism(TRANSFORM_PARALLELISM) + .keyBy(new IpKeysSelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .process(new ConnProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM); + return connTransformStream; + } + + private static DataStream> getSketchTransformStream(DataStream> sketchSource) throws Exception { + DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) + .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) + .keyBy(new IpKeysSelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .process(new SketchProcessFunction()); + return sketchTransformStream; + } + + private static DataStream> getConnUnion(DataStream> connTransformStream, DataStream> sketchTransformStream) throws Exception { + DataStream> ip2ipGraph = connTransformStream.union(sketchTransformStream) + .keyBy(new IpKeysSelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .process(new Ip2IpGraphProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM); + return ip2ipGraph; + } + +} diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java new file mode 100644 index 0000000..5e39df4 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java @@ -0,0 +1,76 @@ +package com.zdjizhi.etl.dns; + +import cn.hutool.core.convert.Convert; +import cn.hutool.core.util.ObjectUtil; +import com.zdjizhi.enums.DnsType; +import com.zdjizhi.etl.LogService; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; + +import static com.zdjizhi.common.FlowWriteConfig.*; + +public class DnsLogService { + + public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception{ + + DataStream> dnsSource = DnsLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_DNS); + + //dns 原始日志 ck入库 + LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS); + + DataStream> dnsTransform = DnsLogService.getDnsTransformStream(dnsSource); + + //dns 拆分后relation日志 ck入库 + LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); + + //arango 入库,按record_type分组入不同的表 + DataStream> dnsGraph = dnsTransform.filter(Objects::nonNull) + .keyBy(new DnsGraphKeysSelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .process(new DnsGraphProcessFunction()) + .setParallelism(SINK_PARALLELISM); + + for (DnsType dnsEnum : DnsType.values()) { + DataStream> dnsRecordData = dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) + .setParallelism(SINK_PARALLELISM); + LogService.getLogArangoSink(dnsRecordData, dnsEnum.getSink()); + } + + } + + private static DataStream> getLogSource(StreamExecutionEnvironment env, String source) throws Exception{ + + DataStream> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) + .setParallelism(SOURCE_PARALLELISM) + .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get("capture_time")) > 0) + .setParallelism(SOURCE_PARALLELISM) + .map(new DnsMapFunction()) + .setParallelism(SOURCE_PARALLELISM) + .name(source); + return dnsSource; + } + + private static DataStream> getDnsTransformStream(DataStream> dnsSource) throws Exception{ + DataStream> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) + .assignTimestampsAndWatermarks(WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) + .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) + .setParallelism(TRANSFORM_PARALLELISM) + .flatMap(new DnsSplitFlatMapFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .keyBy(new DnsGraphKeysSelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .process(new DnsRelationProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM); + return dnsTransform; + } + +} diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index ab577ce..f7b6b70 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -1,30 +1,13 @@ package com.zdjizhi.topology; -import cn.hutool.core.convert.Convert; -import cn.hutool.core.util.ObjectUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.enums.DnsType; -import com.zdjizhi.etl.CKBatchWindow; -import com.zdjizhi.etl.CountTriggerWithTimeout; -import com.zdjizhi.etl.connection.*; -import com.zdjizhi.etl.dns.*; -import com.zdjizhi.utils.arangodb.ArangoDBSink; -import com.zdjizhi.utils.ck.ClickhouseSink; -import com.zdjizhi.utils.kafka.KafkaConsumer; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; +import com.zdjizhi.etl.connection.ConnLogService; +import com.zdjizhi.etl.dns.DnsLogService; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; -import java.time.Duration; -import java.util.Map; -import java.util.Objects; - -import static com.zdjizhi.common.FlowWriteConfig.*; +import static com.zdjizhi.common.FlowWriteConfig.BUFFER_TIMEOUT; +import static com.zdjizhi.common.FlowWriteConfig.LOG_TYPE; public class LogFlowWriteTopology { private static final Log logger = LogFactory.get(); @@ -32,154 +15,18 @@ public class LogFlowWriteTopology { public static void main(String[] args) { try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - //两个输出之间的最大时间 (单位milliseconds) - env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); - + env.setBufferTimeout(BUFFER_TIMEOUT); //1 connection,2 dns - if (FlowWriteConfig.LOG_TYPE == 1) { - //connection - DataStream> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION)) - .setParallelism(SOURCE_PARALLELISM) - .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("conn_start_time"))>0) - .map(new ConnTimeMapFunction()) - .setParallelism(SOURCE_PARALLELISM) - .name(SOURCE_KAFKA_TOPIC_CONNECTION); - - //sketch - DataStream> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH)) - .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("sketch_start_time"))>0) - .map(new SketchTimeMapFunction()) - .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) - .name(SOURCE_KAFKA_TOPIC_SKETCH); - - //写入CKsink,批量处理 - if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { - connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("CKSink"); - - sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("CKSink"); - - } - - //transform - DataStream> connTransformStream = connSource - .assignTimestampsAndWatermarks(WatermarkStrategy - .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) - .withTimestampAssigner((event, timestamp) -> {return Convert.toLong(event.get("conn_start_time")) * 1000;})) - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) - .keyBy(new IpKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) - .process(new ConnProcessFunction()) - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) - .filter(x -> Objects.nonNull(x)) - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - - connTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("CKSink"); - - DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy - .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) - .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) - .keyBy(new IpKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) - .process(new SketchProcessFunction()) - .filter(Objects::nonNull) - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - - //入Arangodb - DataStream> ip2ipGraph = connTransformStream.union(sketchTransformStream) - .keyBy(new IpKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) - .process(new Ip2IpGraphProcessFunction()) - .setParallelism(TRANSFORM_PARALLELISM) - .filter(Objects::nonNull) - .setParallelism(TRANSFORM_PARALLELISM); - - //写入arangodb - ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new ArangodbBatchIPWindow()) - .addSink(new ArangoDBSink(R_VISIT_IP2IP)) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM).name(R_VISIT_IP2IP); - - } else if (FlowWriteConfig.LOG_TYPE == 2) { - DataStream> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS)) - .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) - .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("capture_time"))>0) - .map(new DnsMapFunction()) - .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) - .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS); - - //dns 原始日志 ck入库 - if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { - dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("CKSink"); - } - - DataStream> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) - .assignTimestampsAndWatermarks(WatermarkStrategy - .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) - .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) - .setParallelism(TRANSFORM_PARALLELISM) - .flatMap(new DnsSplitFlatMapFunction()) - .setParallelism(TRANSFORM_PARALLELISM) - .keyBy(new DnsGraphKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) - .process(new DnsRelationProcessFunction()) - .setParallelism(TRANSFORM_PARALLELISM) - .filter(Objects::nonNull) - .setParallelism(TRANSFORM_PARALLELISM); - - //dns 拆分后relation日志 ck入库 - dnsTransform.filter(Objects::nonNull).windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS)) - .setParallelism(SINK_PARALLELISM) - .name("CKSink"); - - //arango 入库,按record_type分组入不同的表 - DataStream> dnsGraph = dnsTransform.filter(Objects::nonNull).keyBy(new DnsGraphKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) - .process(new DnsGraphProcessFunction()) - .setParallelism(SINK_PARALLELISM); - - for (DnsType dnsEnum : DnsType.values()) { - dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) - .setParallelism(SINK_PARALLELISM) - .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new ArangodbBatchDnsWindow()) - .addSink(new ArangoDBSink(dnsEnum.getSink())) - .setParallelism(SINK_PARALLELISM) - .name("ArangodbSink"); - } - + if (LOG_TYPE == 1) { + ConnLogService.connLogStream(env); + } else if (LOG_TYPE == 2) { + DnsLogService.dnsLogStream(env); } - env.execute(args[0]); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is : {}", e); } - } } diff --git a/src/main/java/com/zdjizhi/utils/ck/CKUtils.java b/src/main/java/com/zdjizhi/utils/ck/CKUtils.java new file mode 100644 index 0000000..7ff48d6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/CKUtils.java @@ -0,0 +1,54 @@ +package com.zdjizhi.utils.ck; + +import cn.hutool.core.io.IoUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +import static com.zdjizhi.common.FlowWriteConfig.*; + +public class CKUtils { + + private static final Log log = LogFactory.get(); + + private static Connection connection; + + public static Connection getConnection() { + + try { + ClickHouseProperties props = new ClickHouseProperties(); + props.setDatabase(CK_DATABASE); + props.setUser(CK_USERNAME); + props.setPassword(CK_PIN); + props.setConnectionTimeout(CK_CONNECTION_TIMEOUT); + props.setSocketTimeout(CK_SOCKET_TIMEOUT); + props.setMaxThreads(50); + + BalancedClickhouseDataSource blDataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, props); + blDataSource.actualize(); + blDataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测 + +// HikariConfig conf = new HikariConfig(); +// conf.setDataSource(blDataSource); +// conf.setMinimumIdle(1); +// conf.setMaximumPoolSize(20); +// +// HikariDataSource hkDs = new HikariDataSource(conf); + connection = blDataSource.getConnection(); + log.debug("get clickhouse connection success"); + } catch (SQLException e) { + log.error("clickhouse connection error ,{}", e); + } + return connection; + } + + public static void close(Connection connection) throws Exception { + IoUtil.close(connection); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index d58190d..75a3d7d 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -1,26 +1,18 @@ package com.zdjizhi.utils.ck; -import cn.hutool.core.convert.Convert; import cn.hutool.core.io.IoUtil; -import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.enums.LogMetadata; +import org.apache.commons.lang3.time.StopWatch; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import ru.yandex.clickhouse.BalancedClickhouseDataSource; -import ru.yandex.clickhouse.settings.ClickHouseProperties; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static com.zdjizhi.common.FlowWriteConfig.*; public class ClickhouseSink extends RichSinkFunction>> { @@ -49,90 +41,49 @@ public class ClickhouseSink extends RichSinkFunction>> @Override public void open(Configuration parameters) throws Exception { - try { - ClickHouseProperties properties = new ClickHouseProperties(); - - properties.setDatabase(CK_DATABASE); - properties.setUser(CK_USERNAME); - properties.setPassword(CK_PIN); -// properties.setKeepAliveTimeout(5); - properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT); - properties.setSocketTimeout(CK_SOCKET_TIMEOUT); - - BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties); - dataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测 - connection = dataSource.getConnection(); - log.debug("get clickhouse connection success"); - } catch (SQLException e) { - log.error("clickhouse connection error ,{}", e); - } - + connection = CKUtils.getConnection(); } @Override public void close() throws Exception { IoUtil.close(preparedStatement); - IoUtil.close(connection); + CKUtils.close(connection); } public void executeInsert(List> data, String tableName) { try { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + log.debug("开始写入ck数据 :{}", data.size()); + + boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); - int count = 0; - for (Map map : data) { - List keys = new LinkedList<>(map.keySet()); - preparedStatement = connection.prepareStatement(preparedSql(keys, tableName)); - - List values = new LinkedList<>(map.values()); - for (int i = 1; i <= values.size(); i++) { - Object val = values.get(i - 1); - if (val instanceof Long) { - preparedStatement.setLong((i), Convert.toLong(val)); - } else if (val instanceof Integer) { - preparedStatement.setLong((i), Convert.toLong(val)); - } else if (val instanceof Boolean) { - preparedStatement.setInt((i), Boolean.valueOf(StrUtil.toString(val)) ? 1 : 0); - } else { - preparedStatement.setString((i), StrUtil.toString(val)); - } - } - preparedStatement.addBatch(); - count++; - //1w提交一次 - if (count % CK_BATCH == 0) { - preparedStatement.executeBatch(); - connection.commit(); - preparedStatement.clearBatch(); - count = 0; - } - } - if (count > 0) { - preparedStatement.executeBatch(); - connection.commit(); - } - + batch(data, tableName); + preparedStatement.executeBatch(); + connection.commit(); + connection.setAutoCommit(autoCommit); + stopWatch.stop(); + log.debug("总共花费时间 {}", stopWatch.getTime()); } catch (Exception ex) { log.error("ClickhouseSink插入报错", ex); } } - - public static String preparedSql(List fields, String tableName) { - - String placeholders = fields.stream() - .filter(Objects::nonNull) - .map(f -> "?") - .collect(Collectors.joining(", ")); - String columns = fields.stream() - .filter(Objects::nonNull) - .collect(Collectors.joining(", ")); - String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName, - "(", columns, ") VALUES (", placeholders, ")"); + private void batch(List> data, String tableName) throws SQLException { + String[] logFields = LogMetadata.getLogFields(tableName); + String sql = LogMetadata.preparedSql(tableName); log.debug(sql); - return sql; + preparedStatement = connection.prepareStatement(sql); + + for (Map map : data) { + for (int i = 0; i < logFields.length; i++) { + preparedStatement.setObject(i + 1, map.get(logFields[i])); + } + preparedStatement.addBatch(); + } + } - } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java deleted file mode 100644 index de507ad..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.zdjizhi.utils.functions; - -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class FilterNullFunction implements FilterFunction { - @Override - public boolean filter(String message) { - return StringUtil.isNotBlank(message); - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java deleted file mode 100644 index 810e4c8..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.zdjizhi.utils.functions; - - -import com.zdjizhi.utils.general.TransFormMap; -import org.apache.flink.api.common.functions.MapFunction; - -import java.util.Map; - - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class MapCompletedFunction implements MapFunction, String> { - - @Override - @SuppressWarnings("unchecked") - public String map(Map logs) { - return TransFormMap.dealCommonMessage(logs); - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java deleted file mode 100644 index ccef850..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.zdjizhi.utils.functions; - -import com.zdjizhi.utils.general.TransFormTypeMap; -import org.apache.flink.api.common.functions.MapFunction; - -import java.util.Map; - - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class TypeMapCompletedFunction implements MapFunction, String> { - - @Override - @SuppressWarnings("unchecked") - public String map(Map logs) { - - return TransFormTypeMap.dealCommonMessage(logs); - } -} diff --git a/src/main/java/com/zdjizhi/utils/general/CityHash.java b/src/main/java/com/zdjizhi/utils/general/CityHash.java deleted file mode 100644 index 5de4785..0000000 --- a/src/main/java/com/zdjizhi/utils/general/CityHash.java +++ /dev/null @@ -1,180 +0,0 @@ -package com.zdjizhi.utils.general; - - - - -/** - * CityHash64算法对logid进行散列计算 - * 版本规划暂不实现-TSG22.01 - * - * @author qidaijie - */ -@Deprecated -public class CityHash { - - private static final long k0 = 0xc3a5c85c97cb3127L; - private static final long k1 = 0xb492b66fbe98f273L; - private static final long k2 = 0x9ae16a3b2f90404fL; - private static final long k3 = 0xc949d7c7509e6557L; - private static final long k5 = 0x9ddfea08eb382d69L; - - private CityHash() {} - - public static long CityHash64(byte[] s, int index, int len) { - if (len <= 16 ) { - return HashLen0to16(s, index, len); - } else if (len > 16 && len <= 32) { - return HashLen17to32(s, index, len); - } else if (len > 32 && len <= 64) { - return HashLen33to64(s, index, len); - } else { - long x = Fetch64(s, index); - long y = Fetch64(s, index + len - 16) ^ k1; - long z = Fetch64(s, index + len - 56) ^ k0; - long[] v = WeakHashLen32WithSeeds(s, len - 64, len, y); - long[] w = WeakHashLen32WithSeeds(s, len - 32, len * k1, k0); - z += ShiftMix(v[1]) * k1; - x = Rotate(z + x, 39) * k1; - y = Rotate(y, 33) * k1; - - len = (len - 1) & ~63; - do { - x = Rotate(x + y + v[0] + Fetch64(s, index + 16), 37) * k1; - y = Rotate(y + v[1] + Fetch64(s, index + 48), 42) * k1; - x ^= w[1]; - y ^= v[0]; - z = Rotate(z ^ w[0], 33); - v = WeakHashLen32WithSeeds(s, index, v[1] * k1, x + w[0]); - w = WeakHashLen32WithSeeds(s, index + 32, z + w[1], y); - long t = z; - z = x; - x = t; - index += 64; - len -= 64; - } while (len != 0); - return HashLen16(HashLen16(v[0], w[0]) + ShiftMix(y) * k1 + z, - HashLen16(v[1], w[1]) + x); - } - } - - private static long HashLen0to16(byte[] s, int index, int len) { - if (len > 8) { - long a = Fetch64(s, index); - long b = Fetch64(s, index + len - 8); - return HashLen16(a, RotateByAtLeastOne(b + len, len)) ^ b; - } - if (len >= 4) { - long a = Fetch32(s, index); - return HashLen16(len + (a << 3), Fetch32(s, index + len - 4)); - } - if (len > 0) { - byte a = s[index]; - byte b = s[index + len >>> 1]; - byte c = s[index + len - 1]; - int y = (a) + (b << 8); - int z = len + (c << 2); - return ShiftMix(y * k2 ^ z * k3) * k2; - } - return k2; - } - - private static long HashLen17to32(byte[] s, int index, int len) { - long a = Fetch64(s, index) * k1; - long b = Fetch64(s, index + 8); - long c = Fetch64(s, index + len - 8) * k2; - long d = Fetch64(s, index + len - 16) * k0; - return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d, - a + Rotate(b ^ k3, 20) - c + len); - } - - private static long HashLen33to64(byte[] s, int index, int len) { - long z = Fetch64(s, index + 24); - long a = Fetch64(s, index) + (len + Fetch64(s, index + len - 16)) * k0; - long b = Rotate(a + z, 52); - long c = Rotate(a, 37); - a += Fetch64(s, index + 8); - c += Rotate(a, 7); - a += Fetch64(s, index + 16); - long vf = a + z; - long vs = b + Rotate(a, 31) + c; - a = Fetch64(s, index + 16) + Fetch64(s, index + len - 32); - z = Fetch64(s, index + len - 8); - b = Rotate(a + z, 52); - c = Rotate(a, 37); - a += Fetch64(s, index + len - 24); - c += Rotate(a, 7); - a += Fetch64(s, index + len - 16); - long wf = a + z; - long ws = b + Rotate(a, 31) + c; - long r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0); - return ShiftMix(r * k0 + vs) * k2; - } - - private static long Fetch64(byte[] p, int index) { - return toLongLE(p,index); - } - - private static long Fetch32(byte[] p, int index) { - return toIntLE(p,index); - } - private static long[] WeakHashLen32WithSeeds( - long w, long x, long y, long z, long a, long b) { - a += w; - b = Rotate(b + a + z, 21); - long c = a; - a += x; - a += y; - b += Rotate(a, 44); - return new long[]{a + z, b + c}; - } - - private static long[] WeakHashLen32WithSeeds(byte[] s, int index, long a, long b) { - return WeakHashLen32WithSeeds(Fetch64(s, index), - Fetch64(s, index + 8), - Fetch64(s, index + 16), - Fetch64(s, index + 24), - a, - b); - } - - private static long toLongLE(byte[] b, int i) { - return 0xffffffffffffffffL & (((long) b[i + 7] << 56) + ((long) (b[i + 6] & 255) << 48) + ((long) (b[i + 5] & 255) << 40) + ((long) (b[i + 4] & 255) << 32) + ((long) (b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0)); - } - - private static long toIntLE(byte[] b, int i) { - return 0xffffffffL & (((b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0)); - } - private static long RotateByAtLeastOne(long val, int shift) { - return (val >>> shift) | (val << (64 - shift)); - } - - private static long ShiftMix(long val) { - return val ^ (val >>> 47); - } - - private static long Uint128Low64(long[] x) { - return x[0]; - } - - private static long Rotate(long val, int shift) { - return shift == 0 ? val : (val >>> shift) | (val << (64 - shift)); - } - - private static long Uint128High64(long[] x) { - return x[1]; - } - - private static long Hash128to64(long[] x) { - long a = (Uint128Low64(x) ^ Uint128High64(x)) * k5; - a ^= (a >>> 47); - long b = (Uint128High64(x) ^ a) * k5; - b ^= (b >>> 47); - b *= k5; - return b; - } - - private static long HashLen16(long u, long v) { - return Hash128to64(new long[]{u,v}); - } - -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java deleted file mode 100644 index 35312fc..0000000 --- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java +++ /dev/null @@ -1,120 +0,0 @@ -package com.zdjizhi.utils.general; - - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.json.JsonParseUtil; - -import java.util.Map; - - -/** - * 描述:转换或补全工具类 - * - * @author qidaijie - */ -public class TransFormMap { - private static final Log logger = LogFactory.get(); - - /** - * 解析日志,并补全 - * - * @param jsonMap kafka Topic消费原始日志并解析 - * @return 补全后的日志 - */ - @SuppressWarnings("unchecked") - public static String dealCommonMessage(Map jsonMap) { - try { - JsonParseUtil.dropJsonField(jsonMap); - for (String[] strings : JsonParseUtil.getJobList()) { - //用到的参数的值 - Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); - //需要补全的字段的key - String appendToKeyName = strings[1]; - //需要补全的字段的值 - Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName); - //匹配操作函数的字段 - String function = strings[2]; - //额外的参数的值 - String param = strings[3]; - functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param); - } - return JsonMapper.toJsonString(jsonMap); - } catch (RuntimeException e) { - logger.error("TransForm logs failed,The exception is :" + e); - return null; - } - } - - - /** - * 根据schema描述对应字段进行操作的 函数集合 - * - * @param function 匹配操作函数的字段 - * @param jsonMap 原始日志解析map - * @param appendToKeyName 需要补全的字段的key - * @param appendTo 需要补全的字段的值 - * @param logValue 用到的参数的值 - * @param param 额外的参数的值 - */ - private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) { - switch (function) { - case "current_timestamp": - if (!(appendTo instanceof Long)) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); - } - break; -// case "snowflake_id": -// JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); -// break; - case "geo_ip_detail": - if (logValue != null && appendTo == null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); - } - break; - case "geo_asn": - if (logValue != null && appendTo == null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString())); - } - break; - case "geo_ip_country": - if (logValue != null && appendTo == null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString())); - } - break; - case "set_value": - if (param != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, param); - } - break; - case "get_value": - if (logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue); - } - break; - case "if": - if (param != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param)); - } - break; - case "sub_domain": - if (appendTo == null && logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString())); - } - break; - case "decode_of_base64": - if (logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param))); - } - break; - case "flattenSpec": - if (logValue != null && param != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); - } - break; - default: - } - } - -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java deleted file mode 100644 index f13894e..0000000 --- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java +++ /dev/null @@ -1,122 +0,0 @@ -package com.zdjizhi.utils.general; - - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.json.JsonParseUtil; - -import java.util.Map; - - -/** - * 描述:转换或补全工具类 - * - * @author qidaijie - */ -public class TransFormTypeMap { - private static final Log logger = LogFactory.get(); - - /** - * 解析日志,并补全 - * - * @param message kafka Topic原始日志 - * @return 补全后的日志 - */ - @SuppressWarnings("unchecked") - public static String dealCommonMessage(Map message) { - try { - Map jsonMap = JsonParseUtil.typeTransform(message); - for (String[] strings : JsonParseUtil.getJobList()) { - //用到的参数的值 - Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); - //需要补全的字段的key - String appendToKeyName = strings[1]; - //需要补全的字段的值 - Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName); - //匹配操作函数的字段 - String function = strings[2]; - //额外的参数的值 - String param = strings[3]; - functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param); - } - return JsonMapper.toJsonString(jsonMap); - } catch (RuntimeException e) { - logger.error("TransForm logs failed,The exception is :" + e); - return null; - } - } - - - /** - * 根据schema描述对应字段进行操作的 函数集合 - * - * @param function 匹配操作函数的字段 - * @param jsonMap 原始日志解析map - * @param appendToKeyName 需要补全的字段的key - * @param appendToKeyValue 需要补全的字段的值 - * @param logValue 用到的参数的值 - * @param param 额外的参数的值 - */ - private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) { - switch (function) { - case "current_timestamp": - if (!(appendToKeyValue instanceof Long)) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); - } - break; -// case "snowflake_id": -// JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); -// //版本规划暂不实现TSG-22.01 -//// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId())); -// break; - case "geo_ip_detail": - if (logValue != null && appendToKeyValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); - } - break; - case "geo_asn": - if (logValue != null && appendToKeyValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString())); - } - break; - case "geo_ip_country": - if (logValue != null && appendToKeyValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString())); - } - break; - case "set_value": - if (param != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, param); - } - break; - case "get_value": - if (logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue); - } - break; - case "if": - if (param != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param)); - } - break; - case "sub_domain": - if (appendToKeyValue == null && logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString())); - } - break; - case "decode_of_base64": - if (logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param))); - } - break; - case "flattenSpec": - if (logValue != null && param != null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); - } - break; - default: - } - } - -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java deleted file mode 100644 index 84fe5cc..0000000 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ /dev/null @@ -1,230 +0,0 @@ -package com.zdjizhi.utils.general; - -import cn.hutool.core.codec.Base64; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.jayway.jsonpath.InvalidPathException; -import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.FormatUtils; -import com.zdjizhi.utils.IpLookupV2; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.json.JsonParseUtil; - -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Map; -import java.util.regex.Pattern; - -/** - * @author qidaijie - */ -class TransFunction { - private static final Log logger = LogFactory.get(); - - /** - * 校验数字正则 - */ - private static final Pattern PATTERN = Pattern.compile("[0-9]*"); - - /** - * IP定位库工具类 - */ - private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) - .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb") - .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb") - .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb") - .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb") - .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb") - .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb") - .build(); - - /** - * 生成当前时间戳的操作 - */ - static long getCurrentTime() { - - return System.currentTimeMillis() / 1000; - } - - /** - * CityHash64算法 - * 版本规划暂不实现-TSG22.01 - * - * @param data 原始数据 - * @return 散列结果 - */ - @Deprecated - static BigInteger getDecimalHash(long data) { - byte[] dataBytes = String.valueOf(data).getBytes(); - long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length); - String decimalValue = Long.toUnsignedString(hashValue, 10); - return new BigInteger(decimalValue); - } - - /** - * 根据clientIp获取location信息 - * - * @param ip client IP - * @return ip地址详细信息 - */ - static String getGeoIpDetail(String ip) { - try { - return ipLookup.cityLookupDetail(ip); - } catch (NullPointerException npe) { - logger.error("The MMDB file is not loaded or IP is null! " + npe); - return ""; - } catch (RuntimeException e) { - logger.error("Get clientIP location error! " + e); - return ""; - } - } - - /** - * 根据ip获取asn信息 - * - * @param ip client/server IP - * @return ASN - */ - static String getGeoAsn(String ip) { - try { - return ipLookup.asnLookup(ip); - } catch (NullPointerException npe) { - logger.error("The MMDB file is not loaded or IP is null! " + npe); - return ""; - } catch (RuntimeException e) { - logger.error("Get IP ASN error! " + e); - return ""; - } - } - - /** - * 根据ip获取country信息 - * - * @param ip server IP - * @return 国家 - */ - static String getGeoIpCountry(String ip) { - try { - return ipLookup.countryLookup(ip); - } catch (NullPointerException npe) { - logger.error("The MMDB file is not loaded or IP is null! " + npe); - return ""; - } catch (RuntimeException e) { - logger.error("Get ServerIP location error! " + e); - return ""; - } - } - - - /** - * radius借助HBase补齐 - * - * @param ip client IP - * @return account - */ - - - /** - * 解析顶级域名 - * - * @param domain 初始域名 - * @return 顶级域名 - */ - static String getTopDomain(String domain) { - try { - return FormatUtils.getTopPrivateDomain(domain); - } catch (StringIndexOutOfBoundsException outException) { - logger.error("Parse top-level domain exceptions, exception domain names:" + domain); - return ""; - } - } - - /** - * 根据编码解码base64 - * - * @param message base64 - * @param charset 编码 - * @return 解码字符串 - */ - static String decodeBase64(String message, Object charset) { - String result = ""; - try { - if (StringUtil.isNotBlank(message)) { - if (charset == null) { - result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET); - } else { - result = Base64.decodeStr(message, charset.toString()); - } - } - } catch (RuntimeException e) { - logger.error("Resolve Base64 exception, exception information:" + e); - } - return result; - } - - /** - * 根据表达式解析json - * - * @param message json - * @param expr 解析表达式 - * @return 解析结果 - */ - static String flattenSpec(String message, String expr) { - String flattenResult = ""; - try { - if (StringUtil.isNotBlank(expr)) { - ArrayList read = JsonPath.parse(message).read(expr); - if (read.size() >= 1) { - flattenResult = read.get(0); - } - } - } catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) { - logger.error("The device label resolution exception or [expr] analytic expression error" + e); - } - return flattenResult; - } - - /** - * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 - * - * @param jsonMap 内存实体类 - * @param param 字段名/普通字符串 - * @return JSON.Value or String - */ - static Object isJsonValue(Map jsonMap, String param) { - if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { - return JsonParseUtil.getValue(jsonMap, param.substring(2)); - } else { - return param; - } - } - - /** - * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 - * - * @param jsonMap 内存实体类 - * @param ifParam 字段名/普通字符串 - * @return resultA or resultB or null - */ - static Object condition(Map jsonMap, String ifParam) { - Object result = null; - try { - String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); - if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { - String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); - Object direction = isJsonValue(jsonMap, norms[0]); - Object resultA = isJsonValue(jsonMap, split[1]); - Object resultB = isJsonValue(jsonMap, split[2]); - if (direction instanceof Number) { - result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; - } else if (direction instanceof String) { - result = direction.equals(norms[1]) ? resultA : resultB; - } - } - } catch (RuntimeException e) { - logger.error("IF 函数执行异常,异常信息:" + e); - } - return result; - } -} diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java deleted file mode 100644 index c667224..0000000 --- a/src/test/java/com/zdjizhi/FunctionTest.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.zdjizhi; - -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.IpLookupV2; -import com.zdjizhi.utils.general.CityHash; -import org.junit.Test; - -import java.math.BigInteger; -import java.util.Calendar; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/11/611:38 - */ -public class FunctionTest { - - private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) - .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb") -// .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb") -// .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb") -// .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb") - .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb") - .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb") - .build(); - - @Test - public void CityHashTest() { - - byte[] dataBytes = String.valueOf(613970406986188816L).getBytes(); - long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length); - String decimalValue = Long.toUnsignedString(hashValue, 10); - BigInteger result = new BigInteger(decimalValue); - System.out.println(result); - } - - @Test - public void ipLookupTest() { - String ip = "0.255.255.254"; - System.out.println(ipLookup.cityLookupDetail(ip)); - System.out.println(ipLookup.countryLookup(ip)); - } - - @Test - public void timestampTest(){ - Calendar cal = Calendar.getInstance(); - Long utcTime=cal.getTimeInMillis(); - System.out.println(utcTime); - System.out.println(System.currentTimeMillis()); - } -} diff --git a/src/test/java/com/zdjizhi/json/JsonTest.java b/src/test/java/com/zdjizhi/json/JsonTest.java index 597da40..e797b49 100644 --- a/src/test/java/com/zdjizhi/json/JsonTest.java +++ b/src/test/java/com/zdjizhi/json/JsonTest.java @@ -1,7 +1,6 @@ package com.zdjizhi.json; import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.json.JsonParseUtil; import org.junit.Test; import java.util.Map;