动态获取schema代码更新

This commit is contained in:
李玺康
2019-12-17 14:31:30 +08:00
parent a7e25af875
commit 98a4a66a5f
10 changed files with 515 additions and 20 deletions

14
pom.xml
View File

@@ -144,6 +144,11 @@
<artifactId>fastjson</artifactId>
<version>1.2.59</version>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
@@ -262,12 +267,11 @@
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>25.1-jre</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.3.2</version>
<scope>compile</scope>
</dependency>
</dependencies>

View File

@@ -1,6 +1,6 @@
#管理kafka地址
#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
bootstrap.servers=192.168.40.186:9092
bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
#bootstrap.servers=192.168.40.186:9092
#zookeeper 地址
zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
@@ -16,33 +16,34 @@ hbase.table.name=subscriber_info
auto.offset.reset=latest
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-LOG
kafka.topic=SECURITY-EVENT-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=security-policy-191114
group.id=security-policy-191216
#输出topic
results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
results.output.topic=SECURITY-EVENT-COMPLETED-LOG
#storm topology workers
topology.workers=3
topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=6
spout.parallelism=3
#处理补全操作的bolt并行度-worker的倍数
datacenter.bolt.parallelism=12
datacenter.bolt.parallelism=1
#写入kafka的并行度10
kafka.bolt.parallelism=12
kafka.bolt.parallelism=3
#定位库地址
ip.library=/home/ceiec/topology/dat/
#ip.library=/home/ceiec/topology/dat/
#ip.library=D:\\workerSpace\\K18-Phase2\\3.0.2019115\\log-stream-completion\\
ip.library=D:\\dat\\
#kafka批量条数
batch.insert.num=2000
#网关的schema位置
schema.http=http://192.168.40.224:9999/metadata/schema/v1/fields/security_event_log
#数据中心UID
data.center.id.num=15

View File

@@ -17,6 +17,7 @@ import java.util.HashMap;
import java.util.Map;
import static cn.ac.iie.utils.general.TransFormUtils.getSecurityMessage;
import static cn.ac.iie.utils.general.schema.TransFormUtils.dealCommonMessage;
/**
* 通联关系日志补全
@@ -42,7 +43,7 @@ public class SecurityCompletionBolt extends BaseBasicBolt {
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(getSecurityMessage(message)));
basicOutputCollector.emit(new Values(dealCommonMessage(message)));
}
}
} catch (Exception e) {

View File

@@ -49,6 +49,9 @@ public class FlowWriteConfig {
public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
/**
* http
*/
public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http");
}

View File

@@ -0,0 +1,227 @@
package cn.ac.iie.utils.general.schema;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.hbase.HBaseUtils;
import cn.ac.iie.utils.json.JsonParseUtil;
import cn.ac.iie.utils.system.SnowflakeId;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.net.InternetDomainName;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import java.util.*;
import java.util.regex.Pattern;
/**
* 描述:转换或补全工具类
*
* @author qidaijie
* @create 2018-08-13 15:11
*/
public class TransFormUtils {
private static Logger logger = Logger.getLogger(TransFormUtils.class);
private final static Set<String> PUBLIC_SUFFIX_SET = new HashSet<String>(
Arrays.asList("com|org|net|gov|edu|co|tv|mobi|info|asia|xxx|onion|cc|cn|com.cn|edu.cn|gov.cn|net.cn|org.cn|jp|kr|tw|com.hk|hk|com.hk|org.hk|se|com.se|org.se"
.split("\\|")));
private static Pattern IP_PATTERN = Pattern.compile("(\\d{1,3}\\.){3}(\\d{1,3})");
private static IpLookup ipLookup = new IpLookup.Builder(false)
.loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
.build();
//在内存中加载反射类用的map
private static HashMap<String, Class> map = JsonParseUtil.getMapFromhttp(FlowWriteConfig.SCHEMA_HTTP);
//反射成一个类
private static Object mapObject = JsonParseUtil.generateObject(map);
//获取任务列表
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
/**
* 解析日志,并补全
* 补domain,补subscriber_id
*
* @param message Security原始日志
* @return 补全后的日志
* <p>
* current_timestamp
* snowflake_id
* geo_ip_detail
* geo_asn
* radius_match
* geo_ip_country
* geo_asn
* sub_domain
* sub_domain
*/
public static String dealCommonMessage(String message) {
Object object = JSONObject.parseObject(message, mapObject.getClass());
// System.out.println("补全之前 ===》 "+JSON.toJSONString(object));
try {
for (String[] strings : jobList) {
if (strings[2].equals("current_timestamp")) {
JsonParseUtil.setValue(object, strings[1], getCurrentTime());
} else if (strings[2].equals("snowflake_id")) {
JsonParseUtil.setValue(object, strings[1], getSnowflakeId());
} else if (strings[2].equals("geo_ip_detail")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(JsonParseUtil.getValue(object, strings[0]).toString()));
} else if (strings[2].equals("geo_asn")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(JsonParseUtil.getValue(object, strings[0]).toString()));
} else if (strings[2].equals("radius_match")) {
JsonParseUtil.setValue(object,strings[1],HBaseUtils.getAccount(JsonParseUtil.getValue(object,strings[0]).toString()));
// JsonParseUtil.setValue(object, strings[1], "aaaaaaaaa");
} else if (strings[2].equals("geo_ip_country")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpCountry(JsonParseUtil.getValue(object, strings[0]).toString()));
} else if (strings[0].equals("http_host") && strings[2].equals("sub_domain")) {
JsonParseUtil.setValue(object,strings[1],getTopDomain(null,JsonParseUtil.getValue(object,strings[0]).toString()));
} else if (strings[0].equals("ssl_sni") && strings[2].equals("sub_domain")) {
if (StringUtil.isBlank(JsonParseUtil.getValue(object, strings[1]).toString())) {
JsonParseUtil.setValue(object,strings[1],getTopDomain(JsonParseUtil.getValue(object,strings[0]).toString(),null));
}
}
}
return JSONObject.toJSONString(object);
// System.out.println("补全之后 ===》 "+JSON.toJSONString(object));
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常");
e.printStackTrace();
return "";
}
}
/**
* 有sni通过sni获取域名有host根据host获取域名
*
* @param sni sni
* @param host host
* @return 顶级域名
*/
private static String getTopDomain(String sni, String host) {
if (StringUtil.isNotBlank(host)) {
return getDomainName(host);
} else if (StringUtil.isNotBlank(sni)) {
return getDomainName(sni);
} else {
return "";
}
}
/**
* 根据url截取顶级域名
*
* @param host 网站url
* @return 顶级域名
*/
private static String getDomainName(String host) {
String domain = "";
try {
domain = InternetDomainName.from(host).topPrivateDomain().toString();
} catch (Exception e) {
logger.error("host解析顶级域名异常: " + e.getMessage());
}
return domain;
}
/**
* 生成当前时间戳的操作
*/
private static long getCurrentTime() {
return (System.currentTimeMillis() / 1000);
}
/**
* 雪花模型生成id
*
* @return
*/
private static long getSnowflakeId() {
return SnowflakeId.generateId();
}
/**
* 根据clientIp获取location信息
*
* @param ip
* @return
*/
private static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
/**
* 根据ip获取asn信息
*
* @param ip
* @return
*/
private static String getGeoAsn(String ip) {
return ipLookup.asnLookup(ip, true);
}
/**
* 根据ip获取country信息
*
* @param ip
* @return
*/
private static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
/**
* radius借助hbase补齐
*
* @param ip
* @return
*/
private static String radiusMatch(String ip) {
return HBaseUtils.getAccount(ip);
}
/**
* switch 匹配合适的方法
* current_timestamp
* snowflake_id
* geo_ip_detail
* geo_asn
* radius_match
* geo_ip_country
* geo_asn
* sub_domain
* sub_domain
* @param func
*/
//TODO 行不通的原因是无法统一一个确定的返回值类型
/* private static String switchFunc(String func){
switch (func){
case "current_timestamp":
return String.valueOf(getCurrentTime());
case "snowflake_id":
case "geo_ip_detail":
case "geo_asn":
case "radius_match":
case "geo_ip_country":
case "sub_domain":
}
return func;
}*/
}

View File

@@ -17,6 +17,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* HBase 工具类
*

View File

@@ -0,0 +1,51 @@
package cn.ac.iie.utils.http;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 获取网关schema的工具类
*/
public class HttpClientUtil {
public static String requestByGetMethod(String s) {
CloseableHttpClient httpClient = HttpClients.createDefault();
StringBuilder entityStringBuilder = null;
try {
HttpGet get = new HttpGet(s);
CloseableHttpResponse httpResponse = null;
httpResponse = httpClient.execute(get);
try {
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
String line = null;
while ((line = bufferedReader.readLine()) != null) {
entityStringBuilder.append(line);
}
}
} finally {
httpResponse.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return entityStringBuilder.toString();
}
}

View File

@@ -0,0 +1,180 @@
package cn.ac.iie.utils.json;
import cn.ac.iie.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import net.sf.cglib.beans.BeanGenerator;
import net.sf.cglib.beans.BeanMap;
import java.util.*;
/**
* 使用fastjson解析json的工具类
*/
public class JsonParseUtil {
/**
* 模式匹配,给定一个类型字符串返回一个类类型
*
* @param type
* @return
*/
public static Class getClassName(String type) {
Class clazz = int.class;
switch (type) {
case "int":
clazz = int.class;
break;
case "double":
clazz = double.class;
break;
case "float":
clazz = float.class;
break;
case "long":
clazz = long.class;
break;
case "char":
clazz = char.class;
break;
case "byte":
clazz = byte.class;
break;
case "boolean":
clazz = boolean.class;
break;
case "short":
clazz = short.class;
break;
default:
clazz = String.class;
}
return clazz;
}
/**
* 根据反射生成对象的方法
*
* @param properties
* @return 生成的Object类型的对象
*/
public static Object generateObject(Map properties) {
BeanGenerator generator = new BeanGenerator();
Set keySet = properties.keySet();
for (Iterator i = keySet.iterator(); i.hasNext(); ) {
String key = (String) i.next();
generator.addProperty(key, (Class) properties.get(key));
}
return generator.create();
}
/**
* 获取属性值的方法
*
* @param obj
* @param property
* @return 属性的值
*/
public static Object getValue(Object obj, String property) {
BeanMap beanMap = BeanMap.create(obj);
return beanMap.get(property);
}
/**
* 更新属性值的方法
*
* @param obj
* @param property
* @param value
*/
public static void setValue(Object obj, String property, Object value) {
BeanMap beanMap = BeanMap.create(obj);
beanMap.put(property, value);
}
/**
* 通过获取String类型的网关schema链接来获取map用于生成一个Object类型的对象
*
* @param http
* @return 用于反射生成schema类型的对象的一个map集合
*/
public static HashMap<String, Class> getMapFromhttp(String http) {
HashMap<String, Class> map = new HashMap<>();
String schema = HttpClientUtil.requestByGetMethod(http);
Object data = JSON.parseObject(schema).get("data");
//获取fields并转化为数组数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(data.toString());
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
String name = JSON.parseObject(field.toString()).get("name").toString();
String type = JSON.parseObject(field.toString()).get("type").toString();
//组合用来生成实体类的map
map.put(name, getClassName(type));
}
return map;
}
/**
* 根据http链接获取schema解析之后返回一个任务列表 (useList toList funcList)
* @param http
* @return
*/
public static ArrayList<String[]> getJobListFromHttp(String http) {
ArrayList<String[]> list = new ArrayList<>();
String schema = HttpClientUtil.requestByGetMethod(http);
//解析data
Object data = JSON.parseObject(schema).get("data");
//获取fields并转化为数组数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(data.toString());
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
Object doc = JSON.parseObject(field.toString()).get("doc");
String name = JSON.parseObject(field.toString()).get("name").toString();
if (doc != null) {
Object format = JSON.parseObject(doc.toString()).get("format");
if (format != null) {
Object functions = JSON.parseObject(format.toString()).get("functions");
Object appendTo = JSON.parseObject(format.toString()).get("appendTo");
if (appendTo != null) {
String[] functionArray = functions.toString().split(",");
String[] appendToArray = appendTo.toString().split(",");
for (int i = 0; i < functionArray.length; i++) {
// useList.add(name);
// toList.add(appendToArray[i]);
// funcList.add(functionArray[i]);
list.add(new String[]{name, appendToArray[i], functionArray[i]});
}
} else {
// useList.add(name);
// funcList.add(functions.toString());
// toList.add(name);
list.add(new String[]{name, name, functions.toString()});
}
}
}
}
return list;
}
}

View File

@@ -25,7 +25,7 @@ public class DomainUtils {
// System.out.println(InternetDomainName.from(s));
// System.out.println(InternetDomainName.from(s).topPrivateDomain());
// }else {
System.out.println(InternetDomainName.from(host).topDomainUnderRegistrySuffix());
// System.out.println(InternetDomainName.from(host).topDomainUnderRegistrySuffix());
System.out.println(InternetDomainName.from(host).topPrivateDomain());
// }

View File

@@ -0,0 +1,27 @@
package cn.ac.iie.test;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.json.JsonParseUtil;
import java.util.ArrayList;
import java.util.HashMap;
public class UtilTest {
private static HashMap<String, Class> map = JsonParseUtil.getMapFromhttp(FlowWriteConfig.SCHEMA_HTTP);
//反射成一个类
private static Object mapObject = JsonParseUtil.generateObject(map);
//获取任务列表
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
public static void main(String[] args) {
for (String[] strings : jobList) {
System.out.println(strings[0]);
System.out.println(strings[1]);
System.out.println(strings[2]);
}
}
}