radius下线的功能
This commit is contained in:
6
pom.xml
6
pom.xml
@@ -6,7 +6,11 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>radius-relation</artifactId>
|
||||
<<<<<<< HEAD
|
||||
<version>22-04-01</version>
|
||||
=======
|
||||
<version>22-03-09</version>
|
||||
>>>>>>> c8041564036f715f1a9eacc8bf6deb513542b2c6
|
||||
|
||||
<name>radius-relation</name>
|
||||
<url>http://www.example.com</url>
|
||||
@@ -251,7 +255,7 @@
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<finalName>radius-relation-21-12-06</finalName>
|
||||
<finalName>radius-relation-22-04-01</finalName>
|
||||
<transformers combine.children="append">
|
||||
<!-- The service transformer is needed to merge META-INF/services files -->
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
|
||||
|
||||
@@ -2,11 +2,11 @@
|
||||
|
||||
#管理kafka地址
|
||||
#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
input.kafka.servers=192.168.44.85:9094
|
||||
input.kafka.servers=192.168.44.12:9094
|
||||
|
||||
#hbase zookeeper地址 用于连接HBase
|
||||
#hbase.zookeeper.servers=192.168.44.12
|
||||
hbase.zookeeper.servers=192.168.44.85:2181
|
||||
hbase.zookeeper.servers=192.168.44.12:2181
|
||||
|
||||
#--------------------------------Kafka消费组信息------------------------------#
|
||||
|
||||
|
||||
@@ -4,9 +4,10 @@ public class RadiusMassage {
|
||||
|
||||
private String radius_framed_ip;
|
||||
private String radius_account;
|
||||
private Long radius_event_timestamp;
|
||||
private long radius_event_timestamp;
|
||||
private int radius_acct_status_type;
|
||||
private int radius_packet_type;
|
||||
private long common_end_time;
|
||||
|
||||
public int getRadius_acct_status_type() {
|
||||
return radius_acct_status_type;
|
||||
@@ -40,13 +41,19 @@ public class RadiusMassage {
|
||||
this.radius_account = radius_account;
|
||||
}
|
||||
|
||||
public Long getRadius_event_timestamp() {
|
||||
public long getRadius_event_timestamp() {
|
||||
return radius_event_timestamp;
|
||||
}
|
||||
|
||||
public void setRadius_event_timestamp(Long radius_event_timestamp) {
|
||||
public void setRadius_event_timestamp(long radius_event_timestamp) {
|
||||
this.radius_event_timestamp = radius_event_timestamp;
|
||||
}
|
||||
|
||||
|
||||
public long getCommon_end_time() {
|
||||
return common_end_time;
|
||||
}
|
||||
|
||||
public void setCommon_end_time(long common_end_time) {
|
||||
this.common_end_time = common_end_time;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ public class RadiusRelation {
|
||||
|
||||
accountWithFrameip.addSink(new HbaseSinkAccount(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS));
|
||||
try {
|
||||
environment.execute("RADIUS-RELATIONSHIP-HBASE-V2-t");
|
||||
environment.execute("RADIUS-RELATIONSHIP-HBASE-V2");
|
||||
} catch (Exception e) {
|
||||
logger.error("This Flink task start ERROR! Exception information is :" + e);
|
||||
}
|
||||
|
||||
@@ -33,12 +33,15 @@ public class ParseFunction implements MapFunction<String, Tuple6<String,String,S
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
radiusMassage = JSON.parseObject(message, RadiusMassage.class);
|
||||
|
||||
if(radiusMassage.getRadius_framed_ip()!=null && radiusMassage.getRadius_account()!=null && radiusMassage.getRadius_event_timestamp()!=null){
|
||||
if(radiusMassage.getRadius_framed_ip()!=null && radiusMassage.getRadius_account()!=null ){
|
||||
|
||||
if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == radiusMassage.getRadius_packet_type()){
|
||||
String framedIp=radiusMassage.getRadius_framed_ip();
|
||||
String account=radiusMassage.getRadius_account();
|
||||
Long event_time = radiusMassage.getRadius_event_timestamp();
|
||||
long event_time = radiusMassage.getRadius_event_timestamp();
|
||||
if(event_time==0){
|
||||
event_time=radiusMassage.getCommon_end_time();
|
||||
}
|
||||
int status =radiusMassage.getRadius_acct_status_type();
|
||||
int onff_status = 1;
|
||||
if (status == 2) {
|
||||
|
||||
@@ -93,7 +93,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
|
||||
if (AccountWithIpMap.containsKey(key)) {
|
||||
Radius radius = AccountWithIpMap.get(key);
|
||||
|
||||
if (radius.getLast_update_time() < event_time) {
|
||||
if (radius.getLast_update_time() <= event_time) {
|
||||
if(acct_status_type==radius.getAcct_status_type()) {
|
||||
|
||||
if (!radius.getFramed_ip().equals(framedIp)) {
|
||||
|
||||
@@ -94,7 +94,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
|
||||
if (IpWithAccountMap.containsKey(key)) {
|
||||
Radius radius = IpWithAccountMap.get(key);
|
||||
|
||||
if (radius.getLast_update_time() < event_time) {
|
||||
if (radius.getLast_update_time() <= event_time) {
|
||||
|
||||
if(acct_status_type==radius.getAcct_status_type()) {
|
||||
|
||||
@@ -164,6 +164,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
|
||||
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time));
|
||||
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
|
||||
table.put(put);
|
||||
|
||||
IpWithAccountMap.put(key, radius);
|
||||
} catch (Exception e) {
|
||||
log.error(e.toString());
|
||||
|
||||
Reference in New Issue
Block a user