代码优化,抽取公共方法,优化clickhousesink写入

This commit is contained in:
zhanghongqing
2022-08-19 17:34:37 +08:00
parent ba64fe8187
commit b30f82f588
20 changed files with 365 additions and 1047 deletions

View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId> <groupId>com.zdjizhi</groupId>
<artifactId>knowledge-log</artifactId> <artifactId>knowledge-log</artifactId>
<version>20220722</version> <version>20220819</version>
<name>log-completion-schema</name> <name>log-completion-schema</name>
<url>http://www.example.com</url> <url>http://www.example.com</url>
@@ -218,7 +218,7 @@
<dependency> <dependency>
<groupId>ru.yandex.clickhouse</groupId> <groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId> <artifactId>clickhouse-jdbc</artifactId>
<version>0.2.6</version> <version>0.2.4</version>
</dependency> </dependency>
<!-- <dependency> <!-- <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
@@ -230,6 +230,11 @@
<artifactId>arangodb-java-driver</artifactId> <artifactId>arangodb-java-driver</artifactId>
<version>6.6.3</version> <version>6.6.3</version>
</dependency> </dependency>
<!-- <dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.2.0</version>
</dependency>-->
</dependencies> </dependencies>
</project> </project>

View File

@@ -44,4 +44,4 @@ mail.default.charset=UTF-8
log.transform.type=1 log.transform.type=1
#\u4E24\u4E2A\u8F93\u51FA\u4E4B\u95F4\u7684\u6700\u5927\u65F6\u95F4(\u5355\u4F4Dmilliseconds) #\u4E24\u4E2A\u8F93\u51FA\u4E4B\u95F4\u7684\u6700\u5927\u65F6\u95F4(\u5355\u4F4Dmilliseconds)
buffer.timeout=5000 buffer.timeout=15000

View File

@@ -8,23 +8,23 @@ sink.kafka.servers=
tools.library= tools.library=
#--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------# #--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------#
#\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B #\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B
group.id=KNOWLEDGE-GROUP3 group.id=KNOWLEDGE-GROUP5
#--------------------------------topology\u914D\u7F6E------------------------------# #--------------------------------topology\u914D\u7F6E------------------------------#
#consumer \u5E76\u884C\u5EA6 #consumer \u5E76\u884C\u5EA6
source.parallelism=1 source.parallelism=1
#\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6 #\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6
transform.parallelism=1 transform.parallelism=1
#kafka producer \u5E76\u884C\u5EA6 #kafka producer \u5E76\u884C\u5EA6
sink.parallelism=1 sink.parallelism=3
#--------------------------------\u4E1A\u52A1\u914D\u7F6E------------------------------# #--------------------------------\u4E1A\u52A1\u914D\u7F6E------------------------------#
#1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7 #1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7
log.type=1 log.type=2
#\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy #\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy
producer.kafka.compression.type=none producer.kafka.compression.type=none
#kafka\u6570\u636E\u6E90topic #kafka\u6570\u636E\u6E90topic
source.kafka.topic.connection=test12 source.kafka.topic.connection=CONNECTION-RECORD-LOG
source.kafka.topic.sketch=CONNECTION-SKETCH-RECORD-LOG source.kafka.topic.sketch=CONNECTION-SKETCH-RECORD-LOG
source.kafka.topic.dns=DNS-RECORD-LOG source.kafka.topic.dns=DNS-RECORD-LOG
#\u5199\u5165clickhouse\u672C\u5730\u8868 #\u5199\u5165clickhouse\u672C\u5730\u8868
@@ -43,27 +43,27 @@ sink.arangodb.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN
#\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD70\uFF1A\u5426\uFF0C1\uFF1A\u662F #\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD70\uFF1A\u5426\uFF0C1\uFF1A\u662F
sink.ck.raw.log.insert.open=1 sink.ck.raw.log.insert.open=1
#clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123 #clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123
ck.hosts=192.168.44.12:8123 ck.hosts=192.168.44.85:8123,192.168.44.86:8123,192.168.44.87:8123
ck.database=tsg_galaxy_v3 ck.database=tsg_galaxy_v3
ck.username=default ck.username=tsg_insert
ck.pin=galaxy2019 ck.pin=galaxy2019
#\u8D85\u65F6\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2 #\u8D85\u65F6\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2
ck.connection.timeout=10000 ck.connection.timeout=10000
ck.socket.timeout=300000 ck.socket.timeout=600000
#clickhouse\u5165\u5E93\u6279\u91CF\u5355\u4F4D\u6761 #clickhouse\u5165\u5E93\u6279\u91CF\u5355\u4F4D\u6761
ck.batch=10 ck.batch=100000
#clickhouse\u5165\u5E93\u524D\u79EF\u7D2F\u6279\u91CF\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2 #clickhouse\u5165\u5E93\u524D\u79EF\u7D2F\u6279\u91CF\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2
sink.ck.batch.delay.time=2000 sink.ck.batch.delay.time=30000
#flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4 #flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4
flink.watermark.max.delay.time=50 flink.watermark.max.delay.time=60
#ck relation\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds #ck relation\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds
log.aggregate.duration=5 log.aggregate.duration=30
#arangodb \u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds #arangodb \u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds
log.aggregate.duration.graph=5 log.aggregate.duration.graph=600
#arangoDB\u53C2\u6570\u914D\u7F6E #arangoDB\u53C2\u6570\u914D\u7F6E
arangodb.host=192.168.44.12 arangodb.host=192.168.44.83
arangodb.port=8529 arangodb.port=8529
arangodb.user=root arangodb.user=root
arangodb.password=galaxy_2019 arangodb.password=galaxy_2019

View File

@@ -1,6 +1,11 @@
package com.zdjizhi.enums; package com.zdjizhi.enums;
import cn.hutool.core.util.EnumUtil; import cn.hutool.core.util.EnumUtil;
import cn.hutool.core.util.StrUtil;
import java.util.Arrays;
import static com.zdjizhi.common.FlowWriteConfig.CK_DATABASE;
/** /**
* @description: \ * @description: \
@@ -10,23 +15,27 @@ import cn.hutool.core.util.EnumUtil;
public enum LogMetadata { public enum LogMetadata {
/* /*
* 日志名称topic,表名 * 日志名称,表名,字段
* */ * */
CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log"), CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{"cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status", "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy", "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent", "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num", "imei", "imsi"}),
CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log"), CONNECTION_RELATION_LOG("connection_relation_log", "connection_relation_log_local", new String[]{"start_time", "end_time", "src_ip", "dst_ip", "sessions", "packets", "bytes"}),
DNS_RECORD_LOG("dns_record_log", "dns_record_log"), CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log_local", new String[]{"sled_ip", "sketch_start_time", "sketch_duration", "src_ip", "dst_ip", "sketch_sessions", "sketch_packets", "sketch_bytes"}),
DNS_RECORD_LOG("dns_record_log", "dns_record_log_local", new String[]{"capture_time", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "dns_flag", "ttl", "protocol", "fxo_id", "req_type", "qname", "response", "dns_a", "dns_a_num", "dns_cname", "dns_cname_num", "dns_aaaa", "dns_aaaa_num", "dns_mx", "dns_mx_num", "dns_ns", "dns_ns_num"}),
DNS_RELATION_LOG("dns_relation_log", "dns_relation_log_local", new String[]{"start_time", "end_time", "record_type", "qname", "record", "sessions"}),
; ;
private String source; private String source;
private String sink; private String sink;
private String[] fields;
LogMetadata() { LogMetadata() {
} }
LogMetadata(String source, String sink) { LogMetadata(String source, String sink, String[] fields) {
this.source = source; this.source = source;
this.sink = sink; this.sink = sink;
this.fields = fields;
} }
public String getSource() { public String getSource() {
@@ -37,10 +46,31 @@ public enum LogMetadata {
return sink; return sink;
} }
public String[] getFields() {
return fields;
}
public static String getLogSink(String source) { public static String getLogSink(String source) {
LogMetadata logMetadata = EnumUtil.fromString(LogMetadata.class, source); LogMetadata logMetadata = EnumUtil.fromString(LogMetadata.class, source);
return logMetadata.getSink(); return logMetadata.getSink();
} }
public static String[] getLogFields(String tableName) {
LogMetadata[] values = LogMetadata.values();
for (LogMetadata value : values) {
if (value.sink.equals(tableName)) {
return value.fields;
}
}
return null;
}
public static String preparedSql(String tableName) {
String[] fields = LogMetadata.getLogFields(tableName);
String[] placeholders = new String[fields.length];
Arrays.fill(placeholders, "?");
return StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName,
"(", StrUtil.join(",", fields), ") VALUES (", StrUtil.join(",", placeholders), ")");
}
} }

View File

@@ -1,19 +0,0 @@
package com.zdjizhi.etl;
import com.zdjizhi.utils.json.TypeUtils;
import java.util.Map;
public class LogFormat {
public static Map<String, Object> connTime(Map<String, Object> value) {
value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time")));
return value;
}
public static Map<String, Object> sketchTime(Map<String, Object> value) {
value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time")));
return value;
}
}

View File

@@ -0,0 +1,38 @@
package com.zdjizhi.etl;
import com.zdjizhi.etl.connection.ArangodbBatchIPWindow;
import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.ClickhouseSink;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.*;
public interface LogService {
public static void getLogCKSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{
sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(sink))
.setParallelism(SINK_PARALLELISM)
.name(sink)
.setParallelism(SINK_PARALLELISM);
}
public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{
sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new ArangodbBatchIPWindow())
.addSink(new ArangoDBSink(sink))
.setParallelism(SINK_PARALLELISM)
.name(sink)
.setParallelism(SINK_PARALLELISM);
}
}

View File

@@ -0,0 +1,104 @@
package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import com.zdjizhi.etl.LogService;
import com.zdjizhi.etl.dns.SketchTimeMapFunction;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import static com.zdjizhi.common.FlowWriteConfig.*;
public class ConnLogService {
public static void connLogStream(StreamExecutionEnvironment env) throws Exception{
//connection
DataStream<Map<String, Object>> connSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
//sketch
DataStream<Map<String, Object>> sketchSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
//写入CKsink,批量处理
LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
//transform
DataStream<Map<String, Object>> connTransformStream = ConnLogService.getConnTransformStream(connSource);
//写入ck通联relation表
LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
DataStream<Map<String, Object>> sketchTransformStream = ConnLogService.getSketchTransformStream(sketchSource);
//合并通联和通联sketch
DataStream<Map<String, Object>> ip2ipGraph = ConnLogService.getConnUnion(connTransformStream, sketchTransformStream);
//写入arangodb
LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
}
/**
* 通联原始日志数据源消费kafka
*
* @param source
* @return
*/
private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
.setParallelism(SOURCE_PARALLELISM)
.filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0)
.setParallelism(SOURCE_PARALLELISM)
.map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction())
.setParallelism(SOURCE_PARALLELISM)
.name(source)
.setParallelism(SOURCE_PARALLELISM);
return sourceStream;
}
private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception {
DataStream<Map<String, Object>> connTransformStream = connSource
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> {
return Convert.toLong(event.get("conn_start_time")) * 1000;
}))
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
return connTransformStream;
}
private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction());
return sketchTransformStream;
}
private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
return ip2ipGraph;
}
}

View File

@@ -0,0 +1,76 @@
package com.zdjizhi.etl.dns;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.etl.LogService;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import static com.zdjizhi.common.FlowWriteConfig.*;
public class DnsLogService {
public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception{
DataStream<Map<String, Object>> dnsSource = DnsLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_DNS);
//dns 原始日志 ck入库
LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS);
DataStream<Map<String, Object>> dnsTransform = DnsLogService.getDnsTransformStream(dnsSource);
//dns 拆分后relation日志 ck入库
LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
//arango 入库,按record_type分组入不同的表
DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull)
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
.setParallelism(SINK_PARALLELISM);
for (DnsType dnsEnum : DnsType.values()) {
DataStream<Map<String, Object>> dnsRecordData = dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
.setParallelism(SINK_PARALLELISM);
LogService.getLogArangoSink(dnsRecordData, dnsEnum.getSink());
}
}
private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception{
DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
.setParallelism(SOURCE_PARALLELISM)
.filter(x -> Objects.nonNull(x) && Convert.toLong(x.get("capture_time")) > 0)
.setParallelism(SOURCE_PARALLELISM)
.map(new DnsMapFunction())
.setParallelism(SOURCE_PARALLELISM)
.name(source);
return dnsSource;
}
private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception{
DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
.setParallelism(TRANSFORM_PARALLELISM)
.flatMap(new DnsSplitFlatMapFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
return dnsTransform;
}
}

View File

@@ -1,30 +1,13 @@
package com.zdjizhi.topology; package com.zdjizhi.topology;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.etl.connection.ConnLogService;
import com.zdjizhi.enums.DnsType; import com.zdjizhi.etl.dns.DnsLogService;
import com.zdjizhi.etl.CKBatchWindow;
import com.zdjizhi.etl.CountTriggerWithTimeout;
import com.zdjizhi.etl.connection.*;
import com.zdjizhi.etl.dns.*;
import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.ClickhouseSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration; import static com.zdjizhi.common.FlowWriteConfig.BUFFER_TIMEOUT;
import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_TYPE;
import java.util.Objects;
import static com.zdjizhi.common.FlowWriteConfig.*;
public class LogFlowWriteTopology { public class LogFlowWriteTopology {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
@@ -32,154 +15,18 @@ public class LogFlowWriteTopology {
public static void main(String[] args) { public static void main(String[] args) {
try { try {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//两个输出之间的最大时间 (单位milliseconds) //两个输出之间的最大时间 (单位milliseconds)
env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); env.setBufferTimeout(BUFFER_TIMEOUT);
//1 connection2 dns //1 connection2 dns
if (FlowWriteConfig.LOG_TYPE == 1) { if (LOG_TYPE == 1) {
//connection ConnLogService.connLogStream(env);
DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION)) } else if (LOG_TYPE == 2) {
.setParallelism(SOURCE_PARALLELISM) DnsLogService.dnsLogStream(env);
.filter(x->Objects.nonNull(x) && Convert.toLong(x.get("conn_start_time"))>0)
.map(new ConnTimeMapFunction())
.setParallelism(SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_CONNECTION);
//sketch
DataStream<Map<String, Object>> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH))
.filter(x->Objects.nonNull(x) && Convert.toLong(x.get("sketch_start_time"))>0)
.map(new SketchTimeMapFunction())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_SKETCH);
//写入CKsink,批量处理
if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
}
//transform
DataStream<Map<String, Object>> connTransformStream = connSource
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> {return Convert.toLong(event.get("conn_start_time")) * 1000;}))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.filter(x -> Objects.nonNull(x))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
connTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction())
.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//入Arangodb
DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//写入arangodb
ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new ArangodbBatchIPWindow())
.addSink(new ArangoDBSink(R_VISIT_IP2IP))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM).name(R_VISIT_IP2IP);
} else if (FlowWriteConfig.LOG_TYPE == 2) {
DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS))
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.filter(x->Objects.nonNull(x) && Convert.toLong(x.get("capture_time"))>0)
.map(new DnsMapFunction())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
//dns 原始日志 ck入库
if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
}
DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
.setParallelism(TRANSFORM_PARALLELISM)
.flatMap(new DnsSplitFlatMapFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//dns 拆分后relation日志 ck入库
dnsTransform.filter(Objects::nonNull).windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
.setParallelism(SINK_PARALLELISM)
.name("CKSink");
//arango 入库,按record_type分组入不同的表
DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull).keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
.setParallelism(SINK_PARALLELISM);
for (DnsType dnsEnum : DnsType.values()) {
dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
.setParallelism(SINK_PARALLELISM)
.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new ArangodbBatchDnsWindow())
.addSink(new ArangoDBSink(dnsEnum.getSink()))
.setParallelism(SINK_PARALLELISM)
.name("ArangodbSink");
}
} }
env.execute(args[0]); env.execute(args[0]);
} catch (Exception e) { } catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is : {}", e); logger.error("This Flink task start ERROR! Exception information is : {}", e);
} }
} }
} }

View File

@@ -0,0 +1,54 @@
package com.zdjizhi.utils.ck;
import cn.hutool.core.io.IoUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import static com.zdjizhi.common.FlowWriteConfig.*;
public class CKUtils {
private static final Log log = LogFactory.get();
private static Connection connection;
public static Connection getConnection() {
try {
ClickHouseProperties props = new ClickHouseProperties();
props.setDatabase(CK_DATABASE);
props.setUser(CK_USERNAME);
props.setPassword(CK_PIN);
props.setConnectionTimeout(CK_CONNECTION_TIMEOUT);
props.setSocketTimeout(CK_SOCKET_TIMEOUT);
props.setMaxThreads(50);
BalancedClickhouseDataSource blDataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, props);
blDataSource.actualize();
blDataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测
// HikariConfig conf = new HikariConfig();
// conf.setDataSource(blDataSource);
// conf.setMinimumIdle(1);
// conf.setMaximumPoolSize(20);
//
// HikariDataSource hkDs = new HikariDataSource(conf);
connection = blDataSource.getConnection();
log.debug("get clickhouse connection success");
} catch (SQLException e) {
log.error("clickhouse connection error ,{}", e);
}
return connection;
}
public static void close(Connection connection) throws Exception {
IoUtil.close(connection);
}
}

View File

@@ -1,26 +1,18 @@
package com.zdjizhi.utils.ck; package com.zdjizhi.utils.ck;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.IoUtil; import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.enums.LogMetadata;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.zdjizhi.common.FlowWriteConfig.*;
public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>> { public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>> {
@@ -49,90 +41,49 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
try { connection = CKUtils.getConnection();
ClickHouseProperties properties = new ClickHouseProperties();
properties.setDatabase(CK_DATABASE);
properties.setUser(CK_USERNAME);
properties.setPassword(CK_PIN);
// properties.setKeepAliveTimeout(5);
properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT);
properties.setSocketTimeout(CK_SOCKET_TIMEOUT);
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties);
dataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测
connection = dataSource.getConnection();
log.debug("get clickhouse connection success");
} catch (SQLException e) {
log.error("clickhouse connection error ,{}", e);
}
} }
@Override @Override
public void close() throws Exception { public void close() throws Exception {
IoUtil.close(preparedStatement); IoUtil.close(preparedStatement);
IoUtil.close(connection); CKUtils.close(connection);
} }
public void executeInsert(List<Map<String, Object>> data, String tableName) { public void executeInsert(List<Map<String, Object>> data, String tableName) {
try { try {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.debug("开始写入ck数据 {}", data.size());
boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false); connection.setAutoCommit(false);
int count = 0; batch(data, tableName);
for (Map<String, Object> map : data) { preparedStatement.executeBatch();
List<String> keys = new LinkedList<>(map.keySet()); connection.commit();
preparedStatement = connection.prepareStatement(preparedSql(keys, tableName)); connection.setAutoCommit(autoCommit);
stopWatch.stop();
List<Object> values = new LinkedList<>(map.values()); log.debug("总共花费时间 {}", stopWatch.getTime());
for (int i = 1; i <= values.size(); i++) {
Object val = values.get(i - 1);
if (val instanceof Long) {
preparedStatement.setLong((i), Convert.toLong(val));
} else if (val instanceof Integer) {
preparedStatement.setLong((i), Convert.toLong(val));
} else if (val instanceof Boolean) {
preparedStatement.setInt((i), Boolean.valueOf(StrUtil.toString(val)) ? 1 : 0);
} else {
preparedStatement.setString((i), StrUtil.toString(val));
}
}
preparedStatement.addBatch();
count++;
//1w提交一次
if (count % CK_BATCH == 0) {
preparedStatement.executeBatch();
connection.commit();
preparedStatement.clearBatch();
count = 0;
}
}
if (count > 0) {
preparedStatement.executeBatch();
connection.commit();
}
} catch (Exception ex) { } catch (Exception ex) {
log.error("ClickhouseSink插入报错", ex); log.error("ClickhouseSink插入报错", ex);
} }
} }
private void batch(List<Map<String, Object>> data, String tableName) throws SQLException {
public static String preparedSql(List<String> fields, String tableName) { String[] logFields = LogMetadata.getLogFields(tableName);
String sql = LogMetadata.preparedSql(tableName);
String placeholders = fields.stream()
.filter(Objects::nonNull)
.map(f -> "?")
.collect(Collectors.joining(", "));
String columns = fields.stream()
.filter(Objects::nonNull)
.collect(Collectors.joining(", "));
String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName,
"(", columns, ") VALUES (", placeholders, ")");
log.debug(sql); log.debug(sql);
return sql; preparedStatement = connection.prepareStatement(sql);
for (Map<String, Object> map : data) {
for (int i = 0; i < logFields.length; i++) {
preparedStatement.setObject(i + 1, map.get(logFields[i]));
}
preparedStatement.addBatch();
}
} }
} }

View File

@@ -1,17 +0,0 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.FilterFunction;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/5/2715:01
*/
public class FilterNullFunction implements FilterFunction<String> {
@Override
public boolean filter(String message) {
return StringUtil.isNotBlank(message);
}
}

View File

@@ -1,23 +0,0 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.general.TransFormMap;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/5/2715:01
*/
public class MapCompletedFunction implements MapFunction<Map<String, Object>, String> {
@Override
@SuppressWarnings("unchecked")
public String map(Map<String, Object> logs) {
return TransFormMap.dealCommonMessage(logs);
}
}

View File

@@ -1,23 +0,0 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.general.TransFormTypeMap;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/5/2715:01
*/
public class TypeMapCompletedFunction implements MapFunction<Map<String, Object>, String> {
@Override
@SuppressWarnings("unchecked")
public String map(Map<String, Object> logs) {
return TransFormTypeMap.dealCommonMessage(logs);
}
}

View File

@@ -1,180 +0,0 @@
package com.zdjizhi.utils.general;
/**
* CityHash64算法对logid进行散列计算
* 版本规划暂不实现-TSG22.01
*
* @author qidaijie
*/
@Deprecated
public class CityHash {
private static final long k0 = 0xc3a5c85c97cb3127L;
private static final long k1 = 0xb492b66fbe98f273L;
private static final long k2 = 0x9ae16a3b2f90404fL;
private static final long k3 = 0xc949d7c7509e6557L;
private static final long k5 = 0x9ddfea08eb382d69L;
private CityHash() {}
public static long CityHash64(byte[] s, int index, int len) {
if (len <= 16 ) {
return HashLen0to16(s, index, len);
} else if (len > 16 && len <= 32) {
return HashLen17to32(s, index, len);
} else if (len > 32 && len <= 64) {
return HashLen33to64(s, index, len);
} else {
long x = Fetch64(s, index);
long y = Fetch64(s, index + len - 16) ^ k1;
long z = Fetch64(s, index + len - 56) ^ k0;
long[] v = WeakHashLen32WithSeeds(s, len - 64, len, y);
long[] w = WeakHashLen32WithSeeds(s, len - 32, len * k1, k0);
z += ShiftMix(v[1]) * k1;
x = Rotate(z + x, 39) * k1;
y = Rotate(y, 33) * k1;
len = (len - 1) & ~63;
do {
x = Rotate(x + y + v[0] + Fetch64(s, index + 16), 37) * k1;
y = Rotate(y + v[1] + Fetch64(s, index + 48), 42) * k1;
x ^= w[1];
y ^= v[0];
z = Rotate(z ^ w[0], 33);
v = WeakHashLen32WithSeeds(s, index, v[1] * k1, x + w[0]);
w = WeakHashLen32WithSeeds(s, index + 32, z + w[1], y);
long t = z;
z = x;
x = t;
index += 64;
len -= 64;
} while (len != 0);
return HashLen16(HashLen16(v[0], w[0]) + ShiftMix(y) * k1 + z,
HashLen16(v[1], w[1]) + x);
}
}
private static long HashLen0to16(byte[] s, int index, int len) {
if (len > 8) {
long a = Fetch64(s, index);
long b = Fetch64(s, index + len - 8);
return HashLen16(a, RotateByAtLeastOne(b + len, len)) ^ b;
}
if (len >= 4) {
long a = Fetch32(s, index);
return HashLen16(len + (a << 3), Fetch32(s, index + len - 4));
}
if (len > 0) {
byte a = s[index];
byte b = s[index + len >>> 1];
byte c = s[index + len - 1];
int y = (a) + (b << 8);
int z = len + (c << 2);
return ShiftMix(y * k2 ^ z * k3) * k2;
}
return k2;
}
private static long HashLen17to32(byte[] s, int index, int len) {
long a = Fetch64(s, index) * k1;
long b = Fetch64(s, index + 8);
long c = Fetch64(s, index + len - 8) * k2;
long d = Fetch64(s, index + len - 16) * k0;
return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d,
a + Rotate(b ^ k3, 20) - c + len);
}
private static long HashLen33to64(byte[] s, int index, int len) {
long z = Fetch64(s, index + 24);
long a = Fetch64(s, index) + (len + Fetch64(s, index + len - 16)) * k0;
long b = Rotate(a + z, 52);
long c = Rotate(a, 37);
a += Fetch64(s, index + 8);
c += Rotate(a, 7);
a += Fetch64(s, index + 16);
long vf = a + z;
long vs = b + Rotate(a, 31) + c;
a = Fetch64(s, index + 16) + Fetch64(s, index + len - 32);
z = Fetch64(s, index + len - 8);
b = Rotate(a + z, 52);
c = Rotate(a, 37);
a += Fetch64(s, index + len - 24);
c += Rotate(a, 7);
a += Fetch64(s, index + len - 16);
long wf = a + z;
long ws = b + Rotate(a, 31) + c;
long r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0);
return ShiftMix(r * k0 + vs) * k2;
}
private static long Fetch64(byte[] p, int index) {
return toLongLE(p,index);
}
private static long Fetch32(byte[] p, int index) {
return toIntLE(p,index);
}
private static long[] WeakHashLen32WithSeeds(
long w, long x, long y, long z, long a, long b) {
a += w;
b = Rotate(b + a + z, 21);
long c = a;
a += x;
a += y;
b += Rotate(a, 44);
return new long[]{a + z, b + c};
}
private static long[] WeakHashLen32WithSeeds(byte[] s, int index, long a, long b) {
return WeakHashLen32WithSeeds(Fetch64(s, index),
Fetch64(s, index + 8),
Fetch64(s, index + 16),
Fetch64(s, index + 24),
a,
b);
}
private static long toLongLE(byte[] b, int i) {
return 0xffffffffffffffffL & (((long) b[i + 7] << 56) + ((long) (b[i + 6] & 255) << 48) + ((long) (b[i + 5] & 255) << 40) + ((long) (b[i + 4] & 255) << 32) + ((long) (b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
}
private static long toIntLE(byte[] b, int i) {
return 0xffffffffL & (((b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
}
private static long RotateByAtLeastOne(long val, int shift) {
return (val >>> shift) | (val << (64 - shift));
}
private static long ShiftMix(long val) {
return val ^ (val >>> 47);
}
private static long Uint128Low64(long[] x) {
return x[0];
}
private static long Rotate(long val, int shift) {
return shift == 0 ? val : (val >>> shift) | (val << (64 - shift));
}
private static long Uint128High64(long[] x) {
return x[1];
}
private static long Hash128to64(long[] x) {
long a = (Uint128Low64(x) ^ Uint128High64(x)) * k5;
a ^= (a >>> 47);
long b = (Uint128High64(x) ^ a) * k5;
b ^= (b >>> 47);
b *= k5;
return b;
}
private static long HashLen16(long u, long v) {
return Hash128to64(new long[]{u,v});
}
}

View File

@@ -1,120 +0,0 @@
package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.json.JsonParseUtil;
import java.util.Map;
/**
* 描述:转换或补全工具类
*
* @author qidaijie
*/
public class TransFormMap {
private static final Log logger = LogFactory.get();
/**
* 解析日志,并补全
*
* @param jsonMap kafka Topic消费原始日志并解析
* @return 补全后的日志
*/
@SuppressWarnings("unchecked")
public static String dealCommonMessage(Map<String, Object> jsonMap) {
try {
JsonParseUtil.dropJsonField(jsonMap);
for (String[] strings : JsonParseUtil.getJobList()) {
//用到的参数的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的值
Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
}
return JsonMapper.toJsonString(jsonMap);
} catch (RuntimeException e) {
logger.error("TransForm logs failed,The exception is :" + e);
return null;
}
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param function 匹配操作函数的字段
* @param jsonMap 原始日志解析map
* @param appendToKeyName 需要补全的字段的key
* @param appendTo 需要补全的字段的值
* @param logValue 用到的参数的值
* @param param 额外的参数的值
*/
private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) {
switch (function) {
case "current_timestamp":
if (!(appendTo instanceof Long)) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
}
break;
// case "snowflake_id":
// JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
// break;
case "geo_ip_detail":
if (logValue != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
}
break;
case "geo_asn":
if (logValue != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
}
break;
case "geo_ip_country":
if (logValue != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
}
break;
case "set_value":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
}
break;
case "get_value":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
}
break;
case "sub_domain":
if (appendTo == null && logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
}
break;
case "decode_of_base64":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
}
break;
case "flattenSpec":
if (logValue != null && param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
}
break;
default:
}
}
}

View File

@@ -1,122 +0,0 @@
package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.json.JsonParseUtil;
import java.util.Map;
/**
* 描述:转换或补全工具类
*
* @author qidaijie
*/
public class TransFormTypeMap {
private static final Log logger = LogFactory.get();
/**
* 解析日志,并补全
*
* @param message kafka Topic原始日志
* @return 补全后的日志
*/
@SuppressWarnings("unchecked")
public static String dealCommonMessage(Map<String, Object> message) {
try {
Map<String, Object> jsonMap = JsonParseUtil.typeTransform(message);
for (String[] strings : JsonParseUtil.getJobList()) {
//用到的参数的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的值
Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
}
return JsonMapper.toJsonString(jsonMap);
} catch (RuntimeException e) {
logger.error("TransForm logs failed,The exception is :" + e);
return null;
}
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param function 匹配操作函数的字段
* @param jsonMap 原始日志解析map
* @param appendToKeyName 需要补全的字段的key
* @param appendToKeyValue 需要补全的字段的值
* @param logValue 用到的参数的值
* @param param 额外的参数的值
*/
private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) {
switch (function) {
case "current_timestamp":
if (!(appendToKeyValue instanceof Long)) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
}
break;
// case "snowflake_id":
// JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
// //版本规划暂不实现TSG-22.01
//// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId()));
// break;
case "geo_ip_detail":
if (logValue != null && appendToKeyValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
}
break;
case "geo_asn":
if (logValue != null && appendToKeyValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
}
break;
case "geo_ip_country":
if (logValue != null && appendToKeyValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
}
break;
case "set_value":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
}
break;
case "get_value":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
}
break;
case "sub_domain":
if (appendToKeyValue == null && logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
}
break;
case "decode_of_base64":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
}
break;
case "flattenSpec":
if (logValue != null && param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
}
break;
default:
}
}
}

View File

@@ -1,230 +0,0 @@
package com.zdjizhi.utils.general;
import cn.hutool.core.codec.Base64;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookupV2;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Map;
import java.util.regex.Pattern;
/**
* @author qidaijie
*/
class TransFunction {
private static final Log logger = LogFactory.get();
/**
* 校验数字正则
*/
private static final Pattern PATTERN = Pattern.compile("[0-9]*");
/**
* IP定位库工具类
*/
private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
.loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb")
.loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
.loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
.loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
.loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
.build();
/**
* 生成当前时间戳的操作
*/
static long getCurrentTime() {
return System.currentTimeMillis() / 1000;
}
/**
* CityHash64算法
* 版本规划暂不实现-TSG22.01
*
* @param data 原始数据
* @return 散列结果
*/
@Deprecated
static BigInteger getDecimalHash(long data) {
byte[] dataBytes = String.valueOf(data).getBytes();
long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
String decimalValue = Long.toUnsignedString(hashValue, 10);
return new BigInteger(decimalValue);
}
/**
* 根据clientIp获取location信息
*
* @param ip client IP
* @return ip地址详细信息
*/
static String getGeoIpDetail(String ip) {
try {
return ipLookup.cityLookupDetail(ip);
} catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe);
return "";
} catch (RuntimeException e) {
logger.error("Get clientIP location error! " + e);
return "";
}
}
/**
* 根据ip获取asn信息
*
* @param ip client/server IP
* @return ASN
*/
static String getGeoAsn(String ip) {
try {
return ipLookup.asnLookup(ip);
} catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe);
return "";
} catch (RuntimeException e) {
logger.error("Get IP ASN error! " + e);
return "";
}
}
/**
* 根据ip获取country信息
*
* @param ip server IP
* @return 国家
*/
static String getGeoIpCountry(String ip) {
try {
return ipLookup.countryLookup(ip);
} catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe);
return "";
} catch (RuntimeException e) {
logger.error("Get ServerIP location error! " + e);
return "";
}
}
/**
* radius借助HBase补齐
*
* @param ip client IP
* @return account
*/
/**
* 解析顶级域名
*
* @param domain 初始域名
* @return 顶级域名
*/
static String getTopDomain(String domain) {
try {
return FormatUtils.getTopPrivateDomain(domain);
} catch (StringIndexOutOfBoundsException outException) {
logger.error("Parse top-level domain exceptions, exception domain names:" + domain);
return "";
}
}
/**
* 根据编码解码base64
*
* @param message base64
* @param charset 编码
* @return 解码字符串
*/
static String decodeBase64(String message, Object charset) {
String result = "";
try {
if (StringUtil.isNotBlank(message)) {
if (charset == null) {
result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
} else {
result = Base64.decodeStr(message, charset.toString());
}
}
} catch (RuntimeException e) {
logger.error("Resolve Base64 exception, exception information:" + e);
}
return result;
}
/**
* 根据表达式解析json
*
* @param message json
* @param expr 解析表达式
* @return 解析结果
*/
static String flattenSpec(String message, String expr) {
String flattenResult = "";
try {
if (StringUtil.isNotBlank(expr)) {
ArrayList<String> read = JsonPath.parse(message).read(expr);
if (read.size() >= 1) {
flattenResult = read.get(0);
}
}
} catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
logger.error("The device label resolution exception or [expr] analytic expression error" + e);
}
return flattenResult;
}
/**
* 判断是否为日志字段,是则返回对应value否则返回原始字符串
*
* @param jsonMap 内存实体类
* @param param 字段名/普通字符串
* @return JSON.Value or String
*/
static Object isJsonValue(Map<String, Object> jsonMap, String param) {
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
return JsonParseUtil.getValue(jsonMap, param.substring(2));
} else {
return param;
}
}
/**
* IF函数实现解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
*
* @param jsonMap 内存实体类
* @param ifParam 字段名/普通字符串
* @return resultA or resultB or null
*/
static Object condition(Map<String, Object> jsonMap, String ifParam) {
Object result = null;
try {
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
Object direction = isJsonValue(jsonMap, norms[0]);
Object resultA = isJsonValue(jsonMap, split[1]);
Object resultB = isJsonValue(jsonMap, split[2]);
if (direction instanceof Number) {
result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
} else if (direction instanceof String) {
result = direction.equals(norms[1]) ? resultA : resultB;
}
}
} catch (RuntimeException e) {
logger.error("IF 函数执行异常,异常信息:" + e);
}
return result;
}
}

View File

@@ -1,52 +0,0 @@
package com.zdjizhi;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.IpLookupV2;
import com.zdjizhi.utils.general.CityHash;
import org.junit.Test;
import java.math.BigInteger;
import java.util.Calendar;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2021/11/611:38
*/
public class FunctionTest {
private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
.loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
// .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
// .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
// .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
.loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
.build();
@Test
public void CityHashTest() {
byte[] dataBytes = String.valueOf(613970406986188816L).getBytes();
long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
String decimalValue = Long.toUnsignedString(hashValue, 10);
BigInteger result = new BigInteger(decimalValue);
System.out.println(result);
}
@Test
public void ipLookupTest() {
String ip = "0.255.255.254";
System.out.println(ipLookup.cityLookupDetail(ip));
System.out.println(ipLookup.countryLookup(ip));
}
@Test
public void timestampTest(){
Calendar cal = Calendar.getInstance();
Long utcTime=cal.getTimeInMillis();
System.out.println(utcTime);
System.out.println(System.currentTimeMillis());
}
}

View File

@@ -1,7 +1,6 @@
package com.zdjizhi.json; package com.zdjizhi.json;
import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.junit.Test; import org.junit.Test;
import java.util.Map; import java.util.Map;