diff --git a/pom.xml b/pom.xml
index 1b8d17f..43aad85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,10 +5,10 @@
4.0.0
com.zdjizhi
- radius-relationship-hbase
- 210908-security
+ radius-relation
+ 21-12-06
- radius-relationship-hbase
+ radius-relation
http://www.example.com
@@ -38,78 +38,10 @@
2.7.1
1.0.0
2.2.3
-
- compile
+ provided
+
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 2.4.2
-
-
- package
-
- shade
-
-
-
-
- com.zdjizhi.topology.RadiusRelationshipTopology
-
-
-
-
-
-
-
-
- io.github.zlika
- reproducible-build-maven-plugin
- 0.2
-
-
-
- strip-jar
-
- package
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 2.3.2
-
- 1.8
- 1.8
-
-
-
-
-
- properties
-
- **/*.properties
- **/*.xml
-
- false
-
-
-
- src\main\java
-
- log4j.properties
-
- false
-
-
-
@@ -135,15 +67,6 @@
1.2.70
-
-
-
-
-
-
-
-
-
org.apache.flink
@@ -174,7 +97,6 @@
org.apache.flink
flink-connector-kafka_2.12
${flink.version}
- ${scope.type}
@@ -199,6 +121,22 @@
log4j-over-slf4j
org.slf4j
+
+ commons-io
+ commons-io
+
+
+ commons-lang3
+ org.apache.commons
+
+
+ netty
+ io.netty
+
+
+ netty-all
+ io.netty
+
@@ -283,5 +221,67 @@
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.0
+
+ 1.8
+ 1.8
+
+ false
+
+
+ -Xpkginfo:always
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade
+ package
+
+ shade
+
+
+ radius-relation-21-12-06
+
+
+
+
+ com.zdjizhi.topology.RadiusRelation
+
+
+
+
+
+
+
+
+
+ properties
+
+ **/*.properties
+ **/*.xml
+
+ false
+
+
+
+ src\main\java
+
+ log4j.properties
+
+ false
+
+
+
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 62215cf..15aca81 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,10 +1,12 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+input.kafka.servers=192.168.44.85:9094
#hbase zookeeper地址 用于连接HBase
-hbase.zookeeper.servers=192.168.44.12:2181
+#hbase.zookeeper.servers=192.168.44.12
+hbase.zookeeper.servers=192.168.44.85:2181
#--------------------------------Kafka消费组信息------------------------------#
@@ -12,17 +14,13 @@ hbase.zookeeper.servers=192.168.44.12:2181
input.kafka.topic=RADIUS-RECORD
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=radius-flink-202110270887999888997874
+group.id=radius-flink-202112068
#--------------------------------topology配置------------------------------#
-
-#hbase 更新时间,如填写0则不更新缓存
-hbase.tick.tuple.freq.secs=60
-
-#hbase table name
-hbase.table.name=sub:subscriber_info
+#ip-account对应关系表
+hbase.framedip.table.name=tsg_galaxy:relation_framedip_account
#定位库地址
tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
-
+#account-ip对应关系表
hbase.account.table.name=tsg_galaxy:relation_account_framedip
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
index 48b6eb8..265d9d6 100644
--- a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
+++ b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
@@ -6,11 +6,17 @@ import com.zdjizhi.utils.system.RadiusRelationshipConfigurations;
/**
* @author Administrator
*/
-public class RadiusRelationshipConfig {
+public class RadiusRelationshipConfig {
+
+
/**
* 4- Accounting-Request(账户授权)
*/
+
+
public static final int ACCOUNTING_REQUEST = 4;
+
+
/**
* radius_packet_type
*/
@@ -19,6 +25,9 @@ public class RadiusRelationshipConfig {
* 1、开始计费
*/
public static final int START_BILLING = 1;
+
+ public static final int UPDATE_BILLING = 3;
+
/**
* radius_acct_status_type
*/
@@ -27,8 +36,7 @@ public class RadiusRelationshipConfig {
/**
* System
*/
- public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
- public static final String HBASE_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.table.name");
+ public static final String HBASE_FRAMEDIP_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.framedip.table.name");
public static final String HBASE_ACCOUNT_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.account.table.name");
@@ -37,6 +45,9 @@ public class RadiusRelationshipConfig {
*/
public static final String INPUT_KAFKA_SERVERS = RadiusRelationshipConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String HBASE_ZOOKEEPER_SERVERS = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+ //public static final String HBASE_ZOOKEEPER_PORT = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.zookeeper.port");
+
+
public static final String GROUP_ID = RadiusRelationshipConfigurations.getStringProperty(0, "group.id");
public static final String INPUT_KAFKA_TOPIC = RadiusRelationshipConfigurations.getStringProperty(0, "input.kafka.topic");
diff --git a/src/main/java/com/zdjizhi/topology/RadiusRelation.java b/src/main/java/com/zdjizhi/topology/RadiusRelation.java
new file mode 100644
index 0000000..352e920
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/RadiusRelation.java
@@ -0,0 +1,52 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.RadiusRelationshipConfig;
+import com.zdjizhi.utils.functions.FilterNullFunction;
+import com.zdjizhi.utils.functions.ParseFunction;
+import com.zdjizhi.utils.hbasepackage.HbaseSinkAccount;
+import com.zdjizhi.utils.hbasepackage.HbaseSinkFramedip;
+import com.zdjizhi.utils.kafka.Consumer;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2021/5/2016:42
+ */
+public class RadiusRelation {
+ private static final Log logger = LogFactory.get();
+
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource streamSource = environment.addSource(Consumer.getKafkaConsumer());
+
+ DataStream> getObject = streamSource.map(new ParseFunction()).name("ParseJson");
+
+ DataStream> filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData");
+
+ KeyedStream, String> FrameipWithaccount = filterOriginalData.keyBy(value -> value.f0);
+
+ KeyedStream, String> accountWithFrameip = filterOriginalData.keyBy(value -> value.f1);
+
+ FrameipWithaccount.addSink(new HbaseSinkFramedip(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS));
+
+ accountWithFrameip.addSink(new HbaseSinkAccount(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS));
+ try {
+ environment.execute("RADIUS-RELATIONSHIP-HBASE-V2-t");
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :" + e);
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java b/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java
deleted file mode 100644
index b69e6fa..0000000
--- a/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.zdjizhi.topology;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.utils.functions.*;
-
-import com.zdjizhi.utils.kafka.Consumer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.topology
- * @Description:
- * @date 2021/5/2016:42
- */
-public class RadiusRelationshipTopology {
- private static final Log logger = LogFactory.get();
-
-
- public static void main(String[] args) {
- final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
-
-// environment.enableCheckpointing(5000);
-
- DataStreamSource streamSource = environment.addSource(Consumer.getKafkaConsumer());
-
- DataStream filterOriginalData = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData");
-
- DataStream> getObject = filterOriginalData.map(new ParseFunction()).name("ParseJson");
-
- DataStream> getRadiusAccount = getObject.map(new GetAccountMapFunction()).name("GetRadiusAccount");
-
- KeyedStream, String> tuple2StringKeyedStream = getRadiusAccount.keyBy(value -> value.f0);
-
- KeyedStream, String> accountWithFrameip = getObject.keyBy(value -> value.f1);
-
- tuple2StringKeyedStream.process(new TimerFunction()).name("UpdateHBase");
-
- accountWithFrameip.process(new TimerFunctionAccountWithFramedIp()).name("UpdateAccountHBase");
-
- try {
- environment.execute("RADIUS-RELATIONSHIP-HBASE");
- } catch (Exception e) {
- logger.error("This Flink task start ERROR! Exception information is :" + e);
- }
-
- }
-
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
deleted file mode 100644
index 27e433b..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.common.RadiusRelationshipConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class FilterNullFunction implements FilterFunction {
- private static final Log logger = LogFactory.get();
-
- @Override
- public boolean filter(String message) {
- boolean isFilter = false;
- try {
- if (StringUtil.isNotBlank(message)) {
- JSONObject jsonObject = JSONObject.parseObject(message);
- if (jsonObject.containsKey(RadiusRelationshipConfig.PACKET_TYPE) && jsonObject.containsKey(RadiusRelationshipConfig.STATUS_TYPE)) {
- if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(RadiusRelationshipConfig.PACKET_TYPE)
- && RadiusRelationshipConfig.START_BILLING == jsonObject.getInteger(RadiusRelationshipConfig.STATUS_TYPE)) {
- isFilter = true;
- }
- }
- }
- } catch (JSONException jse) {
- logger.error("数据转换JSON格式异常,原始日志为:" + message);
- } catch (RuntimeException re) {
- logger.error("Radius日志条件过滤异常,异常信息为:" + re);
- }
- return isFilter;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java
deleted file mode 100644
index e0ff4d2..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import static com.zdjizhi.utils.hbase.HBaseUtils.dataValidation;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class GetAccountMapFunction implements MapFunction, Tuple2> {
- private static final Log logger = LogFactory.get();
-
-
- @Override
- public Tuple2 map(Tuple2 stringStringTuple2) throws Exception {
- try {
- String framedIp = stringStringTuple2.f0;
- String account = stringStringTuple2.f1;
- boolean validation = dataValidation(framedIp, account);
- if (validation) {
- return Tuple2.of(framedIp, account);
- } else {
- return Tuple2.of("", "");
- }
- } catch (RuntimeException e) {
- logger.error("解析Radius数据获取用户信息异常,异常信息:" + e);
- }
- return Tuple2.of("", ""); }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
index 78d1a1c..cf9515f 100644
--- a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
@@ -2,9 +2,16 @@ package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONException;
+import com.zdjizhi.common.RadiusRelationshipConfig;
+import com.zdjizhi.pojo.RadiusMassage;
+import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+
+import static cn.hutool.crypto.SecureUtil.md5;
+
/**
* @author qidaijie
@@ -13,22 +20,43 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @date 2021/5/2715:01
*/
-public class ParseFunction implements MapFunction> {
+public class ParseFunction implements MapFunction> {
private static final Log logger = LogFactory.get();
@Override
- public Tuple2 map(String logs) {
- try {
- JSONObject jsonObject = JSONObject.parseObject(logs);
- String framedIp = jsonObject.getString("radius_framed_ip");
- String account = jsonObject.getString("radius_account");
+ public Tuple6 map(String message) {
- return Tuple2.of(framedIp, account);
- } catch (RuntimeException e) {
- logger.error("解析Radius数据获取用户信息异常,异常信息:" + e);
- }
- return Tuple2.of("", "");
+ RadiusMassage radiusMassage = new RadiusMassage();
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ radiusMassage = JSON.parseObject(message, RadiusMassage.class);
+
+ if(radiusMassage.getRadius_framed_ip()!=null && radiusMassage.getRadius_account()!=null && radiusMassage.getRadius_event_timestamp()!=null){
+
+ if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == radiusMassage.getRadius_packet_type()){
+ String framedIp=radiusMassage.getRadius_framed_ip();
+ String account=radiusMassage.getRadius_account();
+ Long event_time = radiusMassage.getRadius_event_timestamp();
+ int status =radiusMassage.getRadius_acct_status_type();
+ int onff_status = 1;
+ if (status == 2) {
+ onff_status = 2;
+ }
+ String key_framedIp = md5(framedIp);
+ String key_account = md5(account);
+ return Tuple6.of(key_framedIp, key_account, framedIp, account, event_time, onff_status);
+
+ }
+ }
+ }
+ } catch (JSONException jse) {
+ logger.error("数据转换JSON格式异常,原始日志为:" + message);
+ } catch (RuntimeException re) {
+ logger.error("Radius日志条件过滤异常,异常信息为:" + re);
+ }
+
+ return Tuple6.of("","","","",0L,0);
}
}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java b/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java
deleted file mode 100644
index 8a5d6b6..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-
-import com.zdjizhi.common.RadiusRelationshipConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.hbase.client.Put;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static com.zdjizhi.utils.hbase.HBaseUtils.insertData;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/6/2316:59
- */
-public class TimerFunction extends KeyedProcessFunction, Object> {
- private static final Logger logger = LoggerFactory.getLogger(TimerFunction.class);
-
- private static List putList = new ArrayList<>();
- private static boolean first = true;
-
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector