4 Commits

Author SHA1 Message Date
wangchengcheng
c3ad8140e8 1.适配百分点OSS V3接口(GAL-384)。
2.将FileMeta的属性sourceList改为source_list。
2023-08-07 18:09:09 +08:00
wangchengcheng
8c94028363 补充pom.xml 2022-09-27 10:47:30 +08:00
wangchengcheng
d6226fef5c 根据04版补全程序更新P19双写程序。 2022-06-17 16:54:38 +08:00
wangchengcheng
935dcfa702 根据04版补全程序更新P19双写程序。 2022-06-17 16:53:16 +08:00
36 changed files with 3908 additions and 0 deletions

301
pom.xml Normal file
View File

@@ -0,0 +1,301 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zdjizhi</groupId>
<artifactId>log-stream-doublewrite</artifactId>
<version>22.04-v3</version>
<name>log-stream-doublewrite</name>
<url>http://www.example.com</url>
<repositories>
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.153:8099/content/groups/public</url>
</repository>
<repository>
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<!--<enabled>true</enabled>-->
</releases>
<snapshots>
<!--<enabled>true</enabled>-->
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.1</flink.version>
<hadoop.version>2.7.1</hadoop.version>
<kafka.version>1.0.0</kafka.version>
<hbase.version>2.2.3</hbase.version>
<nacos.version>1.2.0</nacos.version>
<zdjz.tools.version>1.0.8</zdjz.tools.version>
<scope.type>provided</scope.type>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.zdjizhi.topology.LogFlowWriteTopology</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>properties</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src\main\java</directory>
<includes>
<include>log4j.properties</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
<dependencies>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>${zdjz.tools.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<!--<scope>${scope.type}</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.3.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.nacos/nacos-client -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
<dependency>
<groupId>org.jasypt</groupId>
<artifactId>jasypt</artifactId>
<version>1.9.3</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,65 @@
#====================Kafka KafkaConsumer====================#
#kafka source connection timeout
session.timeout.ms=60000
#kafka source poll
max.poll.records=5000
#kafka source poll bytes
max.partition.fetch.bytes=31457280
#====================Kafka KafkaProducer====================#
#producer重试的次数设置
retries=0
#他的含义就是说一个Batch被创建之后最多过多久不管这个Batch有没有写满都必须发送出去了
linger.ms=10
#如果在超时之前未收到响应,客户端将在必要时重新发送请求
request.timeout.ms=30000
#producer都是按照batch进行发送的,批次大小,默认:16384
batch.size=262144
#Producer端用于缓存消息的缓冲区大小
#128M
buffer.memory=134217728
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
#10M
max.request.size=10485760
#====================kafka default====================#
#kafka SASL验证用户名-加密
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
#kafka SASL及SSL验证密码-加密
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#生产者ack
producer.ack=1
#====================nacos default====================#
#nacos username
nacos.username=nacos
#nacos password
nacos.pin=nacos
#nacos group
nacos.group=Galaxy
#====================Topology Default====================#
#hbase table name
hbase.table.name=tsg_galaxy:relation_framedip_account
#邮件默认编码
mail.default.charset=UTF-8
#0不做任何校验1弱类型校验
log.transform.type=1
#两个输出之间的最大时间(单位milliseconds)
buffer.timeout=5000
#====================临时配置-待删除====================#
#网关APP_ID 获取接口
app.id.http=http://192.168.44.67:9999/open-api/appDicList
#app_id 更新时间如填写0则不更新缓存
app.tick.tuple.freq.secs=0

View File

@@ -0,0 +1,5 @@
txt
html
eml
jpg
png

View File

@@ -0,0 +1,77 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
source.kafka.servers=192.168.44.12:9094
#百分点输出kafka地址
percent.sink.kafka.servers=192.168.44.12:9094
#文件源数据topic输出kafka地址
file.data.sink.kafka.servers=192.168.44.12:9094
#zookeeper 地址 用于配置log_id
zookeeper.servers=192.168.44.12:2181
#hbase zookeeper地址 用于连接HBase
hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/定位库------------------------------#
#定位库地址
tools.library=C:\\workspace\\dat\\
#--------------------------------nacos配置------------------------------#
#nacos 地址
nacos.server=192.168.44.12:8848
#nacos namespace
nacos.schema.namespace=P19
#nacos topology_common_config.properties namespace
nacos.common.namespace=P19
#nacos data id
nacos.data.id=security_event.json
#------------------------------------OOS配置------------------------------------#
#oos地址
oos.servers=10.3.45.100:8057
#--------------------------------Kafka消费/生产配置------------------------------#
#kafka 接收数据topic
source.kafka.topic=test
#百分点对应的topic
percent.kafka.topic=PERCENT-RECORD
#文件源数据topic
file.data.kafka.topic=test-file-data
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=flinktest-1
#--------------------------------topology配置------------------------------#
#consumer 并行度
source.parallelism=1
#转换函数并行度
transform.parallelism=1
#percent producer 并行度
percent.sink.parallelism=1
#filedata producer 并行度
file.data.sink.parallelism=1
#数据中心,取值范围(0-31)
data.center.id.num=0
#hbase 更新时间如填写0则不更新缓存
hbase.tick.tuple.freq.secs=180
#--------------------------------默认值配置------------------------------#
#0不需要补全原样输出日志1需要补全
log.need.complete=1
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none

View File

@@ -0,0 +1,51 @@
package com.zdjizhi.bean;
import com.alibaba.fastjson.JSONArray;
public class FileMeta {
private long common_log_id;
protected int common_recv_time;
private String common_schema_type;
private JSONArray source_list;
private int processing_time;
public long getCommon_log_id() {
return common_log_id;
}
public void setCommon_log_id(long common_log_id) {
this.common_log_id = common_log_id;
}
public int getCommon_recv_time() {
return common_recv_time;
}
public void setCommon_recv_time(int common_recv_time) {
this.common_recv_time = common_recv_time;
}
public String getCommon_schema_type() {
return common_schema_type;
}
public void setCommon_schema_type(String common_schema_type) {
this.common_schema_type = common_schema_type;
}
public JSONArray getSource_list() {
return source_list;
}
public void setSource_list(JSONArray source_list) {
this.source_list = source_list;
}
public int getProcessing_time() {
return processing_time;
}
public void setProcessing_time(int processing_time) {
this.processing_time = processing_time;
}
}

View File

@@ -0,0 +1,22 @@
package com.zdjizhi.bean;
public class SourceList {
private String destination_oss_path;
private String source_oss_path;
public String getDestination_oss_path() {
return destination_oss_path;
}
public void setDestination_oss_path(String destination_oss_path) {
this.destination_oss_path = destination_oss_path;
}
public String getSource_oss_path() {
return source_oss_path;
}
public void setSource_oss_path(String source_oss_path) {
this.source_oss_path = source_oss_path;
}
}

View File

@@ -0,0 +1,134 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.system.FlowWriteConfigurations;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
* @author Administrator
*/
public class FlowWriteConfig {
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
static {
encryptor.setPassword("galaxy");
}
public static final int IF_PARAM_LENGTH = 3;
/**
* 有此标识的字段为失效字段,不计入最终日志字段
*/
public static final String VISIBILITY = "disabled";
/**
* 默认的切分符号
*/
public static final String FORMAT_SPLITTER = ",";
/**
* 标识字段为日志字段还是schema指定字段
*/
public static final String IS_JSON_KEY_TAG = "$.";
/**
* if函数连接分隔符
*/
public static final String IF_CONDITION_SPLITTER = "=";
/**
* 默认的字符串解析编码
*/
public static final String ENCODING = "UTF8";
/**
* Nacos
*/
public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server");
public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace");
public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace");
public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id");
public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin");
public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group");
public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username");
/**
* System config
*/
public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
public static final Integer PERCENT_SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "percent.sink.parallelism");
public static final Integer FILE_DATA_SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "file.data.sink.parallelism");
public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset");
public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
/**
* HBase
*/
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
/**
* kafka common
*/
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin"));
public static final String PERCENT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "percent.kafka.topic");
/**
* kafka source config
*/
public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic");
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
/**
* kafka sink config
*/
public static final String FILE_DATA_SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "file.data.kafka.topic");
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic");
/**
* connection kafka
*/
public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
/**
* http
*/
public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http");
public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
/**
* common config
*/
/**
* public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers");
* public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("sink.kafka.servers");
* public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers");
* public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library");
* public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers");
*/
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers");
public static final String PERCENT_SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"percent.sink.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers");
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library");
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers");
public static final String FILE_DATA_SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"file.data.sink.kafka.servers");
/**
* oos
*/
public static final String OOS_SERVERS = FlowWriteConfigurations.getStringProperty(0, "oos.servers");
}

View File

@@ -0,0 +1,107 @@
package com.zdjizhi.common;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.system.FlowWriteConfigurations;
import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.common
* @Description:
* @date 2022/3/189:36
*/
@Deprecated
public class NacosConfig {
private static final Log logger = LogFactory.get();
private static Properties propCommon = new Properties();
private static Properties propNacos = new Properties();
private static NacosConfig nacosConfig;
private static void getInstance() {
nacosConfig = new NacosConfig();
}
/**
* 构造函数-新
*/
private NacosConfig() {
//获取连接
getConnection();
}
/**
* 初始化Nacos配置列表
*/
private static void getConnection() {
try {
propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_COMMON_NAMESPACE);
propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
ConfigService configService = NacosFactory.createConfigService(propNacos);
String commonConfig = configService.getConfig("etl_connection_config.properties", FlowWriteConfig.NACOS_GROUP, 5000);
if (StringUtil.isNotBlank(commonConfig)) {
propCommon.load(new StringReader(commonConfig));
}
} catch (NacosException | IOException e) {
logger.error("Get topology run configuration error,The exception message is " + e.getMessage());
}
}
/**
* 获取String类型配置
*
* @param key config key
* @return value
*/
public static String getStringProperty(String key) {
if (nacosConfig == null) {
getInstance();
}
return propCommon.getProperty(key);
}
/**
* 获取Integer类型配置
*
* @param key config key
* @return value
*/
public static Integer getIntegerProperty(String key) {
if (nacosConfig == null) {
getInstance();
}
return Integer.parseInt(propCommon.getProperty(key));
}
/**
* 获取Long类型配置
*
* @param key config key
* @return value
*/
public static Long getLongProperty(String key) {
if (nacosConfig == null) {
getInstance();
}
return Long.parseLong(propCommon.getProperty(key));
}
}

View File

@@ -0,0 +1,74 @@
package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.functions.DealFileProcessFunction;
import com.zdjizhi.utils.functions.FilterNullFunction;
import com.zdjizhi.utils.functions.MapCompletedFunction;
import com.zdjizhi.utils.functions.TypeMapCompletedFunction;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Map;
/**
* @author 王成成
* @Package com.zdjizhi.topology
* @Description:
* @date 2022.06.01
*/
public class LogFlowWriteTopology {
private static final Log logger = LogFactory.get();
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
SingleOutputStreamOperator<Map<String, Object>> streamSource = environment.addSource(KafkaConsumer.myDeserializationConsumer())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM).name(FlowWriteConfig.SOURCE_KAFKA_TOPIC);
DataStream<Map<String, Object>> cleaningLog;
switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) {
case 0:
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
case 1:
//对原始日志进行处理补全转换等对日志字段类型做若校验可根据schema进行强转。
cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
default:
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
}
//处理带有非结构化日志的数据
SingleOutputStreamOperator<String> process = cleaningLog.process(new DealFileProcessFunction());
SingleOutputStreamOperator<String> resultFileMetaData = process.getSideOutput(DealFileProcessFunction.metaToKafa).filter(new FilterNullFunction()).name("FilterAbnormalTrafficFileMetaData").setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
SingleOutputStreamOperator<String> result = process.filter(new FilterNullFunction()).name("FilterAbnormalData").setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//文件元数据发送至TRAFFIC-FILE-METADATA
resultFileMetaData.addSink(KafkaProducer.getTrafficFileMetaKafkaProducer()).name("toTrafficFileMeta")
.setParallelism(FlowWriteConfig.FILE_DATA_SINK_PARALLELISM);
//补全后的数据发送给百分点的kafka
result.addSink(KafkaProducer.getPercentKafkaProducer()).name("toPercentKafka")
.setParallelism(FlowWriteConfig.PERCENT_SINK_PARALLELISM);
}
try {
environment.execute(args[0]);
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e);
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,124 @@
package com.zdjizhi.utils.app;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.http.HttpClientUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* AppId 工具类
*
* @author qidaijie
*/
@Deprecated
public class AppUtils {
private static final Log logger = LogFactory.get();
private static Map<Integer, String> appIdMap = new ConcurrentHashMap<>(128);
private static AppUtils appUtils;
private static void getAppInstance() {
appUtils = new AppUtils();
}
/**
* 构造函数-新
*/
private AppUtils() {
//定时更新
updateAppIdCache();
}
/**
* 更新变量
*/
private static void change() {
if (appUtils == null) {
getAppInstance();
}
timestampsFilter();
}
/**
* 获取变更内容
*/
private static void timestampsFilter() {
try {
Long begin = System.currentTimeMillis();
String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP);
if (StringUtil.isNotBlank(schema)) {
String data = JSONObject.parseObject(schema).getString("data");
JSONArray objects = JSONArray.parseArray(data);
for (Object object : objects) {
JSONArray jsonArray = JSONArray.parseArray(object.toString());
int key = jsonArray.getInteger(0);
String value = jsonArray.getString(1);
if (appIdMap.containsKey(key)) {
if (!value.equals(appIdMap.get(key))) {
appIdMap.put(key, value);
}
} else {
appIdMap.put(key, value);
}
}
logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis()));
logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
}
} catch (RuntimeException e) {
logger.error("Update cache app-id failed, exception" + e);
}
}
/**
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
*/
private void updateAppIdCache() {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) {
change();
}
} catch (RuntimeException e) {
logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
}
}
}, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
}
/**
* 获取 appName
*
* @param appId app_id
* @return account
*/
public static String getAppName(int appId) {
if (appUtils == null) {
getAppInstance();
}
if (appIdMap.containsKey(appId)) {
return appIdMap.get(appId);
} else {
logger.warn("AppMap get appName is null, ID is :" + appId);
return "";
}
}
}

View File

@@ -0,0 +1,18 @@
package com.zdjizhi.utils.exception;
/**
* @author qidaijie
* @Package com.zdjizhi.storm.utils.execption
* @Description:
* @date 2021/3/259:42
*/
public class FlowWriteException extends RuntimeException {
public FlowWriteException() {
}
public FlowWriteException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,112 @@
package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.bean.FileMeta;
import com.zdjizhi.bean.SourceList;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.FileEdit;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Map;
/**
* @author wangchengcheng
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/10/14
*/
public class DealFileProcessFunction extends ProcessFunction<Map<String, Object>, String> {
private static final Log logger = LogFactory.get();
private String rpUrlValue;
private String rqUrlValue;
private String emailUrlValue;
private long cfgId = 0; //= common_policy_id;
private String sIp = null; // = common_client_ip;
private int sPort = 0;// = common_client_port;
private String dIp = null;//= common_server_ip;
private int dPort = 0;// = common_server_port;
private long foundTime = 0;// = common_recv_time;
private String account = null;
private String domain = null;
private String schemaType = null;
//初始化侧输流的标记
public static OutputTag<String> metaToKafa = new OutputTag<String>("metaToKafka") {
};
@SuppressWarnings("unchecked")
@Override
public void processElement(Map<String, Object> message, Context context, Collector<String> collector) throws Exception {
try {
if (message.size() > 0) {
rpUrlValue = (String) message.get("http_response_body");
rqUrlValue = (String) message.get("http_request_body");
emailUrlValue = (String) message.get("mail_eml_file");
if (StringUtil.isNotBlank(rpUrlValue) || StringUtil.isNotBlank(rqUrlValue) || StringUtil.isNotBlank(emailUrlValue)) {
cfgId = (long) message.getOrDefault("common_policy_id",0L);
sIp = (String) message.get("common_client_ip");
sPort = (int) message.get("common_client_port");
dIp = (String) message.get("common_server_ip");
dPort = (int) message.get("common_server_port");
foundTime = (long) message.get("common_recv_time");
schemaType = (String) message.get("common_schema_type");
domain = (String)message.getOrDefault("http_domain","");
account = (String)message.getOrDefault("common_subscribe_id","");
FileMeta fileMeta = new FileMeta();
JSONArray jsonarray = new JSONArray();
if (StringUtil.isNotBlank(rqUrlValue)) {
System.out.println(rqUrlValue);
String fileId = FileEdit.getFileId(rqUrlValue,"_1");
message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId));
SourceList request = new SourceList();
request.setSource_oss_path(rqUrlValue);
request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, fileId));
jsonarray.add(request);
}
if (StringUtil.isNotBlank(rpUrlValue)) {
String fileId = FileEdit.getFileId(rpUrlValue,"_2");
message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId));
SourceList response = new SourceList();
response.setSource_oss_path(rpUrlValue);
response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, fileId));
jsonarray.add(response);
}
if (StringUtil.isNotBlank(emailUrlValue)) {
String fileId = FileEdit.getFileId(emailUrlValue,"_9");
message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId));
SourceList emailFile = new SourceList();
emailFile.setSource_oss_path(emailUrlValue);
emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, fileId));
jsonarray.add(emailFile);
}
fileMeta.setSource_list(jsonarray);
fileMeta.setCommon_log_id((long) message.get("common_log_id"));
fileMeta.setCommon_recv_time(Integer.parseInt(message.get("common_recv_time").toString()));
fileMeta.setCommon_schema_type((String) message.get("common_schema_type"));
fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000));
context.output(metaToKafa, JSONObject.toJSONString(fileMeta));
}
collector.collect(JsonMapper.toJsonString(message));
}
} catch (RuntimeException e) {
logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message);
}
}
}

View File

@@ -0,0 +1,17 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.FilterFunction;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/5/2715:01
*/
public class FilterNullFunction implements FilterFunction<String> {
@Override
public boolean filter(String message) {
return StringUtil.isNotBlank(message);
}
}

View File

@@ -0,0 +1,23 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.general.TransFormMap;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/5/2715:01
*/
public class MapCompletedFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> map(Map<String, Object> logs) {
return TransFormMap.dealCommonMessage(logs);
}
}

View File

@@ -0,0 +1,23 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.general.TransFormTypeMap;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/5/2715:01
*/
public class TypeMapCompletedFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> map(Map<String, Object> logs) {
return TransFormTypeMap.dealCommonMessage(logs);
}
}

View File

@@ -0,0 +1,180 @@
package com.zdjizhi.utils.general;
/**
* CityHash64算法对logid进行散列计算
* 版本规划暂不实现-TSG22.01
*
* @author qidaijie
*/
@Deprecated
public class CityHash {
private static final long k0 = 0xc3a5c85c97cb3127L;
private static final long k1 = 0xb492b66fbe98f273L;
private static final long k2 = 0x9ae16a3b2f90404fL;
private static final long k3 = 0xc949d7c7509e6557L;
private static final long k5 = 0x9ddfea08eb382d69L;
private CityHash() {}
public static long CityHash64(byte[] s, int index, int len) {
if (len <= 16 ) {
return HashLen0to16(s, index, len);
} else if (len > 16 && len <= 32) {
return HashLen17to32(s, index, len);
} else if (len > 32 && len <= 64) {
return HashLen33to64(s, index, len);
} else {
long x = Fetch64(s, index);
long y = Fetch64(s, index + len - 16) ^ k1;
long z = Fetch64(s, index + len - 56) ^ k0;
long[] v = WeakHashLen32WithSeeds(s, len - 64, len, y);
long[] w = WeakHashLen32WithSeeds(s, len - 32, len * k1, k0);
z += ShiftMix(v[1]) * k1;
x = Rotate(z + x, 39) * k1;
y = Rotate(y, 33) * k1;
len = (len - 1) & ~63;
do {
x = Rotate(x + y + v[0] + Fetch64(s, index + 16), 37) * k1;
y = Rotate(y + v[1] + Fetch64(s, index + 48), 42) * k1;
x ^= w[1];
y ^= v[0];
z = Rotate(z ^ w[0], 33);
v = WeakHashLen32WithSeeds(s, index, v[1] * k1, x + w[0]);
w = WeakHashLen32WithSeeds(s, index + 32, z + w[1], y);
long t = z;
z = x;
x = t;
index += 64;
len -= 64;
} while (len != 0);
return HashLen16(HashLen16(v[0], w[0]) + ShiftMix(y) * k1 + z,
HashLen16(v[1], w[1]) + x);
}
}
private static long HashLen0to16(byte[] s, int index, int len) {
if (len > 8) {
long a = Fetch64(s, index);
long b = Fetch64(s, index + len - 8);
return HashLen16(a, RotateByAtLeastOne(b + len, len)) ^ b;
}
if (len >= 4) {
long a = Fetch32(s, index);
return HashLen16(len + (a << 3), Fetch32(s, index + len - 4));
}
if (len > 0) {
byte a = s[index];
byte b = s[index + len >>> 1];
byte c = s[index + len - 1];
int y = (a) + (b << 8);
int z = len + (c << 2);
return ShiftMix(y * k2 ^ z * k3) * k2;
}
return k2;
}
private static long HashLen17to32(byte[] s, int index, int len) {
long a = Fetch64(s, index) * k1;
long b = Fetch64(s, index + 8);
long c = Fetch64(s, index + len - 8) * k2;
long d = Fetch64(s, index + len - 16) * k0;
return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d,
a + Rotate(b ^ k3, 20) - c + len);
}
private static long HashLen33to64(byte[] s, int index, int len) {
long z = Fetch64(s, index + 24);
long a = Fetch64(s, index) + (len + Fetch64(s, index + len - 16)) * k0;
long b = Rotate(a + z, 52);
long c = Rotate(a, 37);
a += Fetch64(s, index + 8);
c += Rotate(a, 7);
a += Fetch64(s, index + 16);
long vf = a + z;
long vs = b + Rotate(a, 31) + c;
a = Fetch64(s, index + 16) + Fetch64(s, index + len - 32);
z = Fetch64(s, index + len - 8);
b = Rotate(a + z, 52);
c = Rotate(a, 37);
a += Fetch64(s, index + len - 24);
c += Rotate(a, 7);
a += Fetch64(s, index + len - 16);
long wf = a + z;
long ws = b + Rotate(a, 31) + c;
long r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0);
return ShiftMix(r * k0 + vs) * k2;
}
private static long Fetch64(byte[] p, int index) {
return toLongLE(p,index);
}
private static long Fetch32(byte[] p, int index) {
return toIntLE(p,index);
}
private static long[] WeakHashLen32WithSeeds(
long w, long x, long y, long z, long a, long b) {
a += w;
b = Rotate(b + a + z, 21);
long c = a;
a += x;
a += y;
b += Rotate(a, 44);
return new long[]{a + z, b + c};
}
private static long[] WeakHashLen32WithSeeds(byte[] s, int index, long a, long b) {
return WeakHashLen32WithSeeds(Fetch64(s, index),
Fetch64(s, index + 8),
Fetch64(s, index + 16),
Fetch64(s, index + 24),
a,
b);
}
private static long toLongLE(byte[] b, int i) {
return 0xffffffffffffffffL & (((long) b[i + 7] << 56) + ((long) (b[i + 6] & 255) << 48) + ((long) (b[i + 5] & 255) << 40) + ((long) (b[i + 4] & 255) << 32) + ((long) (b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
}
private static long toIntLE(byte[] b, int i) {
return 0xffffffffL & (((b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
}
private static long RotateByAtLeastOne(long val, int shift) {
return (val >>> shift) | (val << (64 - shift));
}
private static long ShiftMix(long val) {
return val ^ (val >>> 47);
}
private static long Uint128Low64(long[] x) {
return x[0];
}
private static long Rotate(long val, int shift) {
return shift == 0 ? val : (val >>> shift) | (val << (64 - shift));
}
private static long Uint128High64(long[] x) {
return x[1];
}
private static long Hash128to64(long[] x) {
long a = (Uint128Low64(x) ^ Uint128High64(x)) * k5;
a ^= (a >>> 47);
long b = (Uint128High64(x) ^ a) * k5;
b ^= (b >>> 47);
b *= k5;
return b;
}
private static long HashLen16(long u, long v) {
return Hash128to64(new long[]{u,v});
}
}

View File

@@ -0,0 +1,49 @@
package com.zdjizhi.utils.general;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.ordinary.MD5Utils;
import static com.zdjizhi.utils.system.FlowWriteConfigurations.judgeFileType;
/**
* 文件字段操作工具
*/
public class FileEdit {
public static String getFileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileId){
String fileType = null;
if (judgeFileType(getFileType(urlValue))){
fileType = getFileType(urlValue);
}else {
if (schemaType.equals("HTTP")){
fileType = "html";
}
if (schemaType.equals("MAIL")){
fileType = "eml";
}
}
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/upload?cfg_id="+cfgId+"&file_id="+fileId+"&file_type="+fileType+"&found_time="+foundTime+"&s_ip="+sIp+"&s_port="+sPort+"&d_ip="+dIp+"&d_port="+dPort+"&domain="+domain+"&account="+account;
}
public static String getFileDownloadUrl(String fileId){
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/download?file_id="+fileId;
}
public static String getFileType(String url){
String[] split = url.split("\\.");
return split[split.length-1];
}
public static String getFileId(String url,String fileSuffix) throws Exception {
String[] arr = url.split("/");
String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
String prefix = MD5Utils.md5Encode(filename);
// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf("."));
return prefix+fileSuffix;
}
}

View File

@@ -0,0 +1,213 @@
package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.zookeeper.DistributedLock;
import com.zdjizhi.utils.zookeeper.ZookeeperUtils;
/**
* 雪花算法
*
* @author qidaijie
*/
public class SnowflakeId {
private static final Log logger = LogFactory.get();
/**
* 共64位 第一位为符号位 默认0
* 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :6位(0-63),
* workerId(关联进程):7(0-127) ,序列号11位(2047/ms)
*
* 序列号 /ms = (-1L ^ (-1L << 11))
* 最大使用年 = (1L << 39) / (1000L * 60 * 60 * 24 * 365)
*/
/**
* 开始时间截 (2020-11-14 00:00:00) max 17years
*/
private final long twepoch = 1605283200000L;
/**
* 机器id所占的位数
*/
private final long workerIdBits = 8L;
/**
* 数据标识id所占的位数
*/
private final long dataCenterIdBits = 5L;
/**
* 支持的最大机器id结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
* M << n = M * 2^n
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/**
* 支持的最大数据标识id结果是31
*/
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
/**
* 序列在id中占的位数
*/
private final long sequenceBits = 11L;
/**
* 机器ID向左移12位
*/
private final long workerIdShift = sequenceBits;
/**
* 数据标识id向左移17位(14+6)
*/
private final long dataCenterIdShift = sequenceBits + workerIdBits;
/**
* 时间截向左移22位(4+6+14)
*/
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
/**
* 生成序列的掩码这里为2047
*/
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
* 工作机器ID(0~255)
*/
private long workerId;
/**
* 数据中心ID(0~31)
*/
private long dataCenterId;
/**
* 毫秒内序列(0~2047)
*/
private long sequence = 0L;
/**
* 上次生成ID的时间截
*/
private long lastTimestamp = -1L;
/**
* 设置允许时间回拨的最大限制10s
*/
private static final long rollBackTime = 10000L;
private static SnowflakeId idWorker;
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
static {
idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM);
}
//==============================Constructors=====================================
/**
* 构造函数
*/
private SnowflakeId(String zookeeperIp, long dataCenterIdNum) {
DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
try {
lock.lock();
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp);
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
}
this.workerId = tmpWorkerId;
this.dataCenterId = dataCenterIdNum;
} catch (RuntimeException e) {
logger.error("This is not usual error!!!===>>>" + e + "<<<===");
}finally {
lock.unlock();
}
}
// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
*
* @return SnowflakeId
*/
private synchronized long nextId() {
long timestamp = timeGen();
//设置一个允许回拨限制时间系统时间回拨范围在rollBackTime内可以等待校准
if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) {
timestamp = tilNextMillis(lastTimestamp);
}
//如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}
//上次生成ID的时间截
lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift)
| (dataCenterId << dataCenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
*
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间
*
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}
/**
* 静态工具类
*
* @return
*/
public static Long generateId() {
return idWorker.nextId();
}
}

View File

@@ -0,0 +1,130 @@
package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.json.JsonParseUtil;
import java.util.Map;
/**
* 描述:转换或补全工具类
*
* @author qidaijie
*/
public class TransFormMap {
private static final Log logger = LogFactory.get();
/**
* 解析日志,并补全
*
* @param jsonMap kafka Topic消费原始日志并解析
* @return 补全后的日志
*/
@SuppressWarnings("unchecked")
public static Map<String, Object> dealCommonMessage(Map<String, Object> jsonMap) {
try {
JsonParseUtil.dropJsonField(jsonMap);
for (String[] strings : JsonParseUtil.getJobList()) {
//用到的参数的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的值
Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
}
return jsonMap;
} catch (RuntimeException e) {
logger.error("TransForm logs failed,The exception is :" + e);
return null;
}
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param function 匹配操作函数的字段
* @param jsonMap 原始日志解析map
* @param appendToKeyName 需要补全的字段的key
* @param appendTo 需要补全的字段的值
* @param logValue 用到的参数的值
* @param param 额外的参数的值
*/
private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) {
switch (function) {
case "current_timestamp":
if (!(appendTo instanceof Long)) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
}
break;
case "snowflake_id":
JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
break;
case "geo_ip_detail":
if (logValue != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
}
break;
case "geo_asn":
if (logValue != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
}
break;
case "geo_ip_country":
if (logValue != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
}
break;
case "set_value":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
}
break;
case "get_value":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
}
break;
case "sub_domain":
if (appendTo == null && logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
}
break;
case "radius_match":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
}
break;
case "decode_of_base64":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
}
break;
case "flattenSpec":
if (logValue != null && param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
}
break;
case "app_match":
if (logValue != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
}
break;
default:
}
}
}

View File

@@ -0,0 +1,132 @@
package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.json.JsonParseUtil;
import java.util.Map;
/**
* 描述:转换或补全工具类
*
* @author qidaijie
*/
public class TransFormTypeMap {
private static final Log logger = LogFactory.get();
/**
* 解析日志,并补全
*
* @param message kafka Topic原始日志
* @return 补全后的日志
*/
@SuppressWarnings("unchecked")
public static Map<String, Object> dealCommonMessage(Map<String, Object> message) {
try {
Map<String, Object> jsonMap = JsonParseUtil.typeTransform(message);
for (String[] strings : JsonParseUtil.getJobList()) {
//用到的参数的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的值
Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
}
return jsonMap;
} catch (RuntimeException e) {
logger.error("TransForm logs failed,The exception is :" + e);
return null;
}
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param function 匹配操作函数的字段
* @param jsonMap 原始日志解析map
* @param appendToKeyName 需要补全的字段的key
* @param appendToKeyValue 需要补全的字段的值
* @param logValue 用到的参数的值
* @param param 额外的参数的值
*/
private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) {
switch (function) {
case "current_timestamp":
if (!(appendToKeyValue instanceof Long)) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
}
break;
case "snowflake_id":
JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
//版本规划暂不实现TSG-22.01
// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId()));
break;
case "geo_ip_detail":
if (logValue != null && appendToKeyValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
}
break;
case "geo_asn":
if (logValue != null && appendToKeyValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
}
break;
case "geo_ip_country":
if (logValue != null && appendToKeyValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
}
break;
case "set_value":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
}
break;
case "get_value":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
}
break;
case "sub_domain":
if (appendToKeyValue == null && logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
}
break;
case "radius_match":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
}
break;
case "decode_of_base64":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
}
break;
case "flattenSpec":
if (logValue != null && param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
}
break;
case "app_match":
if (logValue != null && appendToKeyValue == null) {
// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
}
break;
default:
}
}
}

View File

@@ -0,0 +1,297 @@
package com.zdjizhi.utils.general;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookupV2;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.app.AppUtils;
import com.zdjizhi.utils.hbase.HBaseUtils;
import com.zdjizhi.utils.json.JsonParseUtil;
import com.zdjizhi.utils.json.TypeUtils;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author qidaijie
*/
class TransFunction {
private static final Log logger = LogFactory.get();
/**
* 校验数字正则
*/
private static final Pattern PATTERN = Pattern.compile("[0-9]*");
/**
* IP定位库工具类
*/
private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
.loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb")
.loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
.loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
.loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
.loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
.build();
/**
* 生成当前时间戳的操作
*/
static long getCurrentTime() {
return System.currentTimeMillis() / 1000;
}
/**
* CityHash64算法
* 版本规划暂不实现-TSG22.01
*
* @param data 原始数据
* @return 散列结果
*/
@Deprecated
static BigInteger getDecimalHash(long data) {
byte[] dataBytes = String.valueOf(data).getBytes();
long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
String decimalValue = Long.toUnsignedString(hashValue, 10);
BigInteger result = new BigInteger(decimalValue);
return result;
}
/**
* 根据clientIp获取location信息
*
* @param ip client IP
* @return ip地址详细信息
*/
static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
/**
* 根据ip获取asn信息
*
* @param ip client/server IP
* @return ASN
*/
static String getGeoAsn(String ip) {
return ipLookup.asnLookup(ip);
}
/**
* 根据ip获取country信息
*
* @param ip server IP
* @return 国家
*/
static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
/**
* radius借助HBase补齐
*
* @param ip client IP
* @return account
*/
static String radiusMatch(String ip) {
return HBaseUtils.getAccount(ip.trim());
}
/**
* appId与缓存中对应关系补全appName
*
* @param appIds app id 列表
* @return appName
*/
@Deprecated
static String appMatch(String appIds) {
try {
String appId = StrUtil.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
return AppUtils.getAppName(Integer.parseInt(appId));
} catch (NumberFormatException | ClassCastException exception) {
logger.error("APP ID列表分割转换异常异常APP ID列表:" + appIds);
return "";
}
}
/**
* 解析顶级域名
*
* @param domain 初始域名
* @return 顶级域名
*/
static String getTopDomain(String domain) {
try {
return FormatUtils.getTopPrivateDomain(domain);
} catch (StringIndexOutOfBoundsException outException) {
logger.error("解析顶级域名异常,异常域名:" + domain);
return "";
}
}
/**
* 根据编码解码base64
*
* @param message base64
* @param charset 编码
* @return 解码字符串
*/
static String decodeBase64(String message, Object charset) {
String result = "";
try {
if (StringUtil.isNotBlank(message)) {
if (charset == null) {
result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
} else {
result = Base64.decodeStr(message, charset.toString());
}
}
} catch (RuntimeException rune) {
logger.error("解析 Base64 异常,异常信息:" + rune);
}
return result;
}
/**
* 根据表达式解析json
*
* @param message json
* @param expr 解析表达式
* @return 解析结果
*/
static String flattenSpec(String message, String expr) {
String flattenResult = "";
try {
if (StringUtil.isNotBlank(expr)) {
ArrayList<String> read = JsonPath.parse(message).read(expr);
if (read.size() >= 1) {
flattenResult = read.get(0);
}
}
} catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e);
}
return flattenResult;
}
/**
* 判断是否为日志字段,是则返回对应value否则返回原始字符串
*
* @param object 内存实体类
* @param param 字段名/普通字符串
* @return JSON.Value or String
*/
static Object isJsonValue(Object object, String param) {
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
return JsonParseUtil.getValue(object, param.substring(2));
} else {
return param;
}
}
/**
* 判断是否为日志字段,是则返回对应value否则返回原始字符串
*
* @param jsonMap 内存实体类
* @param param 字段名/普通字符串
* @return JSON.Value or String
*/
static Object isJsonValue(Map<String, Object> jsonMap, String param) {
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
return JsonParseUtil.getValue(jsonMap, param.substring(2));
} else {
return param;
}
}
/**
* IF函数实现解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
*
* @param object 内存实体类
* @param ifParam 字段名/普通字符串
* @return resultA or resultB or null
*/
static Object condition(Object object, String ifParam) {
Object result = null;
try {
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
Object direction = isJsonValue(object, norms[0]);
Object resultA = isJsonValue(object, split[1]);
Object resultB = isJsonValue(object, split[2]);
if (direction instanceof Number) {
result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB);
} else if (direction instanceof String) {
result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB);
}
}
} catch (RuntimeException e) {
logger.error("IF 函数执行异常,异常信息:" + e);
}
return result;
}
/**
* IF函数实现解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
*
* @param jsonMap 内存实体类
* @param ifParam 字段名/普通字符串
* @return resultA or resultB or null
*/
static Object condition(Map<String, Object> jsonMap, String ifParam) {
Object result = null;
try {
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
Object direction = isJsonValue(jsonMap, norms[0]);
Object resultA = isJsonValue(jsonMap, split[1]);
Object resultB = isJsonValue(jsonMap, split[2]);
if (direction instanceof Number) {
result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
} else if (direction instanceof String) {
result = direction.equals(norms[1]) ? resultA : resultB;
}
}
} catch (RuntimeException e) {
logger.error("IF 函数执行异常,异常信息:" + e);
}
return result;
}
/**
* 设置固定值函数 若为数字则转为long返回
*
* @param param 默认值
* @return 返回数字或字符串
*/
static Object setValue(String param) {
try {
Matcher isNum = PATTERN.matcher(param);
if (isNum.matches()) {
return Long.parseLong(param);
} else {
return param;
}
} catch (RuntimeException e) {
logger.error("SetValue 函数异常,异常信息:" + e);
}
return null;
}
}

View File

@@ -0,0 +1,208 @@
package com.zdjizhi.utils.hbase;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* HBase 工具类
*
* @author qidaijie
*/
public class HBaseUtils {
private static final Log logger = LogFactory.get();
private static Map<String, String> subIdMap = new ConcurrentHashMap<>(16);
private static Connection connection;
private static Long time;
private static HBaseUtils hBaseUtils;
private static void getInstance() {
hBaseUtils = new HBaseUtils();
}
/**
* 构造函数-新
*/
private HBaseUtils() {
//获取连接
getConnection();
//拉取所有
getAll();
//定时更新
updateCache();
}
private static void getConnection() {
try {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
configuration.set("hbase.client.retries.number", "3");
configuration.set("hbase.bulkload.retries.number", "3");
configuration.set("zookeeper.recovery.retry", "3");
connection = ConnectionFactory.createConnection(configuration);
time = System.currentTimeMillis();
logger.warn("HBaseUtils get HBase connection,now to getAll().");
} catch (IOException ioe) {
logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
} catch (RuntimeException e) {
logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
}
}
/**
* 更新变量
*/
private static void change() {
if (hBaseUtils == null) {
getInstance();
}
long nowTime = System.currentTimeMillis();
timestampsFilter(time - 1000, nowTime + 500);
}
/**
* 获取变更内容
*
* @param startTime 开始时间
* @param endTime 结束时间
*/
private static void timestampsFilter(Long startTime, Long endTime) {
Long begin = System.currentTimeMillis();
Table table = null;
ResultScanner scanner = null;
Scan scan2 = new Scan();
try {
table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME));
scan2.setTimeRange(startTime, endTime);
scanner = table.getScanner(scan2);
for (Result result : scanner) {
int acctStatusType = getAcctStatusType(result);
String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim();
String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim();
if (acctStatusType == 1) {
if (subIdMap.containsKey(framedIp)) {
boolean same = account.equals(subIdMap.get(framedIp));
if (!same) {
subIdMap.put(framedIp, account);
}
} else {
subIdMap.put(framedIp, account);
}
} else if (acctStatusType == 2) {
subIdMap.remove(framedIp);
}
}
Long end = System.currentTimeMillis();
logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size());
logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + startTime + ",EndTime: " + endTime);
time = endTime;
} catch (IOException ioe) {
logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<===");
} catch (RuntimeException e) {
logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<===");
} finally {
if (scanner != null) {
scanner.close();
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
logger.error("HBase Table Close ERROR! Exception message is:" + e);
}
}
}
}
/**
* 获取所有的 key value
*/
private static void getAll() {
long begin = System.currentTimeMillis();
try {
Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
int acctStatusType = getAcctStatusType(result);
String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
if (acctStatusType == 1) {
subIdMap.put(framedIp, account);
}
}
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin));
scanner.close();
} catch (IOException ioe) {
logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
} catch (RuntimeException e) {
logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
}
}
/**
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
*/
private void updateCache() {
// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build());
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
change();
}
} catch (RuntimeException e) {
logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
}
}
}, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
}
/**
* 获取 account
*
* @param clientIp client_ip
* @return account
*/
public static String getAccount(String clientIp) {
if (hBaseUtils == null) {
getInstance();
}
return subIdMap.get(clientIp);
}
private static int getAcctStatusType(Result result) {
boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
if (hasType) {
return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
} else {
return 1;
}
}
}

View File

@@ -0,0 +1,77 @@
package com.zdjizhi.utils.http;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 获取网关schema的工具类
*
* @author qidaijie
*/
public class HttpClientUtil {
private static final Log logger = LogFactory.get();
/**
* 请求网关获取schema
*
* @param http 网关url
* @return schema
*/
public static String requestByGetMethod(String http) {
CloseableHttpClient httpClient = HttpClients.createDefault();
StringBuilder entityStringBuilder;
HttpGet get = new HttpGet(http);
BufferedReader bufferedReader = null;
CloseableHttpResponse httpResponse = null;
try {
httpResponse = httpClient.execute(get);
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
int intC;
while ((intC = bufferedReader.read()) != -1) {
char c = (char) intC;
if (c == '\n') {
break;
}
entityStringBuilder.append(c);
}
return entityStringBuilder.toString();
}
} catch (IOException e) {
logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
} finally {
if (httpClient != null) {
try {
httpClient.close();
} catch (IOException e) {
logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
}
}
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
}
}
if (bufferedReader != null) {
IOUtils.closeQuietly(bufferedReader);
}
}
return "";
}
}

View File

@@ -0,0 +1,372 @@
package com.zdjizhi.utils.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import net.sf.cglib.beans.BeanMap;
import java.util.*;
import java.util.concurrent.Executor;
/**
* 使用FastJson解析json的工具类
*
* @author qidaijie
*/
public class JsonParseUtil {
private static final Log logger = LogFactory.get();
private static Properties propNacos = new Properties();
/**
* 获取需要删除字段的列表
*/
private static ArrayList<String> dropList = new ArrayList<>();
/**
* 在内存中加载反射类用的map
*/
private static HashMap<String, Class> jsonFieldsMap;
/**
* 获取任务列表
* list的每个元素是一个四元字符串数组 (有format标识的字段补全的字段用到的功能函数用到的参数),例如:
* (mail_subject mail_subject decode_of_base64 mail_subject_charset)
*/
private static ArrayList<String[]> jobList;
static {
propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE);
propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
try {
ConfigService configService = NacosFactory.createConfigService(propNacos);
String dataId = FlowWriteConfig.NACOS_DATA_ID;
String group = FlowWriteConfig.NACOS_GROUP;
String schema = configService.getConfig(dataId, group, 5000);
if (StringUtil.isNotBlank(schema)) {
jsonFieldsMap = getMapFromHttp(schema);
jobList = getJobListFromHttp(schema);
}
configService.addListener(dataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
if (StringUtil.isNotBlank(configMsg)) {
clearCache();
jsonFieldsMap = getMapFromHttp(configMsg);
jobList = getJobListFromHttp(configMsg);
}
}
});
} catch (NacosException e) {
logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
}
}
/**
* 模式匹配,给定一个类型字符串返回一个类类型
*
* @param type 类型
* @return 类类型
*/
private static Class getClassName(String type) {
Class clazz;
switch (type) {
case "int":
clazz = Integer.class;
break;
case "string":
clazz = String.class;
break;
case "long":
clazz = long.class;
break;
case "array":
clazz = List.class;
break;
case "double":
clazz = double.class;
break;
case "float":
clazz = float.class;
break;
case "char":
clazz = char.class;
break;
case "byte":
clazz = byte.class;
break;
case "boolean":
clazz = boolean.class;
break;
case "short":
clazz = short.class;
break;
default:
clazz = String.class;
}
return clazz;
}
/**
* 获取属性值的方法
*
* @param obj 对象
* @param property key
* @return 属性的值
*/
public static Object getValue(Object obj, String property) {
try {
BeanMap beanMap = BeanMap.create(obj);
return beanMap.get(property);
} catch (RuntimeException e) {
logger.error("获取json-value异常异常key" + property + "异常信息为:" + e);
return null;
}
}
/**
* 获取属性值的方法
*
* @param jsonMap 原始日志
* @param property key
* @return 属性的值
*/
public static Object getValue(Map<String, Object> jsonMap, String property) {
try {
return jsonMap.getOrDefault(property, null);
} catch (RuntimeException e) {
logger.error("获取json-value异常异常key" + property + "异常信息为:" + e);
return null;
}
}
/**
* 更新属性值的方法
*
* @param jsonMap 原始日志json map
* @param property 更新的key
* @param value 更新的值
*/
public static void setValue(Map<String, Object> jsonMap, String property, Object value) {
try {
jsonMap.put(property, value);
} catch (RuntimeException e) {
logger.error("赋予实体类错误类型数据", e);
}
}
/**
* 更新属性值的方法
*
* @param obj 对象
* @param property 更新的key
* @param value 更新的值
*/
public static void setValue(Object obj, String property, Object value) {
try {
BeanMap beanMap = BeanMap.create(obj);
beanMap.put(property, value);
} catch (ClassCastException e) {
logger.error("赋予实体类错误类型数据", e);
}
}
/**
* 类型转换
*
* @param jsonMap 原始日志map
*/
public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException {
JsonParseUtil.dropJsonField(jsonMap);
HashMap<String, Object> tmpMap = new HashMap<>(192);
for (String key : jsonMap.keySet()) {
if (jsonFieldsMap.containsKey(key)) {
String simpleName = jsonFieldsMap.get(key).getSimpleName();
switch (simpleName) {
case "String":
tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
break;
case "Integer":
tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key)));
break;
case "long":
tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key)));
break;
case "List":
tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key)));
break;
case "Map":
tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key)));
break;
case "double":
tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key)));
break;
default:
tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
}
}
}
return tmpMap;
}
public static ArrayList<String[]> getJobList() {
return jobList;
}
/**
* 通过获取String类型的网关schema链接来获取map用于生成一个Object类型的对象
* <p>
* // * @param http 网关schema地址
*
* @return 用于反射生成schema类型的对象的一个map集合
*/
private static HashMap<String, Class> getMapFromHttp(String schema) {
HashMap<String, Class> map = new HashMap<>(16);
//获取fields并转化为数组数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(schema);
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
String filedStr = field.toString();
if (checkKeepField(filedStr)) {
String name = JsonPath.read(filedStr, "$.name").toString();
String type = JsonPath.read(filedStr, "$.type").toString();
if (type.contains("{")) {
type = JsonPath.read(filedStr, "$.type.type").toString();
}
//组合用来生成实体类的map
map.put(name, getClassName(type));
} else {
dropList.add(filedStr);
}
}
return map;
}
/**
* 判断字段是否需要保留
*
* @param message 单个field-json
* @return true or false
*/
private static boolean checkKeepField(String message) {
boolean isKeepField = true;
boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
if (isHiveDoc) {
boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
if (isHiveVi) {
String visibility = JsonPath.read(message, "$.doc.visibility").toString();
if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
isKeepField = false;
}
}
}
return isKeepField;
}
/**
* 删除schema内指定的无效字段jackson
*
* @param jsonMap
*/
public static void dropJsonField(Map<String, Object> jsonMap) {
for (String field : dropList) {
jsonMap.remove(field);
}
}
/**
* 解析schema解析之后返回一个任务列表 (useList toList funcList paramlist)
*
* @param schema 日志schema
* @return 任务列表
*/
private static ArrayList<String[]> getJobListFromHttp(String schema) {
ArrayList<String[]> list = new ArrayList<>();
//获取fields并转化为数组数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(schema);
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
if (JSON.parseObject(field.toString()).containsKey("doc")) {
Object doc = JSON.parseObject(field.toString()).get("doc");
if (JSON.parseObject(doc.toString()).containsKey("format")) {
String name = JSON.parseObject(field.toString()).get("name").toString();
Object format = JSON.parseObject(doc.toString()).get("format");
JSONObject formatObject = JSON.parseObject(format.toString());
String functions = formatObject.get("functions").toString();
String appendTo = null;
String params = null;
if (formatObject.containsKey("appendTo")) {
appendTo = formatObject.get("appendTo").toString();
}
if (formatObject.containsKey("param")) {
params = formatObject.get("param").toString();
}
if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) {
String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
for (int i = 0; i < functionArray.length; i++) {
list.add(new String[]{name, appendToArray[i], functionArray[i], null});
}
} else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) {
String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER);
for (int i = 0; i < functionArray.length; i++) {
list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]});
}
} else {
list.add(new String[]{name, name, functions, params});
}
}
}
}
return list;
}
/**
* 在配置变动时,清空缓存重新获取
*/
private static void clearCache() {
jobList.clear();
jsonFieldsMap.clear();
dropList.clear();
}
}

View File

@@ -0,0 +1,129 @@
package com.zdjizhi.utils.json;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.exception.FlowWriteException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author qidaijie
* @Package PACKAGE_NAME
* @Description:
* @date 2021/7/1217:34
*/
public class JsonTypeUtil {
/**
* 类型转换
*
* @param jsonMap 原始日志map
*/
/**
* String 类型检验转换方法
*
* @param value json value
* @return String value
*/
static String checkString(Object value) {
if (value == null) {
return null;
}
if (value instanceof Map) {
return JsonMapper.toJsonString(value);
}
if (value instanceof List) {
return JsonMapper.toJsonString(value);
}
return value.toString();
}
/**
* array 类型检验转换方法
*
* @param value json value
* @return List value
*/
static Map checkObject(Object value) {
if (value == null) {
return null;
}
if (value instanceof Map) {
return (Map) value;
}
throw new FlowWriteException("can not cast to map, value : " + value);
}
/**
* array 类型检验转换方法
*
* @param value json value
* @return List value
*/
static List checkArray(Object value) {
if (value == null) {
return null;
}
if (value instanceof List) {
return (List) value;
}
throw new FlowWriteException("can not cast to List, value : " + value);
}
/**
* long 类型检验转换方法,若为空返回基础值
*
* @param value json value
* @return Long value
*/
static long checkLongValue(Object value) {
Long longVal = TypeUtils.castToLong(value);
if (longVal == null) {
return 0L;
}
return longVal;
}
/**
* Double 类型校验转换方法
*
* @param value json value
* @return Double value
*/
static Double checkDouble(Object value) {
if (value == null) {
return null;
}
return TypeUtils.castToDouble(value);
}
/**
* int 类型检验转换方法,若为空返回基础值
*
* @param value json value
* @return int value
*/
static int getIntValue(Object value) {
Integer intVal = TypeUtils.castToInt(value);
if (intVal == null) {
return 0;
}
return intVal;
}
}

View File

@@ -0,0 +1,171 @@
package com.zdjizhi.utils.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.exception.FlowWriteException;
/**
* @author qidaijie
* @Package PACKAGE_NAME
* @Description:
* @date 2021/7/1218:20
*/
public class TypeUtils {
private static final Log logger = LogFactory.get();
/**
* Integer 类型判断方法
*
* @param value json value
* @return Integer value or null
*/
public static Object castToIfFunction(Object value) {
if (value == null) {
return null;
}
if (value instanceof String) {
return value.toString();
}
if (value instanceof Integer) {
return ((Number) value).intValue();
}
if (value instanceof Long) {
return ((Number) value).longValue();
}
// if (value instanceof Map) {
// return (Map) value;
// }
//
// if (value instanceof List) {
// return Collections.singletonList(value.toString());
// }
if (value instanceof Boolean) {
return (Boolean) value ? 1 : 0;
}
throw new FlowWriteException("can not cast to int, value : " + value);
}
/**
* Integer 类型判断方法
*
* @param value json value
* @return Integer value or null
*/
static Integer castToInt(Object value) {
if (value == null) {
return null;
}
if (value instanceof Integer) {
return (Integer) value;
}
if (value instanceof Number) {
return ((Number) value).intValue();
}
if (value instanceof String) {
String strVal = (String) value;
if (StringUtil.isBlank(strVal)) {
return null;
}
//将 10,20 类数据转换为10
if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
}
try {
return Integer.parseInt(strVal);
} catch (NumberFormatException ex) {
logger.error("String change Integer Error,The error Str is:" + strVal);
}
}
if (value instanceof Boolean) {
return (Boolean) value ? 1 : 0;
}
throw new FlowWriteException("can not cast to int, value : " + value);
}
/**
* Double类型判断方法
*
* @param value json value
* @return double value or null
*/
static Double castToDouble(Object value) {
if (value instanceof Number) {
return ((Number) value).doubleValue();
}
if (value instanceof String) {
String strVal = (String) value;
if (StringUtil.isBlank(strVal)) {
return null;
}
//将 10,20 类数据转换为10
if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
}
try {
return Double.parseDouble(strVal);
} catch (NumberFormatException ex) {
logger.error("String change Double Error,The error Str is:" + strVal);
}
}
throw new FlowWriteException("can not cast to double, value : " + value);
}
/**
* Long类型判断方法
*
* @param value json value
* @return (Long)value or null
*/
static Long castToLong(Object value) {
if (value == null) {
return null;
}
if (value instanceof Number) {
return ((Number) value).longValue();
}
if (value instanceof String) {
String strVal = (String) value;
if (StringUtil.isBlank(strVal)) {
return null;
}
//将 10,20 类数据转换为10
if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
}
try {
return Long.parseLong(strVal);
} catch (NumberFormatException ex) {
logger.error("String change Long Error,The error Str is:" + strVal);
}
}
throw new FlowWriteException("can not cast to long, value : " + value);
}
}

View File

@@ -0,0 +1,48 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/9/610:37
*/
class CertUtils {
/**
* Kafka SASL认证端口
*/
private static final String SASL_PORT = "9094";
/**
* Kafka SSL认证端口
*/
private static final String SSL_PORT = "9095";
/**
* 根据连接信息端口判断认证方式。
*
* @param servers kafka 连接信息
* @param properties kafka 连接配置信息
*/
static void chooseCert(String servers, Properties properties) {
if (servers.contains(SASL_PORT)) {
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ FlowWriteConfig.KAFKA_SASL_JAAS_USER + " password=" + FlowWriteConfig.KAFKA_SASL_JAAS_PIN + ";");
} else if (servers.contains(SSL_PORT)) {
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
properties.put("ssl.key.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
}
}
}

View File

@@ -0,0 +1,74 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import java.util.Map;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/6/813:54
*/
public class KafkaConsumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", FlowWriteConfig.GROUP_ID);
properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("partition.discovery.interval.ms", "10000");
CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties);
return properties;
}
/**
* 用户序列化kafka数据增加 kafka Timestamp内容。
*
* @return kafka logs -> map
*/
@SuppressWarnings("unchecked")
public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer() {
FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new TimestampDeserializationSchema(), createConsumerConfig());
//随着checkpoint提交将offset提交到kafka
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
//从消费组当前的offset开始消费
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
/**
* 官方序列化kafka数据
*
* @return kafka logs
*/
public static FlinkKafkaConsumer<String> flinkConsumer() {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
//随着checkpoint提交将offset提交到kafka
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
//从消费组当前的offset开始消费
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
}

View File

@@ -0,0 +1,82 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Optional;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/6/814:04
*/
public class KafkaProducer {
private static Properties createPercentProducerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.PERCENT_SINK_KAFKA_SERVERS);
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
properties.put("retries", FlowWriteConfig.RETRIES);
properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
CertUtils.chooseCert(FlowWriteConfig.PERCENT_SINK_KAFKA_SERVERS, properties);
return properties;
}
private static Properties createTrafficFileMetaProducerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.FILE_DATA_SINK_KAFKA_SERVERS);
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
properties.put("retries", FlowWriteConfig.RETRIES);
properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
CertUtils.chooseCert(FlowWriteConfig.FILE_DATA_SINK_KAFKA_SERVERS, properties);
return properties;
}
public static FlinkKafkaProducer<String> getPercentKafkaProducer() {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
FlowWriteConfig.PERCENT_KAFKA_TOPIC,
new SimpleStringSchema(),
createPercentProducerConfig(), Optional.empty());
kafkaProducer.setLogFailuresOnly(false);
// kafkaProducer.setWriteTimestampToKafka(true);
return kafkaProducer;
}
public static FlinkKafkaProducer<String> getTrafficFileMetaKafkaProducer() {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
FlowWriteConfig.FILE_DATA_SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
createTrafficFileMetaProducerConfig(), Optional.empty());
kafkaProducer.setLogFailuresOnly(false);
// kafkaProducer.setWriteTimestampToKafka(true);
return kafkaProducer;
}
}

View File

@@ -0,0 +1,48 @@
package com.zdjizhi.utils.kafka;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.JsonMapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2022/3/89:42
*/
public class TimestampDeserializationSchema implements KafkaDeserializationSchema {
private static final Log logger = LogFactory.get();
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(Map.class);
}
@Override
public boolean isEndOfStream(Object nextElement) {
return false;
}
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> deserialize(ConsumerRecord record) throws Exception {
if (record != null) {
try {
long timestamp = record.timestamp() / 1000;
String value = new String((byte[]) record.value(), FlowWriteConfig.ENCODING);
Map<String, Object> json = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
json.put("common_ingestion_time", timestamp);
return json;
} catch (RuntimeException e) {
logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
}
}
return null;
}
}

View File

@@ -0,0 +1,64 @@
package com.zdjizhi.utils.ordinary;
import org.apache.log4j.Logger;
import java.security.MessageDigest;
/**
* 描述:转换MD5工具类
*
* @author Administrator
* @create 2018-08-13 15:11
*/
public class MD5Utils {
private static Logger logger = Logger.getLogger(MD5Utils.class);
public static String md5Encode(String msg) throws Exception {
try {
byte[] msgBytes = msg.getBytes("utf-8");
/*
* 声明使用Md5算法,获得MessaDigest对象
*/
MessageDigest md5 = MessageDigest.getInstance("MD5");
/*
* 使用指定的字节更新摘要
*/
md5.update(msgBytes);
/*
* 完成哈希计算,获得密文
*/
byte[] digest = md5.digest();
/*
* 以上两行代码等同于
* byte[] digest = md5.digest(msgBytes);
*/
return byteArr2hexString(digest);
} catch (Exception e) {
logger.error("Error in conversion MD5! " + msg);
// e.printStackTrace();
return "";
}
}
/**
* 将byte数组转化为16进制字符串形式
*
* @param bys 字节数组
* @return 字符串
*/
public static String byteArr2hexString(byte[] bys) {
StringBuffer hexVal = new StringBuffer();
int val = 0;
for (byte by : bys) {
//将byte转化为int 如果byte是一个负数就必须要和16进制的0xff做一次与运算
val = ((int) by) & 0xff;
if (val < 16) {
hexVal.append("0");
}
hexVal.append(Integer.toHexString(val));
}
return hexVal.toString();
}
}

View File

@@ -0,0 +1,84 @@
package com.zdjizhi.utils.system;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
/**
* @author Administrator
*/
public final class FlowWriteConfigurations {
private static Properties propKafka = new Properties();
private static Properties propService = new Properties();
private static Properties propfiletype = new Properties();
private static Map<String, String> fileTypeMap;
public static boolean judgeFileType(String filetype){
return fileTypeMap.containsKey(filetype);
}
public static String getStringProperty(Integer type, String key) {
if (type == 0) {
return propService.getProperty(key);
} else if (type == 1) {
return propKafka.getProperty(key);
} else {
return null;
}
}
public static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
return Integer.parseInt(propService.getProperty(key));
} else if (type == 1) {
return Integer.parseInt(propKafka.getProperty(key));
} else {
return null;
}
}
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
return Long.parseLong(propService.getProperty(key));
} else if (type == 1) {
return Long.parseLong(propKafka.getProperty(key));
} else {
return null;
}
}
public static Boolean getBooleanProperty(Integer type, String key) {
if (type == 0) {
return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else if (type == 1) {
return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else {
return null;
}
}
static {
try {
propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
propfiletype.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("file_type.properties"));
fileTypeMap = new HashMap<String, String>((Map) propfiletype);
} catch (IOException | RuntimeException e) {
propKafka = null;
propService = null;
}
}
}

View File

@@ -0,0 +1,190 @@
package com.zdjizhi.utils.zookeeper;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author qidaijie
*/
public class DistributedLock implements Lock, Watcher {
private static final Log logger = LogFactory.get();
private ZooKeeper zk = null;
/**
* 根节点
*/
private final String ROOT_LOCK = "/locks";
/**
* 竞争的资源
*/
private String lockName;
/**
* 等待的前一个锁
*/
private String waitLock;
/**
* 当前锁
*/
private String currentLock;
/**
* 计数器
*/
private CountDownLatch countDownLatch;
private int sessionTimeout = 2000;
private List<Exception> exceptionList = new ArrayList<Exception>();
/**
* 配置分布式锁
*
* @param config 连接的url
* @param lockName 竞争资源
*/
public DistributedLock(String config, String lockName) {
this.lockName = lockName;
try {
// 连接zookeeper
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(ROOT_LOCK, false);
if (stat == null) {
// 如果根节点不存在,则创建根节点
zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException | InterruptedException | KeeperException e) {
logger.error("Node already exists!");
}
}
// 节点监视器
@Override
public void process(WatchedEvent event) {
if (this.countDownLatch != null) {
this.countDownLatch.countDown();
}
}
@Override
public void lock() {
if (exceptionList.size() > 0) {
throw new LockException(exceptionList.get(0));
}
try {
if (this.tryLock()) {
logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁");
} else {
// 等待锁
waitForLock(waitLock, sessionTimeout);
}
} catch (InterruptedException | KeeperException e) {
logger.error("获取锁异常" + e);
}
}
@Override
public boolean tryLock() {
try {
String splitStr = "_lock_";
if (lockName.contains(splitStr)) {
throw new LockException("锁名有误");
}
// 创建临时有序节点
currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 取所有子节点
List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
// 取出所有lockName的锁
List<String> lockObjects = new ArrayList<String>();
for (String node : subNodes) {
String tmpNode = node.split(splitStr)[0];
if (tmpNode.equals(lockName)) {
lockObjects.add(node);
}
}
Collections.sort(lockObjects);
// 若当前节点为最小节点,则获取锁成功
if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
return true;
}
// 若不是最小节点,则找到自己的前一个节点
String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
} catch (InterruptedException | KeeperException e) {
logger.error("获取锁过程异常" + e);
}
return false;
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) {
try {
if (this.tryLock()) {
return true;
}
return waitForLock(waitLock, timeout);
} catch (KeeperException | InterruptedException | RuntimeException e) {
logger.error("判断是否锁定异常" + e);
}
return false;
}
// 等待锁
private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
if (stat != null) {
this.countDownLatch = new CountDownLatch(1);
// 计数等待若等到前一个节点消失则precess中进行countDown停止等待获取锁
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
}
return true;
}
@Override
public void unlock() {
try {
zk.delete(currentLock, -1);
currentLock = null;
zk.close();
} catch (InterruptedException | KeeperException e) {
logger.error("关闭锁异常" + e);
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void lockInterruptibly() throws InterruptedException {
this.lock();
}
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e) {
super(e);
}
public LockException(Exception e) {
super(e);
}
}
}

View File

@@ -0,0 +1,140 @@
package com.zdjizhi.utils.zookeeper;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author qidaijie
* @Package cn.ac.iie.utils.zookeeper
* @Description:
* @date 2020/11/1411:28
*/
public class ZookeeperUtils implements Watcher {
private static final Log logger = LogFactory.get();
private static final int ID_MAX = 255;
private ZooKeeper zookeeper;
private static final int SESSION_TIME_OUT = 20000;
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
/**
* 修改节点信息
*
* @param path 节点路径
*/
public int modifyNode(String path, String zookeeperIp) {
createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
int workerId = 0;
try {
connectZookeeper(zookeeperIp);
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > ID_MAX) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {
String result = String.valueOf(workerId + 1);
if (stat != null) {
zookeeper.setData(path, result.getBytes(), stat.getVersion());
} else {
logger.error("Node does not exist!,Can't modify");
}
}
} catch (KeeperException | InterruptedException e) {
logger.error("modify error Can't modify," + e);
} finally {
closeConn();
}
logger.warn("workerID is" + workerId);
return workerId;
}
/**
* 连接zookeeper
*
* @param host 地址
*/
public void connectZookeeper(String host) {
try {
zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
logger.error("Connection to the Zookeeper Exception! message:" + e);
}
}
/**
* 关闭连接
*/
public void closeConn() {
try {
if (zookeeper != null) {
zookeeper.close();
}
} catch (InterruptedException e) {
logger.error("Close the Zookeeper connection Exception! message:" + e);
}
}
/**
* 获取节点内容
*
* @param path 节点路径
* @return 内容/异常null
*/
public String getNodeDate(String path) {
String result = null;
Stat stat = new Stat();
try {
byte[] resByte = zookeeper.getData(path, true, stat);
result = StrUtil.str(resByte, "UTF-8");
} catch (KeeperException | InterruptedException e) {
logger.error("Get node information exception" + e);
}
return result;
}
/**
* @param path 节点创建的路径
* @param date 节点所存储的数据的byte[]
* @param acls 控制权限策略
*/
public void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) {
try {
connectZookeeper(zookeeperIp);
Stat exists = zookeeper.exists(path, true);
if (exists == null) {
Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true);
if (existsSnowflakeld == null) {
zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT);
}
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
} else {
logger.warn("Node already exists ! Don't need to create");
}
} catch (KeeperException | InterruptedException e) {
logger.error(e);
} finally {
closeConn();
}
}
}

25
src/main/log4j.properties Normal file
View File

@@ -0,0 +1,25 @@
#Log4j
log4j.rootLogger=info,console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=info
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=info
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
log4j.appender.file.file=${nis.root}/log/galaxy-name.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置com.nis.web.dao是mybatis接口所在包
log4j.logger.com.nis.web.dao=debug
#bonecp数据源配置
log4j.category.com.jolbox=debug,console

42
src/main/logback.xml Normal file
View File

@@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 格式化输出:%date表示日期%thread表示线程名%-5level级别从左显示5个字符宽度 %msg日志消息%n是换行符-->
<property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
<!-- 定义日志存储的路径,不要配置相对路径 -->
<property name="LOG_FILE_PATH" value="E:/logs/demo.%d{yyyy-MM-dd}.%i.log" />
<!-- 控制台输出日志 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!-- 按照上面配置的LOG_PATTERN来打印日志 -->
<pattern>${LOG_PATTERN}</pattern>
</encoder>
</appender>
<!--每天生成一个日志文件保存30天的日志文件。rollingFile是用来切分文件的 -->
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_FILE_PATH}</fileNamePattern>
<!-- keep 15 days' worth of history -->
<maxHistory>30</maxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<!-- 日志文件的最大大小 -->
<maxFileSize>20MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<encoder>
<pattern>${LOG_PATTERN}</pattern>
</encoder>
</appender>
<!-- project default level项目输出的日志级别 -->
<logger name="com.example.demo" level="DEBUG" />
<!-- 日志输出级别 常用的日志级别按照从高到低依次为ERROR、WARN、INFO、DEBUG。 -->
<root level="INFO">
<appender-ref ref="CONSOLE" />
<appender-ref ref="FILE" /><!--对应appender name="FILE"。 -->
</root>
</configuration>