公共字段增加 common_sub_action

This commit is contained in:
qidaijie
2019-12-03 14:00:44 +08:00
parent 57fbb78d11
commit 7aac9e03d5
16 changed files with 629 additions and 77 deletions

View File

@@ -41,6 +41,7 @@ public class PublicSessionRecordLog {
private String common_user_region;
private String common_client_ip;
private String common_device_id;
private String common_sub_action;
private String common_isp;
private String common_sled_ip;
private String common_client_location;
@@ -381,4 +382,12 @@ public class PublicSessionRecordLog {
public void setCommon_stream_trace_id(long common_stream_trace_id) {
this.common_stream_trace_id = common_stream_trace_id;
}
public String getCommon_sub_action() {
return common_sub_action;
}
public void setCommon_sub_action(String common_sub_action) {
this.common_sub_action = common_sub_action;
}
}

View File

@@ -8,10 +8,9 @@ import cn.ac.iie.utils.system.FlowWriteConfigurations;
*/
public class FlowWriteConfig {
public static final String LOG_STRING_SPLITTER = "\t";
public static final String SQL_STRING_SPLITTER = "#";
public static final int IPV4_TYPE = 1;
public static final int IPV6_TYPE = 2;
public static final String DOMAIN_SPLITTER = ".";
/**
* System
*/

View File

@@ -60,7 +60,7 @@ public class TransFormUtils {
radiusSessionRecordLog.setCommon_server_asn(ipLookup.asnLookup(serverIp, true));
return JSONObject.toJSONString(radiusSessionRecordLog);
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常");
logger.error("{} 日志解析过程出现异常" + FlowWriteConfig.KAFKA_TOPIC);
e.printStackTrace();
return "";
}

View File

@@ -1,7 +1,8 @@
package cn.ac.iie.utils.hbase;
import cn.ac.iie.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import cn.ac.iie.utils.system.IpUtils;
import io.netty.util.collection.IntObjectHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -14,6 +15,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* HBase 工具类
@@ -23,7 +25,7 @@ import java.util.Map;
public class HBaseUtils {
private final static Logger logger = Logger.getLogger(HBaseUtils.class);
private static Map<String, String> subIdMap = new HashMap<>(32);
private static Map<String, String> subIdMap = new ConcurrentHashMap<>(333334);
private static Connection connection;
private static Long time;
@@ -32,6 +34,9 @@ public class HBaseUtils {
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
configuration.set("hbase.client.retries.number", "3");
configuration.set("hbase.bulkload.retries.number", "3");
configuration.set("zookeeper.recovery.retry", "3");
try {
connection = ConnectionFactory.createConnection(configuration);
time = System.currentTimeMillis();
@@ -47,7 +52,7 @@ public class HBaseUtils {
*/
public static void change() {
Long nowTime = System.currentTimeMillis();
timestampsFilter(time - 1500, nowTime + 500);
timestampsFilter(time - 1000, nowTime + 500);
}
/**
@@ -65,15 +70,23 @@ public class HBaseUtils {
table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
scan2.setTimeRange(startTime, endTime);
scanner = table.getScanner(scan2);
logger.warn("读取HBase变量耗时" + (System.currentTimeMillis() - begin));
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
String key = Bytes.toString(CellUtil.cloneRow(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
if (subIdMap.containsKey(key)) {
if (!value.equals(subIdMap.get(key))) {
subIdMap.put(key, value);
}
} else {
subIdMap.put(key, value);
}
}
}
Long end = System.currentTimeMillis();
logger.warn("当前集合长度" + subIdMap.keySet().size());
logger.warn("更新缓存耗时:" + (System.currentTimeMillis() - begin));
logger.warn("更新缓存耗时:" + (end - begin) + "开始时间:" + begin + "结束时间:" + end);
time = endTime;
} catch (IOException e) {
e.printStackTrace();
@@ -114,18 +127,13 @@ public class HBaseUtils {
}
}
/**
* 获取 account
*
* @param ip client_ip
* @param clientIp client_ip
* @return account
*/
public static String getAccount(String ip) {
if (StringUtil.isNotBlank(ip)) {
return subIdMap.get(ip);
} else {
return "";
}
public static String getAccount(String clientIp) {
return subIdMap.get(clientIp);
}
}

View File

@@ -45,6 +45,22 @@ public class InfluxDbUtils {
}
}
/**
* 记录对准失败次数-即内存中没有对应的key
*
* @param failure 对准失败量
*/
public static void sendHBaseFailure(int failure) {
if (failure != 0) {
InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD);
Point point1 = Point.measurement("sendHBaseFailure")
.tag("topic", FlowWriteConfig.KAFKA_TOPIC)
.field("failure", failure)
.build();
client.write("BusinessMonitor", "", point1);
}
}
/**
* 获取本机IP
*

View File

@@ -0,0 +1,56 @@
package cn.ac.iie.utils.system;
/**
* IP工具类
*
* @author qidaijie
*/
public class IpUtils {
/**
* IPV4 正则
*/
private static final String IPV4 = "^((\\d|[1-9]\\d|1\\d\\d|2([0-4]\\d|5[0-5]))\\.){4}$";
/**
* IPV6正则
*/
private static final String IPV6 = "^(([\\da-fA-F]{1,4}):){8}$";
/**
* 判断IP类型 v4 or v6
*
* @param ip IP
* @return 1:v4 2:v6 3:abnormal
*/
public static int validIPAddress(String ip) {
return String.format("%s.", ip).matches(IPV4) ? 1 : String.format("%s:", ip).matches(IPV6) ? 2 : 3;
}
/**
* ip字符串转整数
* ip是.分割的整数字符串,按照r进制转十进制的规律,按权相加求和,这里的权是256.
*
* @param ip IP
* @return ip(int)
*/
public static int ipChangeInt(String ip) {
//分割ip
String[] ipSplit = ip.split("\\.");
int result = 0;
for (int i = 0; i < 4; i++) {
Integer ipSubInteger = Integer.parseInt(ipSplit[i]);
//正则验证不能为负数
if (ipSubInteger > 255) {
result = 0;
break;
}
result += (ipSubInteger << (24 - i * 8));
}
return result;
}
public static void main(String[] args) {
System.out.println(validIPAddress("192.254.254.254"));
System.out.println(ipChangeInt("254.254.254.254"));
}
}