diff --git a/properties/action_definition.properties b/properties/action_definition.properties new file mode 100644 index 0000000..2a3f59e --- /dev/null +++ b/properties/action_definition.properties @@ -0,0 +1,35 @@ +none=0 +Monitor=1 +monitor=1 +Intercept=2 +intercept=2 +NoIntercept=3 +nointercept=3 +ActiveDefence=4 +activedefence=4 +WANNAT=8 +wannat=8 +Reject=16 +reject=16 +Deny=16 +deny=16 +Shaping=32 +shaping=32 +Manipulate=48 +manipulate=48 +ServiceChaining=64 +servicechaining=64 +Allow=96 +allow=96 +Bypass=96 +bypass=96 +Shunt=128 +shunt=128 +Statistics=129 +statistics=129 +redirect=48 +replace=48 +hijack=48 +insert=48 +edit_element=48 +run_script=48 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 125e092..c65dfcd 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,7 +1,7 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -#source.kafka.servers=192.168.44.12:9094 -source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 +source.kafka.servers=192.168.44.12:9094 +#source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #百分点输出kafka地址 percent.sink.kafka.servers=192.168.44.12:9094 #文件源数据topic输出kafka地址 @@ -68,12 +68,12 @@ deal.file.statistics.time=60 #------------------------------------knowledge配置------------------------------------# knowledge.execution.minutes=600 -knowledge.base.uri=http://192.168.44.67:9999 +knowledge.base.uri=http://192.168.44.12:9999 knowledge.base.path=/v1/knowledge_base ip.user.defined.kd.id=004390bc-3135-4a6f-a492-3662ecb9e289 ip.builtin.kd.id=64af7077-eb9b-4b8f-80cf-2ceebc89bea9 asn.builtin.kd.id=f9f6bc91-2142-4673-8249-e097c00fe1ea -hos.url=http://192.168.44.67:9098/hos/traffic_file_bucket/ +hos.url=http://192.168.44.12:9098/hos/traffic_file_bucket/ diff --git a/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java b/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java index 9bbf95f..97f0d93 100644 --- a/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java +++ b/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java @@ -2,6 +2,7 @@ package com.zdjizhi.operator.map; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.tools.general.ConfigurationsUtils; @@ -12,6 +13,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; +import java.util.HashMap; import java.util.Properties; @@ -22,19 +24,35 @@ import java.util.Properties; public class TypeMapCompleted extends ProcessFunction { private static final Log logger = LogFactory.get(); private ConvertRecordToPERCENT convertRecordToPERCENT; - Properties Prop = new Properties(); + Properties prop = new Properties(); + Properties actionProp = new Properties(); + private HashMap actionMap = new HashMap(); + @Override public void open(Configuration parameters) throws Exception { super.open(parameters); try { if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("security_event.json")) { - Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties")); + prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties")); } else if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("proxy_event.json")) { - Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("proxy_event_mapping_table.properties")); + prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("proxy_event_mapping_table.properties")); } - convertRecordToPERCENT = new ConvertRecordToPERCENT(Prop); + convertRecordToPERCENT = new ConvertRecordToPERCENT(prop); logger.info(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载成功"); + + try { + actionProp.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("action_definition.properties")); + for (String key : actionProp.stringPropertyNames()) { + final String action = actionProp.getProperty(key); + actionMap.put(key, Integer.valueOf(action)); + } + + logger.info(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载成功"); + } catch (Exception e) { + logger.error(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载失败,失败原因为:" + e); + } + } catch (Exception e) { logger.error(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载失败,失败原因为:" + e); } @@ -58,6 +76,26 @@ public class TypeMapCompleted extends ProcessFunction { } if (jsonObject != null) { + + if (record.containsKey("security_rule_list")) { + jsonObject.put("common_policy_id", JSONArray.from(record.get("security_rule_list")).get(0)); + jsonObject.put("common_action", actionMap.get(record.get("security_action").toString().replace(" ", ""))); + } + + if (record.containsKey("monitor_rule_list")) { + jsonObject.put("common_policy_id", JSONArray.from(record.get("monitor_rule_list")).get(0)); + jsonObject.put("common_action", 1); + } + + if (record.containsKey("proxy_rule_list")) { + jsonObject.put("common_policy_id", JSONArray.from(record.get("proxy_rule_list")).get(0)); + jsonObject.put("common_action", actionMap.get(record.get("proxy_action").toString().replace(" ", ""))); + + if ((int) jsonObject.get("common_action") == 48) { + jsonObject.put("common_sub_action", record.get("proxy_action")); + } + } + jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000); TransForm.transformLog(jsonObject); diff --git a/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java b/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java index 57db453..c3639e8 100644 --- a/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java +++ b/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java @@ -88,6 +88,7 @@ public class IpLookupUtils { String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipBuiltinknowlegeBaseMeta.getName(), ipBuiltinknowlegeBaseMeta.getFormat()); knowledgeMetaCache.put(fileName, ipBuiltinknowlegeBaseMeta); } + final KnowlegeBaseMeta ipUserDefinedknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_USER_DEFINED_KD_ID); if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256())) { String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipUserDefinedknowlegeBaseMeta.getName(), ipUserDefinedknowlegeBaseMeta.getFormat()); @@ -163,9 +164,7 @@ public class IpLookupUtils { * @return 过滤参数 */ private static String getFilterParameter() { - String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined','asn_builtin'))]"; - return expr; } @@ -205,8 +204,4 @@ public class IpLookupUtils { return knowlegeBaseMeta; } - public static void main(String[] args) { - final String countryLookup = IpLookupUtils.getIpLookup().asnLookup("10.64.10.7"); - System.out.println(countryLookup); - } } diff --git a/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java b/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java index e6dc4c9..84d695f 100644 --- a/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java +++ b/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java @@ -1,6 +1,5 @@ package com.zdjizhi.tools.logtransformation; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import java.util.*; @@ -27,30 +26,18 @@ public class ConvertRecordToPERCENT { } } - if (record.containsKey("security_rule_list")) { - percent.put("common_policy_id", (Integer) JSONArray.from(record.get("security_rule_list")).get(0)); - percent.put("common_action", fillingCommonAction((String) record.get("security_action"))); - } - - if (record.containsKey("monitor_rule_list")) { - percent.put("common_policy_id", (Integer) JSONArray.from(record.get("monitor_rule_list")).get(0)); - percent.put("common_action", 1); - } - - if (record.containsKey("proxy_rule_list")){ - percent.put("common_policy_id", (Integer) JSONArray.from(record.get("proxy_rule_list")).get(0)); - percent.put("common_action", fillingCommonAction((String) record.get("proxy_action"))); - } - //填充common_start_time、common_end_time - percent.put("common_start_time",record.get("start_timestamp_ms")); - percent.put("common_end_time",record.get("end_timestamp_ms")); + percent.put("common_start_time", (long) record.get("start_timestamp_ms") / 1000); + percent.put("common_end_time", (long) record.get("end_timestamp_ms") / 1000); + + //填充common_sessions + percent.put("common_sessions", 1); //填充common_internal_ip、common_external_ip、common_direction、common_stream_dir if (record.containsKey("flags")) { final int flags = (int) record.get("flags"); if (flags > 0) { - if ((8L & flags) == 8L && (16L & flags) != 16L) { + if ((8L & flags) == 8L && (16L & flags) != 16L) { percent.put("common_internal_ip", record.get("common_client_ip")); percent.put("common_external_ip", record.get("common_server_ip")); percent.put("common_direction", 69); @@ -70,56 +57,4 @@ public class ConvertRecordToPERCENT { } return percent; } - - - private int fillingCommonAction(String action) { - int number = 0; - switch (action) { - case "none": - number = 0; - break; - case "Monitor": - number = 1; - break; - case "Intercept": - number = 2; - break; - case "No Intercept": - number = 3; - break; - case "Active Defence": - number = 4; - break; - case "WAN NAT": - number = 8; - break; - case "Reject": - case "Deny": - number = 16; - break; - case "Shaping": - number = 32; - break; - case "Manipulate": - number = 48; - break; - case "Service Chaining": - number = 64; - break; - case "Allow": - case "Bypass": - number = 96; - break; - case "Shunt": - number = 128; - break; - case "Statistics": - number = 129; - break; - default: - number = 0; - } - return number; - } - } diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index e473df6..95d2a49 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -14,8 +14,6 @@ import com.zdjizhi.tools.connections.kafka.KafkaProducer; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - - public class LogFlowWriteTopology { private static final Log logger = LogFactory.get();