增加account-framedip关系
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
#--------------------------------地址配置------------------------------#
|
||||
|
||||
#管理kafka地址
|
||||
input.kafka.servers=192.168.44.12:9092
|
||||
input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
|
||||
#hbase zookeeper地址 用于连接HBase
|
||||
hbase.zookeeper.servers=192.168.44.12:2181
|
||||
@@ -12,16 +12,17 @@ hbase.zookeeper.servers=192.168.44.12:2181
|
||||
input.kafka.topic=RADIUS-RECORD
|
||||
|
||||
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
||||
group.id=radius-flink-20210623
|
||||
group.id=radius-flink-202110270887999888997874
|
||||
|
||||
#--------------------------------topology配置------------------------------#
|
||||
|
||||
#hbase 更新时间,如填写0则不更新缓存
|
||||
hbase.tick.tuple.freq.secs=180
|
||||
hbase.tick.tuple.freq.secs=60
|
||||
|
||||
#hbase table name
|
||||
hbase.table.name=subscriber_info
|
||||
hbase.table.name=sub:subscriber_info
|
||||
|
||||
#定位库地址
|
||||
tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
|
||||
|
||||
hbase.account.table.name=tsg_galaxy:relation_account_framedip
|
||||
@@ -30,6 +30,7 @@ public class RadiusRelationshipConfig {
|
||||
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
|
||||
public static final String HBASE_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.table.name");
|
||||
|
||||
public static final String HBASE_ACCOUNT_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.account.table.name");
|
||||
|
||||
/**
|
||||
* kafka
|
||||
|
||||
@@ -2,9 +2,8 @@ package com.zdjizhi.topology;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.utils.functions.FilterNullFunction;
|
||||
import com.zdjizhi.utils.functions.GetAccountMapFunction;
|
||||
import com.zdjizhi.utils.functions.TimerFunction;
|
||||
import com.zdjizhi.utils.functions.*;
|
||||
|
||||
import com.zdjizhi.utils.kafka.Consumer;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
@@ -31,12 +30,18 @@ public class RadiusRelationshipTopology {
|
||||
|
||||
DataStream<String> filterOriginalData = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData");
|
||||
|
||||
DataStream<Tuple2<String, String>> getRadiusAccount = filterOriginalData.map(new GetAccountMapFunction()).name("GetRadiusAccount");
|
||||
DataStream<Tuple2<String, String>> getObject = filterOriginalData.map(new ParseFunction()).name("ParseJson");
|
||||
|
||||
DataStream<Tuple2<String, String>> getRadiusAccount = getObject.map(new GetAccountMapFunction()).name("GetRadiusAccount");
|
||||
|
||||
KeyedStream<Tuple2<String, String>, String> tuple2StringKeyedStream = getRadiusAccount.keyBy(value -> value.f0);
|
||||
|
||||
KeyedStream<Tuple2<String, String>, String> accountWithFrameip = getObject.keyBy(value -> value.f1);
|
||||
|
||||
tuple2StringKeyedStream.process(new TimerFunction()).name("UpdateHBase");
|
||||
|
||||
accountWithFrameip.process(new TimerFunctionAccountWithFramedIp()).name("UpdateAccountHBase");
|
||||
|
||||
try {
|
||||
environment.execute("RADIUS-RELATIONSHIP-HBASE");
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.zdjizhi.utils.functions;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.apache.flink.api.common.functions.MapFunction;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
|
||||
@@ -14,16 +13,15 @@ import static com.zdjizhi.utils.hbase.HBaseUtils.dataValidation;
|
||||
* @Description:
|
||||
* @date 2021/5/2715:01
|
||||
*/
|
||||
public class GetAccountMapFunction implements MapFunction<String, Tuple2<String, String>> {
|
||||
public class GetAccountMapFunction implements MapFunction<Tuple2<String, String>, Tuple2<String, String>> {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
|
||||
@Override
|
||||
public Tuple2<String, String> map(String logs) {
|
||||
public Tuple2<String, String> map(Tuple2<String, String> stringStringTuple2) throws Exception {
|
||||
try {
|
||||
JSONObject jsonObject = JSONObject.parseObject(logs);
|
||||
String framedIp = jsonObject.getString("radius_framed_ip");
|
||||
String account = jsonObject.getString("radius_account");
|
||||
String framedIp = stringStringTuple2.f0;
|
||||
String account = stringStringTuple2.f1;
|
||||
boolean validation = dataValidation(framedIp, account);
|
||||
if (validation) {
|
||||
return Tuple2.of(framedIp, account);
|
||||
@@ -33,6 +31,5 @@ public class GetAccountMapFunction implements MapFunction<String, Tuple2<String,
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("解析Radius数据获取用户信息异常,异常信息:" + e);
|
||||
}
|
||||
return Tuple2.of("", "");
|
||||
}
|
||||
return Tuple2.of("", ""); }
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ public class TimerFunction extends KeyedProcessFunction<String, Tuple2<String, S
|
||||
@Override
|
||||
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
|
||||
if (putList.size() != 0) {
|
||||
insertData(putList);
|
||||
insertData(putList,RadiusRelationshipConfig.HBASE_TABLE_NAME);
|
||||
putList.clear();
|
||||
}
|
||||
ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
|
||||
|
||||
@@ -76,10 +76,10 @@ public class HBaseUtils {
|
||||
*
|
||||
* @param putList puts list
|
||||
*/
|
||||
public static void insertData(List<Put> putList) {
|
||||
public static void insertData(List<Put> putList,String tablename) {
|
||||
Table table = null;
|
||||
try {
|
||||
table = connection.getTable(TableName.valueOf("sub:" + RadiusRelationshipConfig.HBASE_TABLE_NAME));
|
||||
table = connection.getTable(TableName.valueOf(tablename));
|
||||
table.put(putList);
|
||||
logger.warn("Update HBase data SUCCESS! Update size :" + putList.size());
|
||||
putList.clear();
|
||||
@@ -97,14 +97,32 @@ public class HBaseUtils {
|
||||
|
||||
}
|
||||
|
||||
public static void insertData(Put put,String tablename) {
|
||||
Table table = null;
|
||||
try {
|
||||
table = connection.getTable(TableName.valueOf(tablename));
|
||||
table.put(put);
|
||||
// logger.warn("Update HBase data SUCCESS! Update size :" + putList.size());
|
||||
} catch (IOException e) {
|
||||
logger.error("Update HBase data ERROR! Message :" + e);
|
||||
} finally {
|
||||
try {
|
||||
if (table != null) {
|
||||
table.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("Close HBase.table ERROR! Message:" + e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
/**
|
||||
* 获取所有的 key value
|
||||
*/
|
||||
private static void getAll() {
|
||||
long begin = System.currentTimeMillis();
|
||||
try {
|
||||
Table table = connection.getTable(TableName.valueOf("sub:" + RadiusRelationshipConfig.HBASE_TABLE_NAME));
|
||||
Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_TABLE_NAME));
|
||||
Scan scan2 = new Scan();
|
||||
ResultScanner scanner = table.getScanner(scan2);
|
||||
for (Result result : scanner) {
|
||||
|
||||
@@ -23,6 +23,7 @@ public class Consumer {
|
||||
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
|
||||
|
||||
CertUtils.chooseCert(RadiusRelationshipConfig.KAFKA_SOURCE_PROTOCOL,properties);
|
||||
|
||||
return properties;
|
||||
|
||||
Reference in New Issue
Block a user