1:增加app_match函数。
2:配置文件增加 appId 查询URL。 3:整理配置文件以及readme。
This commit is contained in:
34
README.md
34
README.md
@@ -1,3 +1,33 @@
|
||||
# log-stream-completion
|
||||
# log-stream-completion-schema
|
||||
|
||||
日志补全程序,接收不包含COMPLETED的topic补全后写回包含COMPLETED的topic中
|
||||
基于查询网关的动态日志预处理程序,接收原始日志根据对应schema定义进行数据清洗
|
||||
并将结果回写Kafka。
|
||||
|
||||
|
||||
## 函数功能列表
|
||||
* current_timestamp
|
||||
> 获取当前时间戳,若追加字段已有时间戳,不予以覆盖
|
||||
* snowflake_id
|
||||
> 雪花ID函数,返回一个一定条件内不重复的 long 类型数值
|
||||
* geo_ip_detail
|
||||
> IP定位库,获取对应IP的详细地理位置信息,城市,州/省,国家
|
||||
* geo_asn
|
||||
> ASN定位库,获取对应IP的ASN信息
|
||||
* geo_ip_country
|
||||
> IP定位库,获取对应IP的地理位置信息,仅包含 国家
|
||||
* set_value
|
||||
> 给予字段固定值
|
||||
* get_value
|
||||
> 获取字段值并追加到新的字段
|
||||
* if
|
||||
> IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
|
||||
* sub_domain
|
||||
> 获取顶级域名
|
||||
* radius_match
|
||||
> 根据IP解析对应raidus用户,借助于HBase存储数据。
|
||||
* app_match
|
||||
> 根据APP_ID获取对应的APP名称
|
||||
* decode_of_base64
|
||||
> 根据编码解码base64,若编码字段为空则根据默认编码解析(UTF-8)
|
||||
* flattenSpec
|
||||
> 根据表达式解析json
|
||||
|
||||
64
pom.xml
64
pom.xml
@@ -2,9 +2,9 @@
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>cn.ac.iie</groupId>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>log-stream-completion-schema</artifactId>
|
||||
<version>v3.21.03.25-eal4</version>
|
||||
<version>v3.21.04.23-appId</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>log-stream-completion-schema</name>
|
||||
@@ -18,19 +18,6 @@
|
||||
<url>http://192.168.40.125:8099/content/groups/public</url>
|
||||
</repository>
|
||||
|
||||
<repository>
|
||||
<id>maven-ali</id>
|
||||
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
|
||||
<releases>
|
||||
<enabled>true</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
<updatePolicy>always</updatePolicy>
|
||||
<checksumPolicy>fail</checksumPolicy>
|
||||
</snapshots>
|
||||
</repository>
|
||||
|
||||
</repositories>
|
||||
|
||||
<build>
|
||||
@@ -41,6 +28,10 @@
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>2.4.2</version>
|
||||
<configuration>
|
||||
<createDependencyReducedPom>false</createDependencyReducedPom>
|
||||
</configuration>
|
||||
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
@@ -76,7 +67,7 @@
|
||||
<goals>
|
||||
<goal>strip-jar</goal>
|
||||
</goals>
|
||||
<phase>package</phase>
|
||||
<phase>install</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
@@ -137,7 +128,7 @@
|
||||
<groupId>org.apache.storm</groupId>
|
||||
<artifactId>storm-core</artifactId>
|
||||
<version>${storm.version}</version>
|
||||
<scope>provided</scope>
|
||||
<!--<scope>provided</scope>-->
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
@@ -232,21 +223,21 @@
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
<version>${hbase.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<!--<dependency>-->
|
||||
<!--<groupId>org.apache.hbase</groupId>-->
|
||||
<!--<artifactId>hbase-server</artifactId>-->
|
||||
<!--<version>${hbase.version}</version>-->
|
||||
<!--<exclusions>-->
|
||||
<!--<exclusion>-->
|
||||
<!--<artifactId>slf4j-log4j12</artifactId>-->
|
||||
<!--<groupId>org.slf4j</groupId>-->
|
||||
<!--</exclusion>-->
|
||||
<!--<exclusion>-->
|
||||
<!--<artifactId>log4j-over-slf4j</artifactId>-->
|
||||
<!--<groupId>org.slf4j</groupId>-->
|
||||
<!--</exclusion>-->
|
||||
<!--</exclusions>-->
|
||||
<!--</dependency>-->
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
|
||||
<dependency>
|
||||
@@ -336,14 +327,5 @@
|
||||
<version>5.5.2</version>
|
||||
</dependency>
|
||||
|
||||
<!--<!– https://mvnrepository.com/artifact/io.github.zlika/reproducible-build-maven-plugin –>-->
|
||||
<!--<dependency>-->
|
||||
<!--<groupId>io.github.zlika</groupId>-->
|
||||
<!--<artifactId>reproducible-build-maven-plugin</artifactId>-->
|
||||
<!--<version>0.12</version>-->
|
||||
<!--</dependency>-->
|
||||
|
||||
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
@@ -14,4 +14,4 @@ batch.size=262144
|
||||
buffer.memory=67108864
|
||||
|
||||
#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ÿ<EFBFBD>η<EFBFBD><CEB7><EFBFBD>Kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>С,Ĭ<><C4AC>1048576
|
||||
max.request.size=5242880
|
||||
max.request.size=5242880
|
||||
@@ -12,6 +12,7 @@ zookeeper.servers=192.168.44.12:2181
|
||||
#hbase zookeeper地址 用于连接HBase
|
||||
hbase.zookeeper.servers=192.168.44.12:2181
|
||||
|
||||
#--------------------------------HTTP/定位库------------------------------#
|
||||
#定位库地址
|
||||
ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\
|
||||
#ip.library=/home/bigdata/topology/dat/
|
||||
@@ -19,20 +20,22 @@ ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\
|
||||
#网关的schema位置
|
||||
schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/security_event_log
|
||||
|
||||
#kafka broker下的topic名称
|
||||
#kafka.topic=CONNECTION-RECORD-LOG
|
||||
#网关APP_ID 获取接口
|
||||
app.id.http=http://192.168.44.67:9999/open-api/appDicList
|
||||
|
||||
#--------------------------------Kafka消费组信息------------------------------#
|
||||
|
||||
#kafka 接收数据topic
|
||||
kafka.topic=test
|
||||
|
||||
#补全数据 输出 topic
|
||||
results.output.topic=test-result
|
||||
|
||||
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
||||
group.id=connection-record-log-20200818-1-test
|
||||
|
||||
#接收自kafka的消费者 client-id
|
||||
consumer.client.id=consumer-connection-record
|
||||
#回写给kafka的生产者 client-id
|
||||
producer.client.id=producer-connection-record
|
||||
|
||||
#生产者压缩模式 none or snappy
|
||||
producer.kafka.compression.type=snappy
|
||||
producer.kafka.compression.type=none
|
||||
|
||||
#生产者ack
|
||||
producer.ack=1
|
||||
@@ -40,9 +43,11 @@ producer.ack=1
|
||||
#latest/earliest 从当前消 or 从头消费
|
||||
auto.offset.reset=latest
|
||||
|
||||
#输出topic
|
||||
#results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
|
||||
results.output.topic=test-result
|
||||
#接收自kafka的消费者 client-id
|
||||
consumer.client.id=consumer-connection-record
|
||||
|
||||
#回写给kafka的生产者 client-id
|
||||
producer.client.id=producer-connection-record
|
||||
|
||||
#--------------------------------topology配置------------------------------#
|
||||
|
||||
@@ -58,21 +63,14 @@ completion.bolt.parallelism=6
|
||||
#写入kafka的并行度10
|
||||
kafka.bolt.parallelism=6
|
||||
|
||||
#ack设置 1启动ack 0不启动ack
|
||||
topology.num.acks=0
|
||||
|
||||
#kafka批量条数
|
||||
batch.insert.num=2000
|
||||
|
||||
#数据中心(UID)
|
||||
data.center.id.num=15
|
||||
|
||||
#tick时钟频率
|
||||
topology.tick.tuple.freq.secs=5
|
||||
|
||||
#hbase 更新时间
|
||||
hbase.tick.tuple.freq.secs=60
|
||||
|
||||
#app_id 更新时间
|
||||
app.tick.tuple.freq.secs=60
|
||||
|
||||
#--------------------------------默认值配置------------------------------#
|
||||
|
||||
@@ -82,6 +80,15 @@ topology.config.max.spout.pending=150000
|
||||
#hbase table name
|
||||
hbase.table.name=subscriber_info
|
||||
|
||||
#ack设置 1启动ack 0不启动ack
|
||||
topology.num.acks=0
|
||||
|
||||
#kafka批量条数
|
||||
batch.insert.num=2000
|
||||
|
||||
#tick时钟频率
|
||||
topology.tick.tuple.freq.secs=5
|
||||
|
||||
#spout接收睡眠时间
|
||||
topology.spout.sleep.time=1
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ public class FlowWriteConfig {
|
||||
public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
|
||||
public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
|
||||
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
|
||||
public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
|
||||
public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
|
||||
public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks");
|
||||
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time");
|
||||
@@ -54,11 +55,11 @@ public class FlowWriteConfig {
|
||||
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
|
||||
public static final String CONSUMER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "consumer.client.id");
|
||||
public static final String PRODUCER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "producer.client.id");
|
||||
|
||||
|
||||
/**
|
||||
* http
|
||||
*/
|
||||
public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http");
|
||||
public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(0, "app.id.http");
|
||||
|
||||
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import cn.hutool.core.thread.ThreadUtil;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.common.KafkaProConfig;
|
||||
import com.zdjizhi.utils.exception.StreamCompletionException;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
||||
125
src/main/java/com/zdjizhi/utils/app/AppUtils.java
Normal file
125
src/main/java/com/zdjizhi/utils/app/AppUtils.java
Normal file
@@ -0,0 +1,125 @@
|
||||
package com.zdjizhi.utils.app;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.hbase.HBaseUtils;
|
||||
import com.zdjizhi.utils.http.HttpClientUtil;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* AppId 工具类
|
||||
*
|
||||
* @author qidaijie
|
||||
*/
|
||||
|
||||
public class AppUtils {
|
||||
private static final Log logger = LogFactory.get();
|
||||
private static Map<Integer, String> appIdMap = new ConcurrentHashMap<>(128);
|
||||
private static AppUtils appUtils;
|
||||
|
||||
private static void getAppInstance() {
|
||||
appUtils = new AppUtils();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 构造函数-新
|
||||
*/
|
||||
private AppUtils() {
|
||||
//定时更新
|
||||
updateHabaseCache();
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新变量
|
||||
*/
|
||||
private static void change() {
|
||||
if (appUtils == null) {
|
||||
getAppInstance();
|
||||
}
|
||||
timestampsFilter();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取变更内容
|
||||
*/
|
||||
private static void timestampsFilter() {
|
||||
try {
|
||||
Long begin = System.currentTimeMillis();
|
||||
String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP);
|
||||
if (StringUtil.isNotBlank(schema)) {
|
||||
String data = JSONObject.parseObject(schema).getString("data");
|
||||
JSONArray objects = JSONArray.parseArray(data);
|
||||
for (Object object : objects) {
|
||||
JSONArray jsonArray = JSONArray.parseArray(object.toString());
|
||||
int key = jsonArray.getInteger(0);
|
||||
String value = jsonArray.getString(1);
|
||||
if (appIdMap.containsKey(key)) {
|
||||
if (!value.equals(appIdMap.get(key))) {
|
||||
appIdMap.put(key, value);
|
||||
}
|
||||
} else {
|
||||
appIdMap.put(key, value);
|
||||
}
|
||||
}
|
||||
logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis()));
|
||||
logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("Update cache app-id failed, exception:" + e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
|
||||
*/
|
||||
private void updateHabaseCache() {
|
||||
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
|
||||
executorService.scheduleAtFixedRate(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
change();
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
|
||||
}
|
||||
}
|
||||
}, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取 appName
|
||||
*
|
||||
* @param appId app_id
|
||||
* @return account
|
||||
*/
|
||||
public static String getAppName(int appId) {
|
||||
|
||||
if (appUtils == null) {
|
||||
getAppInstance();
|
||||
}
|
||||
return appIdMap.get(appId);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -149,6 +149,11 @@ public class TransFormUtils {
|
||||
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString()));
|
||||
}
|
||||
break;
|
||||
case "app_match":
|
||||
if ((int) name != 0 && appendTo == null) {
|
||||
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(Integer.parseInt(name.toString())));
|
||||
}
|
||||
break;
|
||||
case "decode_of_base64":
|
||||
if (name != null) {
|
||||
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param)));
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.zdjizhi.utils.general;
|
||||
|
||||
import cn.hutool.core.codec.Base64;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.app.AppUtils;
|
||||
import com.zdjizhi.utils.hbase.HBaseUtils;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import cn.hutool.log.Log;
|
||||
@@ -12,6 +13,7 @@ import com.zdjizhi.utils.Encodes;
|
||||
import com.zdjizhi.utils.FormatUtils;
|
||||
import com.zdjizhi.utils.IpLookup;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
@@ -80,6 +82,20 @@ class TransFunction {
|
||||
return account;
|
||||
}
|
||||
|
||||
/**
|
||||
* appId与缓存中对应关系补全appName
|
||||
*
|
||||
* @param appId id
|
||||
* @return appName
|
||||
*/
|
||||
static String appMatch(int appId) {
|
||||
String appName = AppUtils.getAppName(appId);
|
||||
if (StringUtil.isBlank(appName)) {
|
||||
logger.warn("AppMap get appName is null, ID is :{}", appId);
|
||||
}
|
||||
return appName;
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析顶级域名
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user