diff --git a/pom.xml b/pom.xml index c6f2d52..315fcf0 100644 --- a/pom.xml +++ b/pom.xml @@ -261,6 +261,11 @@ flink-table-planner-blink_2.12 ${flink.version} + + com.arangodb + arangodb-java-driver + 6.6.3 + diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index a0b7627..2308097 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -65,32 +65,40 @@ source.kafka.topic.connection=connection_record_log source.kafka.topic.sketch=connection_sketch_record_log source.kafka.topic.dns=dns_record_log -sink.ck.table.connection=connection_record_log -sink.ck.table.sketch=connection_sketch_record_log -sink.ck.table.dns=dns_record_log -sink.ck.table.relation.connection=connection_relation_log -sink.ck.table.relation.dns=dns_relation_log +sink.ck.table.connection=connection_record_log_local +sink.ck.table.sketch=connection_sketch_record_log_local +sink.ck.table.dns=dns_record_log_local +sink.ck.table.relation.connection=connection_relation_log_local +sink.ck.table.relation.dns=dns_relation_log_local + +sink.arango.table.r.visit.ip2ip=R_VISIT_IP2IP +sink.arango.table.r.cname.domain2domain=R_CNAME_DOMAIN2DOMAIN +sink.arango.table.r.mx.domain2domain=R_MX_DOMAIN2DOMAIN +sink.arango.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP +sink.arango.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN #clickhouse 入库 ck.hosts=192.168.45.102:8123 -ck.database=default +ck.database=tsg_galaxy_v3 ck.username=default ck.pin=galaxy2019 +ck.connection.timeout=100000 +ck.socket.timeout=1000000 #connection_record_log -flink.watermark.max.orderness=10 +flink.watermark.max.orderness=100000 #统计时间间隔 单位s -log.aggregate.duration=30 -log.aggregate.duration.graph=30 +log.aggregate.duration=5 +log.aggregate.duration.graph=5 #arangoDB参数配置 -arangoDB.host=192.168.40.182 +arangoDB.host=192.168.45.102 #arangoDB.host=192.168.40.224 arangoDB.port=8529 arangoDB.user=upsert -arangoDB.password=ceiec2018 -arangoDB.DB.name=ip-learning-test +arangoDB.password=galaxy2018 +arangoDB.DB.name=knowledge #arangoDB.DB.name=tsg_galaxy_v3 arangoDB.batch=100000 arangoDB.ttl=3600 @@ -101,5 +109,5 @@ update.arango.batch=10000 thread.pool.number=10 thread.await.termination.time=10 -sink.batch.time.out=5 +sink.batch.time.out=1 sink.batch=10000 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/ArangoDelayProcess.java b/src/main/java/com/zdjizhi/common/ArangoDelayProcess.java new file mode 100644 index 0000000..d39e6c5 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/ArangoDelayProcess.java @@ -0,0 +1,65 @@ +package com.zdjizhi.common; + +import com.arangodb.entity.BaseDocument; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.List; +import java.util.Map; +import java.util.Spliterator; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class ArangoDelayProcess extends ProcessFunction> { + + private ValueState currentTimer; + private ListState itemState; + private String stateName; + + @Override + public void open(Configuration parameters) throws Exception { + currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(getStateName() + "_timer", Types.LONG)); + ListStateDescriptor itemViewStateDesc = new ListStateDescriptor(getStateName() + "_state", Map.class); + itemState = getRuntimeContext().getListState(itemViewStateDesc); + } + + @Override + public void processElement(BaseDocument value, Context context, Collector> collector) throws Exception { + //判断定时器是否为空,为空则创建新的定时器 + Long curTimeStamp = currentTimer.value(); + if (curTimeStamp == null || curTimeStamp == 0) { + long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000; + context.timerService().registerEventTimeTimer(onTimer); + currentTimer.update(onTimer); + } + itemState.add(value); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector> out) throws Exception { + Spliterator spliterator = itemState.get().spliterator(); + List collect = StreamSupport.stream(spliterator, false) + .collect(Collectors.toList()); + out.collect(collect); + currentTimer.clear(); + itemState.clear(); + } + + public ArangoDelayProcess(String stateName) { + this.stateName = stateName; + } + + public String getStateName() { + return stateName; + } + + public void setStateName(String stateName) { + this.stateName = stateName; + } +} diff --git a/src/main/java/com/zdjizhi/common/CKDelayProcess.java b/src/main/java/com/zdjizhi/common/CKDelayProcess.java new file mode 100644 index 0000000..35ec90e --- /dev/null +++ b/src/main/java/com/zdjizhi/common/CKDelayProcess.java @@ -0,0 +1,65 @@ +package com.zdjizhi.common; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.List; +import java.util.Map; +import java.util.Spliterator; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class CKDelayProcess extends ProcessFunction, List>> { + + + private ValueState currentTimer; + private ListState> itemState; + private String stateName; + + @Override + public void open(Configuration parameters) throws Exception { + currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(getStateName() + "_timer", Types.LONG)); + ListStateDescriptor> itemViewStateDesc = new ListStateDescriptor(getStateName() + "_state", Map.class); + itemState = getRuntimeContext().getListState(itemViewStateDesc); + } + + @Override + public void processElement(Map value, Context context, Collector>> collector) throws Exception { + //判断定时器是否为空,为空则创建新的定时器 + Long curTimeStamp = currentTimer.value(); + if (curTimeStamp == null || curTimeStamp == 0) { + long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000; + context.timerService().registerEventTimeTimer(onTimer); + currentTimer.update(onTimer); + } + itemState.add(value); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector>> out) throws Exception { + Spliterator> spliterator = itemState.get().spliterator(); + List> collect = StreamSupport.stream(spliterator, false) + .collect(Collectors.toList()); + out.collect(collect); + currentTimer.clear(); + itemState.clear(); + } + + public CKDelayProcess(String stateName) { + this.stateName = stateName; + } + + public String getStateName() { + return stateName; + } + + public void setStateName(String stateName) { + this.stateName = stateName; + } +} diff --git a/src/main/java/com/zdjizhi/common/ConnKeysSelector.java b/src/main/java/com/zdjizhi/common/ConnKeysSelector.java new file mode 100644 index 0000000..6012626 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/ConnKeysSelector.java @@ -0,0 +1,21 @@ +package com.zdjizhi.common; + + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.Map; + +/** + * @description: + * @author: zhq + * @create: 2022-07-05 + **/ +@Deprecated +public class ConnKeysSelector implements KeySelector, String> { + + @Override + public String getKey(Map log) throws Exception { + return String.valueOf(log.get("conn_start_time")); + } +} diff --git a/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java new file mode 100644 index 0000000..5aa08c5 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java @@ -0,0 +1,23 @@ +package com.zdjizhi.common; + +import cn.hutool.core.util.StrUtil; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; + +/** + * @description: + * @author: zhq + * @create: 2022-07-05 + **/ +public class DnsGraphKeysSelector implements KeySelector, Tuple3> { + + @Override + public Tuple3 getKey(Map log) throws Exception { + + return Tuple3.of(StrUtil.toString(log.get("record_type")), + StrUtil.toString(log.get("qname")), + StrUtil.toString(log.get("record"))); + } +} diff --git a/src/main/java/com/zdjizhi/common/DnsKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsKeysSelector.java deleted file mode 100644 index 101597c..0000000 --- a/src/main/java/com/zdjizhi/common/DnsKeysSelector.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.zdjizhi.common; - -import org.apache.flink.api.java.functions.KeySelector; - -import java.util.Map; - -/** - * @description: - * @author: zhq - * @create: 2022-07-05 - **/ -public class DnsKeysSelector implements KeySelector, String> { - - @Override - public String getKey(Map log) throws Exception { - - return String.valueOf(log.get("dns_qname")); - } -} diff --git a/src/main/java/com/zdjizhi/common/DnsTimeKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsTimeKeysSelector.java new file mode 100644 index 0000000..54af74c --- /dev/null +++ b/src/main/java/com/zdjizhi/common/DnsTimeKeysSelector.java @@ -0,0 +1,19 @@ +package com.zdjizhi.common; + + +import org.apache.flink.api.java.functions.KeySelector; + +import java.util.Map; + +/** + * @description: + * @author: zhq + * @create: 2022-07-05 + **/ +public class DnsTimeKeysSelector implements KeySelector, String> { + + @Override + public String getKey(Map log) throws Exception { + return String.valueOf(log.get("capture_time")); + } +} diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 34674f4..57f09a0 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -2,7 +2,6 @@ package com.zdjizhi.common; import com.zdjizhi.utils.system.FlowWriteConfigurations; -import org.apache.flink.configuration.ConfigUtils; import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; /** @@ -123,6 +122,8 @@ public class FlowWriteConfig { public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0, "ck.username"); public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0, "ck.pin"); public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0, "ck.database"); + public static final int CK_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.connection.timeout"); + public static final int CK_SOCKET_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.socket.timeout"); public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.orderness"); public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration"); @@ -139,6 +140,14 @@ public class FlowWriteConfig { public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection"); public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns"); + public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.visit.ip2ip"); + public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.cname.domain2domain"); + public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.mx.domain2domain"); + public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.resolve.domain2ip"); + public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain"); + + + public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host"); public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port"); diff --git a/src/main/java/com/zdjizhi/common/KeysSelector.java b/src/main/java/com/zdjizhi/common/KeysSelector.java index a4d616c..bfa2083 100644 --- a/src/main/java/com/zdjizhi/common/KeysSelector.java +++ b/src/main/java/com/zdjizhi/common/KeysSelector.java @@ -11,6 +11,7 @@ import java.util.Map; * @author: zhq * @create: 2022-07-05 **/ +@Deprecated public class KeysSelector implements KeySelector, Tuple2> { @Override diff --git a/src/main/java/com/zdjizhi/common/ListWindow.java b/src/main/java/com/zdjizhi/common/ListWindow.java new file mode 100644 index 0000000..c5f6161 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/ListWindow.java @@ -0,0 +1,12 @@ +package com.zdjizhi.common; + +/** + * @description: + * @author: zhq + * @create: 2022-07-10 + **/ +public class ListWindow { + + + +} diff --git a/src/main/java/com/zdjizhi/common/SketchKeysSelector.java b/src/main/java/com/zdjizhi/common/SketchKeysSelector.java new file mode 100644 index 0000000..373a557 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/SketchKeysSelector.java @@ -0,0 +1,19 @@ +package com.zdjizhi.common; + + +import org.apache.flink.api.java.functions.KeySelector; + +import java.util.Map; + +/** + * @description: + * @author: zhq + * @create: 2022-07-05 + **/ +public class SketchKeysSelector implements KeySelector, String> { + + @Override + public String getKey(Map log) throws Exception { + return String.valueOf(log.get("sketch_start_time")); + } +} diff --git a/src/main/java/com/zdjizhi/common/StartTimeKeysSelector.java b/src/main/java/com/zdjizhi/common/StartTimeKeysSelector.java new file mode 100644 index 0000000..37ad5d5 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/StartTimeKeysSelector.java @@ -0,0 +1,19 @@ +package com.zdjizhi.common; + + +import org.apache.flink.api.java.functions.KeySelector; + +import java.util.Map; + +/** + * @description: + * @author: zhq + * @create: 2022-07-05 + **/ +public class StartTimeKeysSelector implements KeySelector, String> { + + @Override + public String getKey(Map log) throws Exception { + return String.valueOf(log.get("start_time")); + } +} diff --git a/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java b/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java deleted file mode 100644 index 46d308d..0000000 --- a/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.zdjizhi.common; - -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; - -import java.util.Comparator; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeSet; - -public class TopMetricProcessV2 extends ProcessFunction, Collector>> { - - - private ValueState currentTimer; - private ListState> itemState; - - @Override - public void open(Configuration parameters) throws Exception { - currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>("_timer", Types.LONG)); - ListStateDescriptor> itemViewStateDesc = new ListStateDescriptor("_state", Map.class); - itemState = getRuntimeContext().getListState(itemViewStateDesc); - } - - @Override - public void processElement(Map value, Context context, Collector>> collector) throws Exception { - //判断定时器是否为空,为空则创建新的定时器 - Long curTimeStamp = currentTimer.value(); - if (curTimeStamp == null || curTimeStamp == 0) { - long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000; - context.timerService().registerEventTimeTimer(onTimer); - currentTimer.update(onTimer); - } - itemState.add(value); - } - - @Override - public void onTimer(long timestamp, OnTimerContext ctx, Collector>> out) throws Exception { - super.onTimer(timestamp, ctx, out); - - Iterator> iterator = itemState.get().iterator(); - if(iterator.hasNext()){ - out.collect((Collector>) iterator.next()); - } -// if (baseLogs.size() > FlowWriteConfig.SINK_BATCH) { -// Map last = baseLogs.last(); -// if (Double.compare(map.get(orderBy).doubleValue(), last.get(orderBy).doubleValue()) > 0) { -// baseLogs.pollLast(); -// baseLogs.add(map); -// } -// } else { -// baseLogs.add(map); -// } -// } - currentTimer.clear(); - itemState.clear(); - - - } -} diff --git a/src/main/java/com/zdjizhi/enums/DnsType.java b/src/main/java/com/zdjizhi/enums/DnsType.java index bc5805e..2d8c985 100644 --- a/src/main/java/com/zdjizhi/enums/DnsType.java +++ b/src/main/java/com/zdjizhi/enums/DnsType.java @@ -1,16 +1,42 @@ package com.zdjizhi.enums; +import static com.zdjizhi.common.FlowWriteConfig.*; + /** - * @description: - * @author: zhq - * @create: 2022-07-06 + * @author zhq + * @description + * @create 2022-07-08 **/ public enum DnsType { + //对应dns类型,编码,入库表 + A("a", " 0x0001", R_RESOLVE_DOMAIN2IP), + AAAA("aaaa", " 0x001c", R_RESOLVE_DOMAIN2IP), + CNAME("cname", " 0x0005", R_CNAME_DOMAIN2DOMAIN), + MX("mx", " 0x000f", R_MX_DOMAIN2DOMAIN), + NS("ns", " 0x0002", R_NX_DOMAIN2DOMAIN); - /* - *dns 类型 - * */ + private String type; + private String code; + private String sink; - a, aaaa, cname, mx, ns; + DnsType() { + } + DnsType(String type, String code, String table) { + this.type = type; + this.code = code; + this.sink = table; + } + + public String getType() { + return type; + } + + public String getCode() { + return code; + } + + public String getSink() { + return sink; + } } diff --git a/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java deleted file mode 100644 index 5e852e0..0000000 --- a/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java +++ /dev/null @@ -1,118 +0,0 @@ -package com.zdjizhi.etl; - -import cn.hutool.core.convert.Convert; -import cn.hutool.core.util.StrUtil; -import com.zdjizhi.enums.DnsType; -import com.zdjizhi.pojo.DbLogEntity; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; - - -/** - * @author 94976 - */ -public class DnsFlatMapFunction implements FlatMapFunction { - - private static final Logger logger = LoggerFactory.getLogger(DnsFlatMapFunction.class); - - public void process(Iterable elements, Collector> out) { - List middleResult = getMiddleResult(elements); - try { - if (middleResult != null) { - out.collect(middleResult); - logger.debug("获取中间聚合结果:{}", middleResult.toString()); - } - } catch (Exception e) { - logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e); - } - } - - /** - * 拆分dns_record - * 五种:a/aaaa/cname/mx/ns - * @param elements - * @return - */ - private List getMiddleResult(Iterable elements) { - long startTime = System.currentTimeMillis() / 1000; - long endTime = System.currentTimeMillis() / 1000; - String tableName = ""; - String dnsQname = ""; - try { - Map distinctA = new HashMap<>(); - Map distinctAAAA = new HashMap<>(); - Map distinctCname = new HashMap<>(); - Map distinctNs = new HashMap<>(); - Map distinctMx = new HashMap<>(); - for (DbLogEntity log : elements) { - tableName = log.getTableName(); - List dnsA = splitDns(log, "dns_a"); - List dnsAAAA = splitDns(log, "dns_aaaa"); - List dnsCname = splitDns(log, "dns_cname"); - List dnsNs = splitDns(log, "dns_ns"); - List dnsMx = splitDns(log, "dns_mx"); - - dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum)); - dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum)); - dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum)); - dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum)); - dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum)); - - long connStartTimetime = Convert.toLong(log.getData().get("capure_time_s")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; - dnsQname = StrUtil.toString(log.getData().get("dns_qname")); - } - DbLogEntity dbLogEntity = new DbLogEntity(); - dbLogEntity.setTableName(tableName); - List result = new ArrayList<>(); - result.addAll(getNewDns(startTime, endTime, dnsQname, distinctA, DnsType.a.toString(), dbLogEntity)); - result.addAll(getNewDns(startTime, endTime, dnsQname, distinctAAAA, DnsType.aaaa.toString(), dbLogEntity)); - result.addAll(getNewDns(startTime, endTime, dnsQname, distinctCname, DnsType.cname.toString(), dbLogEntity)); - result.addAll(getNewDns(startTime, endTime, dnsQname, distinctNs, DnsType.ns.toString(), dbLogEntity)); - result.addAll(getNewDns(startTime, endTime, dnsQname, distinctMx, DnsType.mx.toString(), dbLogEntity)); - return result; - - } catch (Exception e) { - logger.error("聚合中间结果集失败 {}", e); - } - return null; - } - - - private static List splitDns(DbLogEntity dbLogEntity, String key) { - - return StrUtil.split(StrUtil.toString(dbLogEntity.getData().get(key)), ","); - } - - private List getNewDns(long startTime, long endTime, String dnsQname, Map distinctMap, String type, DbLogEntity dbLogEntity) { - List newList = new ArrayList<>(); - for (Map.Entry dns : distinctMap.entrySet()) { - Map newDns = new HashMap<>(); - newDns.put("start_time", startTime); - newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION); - newDns.put("record_type", type); - newDns.put("qname", dnsQname); - newDns.put("record", dns.getKey()); - newDns.put("sessions", dns.getValue()); - dbLogEntity.setData(newDns); - newList.add(dbLogEntity); - } - return newList; - } - - @Override - public void flatMap(DbLogEntity dbLogEntity, Collector collector) throws Exception { - - - } -} diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsGraphMapFunction.java new file mode 100644 index 0000000..bc9a73d --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/DnsGraphMapFunction.java @@ -0,0 +1,35 @@ +package com.zdjizhi.etl; + +import cn.hutool.core.util.StrUtil; +import com.arangodb.entity.BaseDocument; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + + +/** + * 去重 + */ +public class DnsGraphMapFunction extends RichMapFunction, BaseDocument> { + + private static final Logger logger = LoggerFactory.getLogger(DnsGraphMapFunction.class); + + @Override + public BaseDocument map(Map map) throws Exception { + try { + BaseDocument baseDocument = new BaseDocument(); + baseDocument.setKey(String.join("-", StrUtil.toString(map.get("qname")), StrUtil.toString(map.get("record")))); + baseDocument.addAttribute("qname", map.get("qname")); + baseDocument.addAttribute("record", map.get("record")); + baseDocument.addAttribute("last_found_time", map.get("start_time")); + return baseDocument; + } catch (Exception e) { + logger.error("dns record type 类型转换错误: {}", e); + } + return null; + } +} diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java new file mode 100644 index 0000000..c438a14 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java @@ -0,0 +1,43 @@ +package com.zdjizhi.etl; + +import cn.hutool.core.convert.Convert; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.Map; + + +/** + * 去重 + */ +public class DnsGraphProcessFunction extends ProcessWindowFunction, Map, Tuple3, TimeWindow> { + + private static final Logger logger = LoggerFactory.getLogger(DnsGraphProcessFunction.class); + + @Override + public void process(Tuple3 keys, Context context, Iterable> elements, Collector> out) { + + try { + long tmpTime = 0L; + for (Map log : elements) { + long startTime = Convert.toLong(log.get("capure_time")); + tmpTime = startTime > tmpTime ? startTime : tmpTime; + } + Map newLog = new LinkedHashMap<>(); + newLog.put("record_type", keys.f0); + newLog.put("qname", keys.f1); + newLog.put("record", keys.f2); + newLog.put("last_found_time", tmpTime); + out.collect(newLog); + logger.debug("获取中间聚合结果:{}", newLog.toString()); + } catch (Exception e) { + logger.error("获取中间聚合结果失败,middleResult: {}", e); + } + } + +} diff --git a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsMapFunction.java new file mode 100644 index 0000000..d5926fe --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/DnsMapFunction.java @@ -0,0 +1,79 @@ +package com.zdjizhi.etl; + +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONUtil; +import com.google.common.base.Joiner; +import com.zdjizhi.enums.DnsType; +import org.apache.flink.api.common.functions.MapFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * @author zhq + * desc 将dns数据response拆分 + */ +public class DnsMapFunction implements MapFunction, Map> { + + private static final Logger logger = LoggerFactory.getLogger(DnsMapFunction.class); + + @Override + public Map map(Map rawLog) throws Exception { + try { + Object response = rawLog.get("response"); + JSONArray responseArray = JSONUtil.parseArray(response); + String dnsA = null; + int dnsANum = 0; + String dnsAAAA = null; + int dnsAAAANum = 0; + String dnsCNAME = null; + int dnsCNAMENum = 0; + String dnsNs = null; + int dnsNsNum = 0; + String dnsMx = null; + int dnsMxNum = 0; + for (Object res : responseArray) { + Map resMap = (Map) res; + String type = resMap.get("res_type"); + String body = resMap.get("res_body"); + if (DnsType.A.getCode().equals(type)) { + dnsA = Joiner.on(",").skipNulls().join(dnsA, body); + dnsANum++; + } else if (DnsType.AAAA.getCode().equals(type)) { + dnsAAAA = Joiner.on(",").skipNulls().join(dnsAAAA, body); + dnsAAAANum++; + } else if (DnsType.CNAME.getCode().equals(type)) { + dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body); + dnsCNAMENum++; + } else if (DnsType.CNAME.getCode().equals(type)) { + dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body); + dnsNsNum++; + } else if (DnsType.MX.getCode().equals(type)) { + dnsMx = Joiner.on(",").skipNulls().join(dnsMx, body); + dnsMxNum++; + } + } + //获取类型,相同类型合并用,拼接,并且计数加1 + rawLog.put("dns_a", dnsA); + rawLog.put("dns_a_num", dnsANum); + + rawLog.put("dns_aaaa", dnsAAAA); + rawLog.put("dns_aaaa_num", dnsAAAANum); + + rawLog.put("dns_cname", dnsCNAME); + rawLog.put("dns_cname_num", dnsCNAMENum); + + rawLog.put("dns_ns", dnsNs); + rawLog.put("dns_ns_num", dnsNsNum); + + rawLog.put("dns_mx", dnsMx); + rawLog.put("dns_mx_num", dnsMxNum); + } catch (Exception e) { + logger.error("dns 原始日志拆分 response 失败 {}", e.getMessage()); + } + + return rawLog; + } + +} diff --git a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java index c9bc596..e101afe 100644 --- a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java @@ -19,20 +19,11 @@ import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; /** * @author 94976 */ +@Deprecated public class DnsProcessFunction extends ProcessWindowFunction, Map, String, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(DnsProcessFunction.class); - @Override - public void process(String keys, Context context, Iterable> elements, Collector> out) { - - try { - getMiddleResult(out, elements); - } catch (Exception e) { - logger.error("获取中间聚合结果失败,middleResult: {}", e); - } - } - /** * 拆分dns_record * 五种:a/aaaa/cname/mx/ns @@ -40,49 +31,51 @@ public class DnsProcessFunction extends ProcessWindowFunction> out, Iterable> elements) { - long startTime = System.currentTimeMillis() / 1000; - long endTime = System.currentTimeMillis() / 1000; - String dnsQname = ""; + @Override + public void process(String keys, Context context, Iterable> elements, Collector> out) { + try { - Map distinctA = new HashMap<>(); - Map distinctAAAA = new HashMap<>(); - Map distinctCname = new HashMap<>(); - Map distinctNs = new HashMap<>(); - Map distinctMx = new HashMap<>(); - for (Map log : elements) { - List dnsA = splitDns(log, "dns_a"); - List dnsAAAA = splitDns(log, "dns_aaaa"); - List dnsCname = splitDns(log, "dns_cname"); - List dnsNs = splitDns(log, "dns_ns"); - List dnsMx = splitDns(log, "dns_mx"); + long startTime = System.currentTimeMillis() / 1000; + long endTime = System.currentTimeMillis() / 1000; + try { + Map distinctA = new HashMap<>(); + Map distinctAAAA = new HashMap<>(); + Map distinctCname = new HashMap<>(); + Map distinctNs = new HashMap<>(); + Map distinctMx = new HashMap<>(); + for (Map log : elements) { + List dnsA = splitDns(log, "dns_a"); + List dnsAAAA = splitDns(log, "dns_aaaa"); + List dnsCname = splitDns(log, "dns_cname"); + List dnsNs = splitDns(log, "dns_ns"); + List dnsMx = splitDns(log, "dns_mx"); - dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum)); - dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum)); - dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum)); - dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum)); - dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum)); + dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum)); + dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum)); + dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum)); + dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum)); + dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum)); - long connStartTimetime = Convert.toLong(log.get("capure_time_s")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; - dnsQname = StrUtil.toString(log.get("dns_qname")); + long connStartTimetime = Convert.toLong(log.get("capture_time")); + startTime = connStartTimetime < startTime ? connStartTimetime : startTime; + endTime = connStartTimetime > endTime ? connStartTimetime : endTime; + } + getNewDns(startTime, endTime, keys, distinctA, DnsType.A.getType(), out); + getNewDns(startTime, endTime, keys, distinctAAAA, DnsType.AAAA.getType(), out); + getNewDns(startTime, endTime, keys, distinctCname, DnsType.CNAME.getType(), out); + getNewDns(startTime, endTime, keys, distinctNs, DnsType.NS.getType(), out); + getNewDns(startTime, endTime, keys, distinctMx, DnsType.MX.getType(), out); + + } catch (Exception e) { + logger.error("聚合中间结果集失败 {}", e); } - getNewDns(startTime, endTime, dnsQname, distinctA, DnsType.a.toString(), out); - getNewDns(startTime, endTime, dnsQname, distinctAAAA, DnsType.aaaa.toString(), out); - getNewDns(startTime, endTime, dnsQname, distinctCname, DnsType.cname.toString(), out); - getNewDns(startTime, endTime, dnsQname, distinctNs, DnsType.ns.toString(), out); - getNewDns(startTime, endTime, dnsQname, distinctMx, DnsType.mx.toString(), out); - } catch (Exception e) { - logger.error("聚合中间结果集失败 {}", e); + logger.error("获取中间聚合结果失败,middleResult: {}", e); } } - private static List splitDns(Map log, String key) { - - return StrUtil.split(StrUtil.toString(log.get(key)), ","); + return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA); } private void getNewDns(long startTime, long endTime, String dnsQname, Map distinctMap, String type, Collector> out) { diff --git a/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java new file mode 100644 index 0000000..104d2d7 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java @@ -0,0 +1,58 @@ +package com.zdjizhi.etl; + +import cn.hutool.core.convert.Convert; +import cn.hutool.core.date.DateUtil; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.Map; + +import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; + + +/** + * @author 94976 + */ +public class DnsRelationProcessFunction extends ProcessWindowFunction, Map, Tuple3, TimeWindow> { + + private static final Logger logger = LoggerFactory.getLogger(DnsRelationProcessFunction.class); + + /** + * 拆分dns_record + * 聚合统计 + * 五种:a/aaaa/cname/mx/ns + * + * @param elements + * @return + */ + @Override + public void process(Tuple3 keys, Context context, Iterable> elements, Collector> out) { + + try { + long sessions = 0L; + long startTime = DateUtil.currentSeconds(); + long endTime = DateUtil.currentSeconds(); + for (Map log : elements) { + sessions++; + long logStartTime = Convert.toLong(log.get("start_time")); + startTime = logStartTime < startTime ? logStartTime : startTime; + endTime = logStartTime > endTime ? logStartTime : endTime; + } + Map newDns = new LinkedHashMap<>(); + newDns.put("start_time", startTime); + newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION); + newDns.put("record_type", keys.f0); + newDns.put("qname", keys.f1); + newDns.put("record", keys.f2); + newDns.put("sessions", sessions); + out.collect(newDns); + } catch (Exception e) { + logger.error("dns relation 日志聚合失败: {}", e); + } + } +} diff --git a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java new file mode 100644 index 0000000..725978e --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java @@ -0,0 +1,72 @@ +package com.zdjizhi.etl; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.zdjizhi.enums.DnsType; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + + +/** + * @author zhq + */ +public class DnsSplitFlatMapFunction extends RichFlatMapFunction, Map> { + + private static final Logger logger = LoggerFactory.getLogger(DnsSplitFlatMapFunction.class); + + /** + * 拆分dns_record + * 五种:a/aaaa/cname/mx/ns + * + * @return + */ + + @Override + public void flatMap(Map log, Collector> out) { + + try { + List dnsA = splitDns(log, "dns_a"); + List dnsAAAA = splitDns(log, "dns_aaaa"); + List dnsCname = splitDns(log, "dns_cname"); + List dnsNs = splitDns(log, "dns_ns"); + List dnsMx = splitDns(log, "dns_mx"); + String startTime = StrUtil.toString(log.get("capture_time")); + Object qname = log.get("qname"); + + getNewDns(qname, startTime, DnsType.A.getType(), dnsA, out); + getNewDns(qname, startTime, DnsType.AAAA.getType(), dnsAAAA, out); + getNewDns(qname, startTime, DnsType.CNAME.getType(), dnsCname, out); + getNewDns(qname, startTime, DnsType.NS.getType(), dnsNs, out); + getNewDns(qname, startTime, DnsType.MX.getType(), dnsMx, out); + + } catch (Exception e) { + logger.error("dns 原始日志拆分错: {}", e); + } + + } + + private void getNewDns(Object qname, String startTime, String type, List dnsList, Collector> out) throws Exception { + if (ObjectUtil.isNotEmpty(dnsList)) { + for (String record : dnsList) { + Map newDns = new LinkedHashMap<>(); + newDns.put("start_time", startTime); + newDns.put("record_type", type); + newDns.put("qname", qname); + newDns.put("record", record); + out.collect(newDns); + } + } + } + + private static List splitDns(Map log, String key) { + + return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA); + } + +} diff --git a/src/main/java/com/zdjizhi/pojo/DbLogEntity.java b/src/main/java/com/zdjizhi/pojo/DbLogEntity.java deleted file mode 100644 index b89f1db..0000000 --- a/src/main/java/com/zdjizhi/pojo/DbLogEntity.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.zdjizhi.pojo; - -import java.io.Serializable; -import java.util.Map; -import java.util.Objects; - -/** - * @description: - * @author: zhq - * @create: 2022-07-05 - **/ -public class DbLogEntity implements Serializable { - - private String tableName; - private Map data; - - public String getTableName() { - return tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } - - public Map getData() { - return data; - } - - public void setData(Map data) { - this.data = data; - } - - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - DbLogEntity that = (DbLogEntity) o; - return Objects.equals(tableName, that.tableName) && - Objects.equals(data, that.data); - } - - @Override - public int hashCode() { - return Objects.hash(tableName, data); - } -} diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index b372f9e..f28d79a 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -1,17 +1,15 @@ package com.zdjizhi.topology; import cn.hutool.core.convert.Convert; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.arangodb.entity.BaseDocument; -import com.zdjizhi.common.DnsKeysSelector; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.IpKeysSelector; -import com.zdjizhi.etl.ConnProcessFunction; -import com.zdjizhi.etl.Ip2IpGraphProcessFunction; -import com.zdjizhi.etl.DnsProcessFunction; -import com.zdjizhi.etl.SketchProcessFunction; +import com.zdjizhi.common.*; +import com.zdjizhi.enums.DnsType; +import com.zdjizhi.etl.*; import com.zdjizhi.utils.arangodb.ArangoDBSink; +import com.zdjizhi.utils.ck.ClickhouseSingleSink; import com.zdjizhi.utils.ck.ClickhouseSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -61,6 +59,7 @@ public class LogFlowWriteTopology { .process(new ConnProcessFunction()) .filter(x -> Objects.nonNull(x)) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + connTransformStream.print(); DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) @@ -68,7 +67,7 @@ public class LogFlowWriteTopology { .keyBy(new IpKeysSelector()) .window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) .process(new SketchProcessFunction()) - .filter(x -> Objects.nonNull(x)) + .filter(Objects::nonNull) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); //入Arangodb @@ -76,16 +75,18 @@ public class LogFlowWriteTopology { .keyBy(new IpKeysSelector()) .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) - .filter(x -> Objects.nonNull(x)) + .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); //写入CKsink,批量处理 - connSource.addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); - sketchSource.addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); - connTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); - sketchTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); + connSource.addSink(new ClickhouseSingleSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); + connTransformStream.print(); + connTransformStream.addSink(new ClickhouseSingleSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); + sketchSource.keyBy(new SketchKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_SKETCH)).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); + connTransformStream.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_RELATION_CONNECTION)).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); + sketchTransformStream.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_RELATION_CONNECTION)).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); -// ip2ipGraph.addSink(new ArangoDBSink("R_VISIT_IP2IP")); + ip2ipGraph.keyBy("key").process(new ArangoDelayProcess(R_VISIT_IP2IP)).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP); @@ -93,39 +94,57 @@ public class LogFlowWriteTopology { DataStream> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS)) .filter(Objects::nonNull) + .map(new DnsMapFunction()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS); DataStream> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_ORDERNESS)) - .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capure_time_s")) * 1000)) - .keyBy(new DnsKeysSelector()) + .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) + .flatMap(new DnsSplitFlatMapFunction()) + .keyBy(new DnsGraphKeysSelector()) .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) - .process(new DnsProcessFunction()) - .filter(x -> Objects.nonNull(x)) + .process(new DnsRelationProcessFunction()) + .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); - //过滤空数据不发送到Kafka内 + //dns 原始日志 ck入库 dnsSource.filter(Objects::nonNull) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("FilterOriginalData") + .keyBy(new DnsTimeKeysSelector()) + .process(new CKDelayProcess(SINK_CK_TABLE_DNS)) .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .name("CKSink"); - dnsTransform.filter(Objects::nonNull) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("FilterOriginalData") + //dns 拆分后relation日志 ck入库 + dnsTransform.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_DNS)) .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) .setParallelism(SINK_PARALLELISM) .name("CKSink"); - } + //arango 入库,按record_type分组入不同的表 + DataStream> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector()) + .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .process(new DnsGraphProcessFunction()) + .setParallelism(SINK_PARALLELISM) + .filter(Objects::nonNull); + + for (DnsType dnsEnum : DnsType.values()) { + dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) + .keyBy(new StartTimeKeysSelector()) + .map(new DnsGraphMapFunction()) + .process(new ArangoDelayProcess(dnsEnum.getSink())) + .addSink(new ArangoDBSink(dnsEnum.getSink())) + .setParallelism(SINK_PARALLELISM) + .name("ArangodbSink"); + } + + } env.execute(args[0]); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is : {}", e); - e.printStackTrace(); } } diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSink.java b/src/main/java/com/zdjizhi/utils/ck/CKSink.java deleted file mode 100644 index 99a579a..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/CKSink.java +++ /dev/null @@ -1,165 +0,0 @@ -package com.zdjizhi.utils.ck; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import ru.yandex.clickhouse.ClickHouseConnection; -import ru.yandex.clickhouse.ClickHouseDataSource; -import ru.yandex.clickhouse.settings.ClickHouseProperties; - -import java.sql.PreparedStatement; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class CKSink extends RichSinkFunction> { - - private static final Log log = LogFactory.get(); - - private static int count = 1; - private static ClickHouseConnection connection = null; - private static PreparedStatement preparedStatement = null; - - static String database = "default"; - static String address = "jdbc:clickhouse://192.168.45.102:8123/"+database; - static String username = "default"; - static String password = "galaxy2019"; - static String fieldStr = "id,name,age"; - static String tableName = "user_table"; - - private String insertSql; - - //创建连接对象和会话 - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - try { - connection = getConn(); - log.info("get clickhouse connection success !"); - String insertSql = preparedSql(fieldStr, tableName); - connection.setAutoCommit(false); - preparedStatement = connection.prepareStatement(insertSql); - } catch (Exception e) { - log.error("clickhouse初始化连接报错:", e); - } - } - -// @Override -// public void close() throws Exception { -// super.close(); -// //关闭连接和释放资源 -// if (connection != null) { -// connection.close(); -// } -// if (preparedStatement != null) { -// preparedStatement.close(); -// } -// } - - //使用Batch批量写入,关闭自动提交 - @Override - public void invoke(Map data, Context context) { - log.info(" invoke methed "); - - try { - - LinkedList values = new LinkedList<>(data.values()); - for (int i = 1; i <= values.size(); i++) { - Object val = values.get(i - 1); - if (val instanceof Long) { - preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); - } else if (val instanceof Integer) { - preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); - } else if (val instanceof Boolean) { - preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val))); - } else { - preparedStatement.setString((i), StrUtil.toString(val)); - } - } - - preparedStatement.addBatch(); - count = count + 1; - try { -// if (count >= 50000) { -// preparedStatement.executeBatch(); -// connection.commit(); -// preparedStatement.clearBatch(); -// count = 1; -// } - - //1w提交一次 -// if (count % 10000 == 0) { -// preparedStatement.executeBatch(); -// connection.commit(); -// preparedStatement.clearBatch(); -// } - preparedStatement.executeBatch(); - connection.commit(); - - } catch (Exception ee) { - log.error("数据插入click house 报错:", ee); - } - } catch (Exception ex) { - log.error("ClickhouseSink插入报错====", ex); - } - } - - public static ClickHouseConnection getConn() { - - int socketTimeout = 600000; - ClickHouseProperties properties = new ClickHouseProperties(); - properties.setUser(username); - properties.setPassword(password); - properties.setDatabase(database); - properties.setSocketTimeout(socketTimeout); - ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties); - ClickHouseConnection conn = null; - try { - conn = clickHouseDataSource.getConnection(); - return conn; - } catch (Exception e) { - log.error(e.getMessage()); - e.printStackTrace(); - } - return null; - } - - public static Map getField() { - - return null; - } - - - public String preparedSql(String fieldStr, String tableName) { - List fields = StrUtil.split(fieldStr, ","); - return getInsertSql(fields, tableName); - } - - public String getInsertSql(List fileds, String tableName) { - String sql = ""; - String sqlStr1 = "INSERT INTO `" + database + "`." + tableName + " ("; - String sqlStr2 = ") VALUES ("; - String sqlStr3 = ")"; - String sqlKey = ""; - String sqlValue = ""; - for (String key : fileds) { - sqlKey += key + ","; - sqlValue += "?,"; - } - sqlKey = sqlKey.substring(0, sqlKey.length() - 1); - sqlValue = sqlValue.substring(0, sqlValue.length() - 1); - sql = StrUtil.concat(true, sqlStr1, sqlKey, sqlStr2, sqlValue, sqlStr3); - -// String placeholders = Arrays.stream(fieldNames) -// .map(f -> "?") -// .collect(Collectors.joining(", ")); -// return "INSERT INTO " + quoteIdentifier(tableName) + -// "(" + columns + ")" + " VALUES (" + placeholders + ")"; - - - log.info(sql); - return sql; - } -} diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java b/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java deleted file mode 100644 index 55c99c4..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java +++ /dev/null @@ -1,151 +0,0 @@ -package com.zdjizhi.utils.ck; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.Collector; -import ru.yandex.clickhouse.ClickHouseConnection; -import ru.yandex.clickhouse.ClickHouseDataSource; -import ru.yandex.clickhouse.settings.ClickHouseProperties; - -import java.sql.PreparedStatement; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class CKSinkFlatMap extends RichFlatMapFunction, String> { - - private static final Log log = LogFactory.get(); - - private static int count = 1; - private static ClickHouseConnection connection = null; - private static PreparedStatement preparedStatement = null; - - static String address = "jdbc:clickhouse://192.168.45.102:8123"; - static String database = "default"; - static String username = "default"; - static String password = "galaxy2019"; - static String fieldStr = "id,name,age"; - static String tableName = "user_table"; - - private String insertSql; - - //创建连接对象和会话 - @Override - public void open(Configuration parameters) { - try { - connection = getConn(); - log.info("get clickhouse connection success !"); - } catch (Exception e) { - log.error("clickhouse初始化连接报错:", e); - } - } - - //使用Batch批量写入,关闭自动提交 - @Override - public void flatMap(Map data, Collector collector) { - - try { - String insertSql = preparedSql(fieldStr, tableName); - connection.setAutoCommit(false); - preparedStatement = connection.prepareStatement(insertSql); - - LinkedList values = new LinkedList<>(data.values()); - for (int i = 1; i <= values.size(); i++) { - Object val = values.get(i - 1); - if (val instanceof Long) { - preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); - } else if (val instanceof Integer) { - preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); - } else if (val instanceof Boolean) { - preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val))); - } else { - preparedStatement.setString((i), StrUtil.toString(val)); - } - } - - preparedStatement.addBatch(); - count = count + 1; - try { -// if (count >= 50000) { -// preparedStatement.executeBatch(); -// connection.commit(); -// preparedStatement.clearBatch(); -// count = 1; -// } - - //1w提交一次 - if (count % 10000 == 0) { - preparedStatement.executeBatch(); - connection.commit(); - preparedStatement.clearBatch(); - } - preparedStatement.executeBatch(); - connection.commit(); - - } catch (Exception ee) { - log.error("数据插入click house 报错:", ee); - } - } catch (Exception ex) { - log.error("ClickhouseSink插入报错====", ex); - } - } - - public static ClickHouseConnection getConn() { - - int socketTimeout = 600000; - ClickHouseProperties properties = new ClickHouseProperties(); - properties.setUser(username); - properties.setPassword(password); - properties.setDatabase(database); - properties.setSocketTimeout(socketTimeout); - ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties); - ClickHouseConnection conn = null; - try { - conn = clickHouseDataSource.getConnection(); - return conn; - } catch (Exception e) { - log.error(e.getMessage()); - e.printStackTrace(); - } - return null; - } - - public static Map getField() { - - return null; - } - - - public String preparedSql(String fieldStr, String tableName) { - List fields = StrUtil.split(fieldStr, ","); - return getInsertSql(fields, database + "." + tableName); - } - - public String getInsertSql(List fileds, String tableName) { - String sql = ""; - String sqlStr1 = "INSERT INTO " + tableName + " ("; - String sqlStr2 = ") VALUES ("; - String sqlStr3 = ")"; - String sqlKey = ""; - String sqlValue = ""; - for (String key : fileds) { - sqlKey += key + ","; - sqlValue += "?,"; - } - sqlKey = sqlKey.substring(0, sqlKey.length() - 1); - sqlValue = sqlValue.substring(0, sqlValue.length() - 1); - sql = StrUtil.concat(true, sqlStr1, sqlKey, sqlStr2, sqlValue, sqlStr3); - -// String placeholders = Arrays.stream(fieldNames) -// .map(f -> "?") -// .collect(Collectors.joining(", ")); -// return "INSERT INTO " + quoteIdentifier(tableName) + -// "(" + columns + ")" + " VALUES (" + placeholders + ")"; - - log.info(sql); - return sql; - } -} diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java b/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java deleted file mode 100644 index 963eef1..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java +++ /dev/null @@ -1,41 +0,0 @@ -/* -package com.zdjizhi.utils.ck; - -import java.util.Optional; - -*/ -/** - * clickhouse方言 - *//* - -public class ClickHouseJDBCDialect implements JDBCDialect { - - private static final long serialVersionUID = 1L; - - @Override - public boolean canHandle(String url) { - return url.startsWith("jdbc:clickhouse:"); - } - - @Override - public Optional defaultDriverName() { - return Optional.of("ru.yandex.clickhouse.ClickHouseDriver"); - } - - @Override - public String quoteIdentifier(String identifier) { - return "`" + identifier + "`"; - } - - @Override - public Optional getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { - return Optional.of(getInsertIntoStatement(tableName, fieldNames)); - } - - @Override - public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) { - return null; - } - -} -*/ diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSingleSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSingleSink.java new file mode 100644 index 0000000..dc5cbe7 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSingleSink.java @@ -0,0 +1,124 @@ +package com.zdjizhi.utils.ck; + +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static com.zdjizhi.common.FlowWriteConfig.*; + +public class ClickhouseSingleSink extends RichSinkFunction> { + + private static final Log log = LogFactory.get(); + + private Connection connection; + private PreparedStatement preparedStatement; + public String sink; + + + public ClickhouseSingleSink(String sink) { + this.sink = sink; + } + + public String getSink() { + return sink; + } + + public void setSink(String sink) { + this.sink = sink; + } + + @Override + public void invoke(Map logs, Context context) throws Exception { + executeInsert(logs, getSink()); + } + + @Override + public void open(Configuration parameters) throws Exception { + try { + Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); + connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); +// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props); +// connection = dataSource.getConnection(); + + log.info("get clickhouse connection success"); + } catch (ClassNotFoundException | SQLException e) { + log.error("clickhouse connection error ,{}", e); + } + } + + @Override + public void close() throws Exception { + IoUtil.close(preparedStatement); + IoUtil.close(connection); + } + + public void executeInsert(Map data, String tableName) { + + try { + List keys = new LinkedList<>(data.keySet()); + connection.setAutoCommit(false); + preparedStatement = connection.prepareStatement(preparedSql(keys, tableName)); + int count = 0; + List values = new LinkedList<>(data.values()); + for (int i = 1; i <= values.size(); i++) { + Object val = values.get(i - 1); + if (val instanceof Long) { + preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); + } else if (val instanceof Integer) { + preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val))); + } else if (val instanceof Boolean) { + preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val))); + } else { + preparedStatement.setString((i), StrUtil.toString(val)); + } + } + preparedStatement.addBatch(); + count++; + //1w提交一次 + if (count % SINK_BATCH == 0) { + preparedStatement.executeBatch(); + connection.commit(); + preparedStatement.clearBatch(); + count = 0; + } + if (count > 0) { + preparedStatement.executeBatch(); + connection.commit(); + } + + } catch (Exception ex) { + log.error("ClickhouseSink插入报错", ex); + } + } + + + public static String preparedSql(List fields, String tableName) { + + String placeholders = fields.stream() + .filter(Objects::nonNull) + .map(f -> "?") + .collect(Collectors.joining(", ")); + String columns = fields.stream() + .filter(Objects::nonNull) + .collect(Collectors.joining(", ")); + String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName, + "(", columns, ") VALUES (", placeholders, ")"); + log.debug(sql); + return sql; + } + + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index 80a1d0c..98afe4c 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -1,5 +1,6 @@ package com.zdjizhi.utils.ck; +import cn.hutool.core.io.IoUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -18,7 +19,7 @@ import java.util.stream.Collectors; import static com.zdjizhi.common.FlowWriteConfig.*; -public class ClickhouseSink extends RichSinkFunction> { +public class ClickhouseSink extends RichSinkFunction>> { private static final Log log = LogFactory.get(); @@ -27,7 +28,15 @@ public class ClickhouseSink extends RichSinkFunction> { public String sink; static { - + try { + Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); + connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); +// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props); +// connection = dataSource.getConnection(); + log.info("get clickhouse connection success"); + } catch (ClassNotFoundException | SQLException e) { + log.error("clickhouse connection error ,{}", e); + } } public ClickhouseSink(String sink) { @@ -43,21 +52,13 @@ public class ClickhouseSink extends RichSinkFunction> { } @Override - public void invoke(Map log, Context context) throws Exception { - executeInsert(log, getSink()); + public void invoke(List> logs, Context context) throws Exception { + executeInsert(logs, getSink()); } @Override public void open(Configuration parameters) throws Exception { - try { - Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); - connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); -// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props); -// connection = dataSource.getConnection(); - log.info("get clickhouse connection success"); - } catch (ClassNotFoundException | SQLException e) { - log.error("clickhouse connection error ,{}", e); - } + } @Override @@ -70,50 +71,52 @@ public class ClickhouseSink extends RichSinkFunction> { } } - public void executeInsert(Map data, String tableName) { + public void executeInsert(List> data, String tableName) { try { - int count = 1; - List keys = new LinkedList<>(data.keySet()); - + List keys = new LinkedList<>(data.get(0).keySet()); connection.setAutoCommit(false); preparedStatement = connection.prepareStatement(preparedSql(keys, tableName)); - List values = new LinkedList<>(data.values()); - for (int i = 1; i <= values.size(); i++) { - Object val = values.get(i - 1); - if (val instanceof Long) { - preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); - } else if (val instanceof Integer) { - preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val))); - } else if (val instanceof Boolean) { - preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val))); - } else { - preparedStatement.setString((i), StrUtil.toString(val)); + int count = 0; + for (Map map : data) { + List values = new LinkedList<>(map.values()); + for (int i = 1; i <= values.size(); i++) { + Object val = values.get(i - 1); + if (val instanceof Long) { + preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); + } else if (val instanceof Integer) { + preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val))); + } else if (val instanceof Boolean) { + preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val))); + } else { + preparedStatement.setString((i), StrUtil.toString(val)); + } } - } - - preparedStatement.addBatch(); - count = count + 1; - try { + preparedStatement.addBatch(); + count++; //1w提交一次 - if (count % 10000 == 0) { + if (count % SINK_BATCH == 0) { preparedStatement.executeBatch(); connection.commit(); preparedStatement.clearBatch(); - count = 1; + count = 0; } + } + if (count > 0) { preparedStatement.executeBatch(); connection.commit(); - } catch (Exception ee) { - log.error("数据插入clickhouse 报错:", ee); } + } catch (Exception ex) { log.error("ClickhouseSink插入报错", ex); + } finally { + IoUtil.close(preparedStatement); + IoUtil.close(connection); } } - public String preparedSql(List fields, String tableName) { + public static String preparedSql(List fields, String tableName) { String placeholders = fields.stream() .filter(Objects::nonNull) @@ -124,7 +127,7 @@ public class ClickhouseSink extends RichSinkFunction> { .collect(Collectors.joining(", ")); String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName, "(", columns, ") VALUES (", placeholders, ")"); - log.info(sql); + log.debug(sql); return sql; } diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java index 5fc1894..9ebdeb5 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java @@ -1,56 +1,53 @@ -/* package com.zdjizhi.utils.ck; -import org.apache.flink.api.java.utils.ParameterTool; -import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings; -import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst; +import cn.hutool.core.io.IoUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +import static com.zdjizhi.common.FlowWriteConfig.*; -*/ /** * @description: * @author: zhq - * @create: 2022-06-29 - **//* - + * @create: 2022-07-10 + **/ public class ClickhouseUtil { + private static final Log log = LogFactory.get(); - public static ParameterTool getGlobalPro() { - Map sinkPro = new HashMap<>(); - //sink Properties - sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "sc.chproxy.bigdata.services.org:10000"); - - // ClickHouse 本地写账号 - sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_USER, "default"); - sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, "galaxy2019"); - // sink common - sinkPro.put(ClickHouseSinkConst.TIMEOUT_SEC, "10"); - sinkPro.put(ClickHouseSinkConst.NUM_WRITERS, "10"); - sinkPro.put(ClickHouseSinkConst.NUM_RETRIES, "3"); - sinkPro.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "1000000"); - sinkPro.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false"); - sinkPro.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");//本地运行会在项目内生成名字为"d:"的文件夹,以存放运行失败明细记录 - - // env - sinkPro - ParameterTool parameters = ParameterTool.fromMap(sinkPro); + private static Connection connection; - return parameters; + public static Connection getConnection() { + try { + Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); +// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); + ClickHouseProperties properties = new ClickHouseProperties(); + properties.setDatabase(CK_DATABASE); + properties.setUser(CK_USERNAME); + properties.setPassword(CK_PIN); + properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT); + properties.setSocketTimeout(CK_SOCKET_TIMEOUT); + BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties); + connection = dataSource.getConnection(); + + log.info("get clickhouse connection success"); + return connection; + } catch (ClassNotFoundException | SQLException e) { + log.error("clickhouse connection error ,{}", e); + } + return null; } - public static Properties getCKPro() { - // ClickHouseSink - sinkPro - Properties props = new Properties(); - props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "database_1564.ch_zjk_test_local"); - props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000"); - return props; + public static void close() { + IoUtil.close(connection); } - } -*/ diff --git a/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java b/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java deleted file mode 100644 index 9cd448a..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java +++ /dev/null @@ -1,39 +0,0 @@ -/* -package com.zdjizhi.utils.ck; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Optional; -import java.util.stream.Collectors; - -import static ru.yandex.clickhouse.ClickHouseUtil.quoteIdentifier; - -*/ -/** - * Handle the SQL dialect of jdbc driver. - *//* - -public interface JDBCDialect extends Serializable { - default Optional getUpsertStatement( - String tableName, String[] fieldNames, String[] uniqueKeyFields) { - return Optional.empty(); - } - default String getInsertIntoStatement(String tableName, String[] fieldNames) { - String columns = Arrays.stream(fieldNames) - .map(this::quoteIdentifier) - .collect(Collectors.joining(", ")); - String placeholders = Arrays.stream(fieldNames) - .map(f -> "?") - .collect(Collectors.joining(", ")); - return "INSERT INTO " + quoteIdentifier(tableName) + - "(" + columns + ")" + " VALUES (" + placeholders + ")"; - } - - default String getDeleteStatement(String tableName, String[] conditionFields) { - String conditionClause = Arrays.stream(conditionFields) - .map(f -> quoteIdentifier(f) + "=?") - .collect(Collectors.joining(" AND ")); - return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause; - } -} -*/ diff --git a/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java b/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java deleted file mode 100644 index 9c951e4..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java +++ /dev/null @@ -1,14 +0,0 @@ -/* -package com.zdjizhi.utils.ck; - -import java.util.Arrays; -import java.util.List; - -public final class JDBCDialects { - - private static final List DIALECTS = Arrays.asList( -// new DerbyDialect(), -// new MySQLDialect(), -// new PostgresDialect() - ); -}*/