diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index cf73cf0..a0b7627 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -80,5 +80,26 @@ ck.pin=galaxy2019 #connection_record_log flink.watermark.max.orderness=10 -#s -log.aggregate.duration=30 \ No newline at end of file +#统计时间间隔 单位s +log.aggregate.duration=30 +log.aggregate.duration.graph=30 + +#arangoDB参数配置 +arangoDB.host=192.168.40.182 +#arangoDB.host=192.168.40.224 +arangoDB.port=8529 +arangoDB.user=upsert +arangoDB.password=ceiec2018 +arangoDB.DB.name=ip-learning-test +#arangoDB.DB.name=tsg_galaxy_v3 +arangoDB.batch=100000 +arangoDB.ttl=3600 + +arangoDB.read.limit= +update.arango.batch=10000 + +thread.pool.number=10 +thread.await.termination.time=10 + +sink.batch.time.out=5 +sink.batch=10000 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index a84ebae..34674f4 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -2,6 +2,7 @@ package com.zdjizhi.common; import com.zdjizhi.utils.system.FlowWriteConfigurations; +import org.apache.flink.configuration.ConfigUtils; import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; /** @@ -108,24 +109,26 @@ public class FlowWriteConfig { /** * common config */ - public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers"); - public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers"); - public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers"); - public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library"); - public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers"); + public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers"); + public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers"); + public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); + public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library"); + public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); /* - * ck - * */ - public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0,"ck.hosts"); - public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0,"ck.username"); - public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0,"ck.pin"); - public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0,"ck.database"); + * ck + * */ + public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0, "ck.hosts"); + public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0, "ck.username"); + public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0, "ck.pin"); + public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0, "ck.database"); - public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0,"flink.watermark.max.orderness"); - public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0,"log.aggregate.duration"); - public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");; + public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.orderness"); + public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration"); + public static final int LOG_AGGREGATE_DURATION_GRAPH = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration.graph"); + public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns"); + ; public static final String SOURCE_KAFKA_TOPIC_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.connection"); public static final String SOURCE_KAFKA_TOPIC_SKETCH = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.sketch"); @@ -135,4 +138,20 @@ public class FlowWriteConfig { public static final String SINK_CK_TABLE_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.dns"); public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection"); public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns"); + + + public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host"); + public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port"); + public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user"); + public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangoDB.password"); + public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name"); + public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl"); + public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch"); + + public static final Integer UPDATE_ARANGO_BATCH = FlowWriteConfigurations.getIntProperty(0, "update.arango.batch"); + public static final String ARANGODB_READ_LIMIT = FlowWriteConfigurations.getStringProperty(0, "arangoDB.read.limit"); + public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number"); + public static final Integer THREAD_AWAIT_TERMINATION_TIME = FlowWriteConfigurations.getIntProperty(0, "thread.await.termination.time"); + public static final Integer SINK_BATCH_TIME_OUT = FlowWriteConfigurations.getIntProperty(0, "sink.batch.time.out"); + public static final Integer SINK_BATCH = FlowWriteConfigurations.getIntProperty(0, "sink.batch"); } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/IpKeysSelector.java b/src/main/java/com/zdjizhi/common/IpKeysSelector.java new file mode 100644 index 0000000..9528a1b --- /dev/null +++ b/src/main/java/com/zdjizhi/common/IpKeysSelector.java @@ -0,0 +1,23 @@ +package com.zdjizhi.common; + + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.Map; + +/** + * @description: + * @author: zhq + * @create: 2022-07-05 + **/ +public class IpKeysSelector implements KeySelector, Tuple2> { + + @Override + public Tuple2 getKey(Map log) throws Exception { + + return Tuple2.of( + String.valueOf(log.get("src_ip")), + String.valueOf(log.get("dst_ip"))); + } +} diff --git a/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java b/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java new file mode 100644 index 0000000..46d308d --- /dev/null +++ b/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java @@ -0,0 +1,65 @@ +package com.zdjizhi.common; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeSet; + +public class TopMetricProcessV2 extends ProcessFunction, Collector>> { + + + private ValueState currentTimer; + private ListState> itemState; + + @Override + public void open(Configuration parameters) throws Exception { + currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>("_timer", Types.LONG)); + ListStateDescriptor> itemViewStateDesc = new ListStateDescriptor("_state", Map.class); + itemState = getRuntimeContext().getListState(itemViewStateDesc); + } + + @Override + public void processElement(Map value, Context context, Collector>> collector) throws Exception { + //判断定时器是否为空,为空则创建新的定时器 + Long curTimeStamp = currentTimer.value(); + if (curTimeStamp == null || curTimeStamp == 0) { + long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000; + context.timerService().registerEventTimeTimer(onTimer); + currentTimer.update(onTimer); + } + itemState.add(value); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector>> out) throws Exception { + super.onTimer(timestamp, ctx, out); + + Iterator> iterator = itemState.get().iterator(); + if(iterator.hasNext()){ + out.collect((Collector>) iterator.next()); + } +// if (baseLogs.size() > FlowWriteConfig.SINK_BATCH) { +// Map last = baseLogs.last(); +// if (Double.compare(map.get(orderBy).doubleValue(), last.get(orderBy).doubleValue()) > 0) { +// baseLogs.pollLast(); +// baseLogs.add(map); +// } +// } else { +// baseLogs.add(map); +// } +// } + currentTimer.clear(); + itemState.clear(); + + + } +} diff --git a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java index fa2b5bb..6ed9eef 100644 --- a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java @@ -3,7 +3,6 @@ package com.zdjizhi.etl; import cn.hutool.core.convert.Convert; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; diff --git a/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java index db69974..5e852e0 100644 --- a/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java @@ -5,8 +5,6 @@ import cn.hutool.core.util.StrUtil; import com.zdjizhi.enums.DnsType; import com.zdjizhi.pojo.DbLogEntity; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java index 46d0814..c9bc596 100644 --- a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java @@ -9,7 +9,6 @@ import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java new file mode 100644 index 0000000..0181b2d --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java @@ -0,0 +1,45 @@ +package com.zdjizhi.etl; + +import cn.hutool.core.convert.Convert; +import cn.hutool.core.date.DateUtil; +import com.arangodb.entity.BaseDocument; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + + +/** + * 对ip去重 + */ +public class Ip2IpGraphProcessFunction extends ProcessWindowFunction, BaseDocument, Tuple2, TimeWindow> { + + private static final Logger logger = LoggerFactory.getLogger(Ip2IpGraphProcessFunction.class); + + @Override + public void process(Tuple2 keys, Context context, Iterable> elements, Collector out) { + + try { + long lastFoundTime = DateUtil.currentSeconds(); + for (Map log : elements) { + long connStartTimetime = Convert.toLong(log.get("conn_start_time")); + lastFoundTime = connStartTimetime > lastFoundTime ? connStartTimetime : lastFoundTime; + } + BaseDocument baseDocument = new BaseDocument(); + baseDocument.setKey(String.join("-", keys.f0, keys.f1)); + baseDocument.addAttribute("src_ip", keys.f0); + baseDocument.addAttribute("dst_ip", keys.f1); + baseDocument.addAttribute("last_found_time", lastFoundTime); + out.collect(baseDocument); + logger.debug("获取中间聚合结果:{}", baseDocument.toString()); + + } catch (Exception e) { + logger.error("获取中间聚合结果失败,middleResult: {}", e); + } + } + +} diff --git a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java index 98a2fe5..09317fe 100644 --- a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java @@ -1,10 +1,8 @@ package com.zdjizhi.etl; import cn.hutool.core.convert.Convert; -import com.zdjizhi.enums.LogMetadata; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index cfbc18b..b372f9e 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -3,16 +3,20 @@ package com.zdjizhi.topology; import cn.hutool.core.convert.Convert; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.arangodb.entity.BaseDocument; import com.zdjizhi.common.DnsKeysSelector; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.KeysSelector; +import com.zdjizhi.common.IpKeysSelector; import com.zdjizhi.etl.ConnProcessFunction; +import com.zdjizhi.etl.Ip2IpGraphProcessFunction; import com.zdjizhi.etl.DnsProcessFunction; import com.zdjizhi.etl.SketchProcessFunction; +import com.zdjizhi.utils.arangodb.ArangoDBSink; import com.zdjizhi.utils.ck.ClickhouseSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; @@ -33,17 +37,17 @@ public class LogFlowWriteTopology { //两个输出之间的最大时间 (单位milliseconds) env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); - //1connection,2dns + //1 connection,2 dns if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { //connection DataStream> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION)) - .filter(x -> Objects.nonNull(x)) + .filter(Objects::nonNull) .setParallelism(SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_CONNECTION); //sketch DataStream> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH)) - .filter(x -> Objects.nonNull(x)) + .filter(Objects::nonNull) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_SKETCH); @@ -52,7 +56,7 @@ public class LogFlowWriteTopology { .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) - .keyBy(new KeysSelector()) + .keyBy(new IpKeysSelector()) .window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) .filter(x -> Objects.nonNull(x)) @@ -61,22 +65,34 @@ public class LogFlowWriteTopology { DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) - .keyBy(new KeysSelector()) + .keyBy(new IpKeysSelector()) .window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) .process(new SketchProcessFunction()) .filter(x -> Objects.nonNull(x)) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - //写入CKsink + //入Arangodb + SingleOutputStreamOperator ip2ipGraph = connTransformStream.union(sketchTransformStream) + .keyBy(new IpKeysSelector()) + .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .process(new Ip2IpGraphProcessFunction()) + .filter(x -> Objects.nonNull(x)) + .setParallelism(TRANSFORM_PARALLELISM); + + //写入CKsink,批量处理 connSource.addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); sketchSource.addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); connTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); sketchTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); +// ip2ipGraph.addSink(new ArangoDBSink("R_VISIT_IP2IP")); + + + } else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) { DataStream> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS)) - .filter(x -> Objects.nonNull(x)) + .filter(Objects::nonNull) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS); @@ -90,14 +106,14 @@ public class LogFlowWriteTopology { .setParallelism(TRANSFORM_PARALLELISM); //过滤空数据不发送到Kafka内 - dnsSource.filter(x -> Objects.nonNull(x)) + dnsSource.filter(Objects::nonNull) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .name("FilterOriginalData") .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .name("CKSink"); - dnsTransform.filter(x -> Objects.nonNull(x)) + dnsTransform.filter(Objects::nonNull) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .name("FilterOriginalData") .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java new file mode 100644 index 0000000..d2306e1 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java @@ -0,0 +1,95 @@ +package com.zdjizhi.utils.arangodb; + +import com.arangodb.ArangoCollection; +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoDB; +import com.arangodb.ArangoDatabase; +import com.arangodb.entity.DocumentCreateEntity; +import com.arangodb.entity.ErrorEntity; +import com.arangodb.entity.MultiDocumentEntity; +import com.arangodb.model.AqlQueryOptions; +import com.arangodb.model.DocumentCreateOptions; +import com.arangodb.util.MapBuilder; +import com.zdjizhi.common.FlowWriteConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class ArangoDBConnect { + private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class); + private static ArangoDB arangoDB = null; + private static ArangoDBConnect conn = null; + static { + getArangoDB(); + } + + private static void getArangoDB(){ + arangoDB = new ArangoDB.Builder() + .maxConnections(FlowWriteConfig.THREAD_POOL_NUMBER) + .host(FlowWriteConfig.ARANGODB_HOST, FlowWriteConfig.ARANGODB_PORT) + .user(FlowWriteConfig.ARANGODB_USER) + .password(FlowWriteConfig.ARANGODB_PASSWORD) + .build(); + } + + public static synchronized ArangoDBConnect getInstance(){ + if (null == conn){ + conn = new ArangoDBConnect(); + } + return conn; + } + + private ArangoDatabase getDatabase(){ + return arangoDB.db(FlowWriteConfig.ARANGODB_DB_NAME); + } + + public void clean(){ + try { + if (arangoDB != null){ + arangoDB.shutdown(); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + } + + public ArangoCursor executorQuery(String query,Class type){ + ArangoDatabase database = getDatabase(); + Map bindVars = new MapBuilder().get(); + AqlQueryOptions options = new AqlQueryOptions() + .ttl(FlowWriteConfig.ARANGODB_TTL); + try { + return database.query(query, bindVars, options, type); + }catch (Exception e){ + LOG.error(e.getMessage()); + return null; + }finally { + bindVars.clear(); + } + } + + public void overwrite(List docOverwrite, String collectionName){ + ArangoDatabase database = getDatabase(); + try { + ArangoCollection collection = database.collection(collectionName); + if (!docOverwrite.isEmpty()){ + DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions(); + documentCreateOptions.overwrite(true); + documentCreateOptions.silent(true); + MultiDocumentEntity> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); + Collection errors = documentCreateEntityMultiDocumentEntity.getErrors(); + for (ErrorEntity errorEntity:errors){ + LOG.error("写入arangoDB异常:"+errorEntity.getErrorMessage()); + } + } + }catch (Exception e){ + LOG.error("更新失败:"+e.toString()); + }finally { + docOverwrite.clear(); + } + } + +} diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java new file mode 100644 index 0000000..a14ce16 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java @@ -0,0 +1,47 @@ +package com.zdjizhi.utils.arangodb; + +import com.arangodb.entity.BaseDocument; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import java.util.List; + +/** + * @description: + * @author: zhq + * @create: 2022-07-07 + **/ +public class ArangoDBSink extends RichSinkFunction> { + + private static ArangoDBConnect arangoDBConnect; + private String collection; + + @Override + public void invoke(List baseDocuments, Context context) throws Exception { + arangoDBConnect.overwrite(baseDocuments, getCollection()); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + arangoDBConnect = ArangoDBConnect.getInstance(); + } + + @Override + public void close() throws Exception { + super.close(); + arangoDBConnect.clean(); + } + + public ArangoDBSink(String collection) { + this.collection = collection; + } + + public String getCollection() { + return collection; + } + + public void setCollection(String collection) { + this.collection = collection; + } +} diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSink.java b/src/main/java/com/zdjizhi/utils/ck/CKSink.java new file mode 100644 index 0000000..99a579a --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/CKSink.java @@ -0,0 +1,165 @@ +package com.zdjizhi.utils.ck; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import ru.yandex.clickhouse.ClickHouseConnection; +import ru.yandex.clickhouse.ClickHouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; + +import java.sql.PreparedStatement; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class CKSink extends RichSinkFunction> { + + private static final Log log = LogFactory.get(); + + private static int count = 1; + private static ClickHouseConnection connection = null; + private static PreparedStatement preparedStatement = null; + + static String database = "default"; + static String address = "jdbc:clickhouse://192.168.45.102:8123/"+database; + static String username = "default"; + static String password = "galaxy2019"; + static String fieldStr = "id,name,age"; + static String tableName = "user_table"; + + private String insertSql; + + //创建连接对象和会话 + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + try { + connection = getConn(); + log.info("get clickhouse connection success !"); + String insertSql = preparedSql(fieldStr, tableName); + connection.setAutoCommit(false); + preparedStatement = connection.prepareStatement(insertSql); + } catch (Exception e) { + log.error("clickhouse初始化连接报错:", e); + } + } + +// @Override +// public void close() throws Exception { +// super.close(); +// //关闭连接和释放资源 +// if (connection != null) { +// connection.close(); +// } +// if (preparedStatement != null) { +// preparedStatement.close(); +// } +// } + + //使用Batch批量写入,关闭自动提交 + @Override + public void invoke(Map data, Context context) { + log.info(" invoke methed "); + + try { + + LinkedList values = new LinkedList<>(data.values()); + for (int i = 1; i <= values.size(); i++) { + Object val = values.get(i - 1); + if (val instanceof Long) { + preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); + } else if (val instanceof Integer) { + preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); + } else if (val instanceof Boolean) { + preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val))); + } else { + preparedStatement.setString((i), StrUtil.toString(val)); + } + } + + preparedStatement.addBatch(); + count = count + 1; + try { +// if (count >= 50000) { +// preparedStatement.executeBatch(); +// connection.commit(); +// preparedStatement.clearBatch(); +// count = 1; +// } + + //1w提交一次 +// if (count % 10000 == 0) { +// preparedStatement.executeBatch(); +// connection.commit(); +// preparedStatement.clearBatch(); +// } + preparedStatement.executeBatch(); + connection.commit(); + + } catch (Exception ee) { + log.error("数据插入click house 报错:", ee); + } + } catch (Exception ex) { + log.error("ClickhouseSink插入报错====", ex); + } + } + + public static ClickHouseConnection getConn() { + + int socketTimeout = 600000; + ClickHouseProperties properties = new ClickHouseProperties(); + properties.setUser(username); + properties.setPassword(password); + properties.setDatabase(database); + properties.setSocketTimeout(socketTimeout); + ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties); + ClickHouseConnection conn = null; + try { + conn = clickHouseDataSource.getConnection(); + return conn; + } catch (Exception e) { + log.error(e.getMessage()); + e.printStackTrace(); + } + return null; + } + + public static Map getField() { + + return null; + } + + + public String preparedSql(String fieldStr, String tableName) { + List fields = StrUtil.split(fieldStr, ","); + return getInsertSql(fields, tableName); + } + + public String getInsertSql(List fileds, String tableName) { + String sql = ""; + String sqlStr1 = "INSERT INTO `" + database + "`." + tableName + " ("; + String sqlStr2 = ") VALUES ("; + String sqlStr3 = ")"; + String sqlKey = ""; + String sqlValue = ""; + for (String key : fileds) { + sqlKey += key + ","; + sqlValue += "?,"; + } + sqlKey = sqlKey.substring(0, sqlKey.length() - 1); + sqlValue = sqlValue.substring(0, sqlValue.length() - 1); + sql = StrUtil.concat(true, sqlStr1, sqlKey, sqlStr2, sqlValue, sqlStr3); + +// String placeholders = Arrays.stream(fieldNames) +// .map(f -> "?") +// .collect(Collectors.joining(", ")); +// return "INSERT INTO " + quoteIdentifier(tableName) + +// "(" + columns + ")" + " VALUES (" + placeholders + ")"; + + + log.info(sql); + return sql; + } +} diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java b/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java index e4d7a8c..55c99c4 100644 --- a/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java +++ b/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java @@ -6,21 +6,14 @@ import cn.hutool.log.LogFactory; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.ClickHouseDataSource; import ru.yandex.clickhouse.settings.ClickHouseProperties; import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; - -import static ru.yandex.clickhouse.ClickHouseUtil.quoteIdentifier; public class CKSinkFlatMap extends RichFlatMapFunction, String> { diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java b/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java new file mode 100644 index 0000000..963eef1 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java @@ -0,0 +1,41 @@ +/* +package com.zdjizhi.utils.ck; + +import java.util.Optional; + +*/ +/** + * clickhouse方言 + *//* + +public class ClickHouseJDBCDialect implements JDBCDialect { + + private static final long serialVersionUID = 1L; + + @Override + public boolean canHandle(String url) { + return url.startsWith("jdbc:clickhouse:"); + } + + @Override + public Optional defaultDriverName() { + return Optional.of("ru.yandex.clickhouse.ClickHouseDriver"); + } + + @Override + public String quoteIdentifier(String identifier) { + return "`" + identifier + "`"; + } + + @Override + public Optional getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { + return Optional.of(getInsertIntoStatement(tableName, fieldNames)); + } + + @Override + public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) { + return null; + } + +} +*/ diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index 0407544..80a1d0c 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -27,15 +27,7 @@ public class ClickhouseSink extends RichSinkFunction> { public String sink; static { - try { - Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); - connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); -// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props); -// connection = dataSource.getConnection(); - log.info("get clickhouse connection success"); - } catch (ClassNotFoundException | SQLException e) { - log.error("clickhouse connection error ,{}", e); - } + } public ClickhouseSink(String sink) { @@ -57,7 +49,15 @@ public class ClickhouseSink extends RichSinkFunction> { @Override public void open(Configuration parameters) throws Exception { - + try { + Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); + connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); +// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props); +// connection = dataSource.getConnection(); + log.info("get clickhouse connection success"); + } catch (ClassNotFoundException | SQLException e) { + log.error("clickhouse connection error ,{}", e); + } } @Override diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java new file mode 100644 index 0000000..5fc1894 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java @@ -0,0 +1,56 @@ +/* +package com.zdjizhi.utils.ck; + +import org.apache.flink.api.java.utils.ParameterTool; +import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings; +import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +*/ +/** + * @description: + * @author: zhq + * @create: 2022-06-29 + **//* + +public class ClickhouseUtil { + + + public static ParameterTool getGlobalPro() { + Map sinkPro = new HashMap<>(); + //sink Properties + sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "sc.chproxy.bigdata.services.org:10000"); + + // ClickHouse 本地写账号 + sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_USER, "default"); + sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, "galaxy2019"); + // sink common + sinkPro.put(ClickHouseSinkConst.TIMEOUT_SEC, "10"); + sinkPro.put(ClickHouseSinkConst.NUM_WRITERS, "10"); + sinkPro.put(ClickHouseSinkConst.NUM_RETRIES, "3"); + sinkPro.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "1000000"); + sinkPro.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false"); + sinkPro.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");//本地运行会在项目内生成名字为"d:"的文件夹,以存放运行失败明细记录 + + // env - sinkPro + ParameterTool parameters = ParameterTool.fromMap(sinkPro); + + + return parameters; + + } + + public static Properties getCKPro() { + // ClickHouseSink - sinkPro + Properties props = new Properties(); + props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "database_1564.ch_zjk_test_local"); + props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000"); + return props; + } + + +} +*/ diff --git a/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java b/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java new file mode 100644 index 0000000..9cd448a --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java @@ -0,0 +1,39 @@ +/* +package com.zdjizhi.utils.ck; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Collectors; + +import static ru.yandex.clickhouse.ClickHouseUtil.quoteIdentifier; + +*/ +/** + * Handle the SQL dialect of jdbc driver. + *//* + +public interface JDBCDialect extends Serializable { + default Optional getUpsertStatement( + String tableName, String[] fieldNames, String[] uniqueKeyFields) { + return Optional.empty(); + } + default String getInsertIntoStatement(String tableName, String[] fieldNames) { + String columns = Arrays.stream(fieldNames) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String placeholders = Arrays.stream(fieldNames) + .map(f -> "?") + .collect(Collectors.joining(", ")); + return "INSERT INTO " + quoteIdentifier(tableName) + + "(" + columns + ")" + " VALUES (" + placeholders + ")"; + } + + default String getDeleteStatement(String tableName, String[] conditionFields) { + String conditionClause = Arrays.stream(conditionFields) + .map(f -> quoteIdentifier(f) + "=?") + .collect(Collectors.joining(" AND ")); + return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause; + } +} +*/ diff --git a/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java b/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java new file mode 100644 index 0000000..9c951e4 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java @@ -0,0 +1,14 @@ +/* +package com.zdjizhi.utils.ck; + +import java.util.Arrays; +import java.util.List; + +public final class JDBCDialects { + + private static final List DIALECTS = Arrays.asList( +// new DerbyDialect(), +// new MySQLDialect(), +// new PostgresDialect() + ); +}*/ diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 4b8c8f0..97a53da 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -1,17 +1,8 @@ package com.zdjizhi.utils.kafka; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.pojo.DbLogEntity; import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; -import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; - import java.util.Map; import java.util.Properties; diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java index 7b18ab5..8f3fccf 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java +++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java @@ -3,8 +3,6 @@ package com.zdjizhi.utils.kafka; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.enums.LogMetadata; -import com.zdjizhi.pojo.DbLogEntity; import com.zdjizhi.utils.JsonMapper; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java index d793628..7275e06 100644 --- a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java +++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java @@ -1,14 +1,8 @@ package com.zdjizhi.utils.system; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.exception.NacosException; -import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.StringUtil; import java.io.IOException; -import java.io.StringReader; import java.util.Locale; import java.util.Properties;