增加缓存过期机制解决问题TSG-17358

This commit is contained in:
wangkuan
2023-11-16 11:54:48 +08:00
parent b3858f2e30
commit bde8e37bec
4 changed files with 80 additions and 131 deletions

View File

@@ -27,3 +27,6 @@ tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
hbase.account.table.name=tsg_galaxy:relation_account_framedip hbase.account.table.name=tsg_galaxy:relation_account_framedip
hbase.scan.limit=100000 hbase.scan.limit=100000
cache.expire.seconds=86400
cache.max.size=10000000

View File

@@ -61,6 +61,10 @@ public class RadiusRelationshipConfig {
public static final int HBASE_SCAN_LIMIT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.scan.limit"); public static final int HBASE_SCAN_LIMIT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.scan.limit");
public static final int HBASE_RPC_TIMEOUT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.rpc.timeout"); public static final int HBASE_RPC_TIMEOUT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.rpc.timeout");
public static final int CACHE_EXPIRE_SECONDS = RadiusRelationshipConfigurations.getIntProperty(0, "cache.expire.seconds");
public static final int CACHE_MAX_SIZE = RadiusRelationshipConfigurations.getIntProperty(0, "cache.max.size");

View File

@@ -1,5 +1,6 @@
package com.zdjizhi.utils.hbasepackage; package com.zdjizhi.utils.hbasepackage;
import com.google.common.cache.CacheBuilder;
import com.zdjizhi.common.RadiusRelationshipConfig; import com.zdjizhi.common.RadiusRelationshipConfig;
import com.zdjizhi.pojo.Radius; import com.zdjizhi.pojo.Radius;
import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.java.tuple.Tuple7;
@@ -15,20 +16,22 @@ import org.apache.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map; import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> { public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> {
private Logger log; private Logger log;
private String hbase_zookeeper_host; private String hbase_zookeeper_host;
public Map<String, Radius> AccountWithIpMap = new ConcurrentHashMap<>(80000); // public Map<String, Radius> AccountWithIpMap = new ConcurrentHashMap<>(80000);
public com.google.common.cache.Cache<String, Radius> AccountWithIpMap ;
private Connection connection; private Connection connection;
private Admin admin; private Admin admin;
public HbaseSinkAccount(String hbase_zookeeper_host) { public HbaseSinkAccount(String hbase_zookeeper_host) {
this.hbase_zookeeper_host = hbase_zookeeper_host; this.hbase_zookeeper_host = hbase_zookeeper_host;
AccountWithIpMap = CacheBuilder.newBuilder().expireAfterWrite(RadiusRelationshipConfig.CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).initialCapacity(100000).maximumSize(RadiusRelationshipConfig.CACHE_MAX_SIZE).build();
} }
@Override @Override
@@ -98,8 +101,8 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,Stri
int acct_status_type = value.f5; int acct_status_type = value.f5;
int vsys_id = value.f6; int vsys_id = value.f6;
if (AccountWithIpMap.containsKey(key)) { Radius radius = AccountWithIpMap.getIfPresent(key);
Radius radius = AccountWithIpMap.get(key); if (radius!=null) {
if (radius.getLast_update_time() <= event_time) { if (radius.getLast_update_time() <= event_time) {
if(acct_status_type==radius.getAcct_status_type()) { if(acct_status_type==radius.getAcct_status_type()) {
@@ -109,84 +112,29 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,Stri
radius.setFramed_ip(framedIp); radius.setFramed_ip(framedIp);
radius.setAcct_status_type(acct_status_type); radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id); radius.setVsys_id(vsys_id);
Table table = null; radius.setAccount(account);
try { updateRelationMessage(key,radius);
table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
Put put = new Put(key.getBytes());
put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
table.put(put);
// IpWithAccountMap.put(radius.getFramed_ip(),radius);
AccountWithIpMap.put(key, radius);
} catch (Exception e) {
log.error(e.toString());
} finally {
table.close();
}
} }
} }
else{ else{
radius.setLast_update_time(event_time); radius.setLast_update_time(event_time);
radius.setFramed_ip(framedIp); radius.setFramed_ip(framedIp);
radius.setAcct_status_type(acct_status_type); radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id); radius.setVsys_id(vsys_id);
radius.setAccount(account);
Table table = null; updateRelationMessage(key,radius);
try {
table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
Put put = new Put(key.getBytes());
put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
table.put(put);
// IpWithAccountMap.put(radius.getFramed_ip(),radius);
AccountWithIpMap.put(key, radius);
} catch (Exception e) {
log.error(e.toString());
} finally {
table.close();
}
} }
} }
} else { } else {
Radius radius = new Radius(); radius = new Radius();
radius.setFramed_ip(framedIp); radius.setFramed_ip(framedIp);
radius.setLast_update_time(event_time); radius.setLast_update_time(event_time);
radius.setAccount(account); radius.setAccount(account);
radius.setFirst_found_time(event_time); radius.setFirst_found_time(event_time);
radius.setAcct_status_type(acct_status_type); radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id); radius.setVsys_id(vsys_id);
updateRelationMessage(key,radius);
Table table = null;
try {
table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
Put put = new Put(key.getBytes());
put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
table.put(put);
AccountWithIpMap.put(key, radius);
} catch (Exception e) {
log.error(e.toString());
} finally {
table.close();
}
} }
} }
@Override @Override
@@ -194,6 +142,28 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,Stri
super.close(); super.close();
} }
public void updateRelationMessage(String key, Radius radius) throws IOException {
try (Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME))) {
Put put = new Put(key.getBytes());
put.addColumn("radius".getBytes(), "framed_ip".getBytes(), radius.getFramed_ip().getBytes());
put.addColumn("radius".getBytes(), "account".getBytes(), radius.getAccount().getBytes());
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(radius.getLast_update_time()));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(radius.getAcct_status_type()));
put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(radius.getVsys_id()));
table.put(put);
AccountWithIpMap.put(key, radius);
} catch (Exception e) {
log.error(e.toString());
}
}
/** /**
* 创建 hbase 表 * 创建 hbase 表

View File

@@ -1,5 +1,6 @@
package com.zdjizhi.utils.hbasepackage; package com.zdjizhi.utils.hbasepackage;
import com.google.common.cache.CacheBuilder;
import com.zdjizhi.common.RadiusRelationshipConfig; import com.zdjizhi.common.RadiusRelationshipConfig;
import com.zdjizhi.pojo.Radius; import com.zdjizhi.pojo.Radius;
import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.java.tuple.Tuple7;
@@ -15,20 +16,21 @@ import org.apache.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map; import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> { public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> {
private Logger log; private Logger log;
private String hbase_zookeeper_host; private String hbase_zookeeper_host;
public Map<String, Radius> IpWithAccountMap = new ConcurrentHashMap<>(80000); public com.google.common.cache.Cache<String, Radius> IpWithAccountMap ;
private Connection connection; private Connection connection;
private Admin admin; private Admin admin;
public HbaseSinkFramedip(String hbase_zookeeper_host) { public HbaseSinkFramedip(String hbase_zookeeper_host) {
this.hbase_zookeeper_host = hbase_zookeeper_host; this.hbase_zookeeper_host = hbase_zookeeper_host;
IpWithAccountMap = CacheBuilder.newBuilder().expireAfterWrite(RadiusRelationshipConfig.CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).initialCapacity(100000).maximumSize(RadiusRelationshipConfig.CACHE_MAX_SIZE).build();
} }
@Override @Override
@@ -99,8 +101,9 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,Str
int acct_status_type = value.f5; int acct_status_type = value.f5;
int vsys_id = value.f6; int vsys_id = value.f6;
if (IpWithAccountMap.containsKey(key)) {
Radius radius = IpWithAccountMap.get(key); Radius radius = IpWithAccountMap.getIfPresent(key);
if (radius!=null) {
if (radius.getLast_update_time() <= event_time) { if (radius.getLast_update_time() <= event_time) {
@@ -111,83 +114,32 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,Str
radius.setAccount(account); radius.setAccount(account);
radius.setAcct_status_type(acct_status_type); radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id); radius.setVsys_id(vsys_id);
radius.setFramed_ip(framedIp);
updateRelationMessage(key, radius);
Table table = null;
try {
table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
Put put = new Put(key.getBytes());
put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
table.put(put);
IpWithAccountMap.put(key, radius);
} catch (Exception e) {
log.error(e.toString());
} finally {
table.close();
}
} }
} }
else{ else{
radius.setLast_update_time(event_time); radius.setLast_update_time(event_time);
radius.setFramed_ip(framedIp); radius.setAccount(account);
radius.setAcct_status_type(acct_status_type); radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id); radius.setVsys_id(vsys_id);
Table table = null; radius.setFramed_ip(framedIp);
try { updateRelationMessage(key, radius);
table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
Put put = new Put(key.getBytes());
put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
table.put(put);
IpWithAccountMap.put(key, radius);
} catch (Exception e) {
log.error(e.toString());
} finally {
table.close();
}
} }
} }
} }
else { else {
Radius radius = new Radius(); radius = new Radius();
radius.setFramed_ip(framedIp); radius.setFramed_ip(framedIp);
radius.setLast_update_time(event_time); radius.setLast_update_time(event_time);
radius.setAccount(account); radius.setAccount(account);
radius.setFirst_found_time(event_time); radius.setFirst_found_time(event_time);
radius.setAcct_status_type(acct_status_type); radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id); radius.setVsys_id(vsys_id);
Table table = null; updateRelationMessage(key, radius);
try {
table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
Put put = new Put(key.getBytes());
put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
table.put(put);
IpWithAccountMap.put(key, radius);
} catch (Exception e) {
log.error(e.toString());
} finally {
table.close();
}
} }
} }
@@ -197,6 +149,26 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,Str
super.close(); super.close();
} }
public void updateRelationMessage(String key, Radius radius) throws IOException {
try (Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME))) {
Put put = new Put(key.getBytes());
put.addColumn("radius".getBytes(), "framed_ip".getBytes(), radius.getFramed_ip().getBytes());
put.addColumn("radius".getBytes(), "account".getBytes(), radius.getAccount().getBytes());
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(radius.getLast_update_time()));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(radius.getAcct_status_type()));
put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(radius.getVsys_id()));
table.put(put);
IpWithAccountMap.put(key, radius);
} catch (Exception e) {
log.error(e.toString());
}
}
/** /**
* 创建 hbase 表 * 创建 hbase 表
*/ */