From 88ffda19a25fa2b39e70bc9f1b5910c22b1724ef Mon Sep 17 00:00:00 2001 From: qidaijie Date: Fri, 23 Apr 2021 18:06:37 +0800 Subject: [PATCH] =?UTF-8?q?1=EF=BC=9A=E5=A2=9E=E5=8A=A0app=5Fmatch?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E3=80=82=202=EF=BC=9A=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E5=A2=9E=E5=8A=A0=20appId=20=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2URL=E3=80=82=203=EF=BC=9A=E6=95=B4=E7=90=86=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=96=87=E4=BB=B6=E4=BB=A5=E5=8F=8Areadme=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 34 ++++- pom.xml | 64 ++++----- properties/kafka_config.properties | 2 +- properties/service_flow_config.properties | 47 ++++--- .../com/zdjizhi/common/FlowWriteConfig.java | 5 +- .../zdjizhi/spout/CustomizedKafkaSpout.java | 1 + .../java/com/zdjizhi/utils/app/AppUtils.java | 125 ++++++++++++++++++ .../zdjizhi/utils/general/TransFormUtils.java | 5 + .../zdjizhi/utils/general/TransFunction.java | 16 +++ 9 files changed, 233 insertions(+), 66 deletions(-) create mode 100644 src/main/java/com/zdjizhi/utils/app/AppUtils.java diff --git a/README.md b/README.md index 4989a69..5bd02be 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,33 @@ -# log-stream-completion +# log-stream-completion-schema -日志补全程序,接收不包含COMPLETED的topic补全后写回包含COMPLETED的topic中 +基于查询网关的动态日志预处理程序,接收原始日志根据对应schema定义进行数据清洗 +并将结果回写Kafka。 + + +## 函数功能列表 +* current_timestamp +> 获取当前时间戳,若追加字段已有时间戳,不予以覆盖 +* snowflake_id +> 雪花ID函数,返回一个一定条件内不重复的 long 类型数值 +* geo_ip_detail +> IP定位库,获取对应IP的详细地理位置信息,城市,州/省,国家 +* geo_asn +> ASN定位库,获取对应IP的ASN信息 +* geo_ip_country +> IP定位库,获取对应IP的地理位置信息,仅包含 国家 +* set_value +> 给予字段固定值 +* get_value +> 获取字段值并追加到新的字段 +* if +> IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 +* sub_domain +> 获取顶级域名 +* radius_match +> 根据IP解析对应raidus用户,借助于HBase存储数据。 +* app_match +> 根据APP_ID获取对应的APP名称 +* decode_of_base64 +> 根据编码解码base64,若编码字段为空则根据默认编码解析(UTF-8) +* flattenSpec +> 根据表达式解析json diff --git a/pom.xml b/pom.xml index 2fff176..dc8725c 100644 --- a/pom.xml +++ b/pom.xml @@ -2,9 +2,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - cn.ac.iie + com.zdjizhi log-stream-completion-schema - v3.21.03.25-eal4 + v3.21.04.23-appId jar log-stream-completion-schema @@ -18,19 +18,6 @@ http://192.168.40.125:8099/content/groups/public - - maven-ali - http://maven.aliyun.com/nexus/content/groups/public/ - - true - - - true - always - fail - - - @@ -41,6 +28,10 @@ org.apache.maven.plugins maven-shade-plugin 2.4.2 + + false + + package @@ -76,7 +67,7 @@ strip-jar - package + install @@ -137,7 +128,7 @@ org.apache.storm storm-core ${storm.version} - provided + slf4j-log4j12 @@ -232,21 +223,21 @@ - - org.apache.hbase - hbase-server - ${hbase.version} - - - slf4j-log4j12 - org.slf4j - - - log4j-over-slf4j - org.slf4j - - - + + + + + + + + + + + + + + + @@ -336,14 +327,5 @@ 5.5.2 - - - - - - - - - diff --git a/properties/kafka_config.properties b/properties/kafka_config.properties index 10ddd4f..9cfa949 100644 --- a/properties/kafka_config.properties +++ b/properties/kafka_config.properties @@ -14,4 +14,4 @@ batch.size=262144 buffer.memory=67108864 #ÿη͸KafkaС,Ĭ1048576 -max.request.size=5242880 +max.request.size=5242880 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 42106bf..edcb30e 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -12,6 +12,7 @@ zookeeper.servers=192.168.44.12:2181 #hbase zookeeper地址 用于连接HBase hbase.zookeeper.servers=192.168.44.12:2181 +#--------------------------------HTTP/定位库------------------------------# #定位库地址 ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\ #ip.library=/home/bigdata/topology/dat/ @@ -19,20 +20,22 @@ ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\ #网关的schema位置 schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/security_event_log -#kafka broker下的topic名称 -#kafka.topic=CONNECTION-RECORD-LOG +#网关APP_ID 获取接口 +app.id.http=http://192.168.44.67:9999/open-api/appDicList + +#--------------------------------Kafka消费组信息------------------------------# + +#kafka 接收数据topic kafka.topic=test +#补全数据 输出 topic +results.output.topic=test-result + #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=connection-record-log-20200818-1-test -#接收自kafka的消费者 client-id -consumer.client.id=consumer-connection-record -#回写给kafka的生产者 client-id -producer.client.id=producer-connection-record - #生产者压缩模式 none or snappy -producer.kafka.compression.type=snappy +producer.kafka.compression.type=none #生产者ack producer.ack=1 @@ -40,9 +43,11 @@ producer.ack=1 #latest/earliest 从当前消 or 从头消费 auto.offset.reset=latest -#输出topic -#results.output.topic=CONNECTION-RECORD-COMPLETED-LOG -results.output.topic=test-result +#接收自kafka的消费者 client-id +consumer.client.id=consumer-connection-record + +#回写给kafka的生产者 client-id +producer.client.id=producer-connection-record #--------------------------------topology配置------------------------------# @@ -58,21 +63,14 @@ completion.bolt.parallelism=6 #写入kafka的并行度10 kafka.bolt.parallelism=6 -#ack设置 1启动ack 0不启动ack -topology.num.acks=0 - -#kafka批量条数 -batch.insert.num=2000 - #数据中心(UID) data.center.id.num=15 -#tick时钟频率 -topology.tick.tuple.freq.secs=5 - #hbase 更新时间 hbase.tick.tuple.freq.secs=60 +#app_id 更新时间 +app.tick.tuple.freq.secs=60 #--------------------------------默认值配置------------------------------# @@ -82,6 +80,15 @@ topology.config.max.spout.pending=150000 #hbase table name hbase.table.name=subscriber_info +#ack设置 1启动ack 0不启动ack +topology.num.acks=0 + +#kafka批量条数 +batch.insert.num=2000 + +#tick时钟频率 +topology.tick.tuple.freq.secs=5 + #spout接收睡眠时间 topology.spout.sleep.time=1 diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 0502aef..36f2ea4 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -23,6 +23,7 @@ public class FlowWriteConfig { public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism"); public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs"); public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); + public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs"); public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending"); public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks"); public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time"); @@ -54,11 +55,11 @@ public class FlowWriteConfig { public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); public static final String CONSUMER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "consumer.client.id"); public static final String PRODUCER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "producer.client.id"); - - /** * http */ public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http"); + public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(0, "app.id.http"); + } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java index ecc6932..150e02c 100644 --- a/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java +++ b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java @@ -4,6 +4,7 @@ import cn.hutool.core.thread.ThreadUtil; import com.zdjizhi.common.FlowWriteConfig; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.common.KafkaProConfig; import com.zdjizhi.utils.exception.StreamCompletionException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java new file mode 100644 index 0000000..08890a6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java @@ -0,0 +1,125 @@ +package com.zdjizhi.utils.app; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.hbase.HBaseUtils; +import com.zdjizhi.utils.http.HttpClientUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * AppId 工具类 + * + * @author qidaijie + */ + +public class AppUtils { + private static final Log logger = LogFactory.get(); + private static Map appIdMap = new ConcurrentHashMap<>(128); + private static AppUtils appUtils; + + private static void getAppInstance() { + appUtils = new AppUtils(); + } + + + /** + * 构造函数-新 + */ + private AppUtils() { + //定时更新 + updateHabaseCache(); + } + + /** + * 更新变量 + */ + private static void change() { + if (appUtils == null) { + getAppInstance(); + } + timestampsFilter(); + } + + + /** + * 获取变更内容 + */ + private static void timestampsFilter() { + try { + Long begin = System.currentTimeMillis(); + String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP); + if (StringUtil.isNotBlank(schema)) { + String data = JSONObject.parseObject(schema).getString("data"); + JSONArray objects = JSONArray.parseArray(data); + for (Object object : objects) { + JSONArray jsonArray = JSONArray.parseArray(object.toString()); + int key = jsonArray.getInteger(0); + String value = jsonArray.getString(1); + if (appIdMap.containsKey(key)) { + if (!value.equals(appIdMap.get(key))) { + appIdMap.put(key, value); + } + } else { + appIdMap.put(key, value); + } + } + logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis())); + logger.warn("Pull the length of the interface data:[" + objects.size() + "]"); + } + } catch (RuntimeException e) { + logger.error("Update cache app-id failed, exception:" + e); + } + } + + + /** + * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie + */ + private void updateHabaseCache() { + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + change(); + } catch (RuntimeException e) { + logger.error("AppUtils update AppCache is error===>{" + e + "}<==="); + } + } + }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); + } + + + /** + * 获取 appName + * + * @param appId app_id + * @return account + */ + public static String getAppName(int appId) { + + if (appUtils == null) { + getAppInstance(); + } + return appIdMap.get(appId); + + } + +} diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java index c2911f5..a5e26fb 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java @@ -149,6 +149,11 @@ public class TransFormUtils { JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString())); } break; + case "app_match": + if ((int) name != 0 && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(Integer.parseInt(name.toString()))); + } + break; case "decode_of_base64": if (name != null) { JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param))); diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java index 6eb839f..9d8a355 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -2,6 +2,7 @@ package com.zdjizhi.utils.general; import cn.hutool.core.codec.Base64; import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.app.AppUtils; import com.zdjizhi.utils.hbase.HBaseUtils; import com.zdjizhi.utils.json.JsonParseUtil; import cn.hutool.log.Log; @@ -12,6 +13,7 @@ import com.zdjizhi.utils.Encodes; import com.zdjizhi.utils.FormatUtils; import com.zdjizhi.utils.IpLookup; import com.zdjizhi.utils.StringUtil; + import java.util.ArrayList; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -80,6 +82,20 @@ class TransFunction { return account; } + /** + * appId与缓存中对应关系补全appName + * + * @param appId id + * @return appName + */ + static String appMatch(int appId) { + String appName = AppUtils.getAppName(appId); + if (StringUtil.isBlank(appName)) { + logger.warn("AppMap get appName is null, ID is :{}", appId); + } + return appName; + } + /** * 解析顶级域名 *