diff --git a/README.md b/README.md
index 4989a69..5bd02be 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,33 @@
-# log-stream-completion
+# log-stream-completion-schema
-日志补全程序,接收不包含COMPLETED的topic补全后写回包含COMPLETED的topic中
+基于查询网关的动态日志预处理程序,接收原始日志根据对应schema定义进行数据清洗
+并将结果回写Kafka。
+
+
+## 函数功能列表
+* current_timestamp
+> 获取当前时间戳,若追加字段已有时间戳,不予以覆盖
+* snowflake_id
+> 雪花ID函数,返回一个一定条件内不重复的 long 类型数值
+* geo_ip_detail
+> IP定位库,获取对应IP的详细地理位置信息,城市,州/省,国家
+* geo_asn
+> ASN定位库,获取对应IP的ASN信息
+* geo_ip_country
+> IP定位库,获取对应IP的地理位置信息,仅包含 国家
+* set_value
+> 给予字段固定值
+* get_value
+> 获取字段值并追加到新的字段
+* if
+> IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
+* sub_domain
+> 获取顶级域名
+* radius_match
+> 根据IP解析对应raidus用户,借助于HBase存储数据。
+* app_match
+> 根据APP_ID获取对应的APP名称
+* decode_of_base64
+> 根据编码解码base64,若编码字段为空则根据默认编码解析(UTF-8)
+* flattenSpec
+> 根据表达式解析json
diff --git a/pom.xml b/pom.xml
index 2fff176..dc8725c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,9 +2,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- cn.ac.iie
+ com.zdjizhi
log-stream-completion-schema
- v3.21.03.25-eal4
+ v3.21.04.23-appId
jar
log-stream-completion-schema
@@ -18,19 +18,6 @@
http://192.168.40.125:8099/content/groups/public
-
- maven-ali
- http://maven.aliyun.com/nexus/content/groups/public/
-
- true
-
-
- true
- always
- fail
-
-
-
@@ -41,6 +28,10 @@
org.apache.maven.plugins
maven-shade-plugin
2.4.2
+
+ false
+
+
package
@@ -76,7 +67,7 @@
strip-jar
- package
+ install
@@ -137,7 +128,7 @@
org.apache.storm
storm-core
${storm.version}
- provided
+
slf4j-log4j12
@@ -232,21 +223,21 @@
-
- org.apache.hbase
- hbase-server
- ${hbase.version}
-
-
- slf4j-log4j12
- org.slf4j
-
-
- log4j-over-slf4j
- org.slf4j
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -336,14 +327,5 @@
5.5.2
-
-
-
-
-
-
-
-
-
diff --git a/properties/kafka_config.properties b/properties/kafka_config.properties
index 10ddd4f..9cfa949 100644
--- a/properties/kafka_config.properties
+++ b/properties/kafka_config.properties
@@ -14,4 +14,4 @@ batch.size=262144
buffer.memory=67108864
#ÿηKafkaС,Ĭ1048576
-max.request.size=5242880
+max.request.size=5242880
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 42106bf..edcb30e 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -12,6 +12,7 @@ zookeeper.servers=192.168.44.12:2181
#hbase zookeeper地址 用于连接HBase
hbase.zookeeper.servers=192.168.44.12:2181
+#--------------------------------HTTP/定位库------------------------------#
#定位库地址
ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\
#ip.library=/home/bigdata/topology/dat/
@@ -19,20 +20,22 @@ ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\
#网关的schema位置
schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/security_event_log
-#kafka broker下的topic名称
-#kafka.topic=CONNECTION-RECORD-LOG
+#网关APP_ID 获取接口
+app.id.http=http://192.168.44.67:9999/open-api/appDicList
+
+#--------------------------------Kafka消费组信息------------------------------#
+
+#kafka 接收数据topic
kafka.topic=test
+#补全数据 输出 topic
+results.output.topic=test-result
+
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
group.id=connection-record-log-20200818-1-test
-#接收自kafka的消费者 client-id
-consumer.client.id=consumer-connection-record
-#回写给kafka的生产者 client-id
-producer.client.id=producer-connection-record
-
#生产者压缩模式 none or snappy
-producer.kafka.compression.type=snappy
+producer.kafka.compression.type=none
#生产者ack
producer.ack=1
@@ -40,9 +43,11 @@ producer.ack=1
#latest/earliest 从当前消 or 从头消费
auto.offset.reset=latest
-#输出topic
-#results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
-results.output.topic=test-result
+#接收自kafka的消费者 client-id
+consumer.client.id=consumer-connection-record
+
+#回写给kafka的生产者 client-id
+producer.client.id=producer-connection-record
#--------------------------------topology配置------------------------------#
@@ -58,21 +63,14 @@ completion.bolt.parallelism=6
#写入kafka的并行度10
kafka.bolt.parallelism=6
-#ack设置 1启动ack 0不启动ack
-topology.num.acks=0
-
-#kafka批量条数
-batch.insert.num=2000
-
#数据中心(UID)
data.center.id.num=15
-#tick时钟频率
-topology.tick.tuple.freq.secs=5
-
#hbase 更新时间
hbase.tick.tuple.freq.secs=60
+#app_id 更新时间
+app.tick.tuple.freq.secs=60
#--------------------------------默认值配置------------------------------#
@@ -82,6 +80,15 @@ topology.config.max.spout.pending=150000
#hbase table name
hbase.table.name=subscriber_info
+#ack设置 1启动ack 0不启动ack
+topology.num.acks=0
+
+#kafka批量条数
+batch.insert.num=2000
+
+#tick时钟频率
+topology.tick.tuple.freq.secs=5
+
#spout接收睡眠时间
topology.spout.sleep.time=1
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index 0502aef..36f2ea4 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -23,6 +23,7 @@ public class FlowWriteConfig {
public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
+ public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks");
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time");
@@ -54,11 +55,11 @@ public class FlowWriteConfig {
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
public static final String CONSUMER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "consumer.client.id");
public static final String PRODUCER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "producer.client.id");
-
-
/**
* http
*/
public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http");
+ public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(0, "app.id.http");
+
}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java
index ecc6932..150e02c 100644
--- a/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java
+++ b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java
@@ -4,6 +4,7 @@ import cn.hutool.core.thread.ThreadUtil;
import com.zdjizhi.common.FlowWriteConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.KafkaProConfig;
import com.zdjizhi.utils.exception.StreamCompletionException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
new file mode 100644
index 0000000..08890a6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
@@ -0,0 +1,125 @@
+package com.zdjizhi.utils.app;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.hbase.HBaseUtils;
+import com.zdjizhi.utils.http.HttpClientUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AppId 工具类
+ *
+ * @author qidaijie
+ */
+
+public class AppUtils {
+ private static final Log logger = LogFactory.get();
+ private static Map appIdMap = new ConcurrentHashMap<>(128);
+ private static AppUtils appUtils;
+
+ private static void getAppInstance() {
+ appUtils = new AppUtils();
+ }
+
+
+ /**
+ * 构造函数-新
+ */
+ private AppUtils() {
+ //定时更新
+ updateHabaseCache();
+ }
+
+ /**
+ * 更新变量
+ */
+ private static void change() {
+ if (appUtils == null) {
+ getAppInstance();
+ }
+ timestampsFilter();
+ }
+
+
+ /**
+ * 获取变更内容
+ */
+ private static void timestampsFilter() {
+ try {
+ Long begin = System.currentTimeMillis();
+ String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP);
+ if (StringUtil.isNotBlank(schema)) {
+ String data = JSONObject.parseObject(schema).getString("data");
+ JSONArray objects = JSONArray.parseArray(data);
+ for (Object object : objects) {
+ JSONArray jsonArray = JSONArray.parseArray(object.toString());
+ int key = jsonArray.getInteger(0);
+ String value = jsonArray.getString(1);
+ if (appIdMap.containsKey(key)) {
+ if (!value.equals(appIdMap.get(key))) {
+ appIdMap.put(key, value);
+ }
+ } else {
+ appIdMap.put(key, value);
+ }
+ }
+ logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis()));
+ logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
+ }
+ } catch (RuntimeException e) {
+ logger.error("Update cache app-id failed, exception:" + e);
+ }
+ }
+
+
+ /**
+ * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
+ */
+ private void updateHabaseCache() {
+ ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
+ executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ change();
+ } catch (RuntimeException e) {
+ logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
+ }
+ }
+ }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
+ }
+
+
+ /**
+ * 获取 appName
+ *
+ * @param appId app_id
+ * @return account
+ */
+ public static String getAppName(int appId) {
+
+ if (appUtils == null) {
+ getAppInstance();
+ }
+ return appIdMap.get(appId);
+
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java
index c2911f5..a5e26fb 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java
@@ -149,6 +149,11 @@ public class TransFormUtils {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString()));
}
break;
+ case "app_match":
+ if ((int) name != 0 && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(Integer.parseInt(name.toString())));
+ }
+ break;
case "decode_of_base64":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param)));
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
index 6eb839f..9d8a355 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -2,6 +2,7 @@ package com.zdjizhi.utils.general;
import cn.hutool.core.codec.Base64;
import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.app.AppUtils;
import com.zdjizhi.utils.hbase.HBaseUtils;
import com.zdjizhi.utils.json.JsonParseUtil;
import cn.hutool.log.Log;
@@ -12,6 +13,7 @@ import com.zdjizhi.utils.Encodes;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
+
import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -80,6 +82,20 @@ class TransFunction {
return account;
}
+ /**
+ * appId与缓存中对应关系补全appName
+ *
+ * @param appId id
+ * @return appName
+ */
+ static String appMatch(int appId) {
+ String appName = AppUtils.getAppName(appId);
+ if (StringUtil.isBlank(appName)) {
+ logger.warn("AppMap get appName is null, ID is :{}", appId);
+ }
+ return appName;
+ }
+
/**
* 解析顶级域名
*