增加 set 函数。
This commit is contained in:
2
pom.xml
2
pom.xml
@@ -4,7 +4,7 @@
|
|||||||
|
|
||||||
<groupId>cn.ac.iie</groupId>
|
<groupId>cn.ac.iie</groupId>
|
||||||
<artifactId>log-stream-completion-schema</artifactId>
|
<artifactId>log-stream-completion-schema</artifactId>
|
||||||
<version>v3.20.12.12-ack-ratelimit-con</version>
|
<version>v3.21.01.02-ack-ratelimit-set</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<name>log-stream-completion-schema</name>
|
<name>log-stream-completion-schema</name>
|
||||||
|
|||||||
@@ -7,20 +7,21 @@ input.kafka.servers=192.168.44.12:9092
|
|||||||
output.kafka.servers=192.168.44.12:9092
|
output.kafka.servers=192.168.44.12:9092
|
||||||
|
|
||||||
#zookeeper 地址 用于配置log_id
|
#zookeeper 地址 用于配置log_id
|
||||||
zookeeper.servers=192.168.40.4412:2181
|
zookeeper.servers=192.168.44.12:2181
|
||||||
|
|
||||||
#hbase zookeeper地址 用于连接HBase
|
#hbase zookeeper地址 用于连接HBase
|
||||||
hbase.zookeeper.servers=192.168.44.12:2181
|
hbase.zookeeper.servers=192.168.44.12:2181
|
||||||
|
|
||||||
#定位库地址
|
#定位库地址
|
||||||
ip.library=/home/bigdata/topology/dat/
|
ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\
|
||||||
|
#ip.library=/home/bigdata/topology/dat/
|
||||||
|
|
||||||
#网关的schema位置
|
#网关的schema位置
|
||||||
schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log
|
schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/security_event_log
|
||||||
|
|
||||||
#kafka broker下的topic名称
|
#kafka broker下的topic名称
|
||||||
#kafka.topic=CONNECTION-RECORD-LOG
|
#kafka.topic=CONNECTION-RECORD-LOG
|
||||||
kafka.topic=CONNECTION-RECORD-LOG
|
kafka.topic=test
|
||||||
|
|
||||||
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
||||||
group.id=connection-record-log-20200818-1-test
|
group.id=connection-record-log-20200818-1-test
|
||||||
@@ -41,7 +42,7 @@ auto.offset.reset=latest
|
|||||||
|
|
||||||
#输出topic
|
#输出topic
|
||||||
#results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
|
#results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
|
||||||
results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
|
results.output.topic=test-result
|
||||||
|
|
||||||
#--------------------------------topology配置------------------------------#
|
#--------------------------------topology配置------------------------------#
|
||||||
|
|
||||||
|
|||||||
@@ -38,31 +38,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
|
|||||||
props.put("max.poll.records", 3000);
|
props.put("max.poll.records", 3000);
|
||||||
props.put("max.partition.fetch.bytes", 31457280);
|
props.put("max.partition.fetch.bytes", 31457280);
|
||||||
props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
|
props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
|
||||||
// switch (FlowWriteConfig.KAFKA_TOPIC) {
|
|
||||||
// case "PROXY-EVENT-LOG":
|
|
||||||
// props.put("client.id", "proxy");
|
|
||||||
// break;
|
|
||||||
// case "RADIUS-RECORD-LOG":
|
|
||||||
// props.put("client.id", "radius");
|
|
||||||
// break;
|
|
||||||
// case "CONNECTION-RECORD-LOG":
|
|
||||||
// props.put("client.id", "connection");
|
|
||||||
// break;
|
|
||||||
// case "SECURITY-EVENT-LOG":
|
|
||||||
// props.put("client.id", "security");
|
|
||||||
// break;
|
|
||||||
// case "CONNECTION-SKETCH":
|
|
||||||
// props.put("client.id", "sketch");
|
|
||||||
// break;
|
|
||||||
// case "ACTIVE-DEFENCE-EVENT-LOG":
|
|
||||||
// props.put("client.id", "active");
|
|
||||||
// break;
|
|
||||||
// case "SYS-PACKET-CAPTURE-LOG":
|
|
||||||
// props.put("client.id", "packet");
|
|
||||||
// break;
|
|
||||||
//
|
|
||||||
// default:
|
|
||||||
// }
|
|
||||||
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||||
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||||
|
|
||||||
|
|||||||
@@ -128,6 +128,11 @@ public class TransFormUtils {
|
|||||||
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(ipLookup, name.toString()));
|
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(ipLookup, name.toString()));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case "set_value":
|
||||||
|
if (name != null && param != null) {
|
||||||
|
JsonParseUtil.setValue(object, appendToKeyName, setValue(param));
|
||||||
|
}
|
||||||
|
break;
|
||||||
case "get_value":
|
case "get_value":
|
||||||
if (name != null) {
|
if (name != null) {
|
||||||
JsonParseUtil.setValue(object, appendToKeyName, name);
|
JsonParseUtil.setValue(object, appendToKeyName, name);
|
||||||
|
|||||||
@@ -189,4 +189,24 @@ class TransFunction {
|
|||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设置固定值函数 若为数字则转为long返回
|
||||||
|
* @param param 默认值
|
||||||
|
* @return 返回数字或字符串
|
||||||
|
*/
|
||||||
|
static Object setValue(String param) {
|
||||||
|
try {
|
||||||
|
Matcher isNum = pattern.matcher(param);
|
||||||
|
if (isNum.matches()) {
|
||||||
|
return Long.parseLong(param);
|
||||||
|
} else {
|
||||||
|
return param;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("SetValue 函数异常,异常信息:" + e);
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user