diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..68048cd --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +src/main/java/com/zdjizhi/utils/kafka/test.java diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..8d78c2f --- /dev/null +++ b/pom.xml @@ -0,0 +1,287 @@ + + + + 4.0.0 + + com.zdjizhi + relationship-gtpc-user + 22-08-15 + + relationship-gtpc-user + http://www.example.com + + + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + + + + maven-ali + http://maven.aliyun.com/nexus/content/groups/public/ + + + + + + fail + + + + + + UTF-8 + 1.13.1 + 2.7.1 + 1.0.0 + 2.2.3 + provided + + + + + + + + com.zdjizhi + galaxy + 1.0.6 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + com.alibaba + fastjson + 1.2.70 + + + + + org.apache.flink + flink-core + ${flink.version} + ${scope.type} + + + + + + org.apache.flink + flink-streaming-java_2.12 + ${flink.version} + ${scope.type} + + + + + org.apache.flink + flink-clients_2.12 + ${flink.version} + ${scope.type} + + + + + org.apache.flink + flink-connector-kafka_2.12 + ${flink.version} + + + + + org.apache.flink + flink-java + ${flink.version} + ${scope.type} + + + + + org.apache.hbase + hbase-client + ${hbase.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + commons-io + commons-io + + + commons-lang3 + org.apache.commons + + + netty + io.netty + + + netty-all + io.netty + + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + cglib + cglib-nodep + 3.2.4 + + + + org.junit.jupiter + junit-jupiter-api + 5.3.2 + compile + + + + com.jayway.jsonpath + json-path + 2.4.0 + + + + cn.hutool + hutool-all + 5.5.2 + + + + junit + junit + 4.12 + test + + + + + org.slf4j + slf4j-api + 1.7.21 + + + + + org.slf4j + slf4j-log4j12 + 1.7.21 + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + + 1.8 + 1.8 + + false + + + -Xpkginfo:always + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade + package + + shade + + + relationship-gtpc-user-22-08-15 + + + + + com.zdjizhi.topology.GtpRelation + + + + + + + + + + properties + + **/*.properties + **/*.xml + + false + + + + src\main\java + + log4j.properties + + false + + + + + diff --git a/properties/default_config.properties b/properties/default_config.properties new file mode 100644 index 0000000..c1832ce --- /dev/null +++ b/properties/default_config.properties @@ -0,0 +1,16 @@ +#====================Kafka KafkaConsumer====================# +#kafka source connection timeout +session.timeout.ms=60000 + +#kafka source poll +max.poll.records=3000 + +#kafka source poll bytes +max.partition.fetch.bytes=31457280 +#====================kafka default====================# + +#kafka SASL验证用户名 +kafka.user=admin + +#kafka SASL及SSL验证密码 +kafka.pin=galaxy2019 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..a6976ba --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,32 @@ +#--------------------------------鍦板潃閰嶇疆------------------------------# + +#绠$悊kafka鍦板潃 +#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 +input.kafka.servers=192.168.44.12:9094 + +#hbase zookeeper鍦板潃 鐢ㄤ簬杩炴帴HBase +#hbase.zookeeper.servers=192.168.44.12 +hbase.zookeeper.servers=192.168.44.12:2181 + +hbase.scan.limit=0 + +cache.expire.seconds=86400 +cache.max.size=10000000 +cache.update.seconds=3600 + +#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------# + +#kafka 鎺ユ敹鏁版嵁topic +input.kafka.topic=GTPC-RECORD-COMPLETED + +#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛 +group.id=test3 + +#--------------------------------topology閰嶇疆------------------------------# +#ip-account瀵瑰簲鍏崇郴琛 +relation.user.teid.table.name=tsg_galaxy:relation_user_teid + +#瀹氫綅搴撳湴鍧 +tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ +#account-ip瀵瑰簲鍏崇郴琛 +gtpc.knowledge.base.table.name=tsg_galaxy:gtpc_knowledge_base \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/GtpConfig.java b/src/main/java/com/zdjizhi/common/GtpConfig.java new file mode 100644 index 0000000..ee377b9 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/GtpConfig.java @@ -0,0 +1,45 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.GtpConfigurations; + +/** + * @author Administrator + */ +public class GtpConfig { + + public static final int HBASE_SCAN_LIMIT = GtpConfigurations.getIntProperty(0, "hbase.scan.limit"); + + + /** + * System + */ + public static final String RELATION_USER_TEID_TABLE_NAME = GtpConfigurations.getStringProperty(0, "relation.user.teid.table.name"); + + public static final String GTPC_KNOWLEDGE_BASE_TABLE_NAME = GtpConfigurations.getStringProperty(0, "gtpc.knowledge.base.table.name"); + + /** + * kafka + */ + public static final String INPUT_KAFKA_SERVERS = GtpConfigurations.getStringProperty(0, "input.kafka.servers"); + public static final String HBASE_ZOOKEEPER_SERVERS = GtpConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String SESSION_TIMEOUT_MS = GtpConfigurations.getStringProperty(1, "session.timeout.ms"); + public static final String MAX_POLL_RECORDS = GtpConfigurations.getStringProperty(1, "max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES = GtpConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); + + + + public static final String GROUP_ID = GtpConfigurations.getStringProperty(0, "group.id"); + public static final String INPUT_KAFKA_TOPIC = GtpConfigurations.getStringProperty(0, "input.kafka.topic"); + + public static final String TOOLS_LIBRARY = GtpConfigurations.getStringProperty(0, "tools.library"); + public static final String KAFKA_USER = GtpConfigurations.getStringProperty(1, "kafka.user"); + public static final String KAFKA_PIN = GtpConfigurations.getStringProperty(1, "kafka.pin"); + + + public static final int CACHE_EXPIRE_SECONDS = GtpConfigurations.getIntProperty(0, "cache.expire.seconds"); + public static final int CACHE_MAX_SIZE = GtpConfigurations.getIntProperty(0, "cache.max.size"); + + + public static final int CACHE_UPDATE_SECONDS = GtpConfigurations.getIntProperty(0, "cache.update.seconds"); +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/pojo/Entity.java b/src/main/java/com/zdjizhi/pojo/Entity.java new file mode 100644 index 0000000..aac3316 --- /dev/null +++ b/src/main/java/com/zdjizhi/pojo/Entity.java @@ -0,0 +1,107 @@ +package com.zdjizhi.pojo; + +public class Entity { + + private String Hashkey; + private int ifError; + + private String gtp_apn; + private String gtp_imei; + private String gtp_imsi; + private String gtp_phone_number; + private Long gtp_uplink_teid; + private Long gtp_downlink_teid ; + private String gtp_msg_type; + private Long common_recv_time; + private Long gtp_teid; + + + public String getHashkey() { + return Hashkey; + } + + public void setHashkey(String hashkey) { + Hashkey = hashkey; + } + + public int getIfError() { + return ifError; + } + + public void setIfError(int ifError) { + this.ifError = ifError; + } + + public String getGtp_apn() { + return gtp_apn; + } + + public void setGtp_apn(String gtp_apn) { + this.gtp_apn = gtp_apn; + } + + public String getGtp_imei() { + return gtp_imei; + } + + public void setGtp_imei(String gtp_imei) { + this.gtp_imei = gtp_imei; + } + + public String getGtp_imsi() { + return gtp_imsi; + } + + public void setGtp_imsi(String gtp_imsi) { + this.gtp_imsi = gtp_imsi; + } + + public String getGtp_phone_number() { + return gtp_phone_number; + } + + public void setGtp_phone_number(String gtp_phone_number) { + this.gtp_phone_number = gtp_phone_number; + } + + public Long getGtp_uplink_teid() { + return gtp_uplink_teid; + } + + public void setGtp_uplink_teid(Long gtp_uplink_teid) { + this.gtp_uplink_teid = gtp_uplink_teid; + } + + public Long getGtp_downlink_teid() { + return gtp_downlink_teid; + } + + public void setGtp_downlink_teid(Long gtp_downlink_teid) { + this.gtp_downlink_teid = gtp_downlink_teid; + } + + public String getGtp_msg_type() { + return gtp_msg_type; + } + + public void setGtp_msg_type(String gtp_msg_type) { + this.gtp_msg_type = gtp_msg_type; + } + + public Long getCommon_recv_time() { + return common_recv_time; + } + + public void setCommon_recv_time(Long common_recv_time) { + this.common_recv_time = common_recv_time; + } + + + public Long getGtp_teid() { + return gtp_teid; + } + + public void setGtp_teid(Long gtp_teid) { + this.gtp_teid = gtp_teid; + } +} diff --git a/src/main/java/com/zdjizhi/pojo/Gtp.java b/src/main/java/com/zdjizhi/pojo/Gtp.java new file mode 100644 index 0000000..f71c922 --- /dev/null +++ b/src/main/java/com/zdjizhi/pojo/Gtp.java @@ -0,0 +1,69 @@ +package com.zdjizhi.pojo; + +public class Gtp { + + private String gtp_apn; + private String gtp_imei; + private String gtp_imsi; + private String gtp_phone_number; + private Long gtp_teid; + private Integer msg_type; + private Long last_update_time; + + + public Integer getMsg_type() { + return msg_type; + } + + public void setMsg_type(Integer msg_type) { + this.msg_type = msg_type; + } + + public String getGtp_apn() { + return gtp_apn; + } + + public void setGtp_apn(String gtp_apn) { + this.gtp_apn = gtp_apn; + } + + public String getGtp_imei() { + return gtp_imei; + } + + public void setGtp_imei(String gtp_imei) { + this.gtp_imei = gtp_imei; + } + + public String getGtp_imsi() { + return gtp_imsi; + } + + public void setGtp_imsi(String gtp_imsi) { + this.gtp_imsi = gtp_imsi; + } + + public String getGtp_phone_number() { + return gtp_phone_number; + } + + public void setGtp_phone_number(String gtp_phone_number) { + this.gtp_phone_number = gtp_phone_number; + } + + public Long getGtp_teid() { + return gtp_teid; + } + + public void setGtp_teid(Long gtp_teid) { + this.gtp_teid = gtp_teid; + } + + public Long getLast_update_time() { + return last_update_time; + } + + public void setLast_update_time(Long last_update_time) { + this.last_update_time = last_update_time; + } +} diff --git a/src/main/java/com/zdjizhi/topology/GtpRelation.java b/src/main/java/com/zdjizhi/topology/GtpRelation.java new file mode 100644 index 0000000..056029d --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/GtpRelation.java @@ -0,0 +1,58 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.GtpConfig; +import com.zdjizhi.pojo.Entity; +import com.zdjizhi.utils.functions.FilterNullFunction; +import com.zdjizhi.utils.functions.ParseFunction; +import com.zdjizhi.utils.hbasepackage.HbaseSink; +import com.zdjizhi.utils.kafka.Consumer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * @author qidaijie + * @Package com.zdjizhi.topology + * @Description: + * @date 2021/5/2016:42 + */ +public class GtpRelation { + private static final Log logger = LogFactory.get(); + + + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource streamSource = environment.addSource(Consumer.getKafkaConsumer()); + + DataStream getObject = streamSource.map(new ParseFunction()).name("ParseJson"); + + DataStream filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData"); + + KeyedStream> GtprelationTuple = filterOriginalData.keyBy(new oneKeySelector()); + + + GtprelationTuple.addSink(new HbaseSink(GtpConfig.HBASE_ZOOKEEPER_SERVERS)); + + try { + environment.execute("RELATIONSHIP-GTPC-USER"); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + } + + } + + + public static class oneKeySelector implements KeySelector> { + + @Override + public Tuple1 getKey(Entity entity) throws Exception { + return new Tuple1<>(entity.getHashkey()); + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java new file mode 100644 index 0000000..46e3ad0 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java @@ -0,0 +1,14 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.pojo.Entity; +import org.apache.flink.api.common.functions.FilterFunction; + +public class FilterNullFunction implements FilterFunction { + + @Override + public boolean filter(Entity entity) { + + return entity.getIfError()!=1; + } +} + diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java new file mode 100644 index 0000000..6751f5b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java @@ -0,0 +1,100 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONException; +import com.zdjizhi.pojo.Entity; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.flink.api.common.functions.MapFunction; + + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ + +public class ParseFunction implements MapFunction { + private static final Log logger = LogFactory.get(); + + + @Override + public Entity map(String message) { + + + Entity entity = new Entity(); + + try { + if (StringUtil.isNotBlank(message)) { + entity = JSON.parseObject(message, Entity.class); + if(entity.getGtp_apn()==null){ + + entity.setGtp_apn(""); + } + if(entity.getGtp_phone_number()==null){ + + entity.setGtp_phone_number(""); + } + if(entity.getGtp_imei()==null){ + + entity.setGtp_imei(""); + } + if(entity.getGtp_imsi()==null){ + + entity.setGtp_imsi(""); + } + + + if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) { + + + String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number()); + entity.setHashkey(md5Str); + + if(entity.getGtp_uplink_teid()==null || entity.getGtp_uplink_teid()==0){ + + if(entity.getGtp_downlink_teid()==null || entity.getGtp_downlink_teid()==0){ + + entity.setIfError(1); + logger.info("teid涓虹┖" + message); + + } + else{ + + entity.setGtp_teid(entity.getGtp_downlink_teid()); + } + }else{ + + entity.setGtp_teid(entity.getGtp_uplink_teid()); + + } + + } + else { + entity.setHashkey(""); + entity.setIfError(1); + logger.info("涓夊厓缁勪负绌" + message); + + } + + + + }else{ + + entity.setIfError(1); + logger.error("鏁版嵁杞崲JSON鏍煎紡寮傚父,鍘熷鏃ュ織涓:" + message); + } + } catch (JSONException jse) { + entity.setIfError(1); + logger.error("鏁版嵁杞崲JSON鏍煎紡寮傚父,鍘熷鏃ュ織涓:" + message); + } catch (RuntimeException re) { + entity.setIfError(1); + logger.error("GTP鏃ュ織鏉′欢杩囨护寮傚父,寮傚父淇℃伅涓猴細" + re); + } + + return entity; + } +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java new file mode 100644 index 0000000..dd545c3 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java @@ -0,0 +1,397 @@ +package com.zdjizhi.utils.hbasepackage; + + +import com.google.common.cache.CacheBuilder; +import com.zdjizhi.common.GtpConfig; +import com.zdjizhi.pojo.Entity; +import com.zdjizhi.pojo.Gtp; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + +public class HbaseSink extends RichSinkFunction implements Serializable, SinkFunction { + private Logger log; + + private String hbase_zookeeper_host; + // public Map gtpConcurrentHashMap = new ConcurrentHashMap<>(80000); + //public Cache gtpConcurrentHashMap = CacheUtil.newLRUCache(4, DateUnit.SECOND.getMillis() * 60); + public com.google.common.cache.Cache gtpConcurrentHashMap ; + + private Connection connection; + private Admin admin; + + public HbaseSink(String hbase_zookeeper_host) { + this.hbase_zookeeper_host = hbase_zookeeper_host; + gtpConcurrentHashMap=CacheBuilder.newBuilder().expireAfterWrite(GtpConfig.CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).initialCapacity(100000).maximumSize(GtpConfig.CACHE_MAX_SIZE).build(); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + log = Logger.getLogger(HbaseSink.class); + + org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host); + + connection = ConnectionFactory.createConnection(configuration); + admin = connection.getAdmin(); + + try { + + Table table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn("gtp".getBytes(), "teid".getBytes()); + scan.addColumn("gtp".getBytes(), "apn".getBytes()); + scan.addColumn("gtp".getBytes(), "phone_number".getBytes()); + scan.addColumn("gtp".getBytes(), "imsi".getBytes()); + scan.addColumn("gtp".getBytes(), "imei".getBytes()); + scan.addColumn("gtp".getBytes(), "last_update_time".getBytes()); + scan.addColumn("gtp".getBytes(), "msg_type".getBytes()); + + if (GtpConfig.HBASE_SCAN_LIMIT != 0) { + scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT); + } + + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + if (result.containsColumn("gtp".getBytes(), "teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) { + + Gtp gtp = new Gtp(); + String key = Bytes.toString(result.getRow()); + Long teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("teid")))); + int msg_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type")))); + + String apn = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("apn")))); + String phone_number = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number")))); + String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi")))); + String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei")))); + Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time")))); + + gtp.setLast_update_time(last_update_time); + gtp.setGtp_teid(teid); + gtp.setGtp_apn(apn); + gtp.setGtp_phone_number(phone_number); + gtp.setGtp_imsi(imsi); + gtp.setGtp_imei(imei); + gtp.setMsg_type(msg_type); + gtpConcurrentHashMap.put(key, gtp); + } + } + scanner.close(); + } catch (IOException ioe) { + log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + log.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); + } + } + + public void invoke(Entity entity, Context context) throws Exception { + // 鎸 project:table 褰掔撼 + + Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey()); + if (gtp!=null) { + //Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey()); + + + + if (gtp.getLast_update_time() <= entity.getCommon_recv_time()) { + + + if("delete".equals(entity.getGtp_msg_type())){ + + if(gtp.getGtp_teid().equals(entity.getGtp_teid())){ + + + + gtp.setMsg_type(2); + ArrayList rows = new ArrayList<>(); + // ArrayList delrows = getDelRows(gtp); + gtp.setLast_update_time(entity.getCommon_recv_time()); + gtp.setGtp_teid(entity.getGtp_teid()); + gtp.setGtp_apn(entity.getGtp_apn()); + gtp.setGtp_phone_number(entity.getGtp_phone_number()); + gtp.setGtp_imsi(entity.getGtp_imsi()); + gtp.setGtp_imei(entity.getGtp_imei()); + ArrayList updaterows = getupdateRows(gtp); + rows.addAll(updaterows); + updateKnowledgeMessage(rows); + updateRelationMessage(entity.getHashkey(), gtp); + + } + else{ + + + gtp.setMsg_type(2); + ArrayList rows = new ArrayList<>(); + // ArrayList delrows = getDelRows(gtp); + gtp.setLast_update_time(entity.getCommon_recv_time()); + gtp.setGtp_teid(entity.getGtp_teid()); + gtp.setGtp_apn(entity.getGtp_apn()); + gtp.setGtp_phone_number(entity.getGtp_phone_number()); + gtp.setGtp_imsi(entity.getGtp_imsi()); + gtp.setGtp_imei(entity.getGtp_imei()); + ArrayList updaterows = getupdateRows(gtp); + // rows.addAll(delrows); + rows.addAll(updaterows); + updateKnowledgeMessage(rows); + // updateRelationMessage(entity.getHashkey(), gtp); + + } + + + + } + else{ + + if(!gtp.getGtp_teid().equals(entity.getGtp_teid())){ + + + gtp.setMsg_type(1); + ArrayList rows = new ArrayList<>(); + // ArrayList delrows = getDelRows(gtp); + gtp.setLast_update_time(entity.getCommon_recv_time()); + gtp.setGtp_teid(entity.getGtp_teid()); + gtp.setGtp_apn(entity.getGtp_apn()); + gtp.setGtp_phone_number(entity.getGtp_phone_number()); + gtp.setGtp_imsi(entity.getGtp_imsi()); + gtp.setGtp_imei(entity.getGtp_imei()); + ArrayList updaterows = getupdateRows(gtp); + // rows.addAll(delrows); + rows.addAll(updaterows); + updateKnowledgeMessage(rows); + updateRelationMessage(entity.getHashkey(), gtp); + + } + else { + if(entity.getCommon_recv_time()-gtp.getLast_update_time()>GtpConfig.CACHE_UPDATE_SECONDS){ + + + gtp.setMsg_type(1); + ArrayList rows = new ArrayList<>(); + // ArrayList delrows = getDelRows(gtp); + gtp.setLast_update_time(entity.getCommon_recv_time()); + gtp.setGtp_teid(entity.getGtp_teid()); + gtp.setGtp_apn(entity.getGtp_apn()); + gtp.setGtp_phone_number(entity.getGtp_phone_number()); + gtp.setGtp_imsi(entity.getGtp_imsi()); + gtp.setGtp_imei(entity.getGtp_imei()); + ArrayList updaterows = getupdateRows(gtp); + // rows.addAll(delrows); + rows.addAll(updaterows); + updateKnowledgeMessage(rows); + updateRelationMessage(entity.getHashkey(), gtp); + } + } + + } + + + + }else{ + + if ("delete".equals(entity.getGtp_msg_type())) { + + + gtp.setMsg_type(2); + ArrayList rows = new ArrayList<>(); + // ArrayList delrows = getDelRows(gtp); + gtp.setLast_update_time(entity.getCommon_recv_time()); + gtp.setGtp_teid(entity.getGtp_teid()); + gtp.setGtp_apn(entity.getGtp_apn()); + gtp.setGtp_phone_number(entity.getGtp_phone_number()); + gtp.setGtp_imsi(entity.getGtp_imsi()); + gtp.setGtp_imei(entity.getGtp_imei()); + ArrayList updaterows = getupdateRows(gtp); + // rows.addAll(delrows); + rows.addAll(updaterows); + updateKnowledgeMessage(rows); + } + + } + } else { + + + Gtp gtpobj = new Gtp(); + gtpobj.setLast_update_time(entity.getCommon_recv_time()); + gtpobj.setGtp_teid(entity.getGtp_teid()); + gtpobj.setGtp_apn(entity.getGtp_apn()); + gtpobj.setGtp_phone_number(entity.getGtp_phone_number()); + gtpobj.setGtp_imsi(entity.getGtp_imsi()); + gtpobj.setGtp_imei(entity.getGtp_imei()); + if(!"delete".equals(entity.getGtp_msg_type())) { + gtpobj.setMsg_type(1); + } + else{ + gtpobj.setMsg_type(2); + + } + ArrayList rows = new ArrayList<>(); + ArrayList updaterows = getupdateRows(gtpobj); + rows.addAll(updaterows); + updateRelationMessage(entity.getHashkey(), gtpobj); + updateKnowledgeMessage(rows); + + } + } + + @Override + public void close() throws Exception { + super.close(); + } + + + public void updateRelationMessage(String key, Gtp gtp) throws IOException { + + Table table = null; + try { + table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME)); + Put put = new Put(key.getBytes()); + put.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid())); + put.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn())); + put.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number())); + put.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi())); + put.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei())); + put.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time())); + put.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type())); + + table.put(put); + gtpConcurrentHashMap.put(key, gtp); + } catch (Exception e) { + log.error(e.toString()); + } finally { + table.close(); + } + + + } + + public void updateKnowledgeMessage(ArrayList rows) throws IOException { + + Table tableR = null; + try { + + tableR = connection.getTable(TableName.valueOf(GtpConfig.GTPC_KNOWLEDGE_BASE_TABLE_NAME)); + Object[] results = new Object[rows.size()]; + tableR.batch(rows, results); + } catch (Exception e) { + log.error(e.toString()); + } finally { + tableR.close(); + } + + + } + + + public ArrayList getDelRows(Gtp entity) { + + + ArrayList delrows = new ArrayList<>(); + + if (!"".equals(entity.getGtp_apn())) { + String oldapnkey = "3" + new StringBuffer(entity.getGtp_apn()).reverse().toString() + "|" + entity.getGtp_teid(); + Delete del_apnkey = new Delete(Bytes.toBytes(oldapnkey)); + delrows.add(del_apnkey); + } + if (!"".equals(entity.getGtp_phone_number())) { + + String oldpnkey = "2" + new StringBuffer(entity.getGtp_phone_number()).reverse().toString() + "|" + entity.getGtp_teid(); + Delete del_pnkey = new Delete(Bytes.toBytes(oldpnkey)); + delrows.add(del_pnkey); + } + + if (!"".equals(entity.getGtp_imsi())) { + + String oldimsikey = "1" + entity.getGtp_imsi() + "|" + entity.getGtp_teid(); + Delete del_imsikey = new Delete(Bytes.toBytes(oldimsikey)); + delrows.add(del_imsikey); + } + if (!"".equals(entity.getGtp_imei())) { + + String oldimeikey = "0" + entity.getGtp_imei() + "|" + entity.getGtp_teid(); + Delete del_imeikey = new Delete(Bytes.toBytes(oldimeikey)); + delrows.add(del_imeikey); + } + + return delrows; + } + + + public ArrayList getupdateRows(Gtp gtp) { + + + ArrayList updaterows = new ArrayList<>(); + + if (!"".equals(gtp.getGtp_apn())) { + String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_teid(); + Put putApn = new Put(apnkey.getBytes()); + putApn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid())); + putApn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn())); + putApn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number())); + putApn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi())); + putApn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei())); + putApn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time())); + putApn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type())); + + updaterows.add(putApn); + + } + if (!"".equals(gtp.getGtp_phone_number())) { + String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_teid(); + Put putPn = new Put(pnkey.getBytes()); + putPn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid())); + putPn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn())); + putPn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number())); + putPn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi())); + putPn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei())); + putPn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time())); + putPn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type())); + + updaterows.add(putPn); + + } + if (!"".equals(gtp.getGtp_imsi())) { + + String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_teid(); + Put putImsi = new Put(imsikey.getBytes()); + putImsi.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid())); + putImsi.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn())); + putImsi.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number())); + putImsi.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi())); + putImsi.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei())); + putImsi.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time())); + putImsi.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type())); + + updaterows.add(putImsi); + + } + if (!"".equals(gtp.getGtp_imei())) { + String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_teid(); + Put putImei = new Put(imeikey.getBytes()); + putImei.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid())); + putImei.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn())); + putImei.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number())); + putImei.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi())); + putImei.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei())); + putImei.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time())); + putImei.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type())); + + updaterows.add(putImei); + + } + + + return updaterows; + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java new file mode 100644 index 0000000..3ac901d --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -0,0 +1,48 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.GtpConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/9/610:37 + */ +class CertUtils { + /** + * Kafka SASL璁よ瘉绔彛 + */ + private static final String SASL_PORT = "9094"; + + /** + * Kafka SSL璁よ瘉绔彛 + */ + private static final String SSL_PORT = "9095"; + + /** + * 鏍规嵁杩炴帴淇℃伅绔彛鍒ゆ柇璁よ瘉鏂瑰紡銆 + * + * @param servers kafka 杩炴帴淇℃伅 + * @param properties kafka 杩炴帴閰嶇疆淇℃伅 + */ + static void chooseCert(String servers, Properties properties) { + if (servers.contains(SASL_PORT)) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + GtpConfig.KAFKA_USER + " password=" + GtpConfig.KAFKA_PIN + ";"); + } else if (servers.contains(SSL_PORT)) { + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", GtpConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", GtpConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", GtpConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", GtpConfig.KAFKA_PIN); + properties.put("ssl.key.password", GtpConfig.KAFKA_PIN); + } + + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java new file mode 100644 index 0000000..04a895d --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -0,0 +1,41 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.GtpConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/813:54 + */ +public class Consumer { + private static Properties createConsumerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", GtpConfig.INPUT_KAFKA_SERVERS); + properties.put("group.id", GtpConfig.GROUP_ID); + properties.put("session.timeout.ms", GtpConfig.SESSION_TIMEOUT_MS); + properties.put("max.poll.records", GtpConfig.MAX_POLL_RECORDS); + properties.put("max.partition.fetch.bytes", GtpConfig.MAX_PARTITION_FETCH_BYTES); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + + CertUtils.chooseCert(GtpConfig.INPUT_KAFKA_SERVERS, properties); + + return properties; + } + + public static FlinkKafkaConsumer getKafkaConsumer() { + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(GtpConfig.INPUT_KAFKA_TOPIC, + new SimpleStringSchema(), createConsumerConfig()); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/system/GtpConfigurations.java b/src/main/java/com/zdjizhi/utils/system/GtpConfigurations.java new file mode 100644 index 0000000..c3c7576 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/system/GtpConfigurations.java @@ -0,0 +1,70 @@ +package com.zdjizhi.utils.system; + +import com.zdjizhi.utils.StringUtil; + +import java.io.IOException; +import java.util.Locale; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class GtpConfigurations { + + private static Properties propDefault = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else if (type == 1) { + return propDefault.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else if (type == 1) { + return Integer.parseInt(propDefault.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else if (type == 1) { + return Long.parseLong(propDefault.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else if (type == 1) { + return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else { + return null; + } + } + + static { + try { + propService.load(GtpConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propDefault.load(GtpConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | RuntimeException e) { + propDefault = null; + propService = null; + } + } +} diff --git a/src/main/log4j.properties b/src/main/log4j.properties new file mode 100644 index 0000000..96f758c --- /dev/null +++ b/src/main/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=console,file +# 鎺у埗鍙版棩蹇楄缃 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=ERROR +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 鏂囦欢鏃ュ織璁剧疆 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#璺緞璇风敤鐩稿璺緞锛屽仛濂界浉鍏虫祴璇曡緭鍑哄埌搴旂敤鐩笅 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +##MyBatis 閰嶇疆锛宑om.nis.web.dao鏄痬ybatis鎺ュ彛鎵鍦ㄥ寘 +#log4j.logger.com.nis.web.dao=debug +##bonecp鏁版嵁婧愰厤缃 +#log4j.category.com.jolbox=debug,console + + diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java new file mode 100644 index 0000000..511fac6 --- /dev/null +++ b/src/test/java/com/zdjizhi/FunctionTest.java @@ -0,0 +1,34 @@ +package com.zdjizhi; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/6/2314:32 + */ +public class FunctionTest { + + @Test + public void jsonTest() { + Map methodCount = new HashMap<>(16); + methodCount.put("A",20L); + methodCount.put("B",20L); + methodCount.put("C",20L); + String jsonString = JSON.toJSONString(methodCount); + System.out.println(jsonString); + JSONObject jsonObject = JSONObject.parseObject(jsonString); + Map hmCount = (Map) jsonObject; + System.out.println(hmCount.toString()); + + + } + + +}