no message
This commit is contained in:
166
pom.xml
166
pom.xml
@@ -5,10 +5,10 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>radius-relationship-hbase</artifactId>
|
||||
<version>210908-security</version>
|
||||
<artifactId>radius-relation</artifactId>
|
||||
<version>21-12-06</version>
|
||||
|
||||
<name>radius-relationship-hbase</name>
|
||||
<name>radius-relation</name>
|
||||
<url>http://www.example.com</url>
|
||||
|
||||
|
||||
@@ -38,78 +38,10 @@
|
||||
<hadoop.version>2.7.1</hadoop.version>
|
||||
<kafka.version>1.0.0</kafka.version>
|
||||
<hbase.version>2.2.3</hbase.version>
|
||||
<!--<scope.type>provided</scope.type>-->
|
||||
<scope.type>compile</scope.type>
|
||||
<scope.type>provided</scope.type>
|
||||
<!--<scope.type>compile</scope.type>-->
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>2.4.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<transformers>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>com.zdjizhi.topology.RadiusRelationshipTopology</mainClass>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>io.github.zlika</groupId>
|
||||
<artifactId>reproducible-build-maven-plugin</artifactId>
|
||||
<version>0.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>strip-jar</goal>
|
||||
</goals>
|
||||
<phase>package</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>2.3.2</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>properties</directory>
|
||||
<includes>
|
||||
<include>**/*.properties</include>
|
||||
<include>**/*.xml</include>
|
||||
</includes>
|
||||
<filtering>false</filtering>
|
||||
</resource>
|
||||
|
||||
<resource>
|
||||
<directory>src\main\java</directory>
|
||||
<includes>
|
||||
<include>log4j.properties</include>
|
||||
</includes>
|
||||
<filtering>false</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -135,15 +67,6 @@
|
||||
<version>1.2.70</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
|
||||
<!--<dependency>-->
|
||||
<!--<groupId>org.apache.flink</groupId>-->
|
||||
<!--<artifactId>flink-table</artifactId>-->
|
||||
<!--<version>${flink.version}</version>-->
|
||||
<!--<type>pom</type>-->
|
||||
<!--<scope>${scope.type}</scope>-->
|
||||
<!--</dependency>-->
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
@@ -174,7 +97,6 @@
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka_2.12</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>${scope.type}</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
|
||||
@@ -199,6 +121,22 @@
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<groupId>commons-io</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>netty</artifactId>
|
||||
<groupId>io.netty</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>netty-all</artifactId>
|
||||
<groupId>io.netty</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
@@ -283,5 +221,67 @@
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.8.0</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
<!-- The semantics of this option are reversed, see MCOMPILER-209. -->
|
||||
<useIncrementalCompilation>false</useIncrementalCompilation>
|
||||
<compilerArgs>
|
||||
<!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
|
||||
<arg>-Xpkginfo:always</arg>
|
||||
</compilerArgs>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>shade</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<finalName>radius-relation-21-12-06</finalName>
|
||||
<transformers combine.children="append">
|
||||
<!-- The service transformer is needed to merge META-INF/services files -->
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>com.zdjizhi.topology.RadiusRelation</mainClass>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>properties</directory>
|
||||
<includes>
|
||||
<include>**/*.properties</include>
|
||||
<include>**/*.xml</include>
|
||||
</includes>
|
||||
<filtering>false</filtering>
|
||||
</resource>
|
||||
|
||||
<resource>
|
||||
<directory>src\main\java</directory>
|
||||
<includes>
|
||||
<include>log4j.properties</include>
|
||||
</includes>
|
||||
<filtering>false</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
</build>
|
||||
</project>
|
||||
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
#--------------------------------地址配置------------------------------#
|
||||
|
||||
#管理kafka地址
|
||||
input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
input.kafka.servers=192.168.44.85:9094
|
||||
|
||||
#hbase zookeeper地址 用于连接HBase
|
||||
hbase.zookeeper.servers=192.168.44.12:2181
|
||||
#hbase.zookeeper.servers=192.168.44.12
|
||||
hbase.zookeeper.servers=192.168.44.85:2181
|
||||
|
||||
#--------------------------------Kafka消费组信息------------------------------#
|
||||
|
||||
@@ -12,17 +14,13 @@ hbase.zookeeper.servers=192.168.44.12:2181
|
||||
input.kafka.topic=RADIUS-RECORD
|
||||
|
||||
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
||||
group.id=radius-flink-202110270887999888997874
|
||||
group.id=radius-flink-202112068
|
||||
|
||||
#--------------------------------topology配置------------------------------#
|
||||
|
||||
#hbase 更新时间,如填写0则不更新缓存
|
||||
hbase.tick.tuple.freq.secs=60
|
||||
|
||||
#hbase table name
|
||||
hbase.table.name=sub:subscriber_info
|
||||
#ip-account对应关系表
|
||||
hbase.framedip.table.name=tsg_galaxy:relation_framedip_account
|
||||
|
||||
#定位库地址
|
||||
tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
|
||||
|
||||
#account-ip对应关系表
|
||||
hbase.account.table.name=tsg_galaxy:relation_account_framedip
|
||||
@@ -6,11 +6,17 @@ import com.zdjizhi.utils.system.RadiusRelationshipConfigurations;
|
||||
/**
|
||||
* @author Administrator
|
||||
*/
|
||||
public class RadiusRelationshipConfig {
|
||||
public class RadiusRelationshipConfig {
|
||||
|
||||
|
||||
/**
|
||||
* 4- Accounting-Request(账户授权)
|
||||
*/
|
||||
|
||||
|
||||
public static final int ACCOUNTING_REQUEST = 4;
|
||||
|
||||
|
||||
/**
|
||||
* radius_packet_type
|
||||
*/
|
||||
@@ -19,6 +25,9 @@ public class RadiusRelationshipConfig {
|
||||
* 1、开始计费
|
||||
*/
|
||||
public static final int START_BILLING = 1;
|
||||
|
||||
public static final int UPDATE_BILLING = 3;
|
||||
|
||||
/**
|
||||
* radius_acct_status_type
|
||||
*/
|
||||
@@ -27,8 +36,7 @@ public class RadiusRelationshipConfig {
|
||||
/**
|
||||
* System
|
||||
*/
|
||||
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
|
||||
public static final String HBASE_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.table.name");
|
||||
public static final String HBASE_FRAMEDIP_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.framedip.table.name");
|
||||
|
||||
public static final String HBASE_ACCOUNT_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.account.table.name");
|
||||
|
||||
@@ -37,6 +45,9 @@ public class RadiusRelationshipConfig {
|
||||
*/
|
||||
public static final String INPUT_KAFKA_SERVERS = RadiusRelationshipConfigurations.getStringProperty(0, "input.kafka.servers");
|
||||
public static final String HBASE_ZOOKEEPER_SERVERS = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
|
||||
//public static final String HBASE_ZOOKEEPER_PORT = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.zookeeper.port");
|
||||
|
||||
|
||||
public static final String GROUP_ID = RadiusRelationshipConfigurations.getStringProperty(0, "group.id");
|
||||
public static final String INPUT_KAFKA_TOPIC = RadiusRelationshipConfigurations.getStringProperty(0, "input.kafka.topic");
|
||||
|
||||
|
||||
52
src/main/java/com/zdjizhi/topology/RadiusRelation.java
Normal file
52
src/main/java/com/zdjizhi/topology/RadiusRelation.java
Normal file
@@ -0,0 +1,52 @@
|
||||
package com.zdjizhi.topology;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.common.RadiusRelationshipConfig;
|
||||
import com.zdjizhi.utils.functions.FilterNullFunction;
|
||||
import com.zdjizhi.utils.functions.ParseFunction;
|
||||
import com.zdjizhi.utils.hbasepackage.HbaseSinkAccount;
|
||||
import com.zdjizhi.utils.hbasepackage.HbaseSinkFramedip;
|
||||
import com.zdjizhi.utils.kafka.Consumer;
|
||||
import org.apache.flink.api.java.tuple.Tuple6;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.datastream.KeyedStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.topology
|
||||
* @Description:
|
||||
* @date 2021/5/2016:42
|
||||
*/
|
||||
public class RadiusRelation {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer());
|
||||
|
||||
DataStream<Tuple6<String,String,String,String,Long,Integer>> getObject = streamSource.map(new ParseFunction()).name("ParseJson");
|
||||
|
||||
DataStream<Tuple6<String,String,String,String,Long,Integer>> filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData");
|
||||
|
||||
KeyedStream<Tuple6<String,String,String,String,Long,Integer>, String> FrameipWithaccount = filterOriginalData.keyBy(value -> value.f0);
|
||||
|
||||
KeyedStream<Tuple6<String,String,String,String,Long,Integer>, String> accountWithFrameip = filterOriginalData.keyBy(value -> value.f1);
|
||||
|
||||
FrameipWithaccount.addSink(new HbaseSinkFramedip(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS));
|
||||
|
||||
accountWithFrameip.addSink(new HbaseSinkAccount(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS));
|
||||
try {
|
||||
environment.execute("RADIUS-RELATIONSHIP-HBASE-V2-t");
|
||||
} catch (Exception e) {
|
||||
logger.error("This Flink task start ERROR! Exception information is :" + e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -1,54 +0,0 @@
|
||||
package com.zdjizhi.topology;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.utils.functions.*;
|
||||
|
||||
import com.zdjizhi.utils.kafka.Consumer;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.datastream.KeyedStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.topology
|
||||
* @Description:
|
||||
* @date 2021/5/2016:42
|
||||
*/
|
||||
public class RadiusRelationshipTopology {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
// environment.enableCheckpointing(5000);
|
||||
|
||||
DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer());
|
||||
|
||||
DataStream<String> filterOriginalData = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData");
|
||||
|
||||
DataStream<Tuple2<String, String>> getObject = filterOriginalData.map(new ParseFunction()).name("ParseJson");
|
||||
|
||||
DataStream<Tuple2<String, String>> getRadiusAccount = getObject.map(new GetAccountMapFunction()).name("GetRadiusAccount");
|
||||
|
||||
KeyedStream<Tuple2<String, String>, String> tuple2StringKeyedStream = getRadiusAccount.keyBy(value -> value.f0);
|
||||
|
||||
KeyedStream<Tuple2<String, String>, String> accountWithFrameip = getObject.keyBy(value -> value.f1);
|
||||
|
||||
tuple2StringKeyedStream.process(new TimerFunction()).name("UpdateHBase");
|
||||
|
||||
accountWithFrameip.process(new TimerFunctionAccountWithFramedIp()).name("UpdateAccountHBase");
|
||||
|
||||
try {
|
||||
environment.execute("RADIUS-RELATIONSHIP-HBASE");
|
||||
} catch (Exception e) {
|
||||
logger.error("This Flink task start ERROR! Exception information is :" + e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -1,40 +0,0 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.fastjson.JSONException;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.zdjizhi.common.RadiusRelationshipConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.flink.api.common.functions.FilterFunction;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.functions
|
||||
* @Description:
|
||||
* @date 2021/5/2715:01
|
||||
*/
|
||||
public class FilterNullFunction implements FilterFunction<String> {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
@Override
|
||||
public boolean filter(String message) {
|
||||
boolean isFilter = false;
|
||||
try {
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
JSONObject jsonObject = JSONObject.parseObject(message);
|
||||
if (jsonObject.containsKey(RadiusRelationshipConfig.PACKET_TYPE) && jsonObject.containsKey(RadiusRelationshipConfig.STATUS_TYPE)) {
|
||||
if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(RadiusRelationshipConfig.PACKET_TYPE)
|
||||
&& RadiusRelationshipConfig.START_BILLING == jsonObject.getInteger(RadiusRelationshipConfig.STATUS_TYPE)) {
|
||||
isFilter = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (JSONException jse) {
|
||||
logger.error("数据转换JSON格式异常,原始日志为:" + message);
|
||||
} catch (RuntimeException re) {
|
||||
logger.error("Radius日志条件过滤异常,异常信息为:" + re);
|
||||
}
|
||||
return isFilter;
|
||||
}
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import org.apache.flink.api.common.functions.MapFunction;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
|
||||
import static com.zdjizhi.utils.hbase.HBaseUtils.dataValidation;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.functions
|
||||
* @Description:
|
||||
* @date 2021/5/2715:01
|
||||
*/
|
||||
public class GetAccountMapFunction implements MapFunction<Tuple2<String, String>, Tuple2<String, String>> {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
|
||||
@Override
|
||||
public Tuple2<String, String> map(Tuple2<String, String> stringStringTuple2) throws Exception {
|
||||
try {
|
||||
String framedIp = stringStringTuple2.f0;
|
||||
String account = stringStringTuple2.f1;
|
||||
boolean validation = dataValidation(framedIp, account);
|
||||
if (validation) {
|
||||
return Tuple2.of(framedIp, account);
|
||||
} else {
|
||||
return Tuple2.of("", "");
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("解析Radius数据获取用户信息异常,异常信息:" + e);
|
||||
}
|
||||
return Tuple2.of("", ""); }
|
||||
}
|
||||
@@ -2,9 +2,16 @@ package com.zdjizhi.utils.functions;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONException;
|
||||
import com.zdjizhi.common.RadiusRelationshipConfig;
|
||||
import com.zdjizhi.pojo.RadiusMassage;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.flink.api.common.functions.MapFunction;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple6;
|
||||
|
||||
import static cn.hutool.crypto.SecureUtil.md5;
|
||||
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
@@ -13,22 +20,43 @@ import org.apache.flink.api.java.tuple.Tuple2;
|
||||
* @date 2021/5/2715:01
|
||||
*/
|
||||
|
||||
public class ParseFunction implements MapFunction<String, Tuple2<String, String>> {
|
||||
public class ParseFunction implements MapFunction<String, Tuple6<String,String,String, String,Long, Integer>> {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
|
||||
@Override
|
||||
public Tuple2<String, String> map(String logs) {
|
||||
try {
|
||||
JSONObject jsonObject = JSONObject.parseObject(logs);
|
||||
String framedIp = jsonObject.getString("radius_framed_ip");
|
||||
String account = jsonObject.getString("radius_account");
|
||||
public Tuple6<String,String ,String, String,Long,Integer> map(String message) {
|
||||
|
||||
return Tuple2.of(framedIp, account);
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("解析Radius数据获取用户信息异常,异常信息:" + e);
|
||||
}
|
||||
return Tuple2.of("", "");
|
||||
RadiusMassage radiusMassage = new RadiusMassage();
|
||||
try {
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
radiusMassage = JSON.parseObject(message, RadiusMassage.class);
|
||||
|
||||
if(radiusMassage.getRadius_framed_ip()!=null && radiusMassage.getRadius_account()!=null && radiusMassage.getRadius_event_timestamp()!=null){
|
||||
|
||||
if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == radiusMassage.getRadius_packet_type()){
|
||||
String framedIp=radiusMassage.getRadius_framed_ip();
|
||||
String account=radiusMassage.getRadius_account();
|
||||
Long event_time = radiusMassage.getRadius_event_timestamp();
|
||||
int status =radiusMassage.getRadius_acct_status_type();
|
||||
int onff_status = 1;
|
||||
if (status == 2) {
|
||||
onff_status = 2;
|
||||
}
|
||||
String key_framedIp = md5(framedIp);
|
||||
String key_account = md5(account);
|
||||
return Tuple6.of(key_framedIp, key_account, framedIp, account, event_time, onff_status);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (JSONException jse) {
|
||||
logger.error("数据转换JSON格式异常,原始日志为:" + message);
|
||||
} catch (RuntimeException re) {
|
||||
logger.error("Radius日志条件过滤异常,异常信息为:" + re);
|
||||
}
|
||||
|
||||
return Tuple6.of("","","","",0L,0);
|
||||
}
|
||||
}
|
||||
@@ -1,56 +0,0 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
|
||||
|
||||
import com.zdjizhi.common.RadiusRelationshipConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static com.zdjizhi.utils.hbase.HBaseUtils.insertData;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.functions
|
||||
* @Description:
|
||||
* @date 2021/6/2316:59
|
||||
*/
|
||||
public class TimerFunction extends KeyedProcessFunction<String, Tuple2<String, String>, Object> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TimerFunction.class);
|
||||
|
||||
private static List<Put> putList = new ArrayList<>();
|
||||
private static boolean first = true;
|
||||
|
||||
|
||||
@Override
|
||||
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
|
||||
if (putList.size() != 0) {
|
||||
insertData(putList,RadiusRelationshipConfig.HBASE_TABLE_NAME);
|
||||
putList.clear();
|
||||
}
|
||||
ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Object> out) throws Exception {
|
||||
//仅在该算子接收到第一个数据时,注册一个定时器
|
||||
if (first) {
|
||||
first = false;
|
||||
Long time = System.currentTimeMillis();
|
||||
ctx.timerService().registerProcessingTimeTimer(time + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
|
||||
}
|
||||
String framedIp = value.f0;
|
||||
String account = value.f1;
|
||||
if (StringUtil.isNotBlank(framedIp) && StringUtil.isNotBlank(account)) {
|
||||
Put put = new Put(value.f0.getBytes());
|
||||
put.addColumn("subscriber_id".getBytes(), "account".getBytes(), value.f1.getBytes());
|
||||
putList.add(put);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,75 +0,0 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
|
||||
|
||||
import com.zdjizhi.common.RadiusRelationshipConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.zdjizhi.utils.hbase.HBaseUtils.insertData;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.functions
|
||||
* @Description:
|
||||
* @date 2021/6/2316:59
|
||||
*/
|
||||
public class TimerFunctionAccountWithFramedIp extends KeyedProcessFunction<String, Tuple2<String, String>, Object> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TimerFunctionAccountWithFramedIp.class);
|
||||
private static Map<String,String> map = new HashMap<String,String>();
|
||||
private static List<Put> putList = new ArrayList<>();
|
||||
private static boolean first = true;
|
||||
|
||||
|
||||
@Override
|
||||
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
|
||||
|
||||
for(Map.Entry<String,String> entry: map.entrySet() ){
|
||||
|
||||
Put put = new Put(entry.getKey().getBytes());
|
||||
put.addColumn("radius".getBytes(), "framed_ip".getBytes(), entry.getValue().getBytes());
|
||||
put.addColumn("radius".getBytes(), "last_found_time".getBytes(), Bytes.toBytes(timestamp/1000));
|
||||
|
||||
|
||||
if(putList.size()<100000){
|
||||
putList.add(put);
|
||||
}
|
||||
else{
|
||||
insertData(putList,RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME);
|
||||
putList.clear();
|
||||
}
|
||||
}
|
||||
if(putList.size()>0) {
|
||||
insertData(putList, RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME);
|
||||
putList.clear();
|
||||
}
|
||||
map.clear();
|
||||
ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Object> out) throws Exception {
|
||||
//仅在该算子接收到第一个数据时,注册一个定时器
|
||||
if (first) {
|
||||
first = false;
|
||||
Long time = System.currentTimeMillis();
|
||||
ctx.timerService().registerProcessingTimeTimer(time + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
|
||||
}
|
||||
String account = value.f1;
|
||||
|
||||
String framedIp = value.f0;
|
||||
if (StringUtil.isNotBlank(framedIp) && StringUtil.isNotBlank(account)) {
|
||||
map.put(account,framedIp);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,169 +0,0 @@
|
||||
package com.zdjizhi.utils.hbase;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.common.RadiusRelationshipConfig;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* HBase 工具类
|
||||
*
|
||||
* @author qidaijie
|
||||
*/
|
||||
|
||||
public class HBaseUtils {
|
||||
private static final Log logger = LogFactory.get();
|
||||
private static Map<String, String> subIdMap = new ConcurrentHashMap<>(83334);
|
||||
private static Connection connection;
|
||||
|
||||
private static HBaseUtils hBaseUtils;
|
||||
|
||||
private static void getInstance() {
|
||||
hBaseUtils = new HBaseUtils();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 构造函数-新
|
||||
*/
|
||||
private HBaseUtils() {
|
||||
//获取连接
|
||||
getConnection();
|
||||
//拉取所有
|
||||
getAll();
|
||||
}
|
||||
|
||||
private static void getConnection() {
|
||||
try {
|
||||
// 管理Hbase的配置信息
|
||||
Configuration configuration = HBaseConfiguration.create();
|
||||
// 设置zookeeper节点
|
||||
configuration.set("hbase.zookeeper.quorum", RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS);
|
||||
configuration.set("hbase.client.retries.number", "3");
|
||||
configuration.set("hbase.bulkload.retries.number", "3");
|
||||
configuration.set("zookeeper.recovery.retry", "3");
|
||||
connection = ConnectionFactory.createConnection(configuration);
|
||||
logger.warn("HBaseUtils get HBase connection,now to getAll().");
|
||||
} catch (IOException ioe) {
|
||||
logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新变量
|
||||
*/
|
||||
private static void change() {
|
||||
if (hBaseUtils == null) {
|
||||
getInstance();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 写入数据到HBase
|
||||
*
|
||||
* @param putList puts list
|
||||
*/
|
||||
public static void insertData(List<Put> putList,String tablename) {
|
||||
Table table = null;
|
||||
try {
|
||||
table = connection.getTable(TableName.valueOf(tablename));
|
||||
table.put(putList);
|
||||
logger.warn("Update HBase data SUCCESS! Update size :" + putList.size());
|
||||
putList.clear();
|
||||
} catch (IOException e) {
|
||||
logger.error("Update HBase data ERROR! Message :" + e);
|
||||
} finally {
|
||||
try {
|
||||
if (table != null) {
|
||||
table.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("Close HBase.table ERROR! Message:" + e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static void insertData(Put put,String tablename) {
|
||||
Table table = null;
|
||||
try {
|
||||
table = connection.getTable(TableName.valueOf(tablename));
|
||||
table.put(put);
|
||||
// logger.warn("Update HBase data SUCCESS! Update size :" + putList.size());
|
||||
} catch (IOException e) {
|
||||
logger.error("Update HBase data ERROR! Message :" + e);
|
||||
} finally {
|
||||
try {
|
||||
if (table != null) {
|
||||
table.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("Close HBase.table ERROR! Message:" + e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
/**
|
||||
* 获取所有的 key value
|
||||
*/
|
||||
private static void getAll() {
|
||||
long begin = System.currentTimeMillis();
|
||||
try {
|
||||
Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_TABLE_NAME));
|
||||
Scan scan2 = new Scan();
|
||||
ResultScanner scanner = table.getScanner(scan2);
|
||||
for (Result result : scanner) {
|
||||
Cell[] cells = result.rawCells();
|
||||
for (Cell cell : cells) {
|
||||
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
|
||||
}
|
||||
}
|
||||
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
|
||||
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin));
|
||||
scanner.close();
|
||||
} catch (IOException ioe) {
|
||||
logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证数据并与内存中的对比
|
||||
*
|
||||
* @param ip framed_ip
|
||||
* @param account account
|
||||
*/
|
||||
public static boolean dataValidation(String ip, String account) {
|
||||
if (subIdMap.size() == 0) {
|
||||
if (hBaseUtils == null) {
|
||||
getInstance();
|
||||
}
|
||||
}
|
||||
boolean checkResult = false;
|
||||
if (subIdMap.containsKey(ip)) {
|
||||
if (!subIdMap.get(ip).equals(account)) {
|
||||
checkResult = true;
|
||||
subIdMap.put(ip, account);
|
||||
}
|
||||
} else {
|
||||
checkResult = true;
|
||||
subIdMap.put(ip, account);
|
||||
}
|
||||
return checkResult;
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user