diff --git a/pom.xml b/pom.xml index df7d7ab..83ac80e 100644 --- a/pom.xml +++ b/pom.xml @@ -144,6 +144,11 @@ fastjson 1.2.59 + + cglib + cglib-nodep + 3.2.4 + com.zdjizhi @@ -262,12 +267,11 @@ - - - com.google.guava - guava - 25.1-jre + org.junit.jupiter + junit-jupiter-api + 5.3.2 + compile diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index c307cc7..1e1bf00 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,6 +1,6 @@ #管理kafka地址 -#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 -bootstrap.servers=192.168.40.186:9092 +bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 +#bootstrap.servers=192.168.40.186:9092 #zookeeper 地址 zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 @@ -16,33 +16,34 @@ hbase.table.name=subscriber_info auto.offset.reset=latest #kafka broker下的topic名称 -kafka.topic=CONNECTION-RECORD-LOG +kafka.topic=SECURITY-EVENT-LOG #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=security-policy-191114 +group.id=security-policy-191216 #输出topic -results.output.topic=CONNECTION-RECORD-COMPLETED-LOG +results.output.topic=SECURITY-EVENT-COMPLETED-LOG #storm topology workers -topology.workers=3 +topology.workers=1 #spout并行度 建议与kafka分区数相同 -spout.parallelism=6 +spout.parallelism=3 #处理补全操作的bolt并行度-worker的倍数 -datacenter.bolt.parallelism=12 +datacenter.bolt.parallelism=1 #写入kafka的并行度10 -kafka.bolt.parallelism=12 +kafka.bolt.parallelism=3 #定位库地址 -ip.library=/home/ceiec/topology/dat/ +#ip.library=/home/ceiec/topology/dat/ #ip.library=D:\\workerSpace\\K18-Phase2\\3.0.2019115\\log-stream-completion\\ - +ip.library=D:\\dat\\ #kafka批量条数 batch.insert.num=2000 - +#网关的schema位置 +schema.http=http://192.168.40.224:9999/metadata/schema/v1/fields/security_event_log #数据中心(UID) data.center.id.num=15 diff --git a/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java index ca52c03..76bc8af 100644 --- a/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.Map; import static cn.ac.iie.utils.general.TransFormUtils.getSecurityMessage; +import static cn.ac.iie.utils.general.schema.TransFormUtils.dealCommonMessage; /** * 通联关系日志补全 @@ -42,7 +43,7 @@ public class SecurityCompletionBolt extends BaseBasicBolt { } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { - basicOutputCollector.emit(new Values(getSecurityMessage(message))); + basicOutputCollector.emit(new Values(dealCommonMessage(message))); } } } catch (Exception e) { diff --git a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java index f16e0da..17f03da 100644 --- a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java +++ b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java @@ -49,6 +49,9 @@ public class FlowWriteConfig { public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library"); - + /** + * http + */ + public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http"); } \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/general/schema/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/schema/TransFormUtils.java new file mode 100644 index 0000000..07f8f01 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/general/schema/TransFormUtils.java @@ -0,0 +1,227 @@ +package cn.ac.iie.utils.general.schema; + + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.hbase.HBaseUtils; +import cn.ac.iie.utils.json.JsonParseUtil; +import cn.ac.iie.utils.system.SnowflakeId; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.google.common.net.InternetDomainName; +import com.zdjizhi.utils.IpLookup; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; + +import java.util.*; +import java.util.regex.Pattern; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + * @create 2018-08-13 15:11 + */ + +public class TransFormUtils { + private static Logger logger = Logger.getLogger(TransFormUtils.class); + private final static Set PUBLIC_SUFFIX_SET = new HashSet( + Arrays.asList("com|org|net|gov|edu|co|tv|mobi|info|asia|xxx|onion|cc|cn|com.cn|edu.cn|gov.cn|net.cn|org.cn|jp|kr|tw|com.hk|hk|com.hk|org.hk|se|com.se|org.se" + .split("\\|"))); + private static Pattern IP_PATTERN = Pattern.compile("(\\d{1,3}\\.){3}(\\d{1,3})"); + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") + .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") + .loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") + .build(); + + //在内存中加载反射类用的map + private static HashMap map = JsonParseUtil.getMapFromhttp(FlowWriteConfig.SCHEMA_HTTP); + //反射成一个类 + private static Object mapObject = JsonParseUtil.generateObject(map); + //获取任务列表 + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + + /** + * 解析日志,并补全 + * 补domain,补subscriber_id + * + * @param message Security原始日志 + * @return 补全后的日志 + *

+ * current_timestamp + * snowflake_id + * geo_ip_detail + * geo_asn + * radius_match + * geo_ip_country + * geo_asn + * sub_domain + * sub_domain + */ + public static String dealCommonMessage(String message) { + + Object object = JSONObject.parseObject(message, mapObject.getClass()); +// System.out.println("补全之前 ===》 "+JSON.toJSONString(object)); + try { + for (String[] strings : jobList) { + + if (strings[2].equals("current_timestamp")) { + JsonParseUtil.setValue(object, strings[1], getCurrentTime()); + } else if (strings[2].equals("snowflake_id")) { + JsonParseUtil.setValue(object, strings[1], getSnowflakeId()); + } else if (strings[2].equals("geo_ip_detail")) { + JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(JsonParseUtil.getValue(object, strings[0]).toString())); + } else if (strings[2].equals("geo_asn")) { + JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(JsonParseUtil.getValue(object, strings[0]).toString())); + } else if (strings[2].equals("radius_match")) { + JsonParseUtil.setValue(object,strings[1],HBaseUtils.getAccount(JsonParseUtil.getValue(object,strings[0]).toString())); +// JsonParseUtil.setValue(object, strings[1], "aaaaaaaaa"); + } else if (strings[2].equals("geo_ip_country")) { + JsonParseUtil.setValue(object, strings[1], getGeoIpCountry(JsonParseUtil.getValue(object, strings[0]).toString())); + } else if (strings[0].equals("http_host") && strings[2].equals("sub_domain")) { + JsonParseUtil.setValue(object,strings[1],getTopDomain(null,JsonParseUtil.getValue(object,strings[0]).toString())); + } else if (strings[0].equals("ssl_sni") && strings[2].equals("sub_domain")) { + if (StringUtil.isBlank(JsonParseUtil.getValue(object, strings[1]).toString())) { + JsonParseUtil.setValue(object,strings[1],getTopDomain(JsonParseUtil.getValue(object,strings[0]).toString(),null)); + } + + } + } + + + return JSONObject.toJSONString(object); +// System.out.println("补全之后 ===》 "+JSON.toJSONString(object)); + + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常"); + e.printStackTrace(); + return ""; + } + } + + + /** + * 有sni通过sni获取域名,有host根据host获取域名 + * + * @param sni sni + * @param host host + * @return 顶级域名 + */ + private static String getTopDomain(String sni, String host) { + if (StringUtil.isNotBlank(host)) { + return getDomainName(host); + } else if (StringUtil.isNotBlank(sni)) { + return getDomainName(sni); + } else { + return ""; + } + } + + + /** + * 根据url截取顶级域名 + * + * @param host 网站url + * @return 顶级域名 + */ + private static String getDomainName(String host) { + String domain = ""; + try { + domain = InternetDomainName.from(host).topPrivateDomain().toString(); + } catch (Exception e) { + logger.error("host解析顶级域名异常: " + e.getMessage()); + } + return domain; + } + + /** + * 生成当前时间戳的操作 + */ + private static long getCurrentTime() { + return (System.currentTimeMillis() / 1000); + } + + /** + * 雪花模型生成id + * + * @return + */ + private static long getSnowflakeId() { + + return SnowflakeId.generateId(); + } + + /** + * 根据clientIp获取location信息 + * + * @param ip + * @return + */ + private static String getGeoIpDetail(String ip) { + + return ipLookup.cityLookupDetail(ip); + } + + /** + * 根据ip获取asn信息 + * + * @param ip + * @return + */ + private static String getGeoAsn(String ip) { + + return ipLookup.asnLookup(ip, true); + } + + /** + * 根据ip获取country信息 + * + * @param ip + * @return + */ + private static String getGeoIpCountry(String ip) { + + return ipLookup.countryLookup(ip); + } + + /** + * radius借助hbase补齐 + * + * @param ip + * @return + */ + private static String radiusMatch(String ip) { + return HBaseUtils.getAccount(ip); + } + + /** + * switch 匹配合适的方法 + * current_timestamp + * snowflake_id + * geo_ip_detail + * geo_asn + * radius_match + * geo_ip_country + * geo_asn + * sub_domain + * sub_domain + * @param func + */ +//TODO 行不通的原因是无法统一一个确定的返回值类型 + /* private static String switchFunc(String func){ + switch (func){ + case "current_timestamp": + return String.valueOf(getCurrentTime()); + case "snowflake_id": + case "geo_ip_detail": + case "geo_asn": + case "radius_match": + case "geo_ip_country": + case "sub_domain": + } + return func; + }*/ +} diff --git a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java index dfcf3b8..f386003 100644 --- a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java +++ b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; + /** * HBase 工具类 * diff --git a/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java b/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..2aa8885 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java @@ -0,0 +1,51 @@ +package cn.ac.iie.utils.http; + +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 获取网关schema的工具类 + */ +public class HttpClientUtil { + public static String requestByGetMethod(String s) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder = null; + try { + HttpGet get = new HttpGet(s); + CloseableHttpResponse httpResponse = null; + httpResponse = httpClient.execute(get); + try { + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + String line = null; + while ((line = bufferedReader.readLine()) != null) { + entityStringBuilder.append(line); + } + } + } finally { + httpResponse.close(); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + if (httpClient != null) { + httpClient.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + return entityStringBuilder.toString(); + } + +} diff --git a/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java b/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java new file mode 100644 index 0000000..ec8b7f5 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java @@ -0,0 +1,180 @@ +package cn.ac.iie.utils.json; + +import cn.ac.iie.utils.http.HttpClientUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import net.sf.cglib.beans.BeanGenerator; +import net.sf.cglib.beans.BeanMap; + +import java.util.*; + +/** + * 使用fastjson解析json的工具类 + */ +public class JsonParseUtil { + + /** + * 模式匹配,给定一个类型字符串返回一个类类型 + * + * @param type + * @return + */ + + public static Class getClassName(String type) { + Class clazz = int.class; + + switch (type) { + case "int": + clazz = int.class; + break; + case "double": + clazz = double.class; + break; + case "float": + clazz = float.class; + break; + case "long": + clazz = long.class; + break; + case "char": + clazz = char.class; + break; + case "byte": + clazz = byte.class; + break; + case "boolean": + clazz = boolean.class; + break; + case "short": + clazz = short.class; + break; + default: + clazz = String.class; + } + return clazz; + } + + /** + * 根据反射生成对象的方法 + * + * @param properties + * @return 生成的Object类型的对象 + */ + public static Object generateObject(Map properties) { + BeanGenerator generator = new BeanGenerator(); + Set keySet = properties.keySet(); + for (Iterator i = keySet.iterator(); i.hasNext(); ) { + String key = (String) i.next(); + generator.addProperty(key, (Class) properties.get(key)); + } + return generator.create(); + } + + /** + * 获取属性值的方法 + * + * @param obj + * @param property + * @return 属性的值 + */ + public static Object getValue(Object obj, String property) { + BeanMap beanMap = BeanMap.create(obj); + return beanMap.get(property); + } + + /** + * 更新属性值的方法 + * + * @param obj + * @param property + * @param value + */ + public static void setValue(Object obj, String property, Object value) { + BeanMap beanMap = BeanMap.create(obj); + beanMap.put(property, value); + } + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @param http + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static HashMap getMapFromhttp(String http) { + HashMap map = new HashMap<>(); + + String schema = HttpClientUtil.requestByGetMethod(http); + Object data = JSON.parseObject(schema).get("data"); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + String name = JSON.parseObject(field.toString()).get("name").toString(); + String type = JSON.parseObject(field.toString()).get("type").toString(); + + //组合用来生成实体类的map + + map.put(name, getClassName(type)); + + } + return map; + } + + + /** + * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList) + * @param http + * @return + */ + public static ArrayList getJobListFromHttp(String http) { + ArrayList list = new ArrayList<>(); + + String schema = HttpClientUtil.requestByGetMethod(http); + //解析data + Object data = JSON.parseObject(schema).get("data"); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + Object doc = JSON.parseObject(field.toString()).get("doc"); + String name = JSON.parseObject(field.toString()).get("name").toString(); + + if (doc != null) { + Object format = JSON.parseObject(doc.toString()).get("format"); + + if (format != null) { + Object functions = JSON.parseObject(format.toString()).get("functions"); + Object appendTo = JSON.parseObject(format.toString()).get("appendTo"); + + if (appendTo != null) { + String[] functionArray = functions.toString().split(","); + String[] appendToArray = appendTo.toString().split(","); + + for (int i = 0; i < functionArray.length; i++) { +// useList.add(name); +// toList.add(appendToArray[i]); +// funcList.add(functionArray[i]); + list.add(new String[]{name, appendToArray[i], functionArray[i]}); + + } + } else { +// useList.add(name); +// funcList.add(functions.toString()); +// toList.add(name); + list.add(new String[]{name, name, functions.toString()}); + } + + } + } + + } + return list; + } + + +} \ No newline at end of file diff --git a/src/test/java/cn/ac/iie/test/DomainUtils.java b/src/test/java/cn/ac/iie/test/DomainUtils.java index 3a0a427..a693047 100644 --- a/src/test/java/cn/ac/iie/test/DomainUtils.java +++ b/src/test/java/cn/ac/iie/test/DomainUtils.java @@ -25,7 +25,7 @@ public class DomainUtils { // System.out.println(InternetDomainName.from(s)); // System.out.println(InternetDomainName.from(s).topPrivateDomain()); // }else { - System.out.println(InternetDomainName.from(host).topDomainUnderRegistrySuffix()); +// System.out.println(InternetDomainName.from(host).topDomainUnderRegistrySuffix()); System.out.println(InternetDomainName.from(host).topPrivateDomain()); // } diff --git a/src/test/java/cn/ac/iie/test/UtilTest.java b/src/test/java/cn/ac/iie/test/UtilTest.java new file mode 100644 index 0000000..c03b75d --- /dev/null +++ b/src/test/java/cn/ac/iie/test/UtilTest.java @@ -0,0 +1,27 @@ +package cn.ac.iie.test; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.json.JsonParseUtil; + +import java.util.ArrayList; +import java.util.HashMap; + +public class UtilTest { + + + private static HashMap map = JsonParseUtil.getMapFromhttp(FlowWriteConfig.SCHEMA_HTTP); + //反射成一个类 + private static Object mapObject = JsonParseUtil.generateObject(map); + //获取任务列表 + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + public static void main(String[] args) { + + + for (String[] strings : jobList) { + System.out.println(strings[0]); + System.out.println(strings[1]); + System.out.println(strings[2]); + } + } +}