diff --git a/pom.xml b/pom.xml index 6a42ada..a3294cf 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ cn.ac.iie log-stream-completion-schema - v3.20.12.12-ack-ratelimit-con + v3.21.01.02-ack-ratelimit-set jar log-stream-completion-schema diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index e76a47a..6143acf 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -7,20 +7,21 @@ input.kafka.servers=192.168.44.12:9092 output.kafka.servers=192.168.44.12:9092 #zookeeper 地址 用于配置log_id -zookeeper.servers=192.168.40.4412:2181 +zookeeper.servers=192.168.44.12:2181 #hbase zookeeper地址 用于连接HBase hbase.zookeeper.servers=192.168.44.12:2181 #定位库地址 -ip.library=/home/bigdata/topology/dat/ +ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\ +#ip.library=/home/bigdata/topology/dat/ #网关的schema位置 -schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log +schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/security_event_log #kafka broker下的topic名称 #kafka.topic=CONNECTION-RECORD-LOG -kafka.topic=CONNECTION-RECORD-LOG +kafka.topic=test #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=connection-record-log-20200818-1-test @@ -41,7 +42,7 @@ auto.offset.reset=latest #输出topic #results.output.topic=CONNECTION-RECORD-COMPLETED-LOG -results.output.topic=CONNECTION-RECORD-COMPLETED-LOG +results.output.topic=test-result #--------------------------------topology配置------------------------------# diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java index 9333955..d8a9946 100644 --- a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java +++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java @@ -38,31 +38,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout { props.put("max.poll.records", 3000); props.put("max.partition.fetch.bytes", 31457280); props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET); -// switch (FlowWriteConfig.KAFKA_TOPIC) { -// case "PROXY-EVENT-LOG": -// props.put("client.id", "proxy"); -// break; -// case "RADIUS-RECORD-LOG": -// props.put("client.id", "radius"); -// break; -// case "CONNECTION-RECORD-LOG": -// props.put("client.id", "connection"); -// break; -// case "SECURITY-EVENT-LOG": -// props.put("client.id", "security"); -// break; -// case "CONNECTION-SKETCH": -// props.put("client.id", "sketch"); -// break; -// case "ACTIVE-DEFENCE-EVENT-LOG": -// props.put("client.id", "active"); -// break; -// case "SYS-PACKET-CAPTURE-LOG": -// props.put("client.id", "packet"); -// break; -// -// default: -// } + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java index 45346f3..6b04ce4 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java +++ b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java @@ -128,6 +128,11 @@ public class TransFormUtils { JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(ipLookup, name.toString())); } break; + case "set_value": + if (name != null && param != null) { + JsonParseUtil.setValue(object, appendToKeyName, setValue(param)); + } + break; case "get_value": if (name != null) { JsonParseUtil.setValue(object, appendToKeyName, name); diff --git a/src/main/java/cn/ac/iie/utils/general/TransFunction.java b/src/main/java/cn/ac/iie/utils/general/TransFunction.java index 51a5692..47f3292 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFunction.java +++ b/src/main/java/cn/ac/iie/utils/general/TransFunction.java @@ -189,4 +189,24 @@ class TransFunction { } return null; } + + /** + * 设置固定值函数 若为数字则转为long返回 + * @param param 默认值 + * @return 返回数字或字符串 + */ + static Object setValue(String param) { + try { + Matcher isNum = pattern.matcher(param); + if (isNum.matches()) { + return Long.parseLong(param); + } else { + return param; + } + } catch (Exception e) { + logger.error("SetValue 函数异常,异常信息:" + e); + e.printStackTrace(); + } + return null; + } }