优化代码:使用windowAll做数据批量操作

This commit is contained in:
zhanghongqing
2022-07-12 19:24:53 +08:00
parent c1b70a6da0
commit 06042db9b1
35 changed files with 593 additions and 1027 deletions

View File

@@ -1,65 +0,0 @@
package com.zdjizhi.common;
import com.arangodb.entity.BaseDocument;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public class ArangoDelayProcess extends ProcessFunction<BaseDocument, List<BaseDocument>> {
private ValueState<Long> currentTimer;
private ListState<BaseDocument> itemState;
private String stateName;
@Override
public void open(Configuration parameters) throws Exception {
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(getStateName() + "_timer", Types.LONG));
ListStateDescriptor<BaseDocument> itemViewStateDesc = new ListStateDescriptor(getStateName() + "_state", Map.class);
itemState = getRuntimeContext().getListState(itemViewStateDesc);
}
@Override
public void processElement(BaseDocument value, Context context, Collector<List<BaseDocument>> collector) throws Exception {
//判断定时器是否为空,为空则创建新的定时器
Long curTimeStamp = currentTimer.value();
if (curTimeStamp == null || curTimeStamp == 0) {
long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000;
context.timerService().registerEventTimeTimer(onTimer);
currentTimer.update(onTimer);
}
itemState.add(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<BaseDocument>> out) throws Exception {
Spliterator<BaseDocument> spliterator = itemState.get().spliterator();
List<BaseDocument> collect = StreamSupport.stream(spliterator, false)
.collect(Collectors.toList());
out.collect(collect);
currentTimer.clear();
itemState.clear();
}
public ArangoDelayProcess(String stateName) {
this.stateName = stateName;
}
public String getStateName() {
return stateName;
}
public void setStateName(String stateName) {
this.stateName = stateName;
}
}

View File

@@ -0,0 +1,36 @@
package com.zdjizhi.common;
import cn.hutool.core.util.StrUtil;
import com.arangodb.entity.BaseEdgeDocument;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class ArangodbDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
@Override
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception {
Iterator<Map<String, Object>> iterator = iterable.iterator();
List<BaseEdgeDocument> batchLog = new ArrayList<>();
while (iterator.hasNext()) {
Map<String, Object> next = iterator.next();
String qname = StrUtil.toString(next.get("qname"));
String record = StrUtil.toString(next.get("record"));
BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
baseEdgeDocument.setKey(String.join("-", qname, record));
baseEdgeDocument.setFrom("qname/" + qname);
baseEdgeDocument.setTo("record/" + record);
baseEdgeDocument.addAttribute("qname", qname);
baseEdgeDocument.addAttribute("record", record);
baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
batchLog.add(baseEdgeDocument);
}
out.collect(batchLog);
}
}

View File

@@ -0,0 +1,36 @@
package com.zdjizhi.common;
import cn.hutool.core.util.StrUtil;
import com.arangodb.entity.BaseEdgeDocument;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class ArangodbIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
@Override
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception {
Iterator<Map<String, Object>> iterator = iterable.iterator();
List<BaseEdgeDocument> batchLog = new ArrayList<>();
while (iterator.hasNext()) {
Map<String, Object> next = iterator.next();
String srcIp = StrUtil.toString(next.get("src_ip"));
String dstIp = StrUtil.toString(next.get("dst_ip"));
BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
baseEdgeDocument.setKey(String.join("-", srcIp, dstIp));
baseEdgeDocument.setFrom("src_ip/" + srcIp);
baseEdgeDocument.setTo("dst_ip/" + dstIp);
baseEdgeDocument.addAttribute("src_ip", srcIp);
baseEdgeDocument.addAttribute("dst_ip", dstIp);
baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
batchLog.add(baseEdgeDocument);
}
collector.collect(batchLog);
}
}

View File

@@ -1,65 +0,0 @@
package com.zdjizhi.common;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public class CKDelayProcess extends ProcessFunction<Map<String, Object>, List<Map<String, Object>>> {
private ValueState<Long> currentTimer;
private ListState<Map<String, Object>> itemState;
private String stateName;
@Override
public void open(Configuration parameters) throws Exception {
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(getStateName() + "_timer", Types.LONG));
ListStateDescriptor<Map<String, Object>> itemViewStateDesc = new ListStateDescriptor(getStateName() + "_state", Map.class);
itemState = getRuntimeContext().getListState(itemViewStateDesc);
}
@Override
public void processElement(Map<String, Object> value, Context context, Collector<List<Map<String, Object>>> collector) throws Exception {
//判断定时器是否为空,为空则创建新的定时器
Long curTimeStamp = currentTimer.value();
if (curTimeStamp == null || curTimeStamp == 0) {
long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000;
context.timerService().registerEventTimeTimer(onTimer);
currentTimer.update(onTimer);
}
itemState.add(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<Map<String, Object>>> out) throws Exception {
Spliterator<Map<String, Object>> spliterator = itemState.get().spliterator();
List<Map<String, Object>> collect = StreamSupport.stream(spliterator, false)
.collect(Collectors.toList());
out.collect(collect);
currentTimer.clear();
itemState.clear();
}
public CKDelayProcess(String stateName) {
this.stateName = stateName;
}
public String getStateName() {
return stateName;
}
public void setStateName(String stateName) {
this.stateName = stateName;
}
}

View File

@@ -0,0 +1,24 @@
package com.zdjizhi.common;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class CKWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
@Override
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
Iterator<Map<String, Object>> iterator = iterable.iterator();
List<Map<String, Object>> batchLog = new ArrayList<>();
while (iterator.hasNext()) {
Map<String, Object> next = iterator.next();
batchLog.add(next);
}
out.collect(batchLog);
}
}

View File

@@ -1,21 +0,0 @@
package com.zdjizhi.common;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.Map;
/**
* @description:
* @author: zhq
* @create: 2022-07-05
**/
@Deprecated
public class ConnKeysSelector implements KeySelector<Map<String, Object>, String> {
@Override
public String getKey(Map<String,Object> log) throws Exception {
return String.valueOf(log.get("conn_start_time"));
}
}

View File

@@ -1,19 +0,0 @@
package com.zdjizhi.common;
import org.apache.flink.api.java.functions.KeySelector;
import java.util.Map;
/**
* @description:
* @author: zhq
* @create: 2022-07-05
**/
public class DnsTimeKeysSelector implements KeySelector<Map<String, Object>, String> {
@Override
public String getKey(Map<String,Object> log) throws Exception {
return String.valueOf(log.get("capture_time"));
}
}

View File

@@ -15,7 +15,6 @@ public class IpKeysSelector implements KeySelector<Map<String, Object>, Tuple2<S
@Override
public Tuple2<String, String> getKey(Map<String,Object> log) throws Exception {
return Tuple2.of(
String.valueOf(log.get("src_ip")),
String.valueOf(log.get("dst_ip")));

View File

@@ -1,24 +0,0 @@
package com.zdjizhi.common;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.Map;
/**
* @description:
* @author: zhq
* @create: 2022-07-05
**/
@Deprecated
public class KeysSelector implements KeySelector<Map<String, Object>, Tuple2<String, String>> {
@Override
public Tuple2<String, String> getKey(Map<String,Object> log) throws Exception {
return Tuple2.of(
String.valueOf(log.get("src_ip")),
String.valueOf(log.get("dst_ip")));
}
}

View File

@@ -1,12 +0,0 @@
package com.zdjizhi.common;
/**
* @description:
* @author: zhq
* @create: 2022-07-10
**/
public class ListWindow {
}

View File

@@ -1,19 +0,0 @@
package com.zdjizhi.common;
import org.apache.flink.api.java.functions.KeySelector;
import java.util.Map;
/**
* @description:
* @author: zhq
* @create: 2022-07-05
**/
public class SketchKeysSelector implements KeySelector<Map<String, Object>, String> {
@Override
public String getKey(Map<String,Object> log) throws Exception {
return String.valueOf(log.get("sketch_start_time"));
}
}

View File

@@ -1,19 +0,0 @@
package com.zdjizhi.common;
import org.apache.flink.api.java.functions.KeySelector;
import java.util.Map;
/**
* @description:
* @author: zhq
* @create: 2022-07-05
**/
public class StartTimeKeysSelector implements KeySelector<Map<String, Object>, String> {
@Override
public String getKey(Map<String,Object> log) throws Exception {
return String.valueOf(log.get("start_time"));
}
}

View File

@@ -9,11 +9,11 @@ import static com.zdjizhi.common.FlowWriteConfig.*;
**/
public enum DnsType {
//对应dns类型,编码,入库表
A("a", " 0x0001", R_RESOLVE_DOMAIN2IP),
AAAA("aaaa", " 0x001c", R_RESOLVE_DOMAIN2IP),
CNAME("cname", " 0x0005", R_CNAME_DOMAIN2DOMAIN),
MX("mx", " 0x000f", R_MX_DOMAIN2DOMAIN),
NS("ns", " 0x0002", R_NX_DOMAIN2DOMAIN);
A("a", "0x0001", R_RESOLVE_DOMAIN2IP),
AAAA("aaaa", "0x001c", R_RESOLVE_DOMAIN2IP),
CNAME("cname", "0x0005", R_CNAME_DOMAIN2DOMAIN),
MX("mx", "0x000f", R_MX_DOMAIN2DOMAIN),
NS("ns", "0x0002", R_NX_DOMAIN2DOMAIN);
private String type;
private String code;

View File

@@ -1,13 +1,13 @@
package com.zdjizhi.etl;
import cn.hutool.core.convert.Convert;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -20,7 +20,7 @@ import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
*/
public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(ConnProcessFunction.class);
private static final Log logger = LogFactory.get();
@Override
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {

View File

@@ -1,35 +0,0 @@
package com.zdjizhi.etl;
import cn.hutool.core.util.StrUtil;
import com.arangodb.entity.BaseDocument;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* 去重
*/
public class DnsGraphMapFunction extends RichMapFunction<Map<String, Object>, BaseDocument> {
private static final Logger logger = LoggerFactory.getLogger(DnsGraphMapFunction.class);
@Override
public BaseDocument map(Map<String, Object> map) throws Exception {
try {
BaseDocument baseDocument = new BaseDocument();
baseDocument.setKey(String.join("-", StrUtil.toString(map.get("qname")), StrUtil.toString(map.get("record"))));
baseDocument.addAttribute("qname", map.get("qname"));
baseDocument.addAttribute("record", map.get("record"));
baseDocument.addAttribute("last_found_time", map.get("start_time"));
return baseDocument;
} catch (Exception e) {
logger.error("dns record type 类型转换错误: {}", e);
}
return null;
}
}

View File

@@ -1,12 +1,12 @@
package com.zdjizhi.etl;
import cn.hutool.core.convert.Convert;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -17,15 +17,15 @@ import java.util.Map;
*/
public class DnsGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple3<String, String, String>, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(DnsGraphProcessFunction.class);
private static final Log logger = LogFactory.get();
@Override
public void process(Tuple3<String, String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
try {
long tmpTime = 0L;
Long tmpTime = 0L;
for (Map<String, Object> log : elements) {
long startTime = Convert.toLong(log.get("capure_time"));
Long startTime = Convert.toLong(log.get("start_time"));
tmpTime = startTime > tmpTime ? startTime : tmpTime;
}
Map newLog = new LinkedHashMap<>();

View File

@@ -1,12 +1,13 @@
package com.zdjizhi.etl;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.google.common.base.Joiner;
import com.zdjizhi.enums.DnsType;
import org.apache.flink.api.common.functions.MapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
@@ -16,7 +17,7 @@ import java.util.Map;
*/
public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
private static final Logger logger = LoggerFactory.getLogger(DnsMapFunction.class);
private static final Log logger = LogFactory.get();
@Override
public Map<String, Object> map(Map<String, Object> rawLog) throws Exception {
@@ -34,9 +35,9 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
String dnsMx = null;
int dnsMxNum = 0;
for (Object res : responseArray) {
Map<String, String> resMap = (Map<String, String>) res;
String type = resMap.get("res_type");
String body = resMap.get("res_body");
Map<String, Object> resMap = (Map<String, Object>) res;
String type = StrUtil.toString(resMap.get("res_type"));
String body = StrUtil.toString(resMap.get("res_body"));
if (DnsType.A.getCode().equals(type)) {
dnsA = Joiner.on(",").skipNulls().join(dnsA, body);
dnsANum++;
@@ -54,6 +55,7 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
dnsMxNum++;
}
}
//获取类型相同类型合并用拼接并且计数加1
rawLog.put("dns_a", dnsA);
rawLog.put("dns_a_num", dnsANum);

View File

@@ -1,93 +0,0 @@
package com.zdjizhi.etl;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.StrUtil;
import com.zdjizhi.enums.DnsType;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
/**
* @author 94976
*/
@Deprecated
public class DnsProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(DnsProcessFunction.class);
/**
* 拆分dns_record
* 五种a/aaaa/cname/mx/ns
*
* @param elements
* @return
*/
@Override
public void process(String keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
try {
long startTime = System.currentTimeMillis() / 1000;
long endTime = System.currentTimeMillis() / 1000;
try {
Map<String, Long> distinctA = new HashMap<>();
Map<String, Long> distinctAAAA = new HashMap<>();
Map<String, Long> distinctCname = new HashMap<>();
Map<String, Long> distinctNs = new HashMap<>();
Map<String, Long> distinctMx = new HashMap<>();
for (Map<String, Object> log : elements) {
List<String> dnsA = splitDns(log, "dns_a");
List<String> dnsAAAA = splitDns(log, "dns_aaaa");
List<String> dnsCname = splitDns(log, "dns_cname");
List<String> dnsNs = splitDns(log, "dns_ns");
List<String> dnsMx = splitDns(log, "dns_mx");
dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum));
dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum));
dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum));
dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum));
dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum));
long connStartTimetime = Convert.toLong(log.get("capture_time"));
startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
}
getNewDns(startTime, endTime, keys, distinctA, DnsType.A.getType(), out);
getNewDns(startTime, endTime, keys, distinctAAAA, DnsType.AAAA.getType(), out);
getNewDns(startTime, endTime, keys, distinctCname, DnsType.CNAME.getType(), out);
getNewDns(startTime, endTime, keys, distinctNs, DnsType.NS.getType(), out);
getNewDns(startTime, endTime, keys, distinctMx, DnsType.MX.getType(), out);
} catch (Exception e) {
logger.error("聚合中间结果集失败 {}", e);
}
} catch (Exception e) {
logger.error("获取中间聚合结果失败,middleResult: {}", e);
}
}
private static List<String> splitDns(Map<String, Object> log, String key) {
return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA);
}
private void getNewDns(long startTime, long endTime, String dnsQname, Map<String, Long> distinctMap, String type, Collector<Map<String, Object>> out) {
for (Map.Entry<String, Long> dns : distinctMap.entrySet()) {
Map<String, Object> newDns = new HashMap<>();
newDns.put("start_time", startTime);
newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION);
newDns.put("record_type", type);
newDns.put("qname", dnsQname);
newDns.put("record", dns.getKey());
newDns.put("sessions", dns.getValue());
out.collect(newDns);
}
}
}

View File

@@ -2,12 +2,12 @@ package com.zdjizhi.etl;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -20,7 +20,7 @@ import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
*/
public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple3<String, String, String>, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(DnsRelationProcessFunction.class);
private static final Log logger = LogFactory.get();
/**
* 拆分dns_record

View File

@@ -2,15 +2,17 @@ package com.zdjizhi.etl;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.google.common.base.Splitter;
import com.zdjizhi.enums.DnsType;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
@@ -18,7 +20,7 @@ import java.util.Map;
*/
public class DnsSplitFlatMapFunction extends RichFlatMapFunction<Map<String, Object>, Map<String, Object>> {
private static final Logger logger = LoggerFactory.getLogger(DnsSplitFlatMapFunction.class);
private static final Log logger = LogFactory.get();
/**
* 拆分dns_record
@@ -65,8 +67,10 @@ public class DnsSplitFlatMapFunction extends RichFlatMapFunction<Map<String, Obj
}
private static List<String> splitDns(Map<String, Object> log, String key) {
return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA);
if (Objects.isNull(log.get(key))) {
return null;
}
return Splitter.on(StrUtil.COMMA).trimResults().omitEmptyStrings().splitToList(StrUtil.toString(log.get(key)));
}
}

View File

@@ -2,40 +2,39 @@ package com.zdjizhi.etl;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import com.arangodb.entity.BaseDocument;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* 对ip去重
*/
public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, BaseDocument, Tuple2<String, String>, TimeWindow> {
public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(Ip2IpGraphProcessFunction.class);
private static final Log logger = LogFactory.get();
@Override
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<BaseDocument> out) {
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
try {
long lastFoundTime = DateUtil.currentSeconds();
for (Map<String, Object> log : elements) {
long connStartTimetime = Convert.toLong(log.get("conn_start_time"));
long connStartTimetime = Convert.toLong(log.get("start_time"));
lastFoundTime = connStartTimetime > lastFoundTime ? connStartTimetime : lastFoundTime;
}
BaseDocument baseDocument = new BaseDocument();
baseDocument.setKey(String.join("-", keys.f0, keys.f1));
baseDocument.addAttribute("src_ip", keys.f0);
baseDocument.addAttribute("dst_ip", keys.f1);
baseDocument.addAttribute("last_found_time", lastFoundTime);
out.collect(baseDocument);
logger.debug("获取中间聚合结果:{}", baseDocument.toString());
Map<String, Object> newLog = new HashMap<>();
newLog.put("src_ip", keys.f0);
newLog.put("dst_ip", keys.f1);
newLog.put("last_found_time", lastFoundTime);
out.collect(newLog);
logger.debug("获取中间聚合结果:{}", newLog.toString());
} catch (Exception e) {
logger.error("获取中间聚合结果失败,middleResult: {}", e);

View File

@@ -1,13 +1,13 @@
package com.zdjizhi.etl;
import cn.hutool.core.convert.Convert;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -20,7 +20,7 @@ import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
*/
public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(SketchProcessFunction.class);
private static final Log logger = LogFactory.get();
@Override
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {

View File

@@ -4,19 +4,16 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.arangodb.entity.BaseDocument;
import com.zdjizhi.common.*;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.etl.*;
import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.ClickhouseSingleSink;
import com.zdjizhi.utils.ck.ClickhouseSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
@@ -55,40 +52,36 @@ public class LogFlowWriteTopology {
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
.filter(x -> Objects.nonNull(x))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
connTransformStream.print();
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction())
.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//入Arangodb
SingleOutputStreamOperator<BaseDocument> ip2ipGraph = connTransformStream.union(sketchTransformStream)
DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
.keyBy(new IpKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
.filter(Objects::nonNull)
// .filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//写入CKsink,批量处理
connSource.addSink(new ClickhouseSingleSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
connTransformStream.print();
connTransformStream.addSink(new ClickhouseSingleSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
sketchSource.keyBy(new SketchKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_SKETCH)).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
connTransformStream.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_RELATION_CONNECTION)).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
sketchTransformStream.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_RELATION_CONNECTION)).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
ip2ipGraph.keyBy("key").process(new ArangoDelayProcess(R_VISIT_IP2IP)).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
//写入arangodb
ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
} else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) {
@@ -103,7 +96,7 @@ public class LogFlowWriteTopology {
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
.flatMap(new DnsSplitFlatMapFunction())
.keyBy(new DnsGraphKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
@@ -111,30 +104,27 @@ public class LogFlowWriteTopology {
//dns 原始日志 ck入库
dnsSource.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.keyBy(new DnsTimeKeysSelector())
.process(new CKDelayProcess(SINK_CK_TABLE_DNS))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
//dns 拆分后relation日志 ck入库
dnsTransform.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_DNS))
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
.setParallelism(SINK_PARALLELISM)
.name("CKSink");
//arango 入库,按record_type分组入不同的表
DataStream<Map<String, Object>> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
.setParallelism(SINK_PARALLELISM)
.filter(Objects::nonNull);
for (DnsType dnsEnum : DnsType.values()) {
dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
.keyBy(new StartTimeKeysSelector())
.map(new DnsGraphMapFunction())
.process(new ArangoDelayProcess(dnsEnum.getSink()))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbDnsWindow())
.addSink(new ArangoDBSink(dnsEnum.getSink()))
.setParallelism(SINK_PARALLELISM)
.name("ArangodbSink");

View File

@@ -1,5 +1,7 @@
package com.zdjizhi.utils.arangodb;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.arangodb.ArangoCollection;
import com.arangodb.ArangoCursor;
import com.arangodb.ArangoDB;
@@ -11,22 +13,21 @@ import com.arangodb.model.AqlQueryOptions;
import com.arangodb.model.DocumentCreateOptions;
import com.arangodb.util.MapBuilder;
import com.zdjizhi.common.FlowWriteConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class ArangoDBConnect {
private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class);
private static final Log logger = LogFactory.get();
private static ArangoDB arangoDB = null;
private static ArangoDBConnect conn = null;
static {
getArangoDB();
}
private static void getArangoDB(){
private static void getArangoDB() {
arangoDB = new ArangoDB.Builder()
.maxConnections(FlowWriteConfig.THREAD_POOL_NUMBER)
.host(FlowWriteConfig.ARANGODB_HOST, FlowWriteConfig.ARANGODB_PORT)
@@ -35,59 +36,59 @@ public class ArangoDBConnect {
.build();
}
public static synchronized ArangoDBConnect getInstance(){
if (null == conn){
public static synchronized ArangoDBConnect getInstance() {
if (null == conn) {
conn = new ArangoDBConnect();
}
return conn;
}
private ArangoDatabase getDatabase(){
private ArangoDatabase getDatabase() {
return arangoDB.db(FlowWriteConfig.ARANGODB_DB_NAME);
}
public void clean(){
public void clean() {
try {
if (arangoDB != null){
if (arangoDB != null) {
arangoDB.shutdown();
}
}catch (Exception e){
LOG.error(e.getMessage());
} catch (Exception e) {
logger.error(e.getMessage());
}
}
public <T> ArangoCursor<T> executorQuery(String query,Class<T> type){
public <T> ArangoCursor<T> executorQuery(String query, Class<T> type) {
ArangoDatabase database = getDatabase();
Map<String, Object> bindVars = new MapBuilder().get();
AqlQueryOptions options = new AqlQueryOptions()
.ttl(FlowWriteConfig.ARANGODB_TTL);
try {
return database.query(query, bindVars, options, type);
}catch (Exception e){
LOG.error(e.getMessage());
} catch (Exception e) {
logger.error(e.getMessage());
return null;
}finally {
} finally {
bindVars.clear();
}
}
public <T> void overwrite(List<T> docOverwrite, String collectionName){
public <T> void overwrite(List<T> docOverwrite, String collectionName) {
ArangoDatabase database = getDatabase();
try {
ArangoCollection collection = database.collection(collectionName);
if (!docOverwrite.isEmpty()){
if (!docOverwrite.isEmpty()) {
DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions();
documentCreateOptions.overwrite(true);
documentCreateOptions.silent(true);
MultiDocumentEntity<DocumentCreateEntity<T>> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions);
Collection<ErrorEntity> errors = documentCreateEntityMultiDocumentEntity.getErrors();
for (ErrorEntity errorEntity:errors){
LOG.error("写入arangoDB异常"+errorEntity.getErrorMessage());
for (ErrorEntity errorEntity : errors) {
logger.error("写入arangoDB异常{}", errorEntity.getErrorMessage());
}
}
}catch (Exception e){
LOG.error("更新失败:"+e.toString());
}finally {
} catch (Exception e) {
logger.error("更新失败:" + e.toString());
} finally {
docOverwrite.clear();
}
}

View File

@@ -1,6 +1,6 @@
package com.zdjizhi.utils.arangodb;
import com.arangodb.entity.BaseDocument;
import com.arangodb.entity.BaseEdgeDocument;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -11,14 +11,15 @@ import java.util.List;
* @author: zhq
* @create: 2022-07-07
**/
public class ArangoDBSink extends RichSinkFunction<List<BaseDocument>> {
public class ArangoDBSink extends RichSinkFunction<List<BaseEdgeDocument>> {
private static ArangoDBConnect arangoDBConnect;
private String collection;
@Override
public void invoke(List<BaseDocument> baseDocuments, Context context) throws Exception {
arangoDBConnect.overwrite(baseDocuments, getCollection());
public void invoke(List<BaseEdgeDocument> BaseEdgeDocuments, Context context) throws Exception {
arangoDBConnect.overwrite(BaseEdgeDocuments, getCollection());
}
@Override
@@ -44,4 +45,5 @@ public class ArangoDBSink extends RichSinkFunction<List<BaseDocument>> {
public void setCollection(String collection) {
this.collection = collection;
}
}

View File

@@ -0,0 +1,335 @@
/*
package com.zdjizhi.utils.ck;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDriver;
import ru.yandex.clickhouse.ClickhouseJdbcUrlParser;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import javax.sql.DataSource;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
*/
/**
* 提供负载均衡能力的datasource实现
*//*
public class BalancedClickhouseDataSource implements DataSource {
private static final Logger log = LoggerFactory.getLogger(BalancedClickhouseDataSource.class);
private static final Pattern URL_TEMPLATE = Pattern.compile("jdbc:clickhouse://([a-zA-Z0-9_:,.-]+)(/[a-zA-Z0-9_]+([?][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+([&][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+)*)?)?");
private PrintWriter printWriter;
private int loginTimeoutSeconds;
//随机数
private final ThreadLocal<Random> randomThreadLocal;
//所有的url
private final List<String> allUrls;
//可用的url
private volatile List<String> enabledUrls;
private final ClickHouseProperties properties;
private final ClickHouseDriver driver;
public BalancedClickhouseDataSource(String url) {
this(splitUrl(url), getFromUrl(url));
}
public BalancedClickhouseDataSource(String url, Properties properties) {
this(splitUrl(url), new ClickHouseProperties(properties));
}
public BalancedClickhouseDataSource(String url, ClickHouseProperties properties) {
this(splitUrl(url), properties.merge(getFromUrlWithoutDefault(url)));
}
private BalancedClickhouseDataSource(List<String> urls) {
this(urls, new ClickHouseProperties());
}
private BalancedClickhouseDataSource(List<String> urls, Properties info) {
this(urls, new ClickHouseProperties(info));
}
private BalancedClickhouseDataSource(List<String> urls, ClickHouseProperties properties) {
this.loginTimeoutSeconds = 0;
this.randomThreadLocal = new ThreadLocal();
this.driver = new ClickHouseDriver();
if (urls.isEmpty()) {
throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty");
} else {
try {
//解析配置文件
ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse((String)urls.get(0), properties.asProperties());
localProperties.setHost((String)null);
localProperties.setPort(-1);
this.properties = localProperties;
} catch (URISyntaxException var8) {
throw new IllegalArgumentException(var8);
}
List<String> allUrls = new ArrayList(urls.size());
Iterator var4 = urls.iterator();
while(var4.hasNext()) {
String url = (String)var4.next();
try {
//如果合法url
if (this.driver.acceptsURL(url)) {
//添加到所有的url列表
allUrls.add(url);
} else {
log.error("that url is has not correct format: {}", url);
}
} catch (SQLException var7) {
throw new IllegalArgumentException("error while checking url: " + url, var7);
}
}
if (allUrls.isEmpty()) {
throw new IllegalArgumentException("there are no correct urls");
} else {
//所有url
this.allUrls = Collections.unmodifiableList(allUrls);
//可用url
this.enabledUrls = this.allUrls;
}
}
}
*/
/**
* 切割url
* @param url
* @return
*//*
static List<String> splitUrl(String url) {
//校验url合法性
Matcher m = URL_TEMPLATE.matcher(url);
if (!m.matches()) {
throw new IllegalArgumentException("Incorrect url");
} else {
String database = m.group(2);
if (database == null) {
database = "";
}
//切割url串
String[] hosts = m.group(1).split(",");
List<String> result = new ArrayList(hosts.length);
String[] var5 = hosts;
int var6 = hosts.length;
//遍历添加切割后的url
for(int var7 = 0; var7 < var6; ++var7) {
String host = var5[var7];
result.add("jdbc:clickhouse://" + host + database);
}
return result;
}
}
*/
/**
* ping url看是否可用
* @param url
* @return
*//*
private boolean ping(String url) {
try {
//执行简单sql测试url链接可用性
this.driver.connect(url, this.properties).createStatement().execute("SELECT 1");
return true;
} catch (Exception var3) {
return false;
}
}
*/
/**
* 遍历所有url通过ping的方式选择出可用的url
* @return
*//*
public synchronized int actualize() {
//新建可用url列表
List<String> enabledUrls = new ArrayList(this.allUrls.size());
Iterator var2 = this.allUrls.iterator();
while(var2.hasNext()) {
String url = (String)var2.next();
log.debug("Pinging disabled url: {}", url);
if (this.ping(url)) {
log.debug("Url is alive now: {}", url);
//ping通的才添加进可用的
enabledUrls.add(url);
} else {
log.debug("Url is dead now: {}", url);
}
}
//重置可用url列表
this.enabledUrls = Collections.unmodifiableList(enabledUrls);
return enabledUrls.size();
}
*/
/**
* 随机获取可用url返回
* @return
* @throws java.sql.SQLException
*//*
private String getAnyUrl() throws SQLException {
//可用url列表
List<String> localEnabledUrls = this.enabledUrls;
if (localEnabledUrls.isEmpty()) {
throw new SQLException("Unable to get connection: there are no enabled urls");
} else {
Random random = (Random)this.randomThreadLocal.get();
if (random == null) {
this.randomThreadLocal.set(new Random());
//产生一个随机数
random = (Random)this.randomThreadLocal.get();
}
int index = random.nextInt(localEnabledUrls.size());
//用随机数选择一个可用的url返回
return (String)localEnabledUrls.get(index);
}
}
public ClickHouseConnection getConnection() throws SQLException {
return this.driver.connect(this.getAnyUrl(), this.properties);
}
public ClickHouseConnection getConnection(String username, String password) throws SQLException {
return this.driver.connect(this.getAnyUrl(), this.properties.withCredentials(username, password));
}
public <T> T unwrap(Class<T> iface) throws SQLException {
if (iface.isAssignableFrom(this.getClass())) {
return iface.cast(this);
} else {
throw new SQLException("Cannot unwrap to " + iface.getName());
}
}
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return iface.isAssignableFrom(this.getClass());
}
public PrintWriter getLogWriter() throws SQLException {
return this.printWriter;
}
public void setLogWriter(PrintWriter printWriter) throws SQLException {
this.printWriter = printWriter;
}
public void setLoginTimeout(int seconds) throws SQLException {
this.loginTimeoutSeconds = seconds;
}
public int getLoginTimeout() throws SQLException {
return this.loginTimeoutSeconds;
}
public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException();
}
*/
/**
* 定期清理无用url链接
* @param rate
* @param timeUnit
* @return
*//*
public BalancedClickhouseDataSource withConnectionsCleaning(int rate, TimeUnit timeUnit) {
this.driver.scheduleConnectionsCleaning(rate, timeUnit);
return this;
}
*/
/**
* 定期确认url通过定时任务实现以定时更新可用url列表
* @param delay
* @param timeUnit
* @return
*//*
public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) {
ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
BalancedClickhouseDataSource.this.actualize();
} catch (Exception var2) {
BalancedClickhouseDataSource.log.error("Unable to actualize urls", var2);
}
}
}, 0L, (long)delay, timeUnit);
return this;
}
public List<String> getAllClickhouseUrls() {
return this.allUrls;
}
public List<String> getEnabledClickHouseUrls() {
return this.enabledUrls;
}
*/
/**
* 返回不可用url集合
* 通过all 和 enable的差值来找
*
* @return
*//*
public List<String> getDisabledUrls() {
List<String> enabledUrls = this.enabledUrls;
if (!this.hasDisabledUrls()) {
return Collections.emptyList();
} else {
List<String> disabledUrls = new ArrayList(this.allUrls);
disabledUrls.removeAll(enabledUrls);
return disabledUrls;
}
}
public boolean hasDisabledUrls() {
return this.allUrls.size() != this.enabledUrls.size();
}
public ClickHouseProperties getProperties() {
return this.properties;
}
private static ClickHouseProperties getFromUrl(String url) {
return new ClickHouseProperties(getFromUrlWithoutDefault(url));
}
private static Properties getFromUrlWithoutDefault(String url) {
if (StringUtils.isBlank(url)) {
return new Properties();
} else {
int index = url.indexOf("?");
return index == -1 ? new Properties() : ClickhouseJdbcUrlParser.parseUriQueryPart(url.substring(index + 1), new Properties());
}
}
}*/

View File

@@ -1,124 +0,0 @@
package com.zdjizhi.utils.ck;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static com.zdjizhi.common.FlowWriteConfig.*;
public class ClickhouseSingleSink extends RichSinkFunction<Map<String, Object>> {
private static final Log log = LogFactory.get();
private Connection connection;
private PreparedStatement preparedStatement;
public String sink;
public ClickhouseSingleSink(String sink) {
this.sink = sink;
}
public String getSink() {
return sink;
}
public void setSink(String sink) {
this.sink = sink;
}
@Override
public void invoke(Map<String, Object> logs, Context context) throws Exception {
executeInsert(logs, getSink());
}
@Override
public void open(Configuration parameters) throws Exception {
try {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
// connection = dataSource.getConnection();
log.info("get clickhouse connection success");
} catch (ClassNotFoundException | SQLException e) {
log.error("clickhouse connection error ,{}", e);
}
}
@Override
public void close() throws Exception {
IoUtil.close(preparedStatement);
IoUtil.close(connection);
}
public void executeInsert(Map<String, Object> data, String tableName) {
try {
List<String> keys = new LinkedList<>(data.keySet());
connection.setAutoCommit(false);
preparedStatement = connection.prepareStatement(preparedSql(keys, tableName));
int count = 0;
List<Object> values = new LinkedList<>(data.values());
for (int i = 1; i <= values.size(); i++) {
Object val = values.get(i - 1);
if (val instanceof Long) {
preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
} else if (val instanceof Integer) {
preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val)));
} else if (val instanceof Boolean) {
preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
} else {
preparedStatement.setString((i), StrUtil.toString(val));
}
}
preparedStatement.addBatch();
count++;
//1w提交一次
if (count % SINK_BATCH == 0) {
preparedStatement.executeBatch();
connection.commit();
preparedStatement.clearBatch();
count = 0;
}
if (count > 0) {
preparedStatement.executeBatch();
connection.commit();
}
} catch (Exception ex) {
log.error("ClickhouseSink插入报错", ex);
}
}
public static String preparedSql(List<String> fields, String tableName) {
String placeholders = fields.stream()
.filter(Objects::nonNull)
.map(f -> "?")
.collect(Collectors.joining(", "));
String columns = fields.stream()
.filter(Objects::nonNull)
.collect(Collectors.joining(", "));
String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName,
"(", columns, ") VALUES (", placeholders, ")");
log.debug(sql);
return sql;
}
}

View File

@@ -6,15 +6,17 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.zdjizhi.common.FlowWriteConfig.*;
@@ -23,21 +25,10 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
private static final Log log = LogFactory.get();
private static Connection connection;
private static PreparedStatement preparedStatement;
private Connection connection;
private PreparedStatement preparedStatement;
public String sink;
static {
try {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
// connection = dataSource.getConnection();
log.info("get clickhouse connection success");
} catch (ClassNotFoundException | SQLException e) {
log.error("clickhouse connection error ,{}", e);
}
}
public ClickhouseSink(String sink) {
this.sink = sink;
@@ -59,16 +50,32 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
@Override
public void open(Configuration parameters) throws Exception {
try {
// Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
ClickHouseProperties properties = new ClickHouseProperties();
properties.setDatabase(CK_DATABASE);
properties.setUser(CK_USERNAME);
properties.setPassword(CK_PIN);
// properties.setKeepAliveTimeout(5);
properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT);
properties.setSocketTimeout(CK_SOCKET_TIMEOUT);
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties);
dataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测
connection = dataSource.getConnection();
log.info("get clickhouse connection success");
} catch (SQLException e) {
log.error("clickhouse connection error ,{}", e);
}
}
@Override
public void close() throws Exception {
if (null != connection) {
connection.close();
}
if (null != preparedStatement) {
preparedStatement.close();
}
IoUtil.close(preparedStatement);
IoUtil.close(connection);
}
public void executeInsert(List<Map<String, Object>> data, String tableName) {
@@ -109,9 +116,6 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
} catch (Exception ex) {
log.error("ClickhouseSink插入报错", ex);
} finally {
IoUtil.close(preparedStatement);
IoUtil.close(connection);
}
}

View File

@@ -7,7 +7,6 @@ import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import static com.zdjizhi.common.FlowWriteConfig.*;
@@ -26,7 +25,7 @@ public class ClickhouseUtil {
public static Connection getConnection() {
try {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
// Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
ClickHouseProperties properties = new ClickHouseProperties();
properties.setDatabase(CK_DATABASE);
@@ -40,7 +39,7 @@ public class ClickhouseUtil {
log.info("get clickhouse connection success");
return connection;
} catch (ClassNotFoundException | SQLException e) {
} catch (SQLException e) {
log.error("clickhouse connection error ,{}", e);
}
return null;

View File

@@ -5,17 +5,11 @@ import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import java.util.*;
import java.util.concurrent.Executor;
/**
@@ -44,40 +38,6 @@ public class JsonParseUtil {
*/
private static ArrayList<String[]> jobList;
static {
propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE);
propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
try {
ConfigService configService = NacosFactory.createConfigService(propNacos);
String dataId = FlowWriteConfig.NACOS_DATA_ID;
String group = FlowWriteConfig.NACOS_GROUP;
String schema = configService.getConfig(dataId, group, 5000);
if (StringUtil.isNotBlank(schema)) {
jsonFieldsMap = getFieldsFromSchema(schema);
jobList = getJobListFromHttp(schema);
}
configService.addListener(dataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
if (StringUtil.isNotBlank(configMsg)) {
clearCache();
jsonFieldsMap = getFieldsFromSchema(configMsg);
jobList = getJobListFromHttp(configMsg);
}
}
});
} catch (NacosException e) {
logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
}
}
/**
* 模式匹配,给定一个类型字符串返回一个类类型
*