上下行teid

This commit is contained in:
LAPTOP-CUUVN8AS\wk
2022-09-19 13:50:22 +08:00
parent bea6a964df
commit 67db966e10
7 changed files with 196 additions and 162 deletions

View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>relationship-gtpc-user</artifactId>
<version>22-08-15</version>
<version>22-08-19</version>
<name>relationship-gtpc-user</name>
<url>http://www.example.com</url>
@@ -251,7 +251,7 @@
<goal>shade</goal>
</goals>
<configuration>
<finalName>relationship-gtpc-user-22-08-15</finalName>
<finalName>relationship-gtpc-user-22-08-19</finalName>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

View File

@@ -1,13 +1,14 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
input.kafka.servers=192.168.44.12:9094
input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#input.kafka.servers=192.168.44.12:9094
#input.kafka.servers=192.168.40.151:9094
#hbase zookeeper地址 用于连接HBase
#hbase.zookeeper.servers=192.168.44.12
hbase.zookeeper.servers=192.168.44.12:2181
hbase.zookeeper.servers=192.168.44.11
#hbase.zookeeper.servers=192.168.40.151:2181
#if.hbase.scan=1
hbase.scan.limit=0
cache.expire.seconds=86400

View File

@@ -42,4 +42,5 @@ public class GtpConfig {
public static final int CACHE_UPDATE_SECONDS = GtpConfigurations.getIntProperty(0, "cache.update.seconds");
}

View File

@@ -9,11 +9,11 @@ public class Entity {
private String gtp_imei;
private String gtp_imsi;
private String gtp_phone_number;
private Long gtp_uplink_teid;
private Long gtp_downlink_teid ;
private long gtp_uplink_teid;
private long gtp_downlink_teid ;
private String gtp_msg_type;
private Long common_recv_time;
private Long gtp_teid;
private long common_recv_time;
private String gtp_uuid;
public String getHashkey() {
@@ -64,19 +64,19 @@ public class Entity {
this.gtp_phone_number = gtp_phone_number;
}
public Long getGtp_uplink_teid() {
public long getGtp_uplink_teid() {
return gtp_uplink_teid;
}
public void setGtp_uplink_teid(Long gtp_uplink_teid) {
public void setGtp_uplink_teid(long gtp_uplink_teid) {
this.gtp_uplink_teid = gtp_uplink_teid;
}
public Long getGtp_downlink_teid() {
public long getGtp_downlink_teid() {
return gtp_downlink_teid;
}
public void setGtp_downlink_teid(Long gtp_downlink_teid) {
public void setGtp_downlink_teid(long gtp_downlink_teid) {
this.gtp_downlink_teid = gtp_downlink_teid;
}
@@ -88,20 +88,19 @@ public class Entity {
this.gtp_msg_type = gtp_msg_type;
}
public Long getCommon_recv_time() {
public long getCommon_recv_time() {
return common_recv_time;
}
public void setCommon_recv_time(Long common_recv_time) {
public void setCommon_recv_time(long common_recv_time) {
this.common_recv_time = common_recv_time;
}
public Long getGtp_teid() {
return gtp_teid;
public String getGtp_uuid() {
return gtp_uuid;
}
public void setGtp_teid(Long gtp_teid) {
this.gtp_teid = gtp_teid;
public void setGtp_uuid(String gtp_uuid) {
this.gtp_uuid = gtp_uuid;
}
}

View File

@@ -6,10 +6,11 @@ public class Gtp {
private String gtp_imei;
private String gtp_imsi;
private String gtp_phone_number;
private Long gtp_teid;
private Integer msg_type;
private Long last_update_time;
private long last_update_time;
private long gtp_uplink_teid;
private long gtp_downlink_teid ;
private String gtp_uuid;
public Integer getMsg_type() {
return msg_type;
@@ -51,19 +52,35 @@ public class Gtp {
this.gtp_phone_number = gtp_phone_number;
}
public Long getGtp_teid() {
return gtp_teid;
public long getGtp_uplink_teid() {
return gtp_uplink_teid;
}
public void setGtp_teid(Long gtp_teid) {
this.gtp_teid = gtp_teid;
public void setGtp_uplink_teid(long gtp_uplink_teid) {
this.gtp_uplink_teid = gtp_uplink_teid;
}
public Long getLast_update_time() {
public long getGtp_downlink_teid() {
return gtp_downlink_teid;
}
public void setGtp_downlink_teid(long gtp_downlink_teid) {
this.gtp_downlink_teid = gtp_downlink_teid;
}
public String getGtp_uuid() {
return gtp_uuid;
}
public void setGtp_uuid(String gtp_uuid) {
this.gtp_uuid = gtp_uuid;
}
public long getLast_update_time() {
return last_update_time;
}
public void setLast_update_time(Long last_update_time) {
public void setLast_update_time(long last_update_time) {
this.last_update_time = last_update_time;
}
}

View File

@@ -28,72 +28,70 @@ public class ParseFunction implements MapFunction<String, Entity> {
Entity entity = new Entity();
try {
if (StringUtil.isNotBlank(message)) {
entity = JSON.parseObject(message, Entity.class);
if(entity.getGtp_apn()==null){
if (StringUtil.isNotBlank(message)) {
entity = JSON.parseObject(message, Entity.class);
if(entity.getGtp_apn()==null){
entity.setGtp_apn("");
}
if(entity.getGtp_phone_number()==null){
entity.setGtp_apn("");
}
if(entity.getGtp_phone_number()==null){
entity.setGtp_phone_number("");
}
if(entity.getGtp_imei()==null){
entity.setGtp_phone_number("");
}
if(entity.getGtp_imei()==null){
entity.setGtp_imei("");
}
if(entity.getGtp_imsi()==null){
entity.setGtp_imei("");
}
if(entity.getGtp_imsi()==null){
entity.setGtp_imsi("");
}
if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) {
String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number());
entity.setHashkey(md5Str);
if(entity.getGtp_uplink_teid()==null || entity.getGtp_uplink_teid()==0){
if(entity.getGtp_downlink_teid()==null || entity.getGtp_downlink_teid()==0){
entity.setIfError(1);
logger.info("teid为空" + message);
}
else{
entity.setGtp_teid(entity.getGtp_downlink_teid());
}
}else{
entity.setGtp_teid(entity.getGtp_uplink_teid());
}
}
else {
entity.setHashkey("");
entity.setIfError(1);
logger.info("三元组为空" + message);
}
}else{
entity.setGtp_imsi("");
}
/* if(entity.getCommon_recv_time()<0 || entity.getCommon_recv_time()>9999999999L){
entity.setIfError(1);
logger.error("数据转换JSON格式异常,原始日志为:" + message);
logger.info("时间戳不合法 " + entity.getCommon_recv_time());
}*/
if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) {
String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number());
entity.setHashkey(md5Str);
if( entity.getGtp_uplink_teid()==0 && entity.getGtp_downlink_teid()==0){
entity.setIfError(1);
logger.info("teid为空" + message);
}else{
String gtp_uuid = DigestUtils.md5Hex(entity.getGtp_uplink_teid() +"|"+ entity.getGtp_downlink_teid() );
entity.setGtp_uuid(gtp_uuid);
}
}
} catch (JSONException jse) {
else {
entity.setHashkey("");
entity.setIfError(1);
logger.info("三元组为空" + message);
}
}else{
entity.setIfError(1);
logger.error("数据转换JSON格式异常,原始日志为:" + message);
}
} catch (JSONException jse) {
entity.setIfError(1);
logger.error("数据转换JSON格式异常,原始日志为:" + message);
} catch (RuntimeException re) {
} catch (RuntimeException re) {
entity.setIfError(1);
logger.error("GTP日志条件过滤异常,异常信息为:" + re);
}
}
return entity;
}

View File

@@ -5,6 +5,7 @@ import com.google.common.cache.CacheBuilder;
import com.zdjizhi.common.GtpConfig;
import com.zdjizhi.pojo.Entity;
import com.zdjizhi.pojo.Gtp;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -41,59 +42,66 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
super.open(parameters);
log = Logger.getLogger(HbaseSink.class);
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
try {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
Table table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME));
Scan scan = new Scan();
scan.addColumn("gtp".getBytes(), "teid".getBytes());
scan.addColumn("gtp".getBytes(), "apn".getBytes());
scan.addColumn("gtp".getBytes(), "phone_number".getBytes());
scan.addColumn("gtp".getBytes(), "imsi".getBytes());
scan.addColumn("gtp".getBytes(), "imei".getBytes());
scan.addColumn("gtp".getBytes(), "last_update_time".getBytes());
scan.addColumn("gtp".getBytes(), "msg_type".getBytes());
try {
if (GtpConfig.HBASE_SCAN_LIMIT != 0) {
scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT);
}
Table table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME));
Scan scan = new Scan();
scan.addColumn("gtp".getBytes(), "uplink_teid".getBytes());
scan.addColumn("gtp".getBytes(), "downlink_teid".getBytes());
scan.addColumn("gtp".getBytes(), "apn".getBytes());
scan.addColumn("gtp".getBytes(), "phone_number".getBytes());
scan.addColumn("gtp".getBytes(), "imsi".getBytes());
scan.addColumn("gtp".getBytes(), "imei".getBytes());
scan.addColumn("gtp".getBytes(), "last_update_time".getBytes());
scan.addColumn("gtp".getBytes(), "msg_type".getBytes());
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
if (result.containsColumn("gtp".getBytes(), "teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) {
Gtp gtp = new Gtp();
String key = Bytes.toString(result.getRow());
Long teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("teid"))));
int msg_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type"))));
String apn = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("apn"))));
String phone_number = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))));
String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))));
String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))));
Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time"))));
gtp.setLast_update_time(last_update_time);
gtp.setGtp_teid(teid);
gtp.setGtp_apn(apn);
gtp.setGtp_phone_number(phone_number);
gtp.setGtp_imsi(imsi);
gtp.setGtp_imei(imei);
gtp.setMsg_type(msg_type);
gtpConcurrentHashMap.put(key, gtp);
if (GtpConfig.HBASE_SCAN_LIMIT != 0) {
scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT);
}
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
if (result.containsColumn("gtp".getBytes(), "uplink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "downlink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) {
Gtp gtp = new Gtp();
String key = Bytes.toString(result.getRow());
Long uplink_teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("uplink_teid"))));
Long downlink_teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("downlink_teid"))));
int msg_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type"))));
String apn = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("apn"))));
String phone_number = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))));
String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))));
String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))));
Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time"))));
gtp.setLast_update_time(last_update_time);
gtp.setGtp_uplink_teid(uplink_teid);
gtp.setGtp_downlink_teid(downlink_teid);
gtp.setGtp_apn(apn);
gtp.setGtp_phone_number(phone_number);
gtp.setGtp_imsi(imsi);
gtp.setGtp_imei(imei);
gtp.setMsg_type(msg_type);
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() + "|" + gtp.getGtp_downlink_teid()));
gtpConcurrentHashMap.put(key, gtp);
}
}
scanner.close();
} catch (IOException ioe) {
log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
} catch (RuntimeException e) {
log.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
}
scanner.close();
} catch (IOException ioe) {
log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
} catch (RuntimeException e) {
log.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
}
}
public void invoke(Entity entity, Context context) throws Exception {
@@ -101,24 +109,18 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey());
if (gtp!=null) {
//Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey());
if (gtp.getLast_update_time() <= entity.getCommon_recv_time()) {
if("delete".equals(entity.getGtp_msg_type())){
if(gtp.getGtp_teid().equals(entity.getGtp_teid())){
if(gtp.getGtp_uuid().equals(entity.getGtp_uuid())){
gtp.setMsg_type(2);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_teid(entity.getGtp_teid());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -136,7 +138,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_teid(entity.getGtp_teid());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -154,14 +158,17 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
else{
if(!gtp.getGtp_teid().equals(entity.getGtp_teid())){
if(!gtp.getGtp_uuid().equals(entity.getGtp_uuid())){
gtp.setMsg_type(1);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_teid(entity.getGtp_teid());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -181,7 +188,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_teid(entity.getGtp_teid());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -207,7 +216,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_teid(entity.getGtp_teid());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -224,7 +235,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
Gtp gtpobj = new Gtp();
gtpobj.setLast_update_time(entity.getCommon_recv_time());
gtpobj.setGtp_teid(entity.getGtp_teid());
gtpobj.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtpobj.setGtp_downlink_teid(entity.getGtp_downlink_teid());
gtpobj.setGtp_uuid(DigestUtils.md5Hex(gtpobj.getGtp_uplink_teid() +"|"+ gtpobj.getGtp_downlink_teid() ));
gtpobj.setGtp_apn(entity.getGtp_apn());
gtpobj.setGtp_phone_number(entity.getGtp_phone_number());
gtpobj.setGtp_imsi(entity.getGtp_imsi());
@@ -257,7 +270,8 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
try {
table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME));
Put put = new Put(key.getBytes());
put.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
put.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
put.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
put.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
put.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
put.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
@@ -300,26 +314,26 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> delrows = new ArrayList<>();
if (!"".equals(entity.getGtp_apn())) {
String oldapnkey = "3" + new StringBuffer(entity.getGtp_apn()).reverse().toString() + "|" + entity.getGtp_teid();
String oldapnkey = "3" + new StringBuffer(entity.getGtp_apn()).reverse().toString() + "|" + entity.getGtp_uuid();
Delete del_apnkey = new Delete(Bytes.toBytes(oldapnkey));
delrows.add(del_apnkey);
}
if (!"".equals(entity.getGtp_phone_number())) {
String oldpnkey = "2" + new StringBuffer(entity.getGtp_phone_number()).reverse().toString() + "|" + entity.getGtp_teid();
String oldpnkey = "2" + new StringBuffer(entity.getGtp_phone_number()).reverse().toString() + "|" + entity.getGtp_uuid();
Delete del_pnkey = new Delete(Bytes.toBytes(oldpnkey));
delrows.add(del_pnkey);
}
if (!"".equals(entity.getGtp_imsi())) {
String oldimsikey = "1" + entity.getGtp_imsi() + "|" + entity.getGtp_teid();
String oldimsikey = "1" + entity.getGtp_imsi() + "|" + entity.getGtp_uuid();
Delete del_imsikey = new Delete(Bytes.toBytes(oldimsikey));
delrows.add(del_imsikey);
}
if (!"".equals(entity.getGtp_imei())) {
String oldimeikey = "0" + entity.getGtp_imei() + "|" + entity.getGtp_teid();
String oldimeikey = "0" + entity.getGtp_imei() + "|" + entity.getGtp_uuid();
Delete del_imeikey = new Delete(Bytes.toBytes(oldimeikey));
delrows.add(del_imeikey);
}
@@ -334,9 +348,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> updaterows = new ArrayList<>();
if (!"".equals(gtp.getGtp_apn())) {
String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_teid();
String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_uuid();
Put putApn = new Put(apnkey.getBytes());
putApn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
putApn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putApn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
putApn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putApn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putApn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
@@ -348,9 +363,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
if (!"".equals(gtp.getGtp_phone_number())) {
String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_teid();
String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_uuid();
Put putPn = new Put(pnkey.getBytes());
putPn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
putPn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putPn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
putPn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putPn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putPn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
@@ -363,9 +379,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
if (!"".equals(gtp.getGtp_imsi())) {
String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_teid();
String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_uuid();
Put putImsi = new Put(imsikey.getBytes());
putImsi.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
putImsi.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putImsi.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
putImsi.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putImsi.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putImsi.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
@@ -377,9 +394,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
if (!"".equals(gtp.getGtp_imei())) {
String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_teid();
String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_uuid();
Put putImei = new Put(imeikey.getBytes());
putImei.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
putImei.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putImei.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
putImei.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putImei.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putImei.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));