20.11-rc1代码提交

This commit is contained in:
李玺康
2020-10-23 14:42:51 +08:00
parent 4a8718a458
commit bc8cb5c68c
29 changed files with 1113 additions and 1010 deletions

View File

@@ -1,195 +0,0 @@
package cn.ac.iie.origion.bean;
/**
* @ClassNameConnectionRecordLog
* @Author lixkvip@126.com
* @Date2020/5/28 13:44
* @Version V1.0
**/
public class ConnectionRecordLog {
//key
private long common_recv_time;
private int common_policy_id;
private int common_action;
private String common_sub_action;
private String common_client_ip;
private String common_client_location;
private String common_sled_ip;
private String common_device_id;
private String common_subscriber_id;
private String common_server_ip;
private String common_server_location;
private int common_server_port;
private String common_l4_protocol;
private String http_domain;
private String ssl_sni;
//value
private long common_sessions;
private long common_c2s_pkt_num;
private long common_s2c_pkt_num;
private long common_c2s_byte_num;
private long common_s2c_byte_num;
public long getCommon_recv_time() {
return common_recv_time;
}
public void setCommon_recv_time(long common_recv_time) {
this.common_recv_time = common_recv_time;
}
public int getCommon_policy_id() {
return common_policy_id;
}
public void setCommon_policy_id(int common_policy_id) {
this.common_policy_id = common_policy_id;
}
public int getCommon_action() {
return common_action;
}
public void setCommon_action(int common_action) {
this.common_action = common_action;
}
public String getCommon_sub_action() {
return common_sub_action;
}
public void setCommon_sub_action(String common_sub_action) {
this.common_sub_action = common_sub_action;
}
public String getCommon_client_ip() {
return common_client_ip;
}
public void setCommon_client_ip(String common_client_ip) {
this.common_client_ip = common_client_ip;
}
public String getCommon_client_location() {
return common_client_location;
}
public void setCommon_client_location(String common_client_location) {
this.common_client_location = common_client_location;
}
public String getCommon_sled_ip() {
return common_sled_ip;
}
public void setCommon_sled_ip(String common_sled_ip) {
this.common_sled_ip = common_sled_ip;
}
public String getCommon_device_id() {
return common_device_id;
}
public void setCommon_device_id(String common_device_id) {
this.common_device_id = common_device_id;
}
public String getCommon_subscriber_id() {
return common_subscriber_id;
}
public void setCommon_subscriber_id(String common_subscriber_id) {
this.common_subscriber_id = common_subscriber_id;
}
public String getCommon_server_ip() {
return common_server_ip;
}
public void setCommon_server_ip(String common_server_ip) {
this.common_server_ip = common_server_ip;
}
public String getCommon_server_location() {
return common_server_location;
}
public void setCommon_server_location(String common_server_location) {
this.common_server_location = common_server_location;
}
public int getCommon_server_port() {
return common_server_port;
}
public void setCommon_server_port(int common_server_port) {
this.common_server_port = common_server_port;
}
public String getCommon_l4_protocol() {
return common_l4_protocol;
}
public void setCommon_l4_protocol(String common_l4_protocol) {
this.common_l4_protocol = common_l4_protocol;
}
public String getHttp_domain() {
return http_domain;
}
public void setHttp_domain(String http_domain) {
this.http_domain = http_domain;
}
public String getSsl_sni() {
return ssl_sni;
}
public void setSsl_sni(String ssl_sni) {
this.ssl_sni = ssl_sni;
}
public long getCommon_sessions() {
return common_sessions;
}
public void setCommon_sessions(long common_sessions) {
this.common_sessions = common_sessions;
}
public long getCommon_c2s_pkt_num() {
return common_c2s_pkt_num;
}
public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
this.common_c2s_pkt_num = common_c2s_pkt_num;
}
public long getCommon_s2c_pkt_num() {
return common_s2c_pkt_num;
}
public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
this.common_s2c_pkt_num = common_s2c_pkt_num;
}
public long getCommon_c2s_byte_num() {
return common_c2s_byte_num;
}
public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
this.common_c2s_byte_num = common_c2s_byte_num;
}
public long getCommon_s2c_byte_num() {
return common_s2c_byte_num;
}
public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
this.common_s2c_byte_num = common_s2c_byte_num;
}
}

View File

@@ -1,166 +0,0 @@
package cn.ac.iie.origion.bean;
/**
* @ClassNameKeyBean
* @Author lixkvip@126.com
* @Date2020/6/3 18:52
* @Version V1.0
**/
public class KeyBean {
private int common_policy_id;
private int common_action;
private String common_sub_action;
private String common_client_ip;
private String common_client_location;
private String common_sled_ip;
private String common_device_id;
private String common_subscriber_id;
private String common_server_ip;
private String common_server_location;
private int common_server_port;
private String common_l4_protocol;
private String http_domain;
private String ssl_sni;
public int getCommon_policy_id() {
return common_policy_id;
}
public void setCommon_policy_id(int common_policy_id) {
this.common_policy_id = common_policy_id;
}
public int getCommon_action() {
return common_action;
}
public void setCommon_action(int common_action) {
this.common_action = common_action;
}
public String getCommon_sub_action() {
return common_sub_action;
}
public void setCommon_sub_action(String common_sub_action) {
this.common_sub_action = common_sub_action;
}
public String getCommon_client_ip() {
return common_client_ip;
}
public void setCommon_client_ip(String common_client_ip) {
this.common_client_ip = common_client_ip;
}
public String getCommon_client_location() {
return common_client_location;
}
public void setCommon_client_location(String common_client_location) {
this.common_client_location = common_client_location;
}
public String getCommon_sled_ip() {
return common_sled_ip;
}
public void setCommon_sled_ip(String common_sled_ip) {
this.common_sled_ip = common_sled_ip;
}
public String getCommon_device_id() {
return common_device_id;
}
public void setCommon_device_id(String common_device_id) {
this.common_device_id = common_device_id;
}
public String getCommon_subscriber_id() {
return common_subscriber_id;
}
public void setCommon_subscriber_id(String common_subscriber_id) {
this.common_subscriber_id = common_subscriber_id;
}
public String getCommon_server_ip() {
return common_server_ip;
}
public void setCommon_server_ip(String common_server_ip) {
this.common_server_ip = common_server_ip;
}
public String getCommon_server_location() {
return common_server_location;
}
public void setCommon_server_location(String common_server_location) {
this.common_server_location = common_server_location;
}
public int getCommon_server_port() {
return common_server_port;
}
public void setCommon_server_port(int common_server_port) {
this.common_server_port = common_server_port;
}
public String getCommon_l4_protocol() {
return common_l4_protocol;
}
public void setCommon_l4_protocol(String common_l4_protocol) {
this.common_l4_protocol = common_l4_protocol;
}
public String getHttp_domain() {
return http_domain;
}
public void setHttp_domain(String http_domain) {
this.http_domain = http_domain;
}
public String getSsl_sni() {
return ssl_sni;
}
public void setSsl_sni(String ssl_sni) {
this.ssl_sni = ssl_sni;
}
@Override
public int hashCode() {
return ("" + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof KeyBean) {
KeyBean keyBean = (KeyBean) o;
return (this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) &&
this.getCommon_action()==(keyBean.getCommon_action()) &&
this.getCommon_sub_action().equals(keyBean.getCommon_sub_action()) &&
this.getCommon_client_ip().equals(keyBean.getCommon_client_ip()) &&
this.getCommon_client_location().equals(keyBean.getCommon_client_location()) &&
this.getCommon_sled_ip().equals(keyBean.getCommon_sled_ip()) &&
this.getCommon_device_id().equals(keyBean.getCommon_device_id()) &&
this.getCommon_subscriber_id().equals(keyBean.getCommon_subscriber_id()) &&
this.getCommon_server_ip().equals(keyBean.getCommon_server_ip()) &&
this.getCommon_server_location().equals(keyBean.getCommon_server_location()) &&
this.getCommon_server_port()==(keyBean.getCommon_server_port()) &&
this.getCommon_l4_protocol().equals(keyBean.getCommon_l4_protocol()) &&
this.getHttp_domain().equals(keyBean.getHttp_domain()) &&
this.getSsl_sni().equals(keyBean.getSsl_sni()));
}
return false;
}
}

View File

@@ -1,59 +0,0 @@
package cn.ac.iie.origion.bean;
import java.io.Serializable;
/**
* @ClassNameValueBean
* @Author lixkvip@126.com
* @Date2020/6/2 14:05
* @Version V1.0
**/
public class ValueBean implements Serializable {
private long common_sessions;
private long common_c2s_pkt_num;
private long common_s2c_pkt_num;
private long common_c2s_byte_num;
private long common_s2c_byte_num;
public long getCommon_sessions() {
return common_sessions;
}
public void setCommon_sessions(long common_sessions) {
this.common_sessions = common_sessions;
}
public long getCommon_c2s_pkt_num() {
return common_c2s_pkt_num;
}
public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
this.common_c2s_pkt_num = common_c2s_pkt_num;
}
public long getCommon_s2c_pkt_num() {
return common_s2c_pkt_num;
}
public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
this.common_s2c_pkt_num = common_s2c_pkt_num;
}
public long getCommon_c2s_byte_num() {
return common_c2s_byte_num;
}
public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
this.common_c2s_byte_num = common_c2s_byte_num;
}
public long getCommon_s2c_byte_num() {
return common_s2c_byte_num;
}
public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
this.common_s2c_byte_num = common_s2c_byte_num;
}
}

View File

@@ -1,99 +0,0 @@
package cn.ac.iie.origion.bolt;
import cn.ac.iie.origion.bean.ValueBean;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import cn.ac.iie.origion.utils.TupleUtils;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class AggregateBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(AggregateBolt.class);
private static final long serialVersionUID = 9006119186526123734L;
private HashMap<String, ValueBean> map;
private String key = "";
private ValueBean value;
private ValueBean mapValue;
@Override
public void prepare(Map stormConf, TopologyContext context) {
map = new HashMap<>(16);
key = "";
value = new ValueBean();
mapValue = new ValueBean();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
if (TupleUtils.isTick(tuple)) {
//TODO 发送到kafka的 bolt
for (String key : map.keySet()) {
basicOutputCollector.emit(new Values(key,map.get(key)));
}
map.clear();
} else {
//TODO 获取一条tuple数据的key和value
key = tuple.getString(0);
value = (ValueBean) tuple.getValue(1);
//TODO 两个count聚合后放入HashMap中利用HashMap的去重功能实现value的覆盖
mapValue = map.getOrDefault(key, new ValueBean());
mapValue = addValueBean(mapValue, value);
map.put(key, mapValue);
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.AGG_TIME);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// outputFieldsDeclarer.declare(new Fields("map"));
outputFieldsDeclarer.declare(new Fields("key","value"));
}
/**
* 将两个ValueBean中的相应的属性相加
*
* @param result
* @param value2
* @return
*/
@SuppressWarnings("all")
public ValueBean addValueBean(ValueBean result, ValueBean value2) {
result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
result.setCommon_sessions(result.getCommon_sessions() + value2.getCommon_sessions());
return result;
}
}

View File

@@ -1,102 +0,0 @@
package cn.ac.iie.origion.bolt;
import cn.ac.iie.origion.bean.KeyBean;
import cn.ac.iie.origion.bean.ValueBean;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassNameMyWindowBolt
* @Author lixkvip@126.com
* @Date2020/6/9 14:45
* @Version V1.0
**/
public class MyWindowBolt extends BaseWindowedBolt {
private static OutputCollector collector;
private HashMap<String, ValueBean> map;
private String message;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
map = new HashMap<>(16);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("key","value"));
// declarer.declare(new Fields("map"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return super.getComponentConfiguration();
}
@Override
public void execute(TupleWindow inputWindow) {
//TODO 清空上一个窗口聚合的数据
map.clear();
//TODO 遍历一个窗口的数据
for (Tuple tuple : inputWindow.getNew()) {
//TODO 将一个Tuple解析成String
message = tuple.getStringByField("source");
//TODO 获取Tuple中的value Bean
ValueBean valueBean = JSONObject.parseObject(message, ValueBean.class);
//TODO 获取Tuple中的key String
String key = JSONObject.toJSONString(JSONObject.parseObject(message, KeyBean.class));
//TODO 获取map中的value Bean
ValueBean mapValueBean = map.getOrDefault(key, new ValueBean());
//TODO 将tuple中的value和map中的value做累加
mapValueBean = addValueBean(mapValueBean, valueBean);
//TODO 将累加后的结果放到map中
map.put(key, mapValueBean);
}
//TODO 遍历map将 K V发送出去
for (String key : map.keySet()) {
collector.emit(new Values(key,map.get(key)));
}
// collector.emit(new Values(map));
}
@SuppressWarnings("all")
/**
* 将两个ValueBean中的相应的属性相加
* @param result
* @param value2
* @return
*/
public ValueBean addValueBean(ValueBean result, ValueBean value2) {
result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
result.setCommon_sessions(result.getCommon_sessions() + 1L);
return result;
}
}

View File

@@ -1,87 +0,0 @@
package cn.ac.iie.origion.bolt;
import cn.ac.iie.origion.bean.ConnectionRecordLog;
import cn.ac.iie.origion.bean.ValueBean;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import cn.ac.iie.origion.utils.KafkaLogNtc;
import com.alibaba.fastjson.JSONObject;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* 发送到kafka的bolt
*/
public class NtcLogSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = 3237813470939823159L;
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
private KafkaLogNtc kafkaLogNtc;
private JSONObject key;
private ValueBean valueBean;
private ConnectionRecordLog connectionRecordLog;
@Override
public void prepare(Map stormConf, TopologyContext context) {
kafkaLogNtc = KafkaLogNtc.getInstance();
connectionRecordLog = new ConnectionRecordLog();
key = new JSONObject();
valueBean = new ValueBean();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
// System.out.println(this.getClass() + " 获取的tuple的sessions " + (tuple.getValue(0)) + ((ValueBean)tuple.getValue(1)).getCommon_sessions());
try {
key = JSONObject.parseObject(tuple.getValue(0).toString());
valueBean = (ValueBean) (tuple.getValue(1));
connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
connectionRecordLog.setCommon_policy_id(Integer.parseInt(key.getString("common_policy_id")));
connectionRecordLog.setCommon_action(Integer.parseInt(key.getString("common_action")));
connectionRecordLog.setCommon_sub_action(key.getString("common_sub_action"));
connectionRecordLog.setCommon_client_ip(key.getString("common_client_ip"));
connectionRecordLog.setCommon_client_location(key.getString("common_client_location"));
connectionRecordLog.setCommon_sled_ip(key.getString("common_sled_ip"));
connectionRecordLog.setCommon_device_id(key.getString("common_device_id"));
connectionRecordLog.setCommon_subscriber_id(key.getString("common_subscriber_id"));
connectionRecordLog.setCommon_server_ip(key.getString("common_server_ip"));
connectionRecordLog.setCommon_server_location(key.getString("common_server_location"));
connectionRecordLog.setCommon_server_port(Integer.parseInt(key.getString("common_server_port")));
connectionRecordLog.setCommon_l4_protocol(key.getString("common_l4_protocol"));
connectionRecordLog.setHttp_domain(key.getString("http_domain"));
connectionRecordLog.setSsl_sni(key.getString("ssl_sni"));
//TODO 为Value赋值
connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

View File

@@ -1,56 +0,0 @@
package cn.ac.iie.origion.bolt;
import cn.ac.iie.origion.bean.ValueBean;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import com.alibaba.fastjson.JSONObject;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassNamePrintBolt
* @Author lixkvip@126.com
* @Date2020/6/9 13:53
* @Version V1.0
**/
public class PrintBolt extends BaseBasicBolt {
private static final long serialVersionUID = -3663610927224396615L;
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@SuppressWarnings("all")
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
HashMap<String,ValueBean> hashMap = (HashMap) tuple.getValue(0);
if (hashMap.size() != 0) {
for (String key : hashMap.keySet()) {
System.out.println(key);
System.out.println(JSONObject.toJSONString(hashMap.get(key)));
}
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}

View File

@@ -1,65 +0,0 @@
package cn.ac.iie.origion.utils;
import cn.ac.iie.origion.bean.KeyBean;
import cn.ac.iie.origion.bean.ValueBean;
import com.alibaba.fastjson.JSONObject;
import java.util.Map;
/**
* @ClassNameAggregateUtil
* @Author lixkvip@126.com
* @Date2020/6/9 11:44
* @Version V1.0
**/
public class AggregateUtil {
private static ValueBean valueBean = new ValueBean();
private static KeyBean keyBean = new KeyBean();
/**
* 接收到的是一个json字符串该方法将其解析成 Bean
* @param message
* @return
*/
public static String aggregate(String message){
//TODO 获取tuple输入内容,解析成map
Map map = JSONObject.parseObject(message);
//TODO KEY
keyBean.setCommon_policy_id(Integer.parseInt(map.getOrDefault("common_policy_id","0").toString()));
keyBean.setCommon_action(Integer.parseInt(map.getOrDefault("common_action","0").toString()));
keyBean.setCommon_sub_action(map.getOrDefault("common_sub_action","").toString());
keyBean.setCommon_client_ip(map.getOrDefault("common_client_ip","").toString());
keyBean.setCommon_client_location(map.getOrDefault("common_client_location","").toString());
keyBean.setCommon_sled_ip(map.getOrDefault("common_sled_ip","").toString());
keyBean.setCommon_device_id(map.getOrDefault("common_device_id","").toString());
keyBean.setCommon_subscriber_id(map.getOrDefault("common_subscriber_id","").toString());
keyBean.setCommon_server_ip(map.getOrDefault("common_server_ip","").toString());
keyBean.setCommon_server_location(map.getOrDefault("common_server_location","").toString());
keyBean.setCommon_server_port(Integer.parseInt(map.getOrDefault("common_server_port","0" ).toString()));
keyBean.setCommon_l4_protocol(map.getOrDefault("common_l4_protocol","").toString());
keyBean.setHttp_domain(map.getOrDefault("http_domain","").toString());
keyBean.setSsl_sni(map.getOrDefault("ssl_sni","").toString());
//TODO VALUE
valueBean.setCommon_c2s_pkt_num(Long.parseLong(map.getOrDefault("common_c2s_pkt_num",0).toString()));
valueBean.setCommon_s2c_pkt_num(Long.parseLong(map.getOrDefault("common_s2c_pkt_num",0).toString()));
valueBean.setCommon_c2s_byte_num(Long.parseLong(map.getOrDefault("common_c2s_byte_num",0).toString()));
valueBean.setCommon_s2c_byte_num(Long.parseLong(map.getOrDefault("common_s2c_byte_num",0).toString()));
valueBean.setCommon_sessions(Long.parseLong(map.getOrDefault("common_sessions",0).toString()));
return message;
}
}

View File

@@ -1,48 +0,0 @@
package cn.ac.iie.origion.utils;
/**
* @author Administrator
*/
public class FlowWriteConfig {
public static final int IPV4_TYPE = 1;
public static final int IPV6_TYPE = 2;
public static final String FORMAT_SPLITTER = ",";
/**
* System
*/
public static final Integer SPOUT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "spout.parallelism");
public static final Integer DATACENTER_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "datacenter.bolt.parallelism");
public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers");
public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks");
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time");
public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num");
public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
public static final Integer AGG_TIME = FlowWriteConfigurations.getIntProperty(0, "agg.time");
/**
* kafka
*/
public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(0, "hbase.table.name");
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic");
public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic");
public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset");
public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type");
/**
* http
*/
public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http");
}

View File

@@ -1,23 +0,0 @@
package cn.ac.iie.origion.utils;
import org.apache.storm.Constants;
import org.apache.storm.tuple.Tuple;
/**
* 用于检测是否是系统发送的tuple
*
* @author Administrator
*/
public final class TupleUtils {
/**
* 判断是否系统自动发送的Tuple
*
* @param tuple 元组
* @return true or false
*/
public static boolean isTick(Tuple tuple) {
return tuple != null
&& Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}
}

View File

@@ -0,0 +1,172 @@
package cn.ac.iie.storm.bolt;
import cn.ac.iie.storm.utils.combine.AggregateUtils;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import cn.ac.iie.storm.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.TupleUtils;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassNameAggregateBolt
* @Author lixkvip@126.com
* @Date2020/6/24 13:39
* @Version V1.0
**/
public class AggregateBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(AggregateBolt.class);
private static final long serialVersionUID = -7666031217706448622L;
private HashMap<String, JSONObject> metricsMap;
private HashMap<String, String[]> actionMap;
private HashMap<String, JSONObject> cacheMap;
private static String timestamp;
/**
* 只会在程序初始化的时候执行一次
*
* @param stormConf
* @param context
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
// timestampValue = System.currentTimeMillis() / 1000;
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
timestamp = AggregateUtils.getTimeMetric(schema);
cacheMap = new HashMap<>(32);
// TODO 获取action标签的内容
actionMap = AggregateUtils.getActionMap(schema);
metricsMap = AggregateUtils.getMetircsMap(schema);
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
if (TupleUtils.isTick(input)) {
long timestampValue = System.currentTimeMillis() / 1000;
for (String s : cacheMap.keySet()) {
JSONObject result = JSONObject.parseObject(s);
result.put(timestamp, timestampValue);
result.putAll(cacheMap.get(s));
collector.emit(new Values(result.toString()));
}
cacheMap.clear();
} else {
String label = input.getStringByField("label");
//action中某个协议的所有function,如果没有就默认
String[] metrics = actionMap.getOrDefault(label, actionMap.get("Default"));
String dimensions = input.getStringByField("dimensions");
String message = input.getStringByField("message");
//一条数据
JSONObject event = JSONObject.parseObject(message);
//数据中的key值 (protocol,device_id,isp)
//map中对应的数据可能为空,为空就默认创建一个对象
JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, new JSONObject());
//TODO 接下来遍历所有的函数去metrics的Map中去找对应的方法并执行
for (String metric : metrics) {
String name = metricsMap.get(metric).getString("name");
//可能为空
String fieldName = metricsMap.get(name).getString("fieldName");
String nameValue = cacheMessage.getString(name);
//map中的字段值
nameValue = (nameValue == null) ? "0" : nameValue;
String fieldNameValue = event.getString(fieldName);
//数据中的字段值
fieldNameValue = (fieldNameValue == null) ? "0" : fieldNameValue;
//TODO 每次新增函数,需要改动此处代码
functionSet(name, cacheMessage, nameValue, fieldNameValue);
}
cacheMap.put(dimensions, cacheMessage);
}
} catch (Exception e) {
logger.error("计算节点异常,异常信息:" + e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.AGG_TIME);
return conf;
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param name 函数名称
* @param cacheMessage 结果集
* @param nameValue 当前值
* @param fieldNameValue 新加值
*/
private static void functionSet(String name, JSONObject cacheMessage, String nameValue, String fieldNameValue) {
switch (name) {
case "sessions":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "c2s_byte_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "s2c_byte_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "c2s_pkt_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "s2c_pkt_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "c2s_ipfrag_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "s2c_ipfrag_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "s2c_tcp_lostlen":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "c2s_tcp_lostlen":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "c2s_tcp_unorder_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "s2c_tcp_unorder_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "unique_sip_num":
//TODO
//cacheMessage.put(name, AggregateUtils.)
break;
case "unique_cip_num":
//TODO
break;
default:
break;
}
}
}

View File

@@ -0,0 +1,168 @@
package cn.ac.iie.storm.bolt;
import cn.ac.iie.storm.utils.combine.AggregateUtils;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import cn.ac.iie.storm.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.TupleUtils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static cn.ac.iie.storm.utils.combine.AggregateUtils.transDimensions;
import static cn.ac.iie.storm.utils.combine.AggregateUtils.updateAppRelation;
/**
* @ClassNameMyWindowBolt
* @Author lixkvip@126.com
* @Date2020/6/9 14:45
* @Version V1.0
**/
public class ParseKvBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(ParseKvBolt.class);
private static final long serialVersionUID = -999382396035310355L;
private JSONArray transforms;
private JSONArray dimensions;
private static HashMap<Long, String> appMap = new HashMap<>(32);
/**
* 此方法只在程序启动的时候执行一次,用来初始化
*
* @param stormConf Map
* @param context TopologyContext
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
JSONObject jsonObject = JSONObject.parseObject(schema);
String data = JSONObject.parseObject(jsonObject.getString("data")).getString("data");
//TODO 解析 schema
transforms = JSONObject.parseArray(JSONObject.parseObject(data).getString("transforms"));
//TODO 获取dimensions
dimensions = JSONObject.parseArray(JSONObject.parseObject(data).getString("dimensions"));
updateAppRelation(appMap);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
if (TupleUtils.isTick(tuple)) {
updateAppRelation(appMap);
} else {
//TODO 解析tuple的 message
JSONObject message = JSONObject.parseObject(tuple.getStringByField("source"));
//TODO 新建一个dimensions的Json对象
JSONObject dimensionsObj = transDimensions(dimensions, message);
for (Object transform : transforms) {
JSONObject transformObj = JSONObject.parseObject(transform.toString());
String function = transformObj.getString("function");
String name = transformObj.getString("name");
String fieldName = transformObj.getString("fieldName");
String parameters = transformObj.getString("parameters");
switch (function) {
case "alignment":
if (StringUtil.isNotBlank(parameters)) {
if (message.containsKey(fieldName)) {
alignmentUtils(parameters, message, name, fieldName);
}
}
break;
case "combination":
if (StringUtil.isNotBlank(parameters)) {
combinationUtils(parameters, message, name, fieldName, dimensionsObj);
}
break;
case "hierarchy":
String hierarchyValue = message.getString(fieldName);
//TODO 执行拆分代码
if (StringUtil.isNotBlank(hierarchyValue) && StringUtil.isNotBlank(parameters)) {
String[] hierarchyPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
String[] dimensionsArr = hierarchyValue.split(hierarchyPars[0]);
//TODO 递归拼接tuple并发送出去
AggregateUtils.reSend(1, dimensionsArr, "", collector, message, dimensionsObj, name);
}
break;
default:
//数据原样输出
collector.emit(new Values(null, null, message.toString()));
break;
}
}
}
} catch (Exception e) {
logger.error("上层解析原始日志/拼接计算日志发送异常,异常信息:" + e);
e.printStackTrace();
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.UPDATE_APP_ID_TIME);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("label", "dimensions", "message"));
}
/**
* alignment ID替换操作
*
* @param parameters 参数集
* @param message 原始日志
* @param name 结果日志列名
* @param fieldName 原始日志列名
*/
private static void alignmentUtils(String parameters, JSONObject message, String name, String fieldName) {
String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
String data = message.getString(fieldName);
System.out.println("alignmentPars=" + Arrays.toString(alignmentPars) + "data=" + data);
int subscript = Integer.parseInt(alignmentPars[0]);
String[] fieldSplit = data.split(alignmentPars[1]);
Long appID = Long.valueOf(fieldSplit[subscript]);
int length = fieldSplit[subscript].length();
StringBuilder sb = new StringBuilder(data);
message.put(name, sb.replace(0, length, appMap.get(appID)));
}
/**
* combination 拼接操作
*
* @param parameters 参数集
* @param message 原始日志
* @param name 结果日志列名
* @param fieldName 原始日志列名
* @param dimensionsObj 结果集
*/
private static void combinationUtils(String parameters, JSONObject message, String name, String fieldName, JSONObject dimensionsObj) {
String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
String parameter0Value = message.getString(combinationPars[0]);
if (StringUtil.isNotBlank(parameter0Value)) {
String combinationValue = parameter0Value + combinationPars[1] + message.getString(fieldName);
message.put(fieldName, combinationValue);
dimensionsObj.put(name, combinationValue);
}
}
}

View File

@@ -0,0 +1,44 @@
package cn.ac.iie.storm.bolt;
import cn.ac.iie.storm.utils.common.LogSendKafka;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
/**
* 发送到kafka的bolt
*
* @author qidaijie
*/
public class ResultSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = 3237813470939823159L;
private static Logger logger = Logger.getLogger(ResultSendBolt.class);
private LogSendKafka logSendKafka;
@Override
public void prepare(Map stormConf, TopologyContext context) {
logSendKafka = LogSendKafka.getInstance();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
logSendKafka.sendMessage(tuple.getStringByField("message"));
} catch (Exception e) {
logger.error(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "日志发送Kafka过程出现异常,异常信息:" + e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

View File

@@ -0,0 +1,68 @@
package cn.ac.iie.storm.bolt.change;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* @ClassNameFilterBolt
* @Author lixkvip@126.com
* @Date2020/7/1 12:02
* @Version V1.0
**/
public class FilterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
JSONObject source = JSONObject.parseObject(input.getStringByField("source"));
String schema = "";
String data = JSONObject.parseObject(schema).getString("data");
String filters = JSONObject.parseObject(data).getString("filters");
boolean flag = true;
String type = JSONObject.parseObject(filters).getString("type");
JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields"));
if ("and".equals(type)) {
for (int i = 0; i < fieldsArr.size(); i++) {
JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString());
String name = field.getString("fieldName");
String fieldType = field.getString("type");
Object values = field.get("values");
Object nameValue = source.get(name);
System.out.println("nameValue ========" +nameValue);
if ("not".equals(fieldType)) {
if (nameValue == values) {
//满足过滤条件
flag = false;
}
} else if ("in".equals(fieldType)) {
if (!values.toString().contains(nameValue.toString())) {
//满足过滤条件
flag = false;
}
}
}}
collector.emit(new Values(source));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("filter"));
}
}

View File

@@ -0,0 +1,40 @@
package cn.ac.iie.storm.bolt.print;
import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
/**
* @ClassNamePrintBolt
* @Author lixkvip@126.com
* @Date2020/6/28 15:34
* @Version V1.0
**/
public class PrintBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(PrintBolt.class);
private static long a;
private long b;
public static long c;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
logger.error("==================================一批数据=========================");
a= System.currentTimeMillis();
b= System.currentTimeMillis();
c= System.currentTimeMillis();
logger.error(Thread.currentThread() + "private static long a======:" + a);
logger.error(Thread.currentThread() + "private long b======:" + b);
logger.error(Thread.currentThread() + "public static long c======:" + c);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

View File

@@ -1,7 +1,6 @@
package cn.ac.iie.origion.spout;
package cn.ac.iie.storm.spout;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -32,12 +31,12 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
private static Properties createConsumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
props.put("group.id", FlowWriteConfig.GROUP_ID);
props.put("bootstrap.servers", StreamAggregateConfig.BOOTSTRAP_SERVERS);
props.put("group.id", StreamAggregateConfig.GROUP_ID);
props.put("session.timeout.ms", "60000");
props.put("max.poll.records", 3000);
props.put("max.partition.fetch.bytes", 31457280);
props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
@@ -50,7 +49,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
this.context = context;
Properties prop = createConsumerConfig();
this.consumer = new KafkaConsumer<>(prop);
this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC));
this.consumer.subscribe(Collections.singletonList(StreamAggregateConfig.KAFKA_TOPIC));
}
@Override
@@ -63,7 +62,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
try {
// TODO Auto-generated method stub
ConsumerRecords<String, String> records = consumer.poll(10000L);
Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
for (ConsumerRecord<String, String> record : records) {
this.collector.emit(new Values(record.value()));
}

View File

@@ -1,4 +1,4 @@
package cn.ac.iie.origion.topology;
package cn.ac.iie.storm.topology;
import org.apache.storm.Config;

View File

@@ -1,51 +1,52 @@
package cn.ac.iie.origion.topology;
package cn.ac.iie.storm.topology;
import cn.ac.iie.origion.bolt.AggregateBolt;
import cn.ac.iie.origion.bolt.MyWindowBolt;
import cn.ac.iie.origion.bolt.NtcLogSendBolt;
import cn.ac.iie.origion.spout.CustomizedKafkaSpout;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import cn.ac.iie.storm.bolt.AggregateBolt;
import cn.ac.iie.storm.bolt.ResultSendBolt;
import cn.ac.iie.storm.bolt.ParseKvBolt;
import cn.ac.iie.storm.spout.CustomizedKafkaSpout;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import java.util.concurrent.TimeUnit;
/**
* Storm程序主类
*
* @author Administrator
*/
* @ClassNameFlowAggregateTopo
* @Author lixkvip@126.com
* @Date2020/6/24 10:14
* @Version V1.0
**/
public class StreamAggregateTopology {
public class LogFlowWriteTopology {
private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
private static Logger logger = Logger.getLogger(StreamAggregateTopology.class);
private final String topologyName;
private final Config topologyConfig;
private TopologyBuilder builder;
private LogFlowWriteTopology() {
this(LogFlowWriteTopology.class.getSimpleName());
private StreamAggregateTopology() {
this(StreamAggregateTopology.class.getSimpleName());
}
private LogFlowWriteTopology(String topologyName) {
private StreamAggregateTopology(String topologyName) {
this.topologyName = topologyName;
topologyConfig = createTopologConfig();
}
/**
* 测试配置
* conf.setTopologyWorkerMaxHeapSize(6144);
* conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G");
*/
private Config createTopologConfig() {
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setMaxSpoutPending(150000);
conf.setNumAckers(FlowWriteConfig.TOPOLOGY_WORKERS);
// conf.setTopologyWorkerMaxHeapSize(6144);
conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G");
conf.setMaxSpoutPending(StreamAggregateConfig.SPOUT_PARALLELISM);
conf.setNumAckers(StreamAggregateConfig.TOPOLOGY_NUM_ACKS);
return conf;
}
@@ -55,7 +56,7 @@ public class LogFlowWriteTopology {
}
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
topologyConfig.setNumWorkers(StreamAggregateConfig.TOPOLOGY_WORKERS);
//设置过高会导致很多问题如心跳线程饿死吞吐量大幅下跌
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
@@ -63,26 +64,30 @@ public class LogFlowWriteTopology {
private void buildTopology() {
builder = new TopologyBuilder();
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
builder.setBolt("TEST-CONN", new MyWindowBolt()
.withWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
.localOrShuffleGrouping("LogFlowWriteSpout");
builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), StreamAggregateConfig.SPOUT_PARALLELISM);
builder.setBolt("ParseKvBolt", new ParseKvBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM)
.localOrShuffleGrouping("CustomizedKafkaSpout");
builder.setBolt("AggregateBolt", new AggregateBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM)
.fieldsGrouping("ParseKvBolt", new Fields("dimensions"));
builder.setBolt("ResultSendBolt", new ResultSendBolt(), StreamAggregateConfig.KAFKA_BOLT_PARALLELISM)
.localOrShuffleGrouping("AggregateBolt");
// builder.setBolt("PrintBolt", new PrintBolt(), 3).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("AGG-BOLT", new AggregateBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).fieldsGrouping("TEST-CONN", new Fields("key"));
builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("AGG-BOLT");
}
public static void main(String[] args) throws Exception {
LogFlowWriteTopology csst = null;
StreamAggregateTopology csst = null;
boolean runLocally = true;
String parameter = "remote";
int size = 2;
if (args.length >= size && parameter.equalsIgnoreCase(args[1])) {
runLocally = false;
csst = new LogFlowWriteTopology(args[0]);
csst = new StreamAggregateTopology(args[0]);
} else {
csst = new LogFlowWriteTopology();
csst = new StreamAggregateTopology();
}
csst.buildTopology();

View File

@@ -0,0 +1,204 @@
package cn.ac.iie.storm.utils.combine;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import cn.ac.iie.storm.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
/**
* @ClassNameAggregateUtils
* @Author lixkvip@126.com
* @Date2020/6/23 14:04
* @Version V1.0
**/
public class AggregateUtils {
private final static Logger logger = Logger.getLogger(AggregateUtils.class);
/**
* Long类型的数据求和
*
* @param value1 第一个值
* @param value2 第二个值
* @return value1 + value2
*/
public static Long longSum(Long value1, Long value2) {
return value1 + value2;
}
/**
* 计算Count
*
* @param count 当前count值
* @return count+1
*/
public static Long count(Long count) {
count++;
return count;
}
/**
* 返回指标列的Map集合
*
* @param schema 动态获取的schema
* @return 指标列集合
* c2s_byte_len, { "function" : "c2s_byte_sum", "name" : "c2s_byte_len", "fieldName" : "common_c2s_byte_num" }
*/
public static HashMap<String, JSONObject> getMetircsMap(String schema) {
HashMap<String, JSONObject> metricsMap = new HashMap<>(16);
JSONObject jsonObject = JSONObject.parseObject(schema);
String data = jsonObject.getString("data");
JSONArray metrics = JSONObject.parseArray(JSONObject.parseObject(JSONObject.parseObject(data).getString("data")).getString("metrics"));
for (Object metric : metrics) {
JSONObject json = JSONObject.parseObject(metric.toString());
String name = json.getString("name");
metricsMap.put(name, json);
}
return metricsMap;
}
/**
* 递归发送tuple
*
* @param headIndex ssss
* @param splitArr
* @param initStr
* @param collector
* @param message
* @param dimesionsObj
* @param name
*/
public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) {
// //递归拼接字符串
// if (splitArr.length == headIndex - 1) {
// //什么也不做
// } else {
// //递归的核心代码
// if ("".equals(initStr)) {
// initStr = splitArr[splitArr.length - headIndex];
// } else {
// initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
// }
// dimesionsObj.put(name, initStr);
//
// collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
// reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
// }
//递归拼接字符串
if (splitArr.length != headIndex - 1) {
//递归的核心代码
if ("".equals(initStr)) {
initStr = splitArr[splitArr.length - headIndex];
} else {
initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
}
dimesionsObj.put(name, initStr);
collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
}
}
/**
* 获取action模块的Map集合
*
* @param schema 动态获取的schema
* @return HTTPmetrics数组
*/
public static HashMap<String, String[]> getActionMap(String schema) {
JSONObject jsonObject = JSONObject.parseObject(schema);
String data = jsonObject.getString("data");
JSONArray actions = JSONObject.parseArray(JSONObject.parseObject(JSONObject.parseObject(data).getString("data")).getString("action"));
HashMap<String, String[]> map = new HashMap<>(16);
for (Object action : actions) {
JSONObject json = JSONObject.parseObject(action.toString());
String label = json.getString("label");
String[] metrics = json.getString("metrics").split(",");
map.put(label, metrics);
}
return map;
}
/**
* 获取时间列的集合
*
* @param schema 动态获取的schema
* @return 时间列
*/
public static String getTimeMetric(String schema) {
JSONObject jsonObject = JSONObject.parseObject(schema);
String data = jsonObject.getString("data");
return JSONObject.parseObject(JSONObject.parseObject(JSONObject.parseObject(data)
.getString("data"))
.getString("timestamp"))
.getString("name");
}
/**
* 更新缓存中的对应关系map
*
* @param hashMap 当前缓存对应关系map
*/
public static void updateAppRelation(HashMap<Long, String> hashMap) {
try {
Long begin = System.currentTimeMillis();
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
String data = JSONObject.parseObject(schema).getString("data");
JSONArray objects = JSONArray.parseArray(data);
for (Object object : objects) {
JSONArray jsonArray = JSONArray.parseArray(object.toString());
Long key = jsonArray.getLong(0);
String value = jsonArray.getString(1);
if (hashMap.containsKey(key)) {
if (!value.equals(hashMap.get(key))) {
hashMap.put(key, value);
}
} else {
hashMap.put(key, value);
}
}
System.out.println((System.currentTimeMillis() - begin));
logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size());
} catch (Exception e) {
logger.error("更新缓存APP-ID失败,异常:" + e);
}
}
/**
* 解析 dimensions 字段集
*
* @param dimensions 维度集
* @param message 原始日志
* @return 结果维度集
*/
public static JSONObject transDimensions(JSONArray dimensions, JSONObject message) {
JSONObject dimensionsObj = new JSONObject();
for (Object dimension : dimensions) {
String fieldName = JSONObject.parseObject(dimension.toString()).getString("fieldName");
String name = JSONObject.parseObject(dimension.toString()).getString("name");
dimensionsObj.put(name, message.get(fieldName));
}
return dimensionsObj;
}
}

View File

@@ -1,6 +1,7 @@
package cn.ac.iie.origion.utils;
package cn.ac.iie.storm.utils.common;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.log4j.Logger;
@@ -13,9 +14,9 @@ import java.util.Properties;
* @create 2018-08-13 15:11
*/
public class KafkaLogNtc {
public class LogSendKafka {
private static Logger logger = Logger.getLogger(KafkaLogNtc.class);
private static Logger logger = Logger.getLogger(LogSendKafka.class);
/**
* kafka生产者用于向kafka中发送消息
@@ -25,27 +26,27 @@ public class KafkaLogNtc {
/**
* kafka生产者适配器单例用来代理kafka生产者发送消息
*/
private static KafkaLogNtc kafkaLogNtc;
private static LogSendKafka logSendKafka;
private KafkaLogNtc() {
private LogSendKafka() {
initKafkaProducer();
}
public static KafkaLogNtc getInstance() {
if (kafkaLogNtc == null) {
kafkaLogNtc = new KafkaLogNtc();
public static LogSendKafka getInstance() {
if (logSendKafka == null) {
logSendKafka = new LogSendKafka();
}
return kafkaLogNtc;
return logSendKafka;
}
public void sendMessage(String message) {
final int[] errorSum = {0};
kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() {
kafkaProducer.send(new ProducerRecord<>(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
logger.error("写入" + StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
errorSum[0]++;
}
}
@@ -60,7 +61,7 @@ public class KafkaLogNtc {
*/
private void initKafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
properties.put("bootstrap.servers", StreamAggregateConfig.RESULTS_BOOTSTRAP_SERVERS);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
@@ -68,7 +69,7 @@ public class KafkaLogNtc {
properties.put("request.timeout.ms", 30000);
properties.put("batch.size", 262144);
properties.put("buffer.memory", 33554432);
properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE);
properties.put("compression.type", StreamAggregateConfig.KAFKA_COMPRESSION_TYPE);
kafkaProducer = new KafkaProducer<>(properties);
}

View File

@@ -0,0 +1,45 @@
package cn.ac.iie.storm.utils.file;
/**
* @author Administrator
*/
public class StreamAggregateConfig {
public static final String FORMAT_SPLITTER = ",";
/**
* System
*/
public static final Integer SPOUT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "spout.parallelism");
public static final Integer DATACENTER_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "datacenter.bolt.parallelism");
public static final Integer TOPOLOGY_WORKERS = StreamAggregateConfigurations.getIntProperty(0, "topology.workers");
public static final Integer KAFKA_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
public static final Integer TOPOLOGY_NUM_ACKS = StreamAggregateConfigurations.getIntProperty(0, "topology.num.acks");
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = StreamAggregateConfigurations.getIntProperty(0, "topology.spout.sleep.time");
public static final Integer BATCH_INSERT_NUM = StreamAggregateConfigurations.getIntProperty(0, "batch.insert.num");
public static final Integer DATA_CENTER_ID_NUM = StreamAggregateConfigurations.getIntProperty(0, "data.center.id.num");
public static final Integer MAX_FAILURE_NUM = StreamAggregateConfigurations.getIntProperty(0, "max.failure.num");
public static final Integer AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "agg.time");
public static final Integer UPDATE_APP_ID_TIME = StreamAggregateConfigurations.getIntProperty(0, "update.app.id.time");
/**
* kafka
*/
public static final String BOOTSTRAP_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "bootstrap.servers");
public static final String RESULTS_BOOTSTRAP_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "results.bootstrap.servers");
public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id");
public static final String RESULTS_OUTPUT_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "results.output.topic");
public static final String KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "kafka.topic");
public static final String AUTO_OFFSET_RESET = StreamAggregateConfigurations.getStringProperty(0, "auto.offset.reset");
public static final String KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "kafka.compression.type");
/**
* http
*/
public static final String SCHEMA_HTTP = StreamAggregateConfigurations.getStringProperty(0, "schema.http");
public static final String APP_ID_HTTP = StreamAggregateConfigurations.getStringProperty(0, "app.id.http");
}

View File

@@ -1,4 +1,4 @@
package cn.ac.iie.origion.utils;
package cn.ac.iie.storm.utils.file;
import java.util.Properties;
@@ -7,7 +7,7 @@ import java.util.Properties;
* @author Administrator
*/
public final class FlowWriteConfigurations {
public final class StreamAggregateConfigurations {
// private static Properties propCommon = new Properties();
private static Properties propService = new Properties();
@@ -56,7 +56,7 @@ public final class FlowWriteConfigurations {
static {
try {
propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
} catch (Exception e) {
// propCommon = null;
propService = null;

View File

@@ -0,0 +1,55 @@
package cn.ac.iie.storm.utils.http;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 获取网关schema的工具类
*
* @author qidaijie
*/
public class HttpClientUtil {
/**
* 请求网关获取schema
* @param http 网关url
* @return schema
*/
public static String requestByGetMethod(String http) {
CloseableHttpClient httpClient = HttpClients.createDefault();
StringBuilder entityStringBuilder = null;
try {
HttpGet get = new HttpGet(http);
try (CloseableHttpResponse httpResponse = httpClient.execute(get)) {
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
String line;
while ((line = bufferedReader.readLine()) != null) {
entityStringBuilder.append(line);
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return entityStringBuilder.toString();
}
}

11
src/main/main.iml Normal file
View File

@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/java" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@@ -0,0 +1,54 @@
package com.wp;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import cn.ac.iie.storm.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
/**
* @author qidaijie
* @Package com.wp
* @Description:
* @date 2020/9/2215:09
*/
public class AppIdTest {
@Test
public void appTest() {
//http://192.168.44.12:9999/open-api/appDicList
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
JSONObject jsonObject = JSONObject.parseObject(schema);
String data = jsonObject.getString("data");
HashMap<Long, String> map = new HashMap<>(16);
JSONArray objects = JSONArray.parseArray(data);
for (Object object : objects) {
JSONArray jsonArray = JSONArray.parseArray(object.toString());
map.put(jsonArray.getLong(0), jsonArray.getString(1));
// System.out.println(object);
}
System.out.println(map.toString());
System.out.println(map.size());
}
@Test
public void changeApp() {
String a = "QQ";
String[] alignmentPars = "0,/".split(StreamAggregateConfig.FORMAT_SPLITTER);
String data = "100/HTTP";
int subscript = Integer.parseInt(alignmentPars[0]);
String[] fieldSplit = data.split(alignmentPars[1]);
Long appID = Long.valueOf(fieldSplit[subscript]);
int length = fieldSplit[subscript].length();
StringBuilder sb = new StringBuilder(data);
System.out.println(sb.replace(0, length, a));
}
}

View File

@@ -1,51 +0,0 @@
package com.wp;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.builtin.Debug;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
* Unit test for simple App.
*/
public class AppTest{
@org.junit.Test
public void test(){
Config conf = new Config();
// conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setNumWorkers(1);
FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
new Values("nickt1", 4),
new Values("nickt2", 7),
new Values("nickt3", 8),
new Values("nickt4", 9),
new Values("nickt5", 7),
new Values("nickt6", 11),
new Values("nickt7", 5)
);
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.batchGlobal()
.each(new Fields("user"),new Debug("print:"))
.parallelismHint(5);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-function", conf, topology.build());
}
}

View File

@@ -0,0 +1,133 @@
package com.wp;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
/**
* @ClassNameFilterBolt
* @Author lixkvip@126.com
* @Date2020/7/1 14:53
* @Version V1.0
**/
public class FilterBolt {
@SuppressWarnings("all")
public static void main(String[] args) {
JSONObject source = new JSONObject();
String schema = "{\n" +
" \"task\": \"Application-Protocol-Distribution\",\n" +
" \"in\": \"CONNECTION-SKETCH-COMPLETED\",\n" +
" \"out\": \"TRAFFIC-PROTOCOL-STAT-LOG\",\n" +
" \"data\": {\n" +
" \"timestamp\": {\n" +
" \"name\": \"stat_time\"\n" +
" },\n" +
" \"dimensions\": [\n" +
" {\n" +
" \"name\": \"protocol_id\",\n" +
" \"fieldName\": \"common_protocol_label\",\n" +
" \"type\": \"String\"\n" +
" },\n" +
" {\n" +
" \"name\": \"device_id\",\n" +
" \"fieldName\": \"common_device_id\",\n" +
" \"type\": \"String\"\n" +
" },\n" +
" {\n" +
" \"name\": \"isp\",\n" +
" \"fieldName\": \"common_isp\",\n" +
" \"type\": \"String\"\n" +
" }\n" +
" ],\n" +
" \"metrics\": [\n" +
" { \"function\" : \"sessions_count\", \"name\" : \"sessions\"},\n" +
" { \"function\" : \"c2s_byte_sum\", \"name\" : \"c2s_byte_len\", \"fieldName\" : \"common_c2s_byte_num\" },\n" +
" { \"function\" : \"s2c_byte_sum\", \"name\" : \"s2c_byte_len\", \"fieldName\" : \"common_s2c_byte_num\" },\n" +
" { \"function\" : \"c2s_pkt_sum\", \"name\" : \"c2s_pkt_num\", \"fieldName\" : \"common_c2s_pkt_num\" },\n" +
" { \"function\" : \"s2c_pkt_sum\", \"name\" : \"s2c_pkt_num\", \"fieldName\" : \"common_s2c_pkt_num\" },\n" +
" { \"function\" : \"sip_disCount\", \"name\" : \"unique_sip_num\", \"fieldName\" : \"common_server_ip\" },\n" +
" { \"function\" : \"cip_disCount\", \"name\" : \"unique_cip_num\", \"fieldName\" : \"common_client_ip\" }\n" +
" ],\n" +
" \"filters\": {\n" +
" \"type\": \"and\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"fieldName\": \"common_device_id\",\n" +
" \"type\": \"not\",\n" +
" \"values\": null\n" +
" },\n" +
" {\n" +
" \"fieldName\": \"common_protocol_label\",\n" +
" \"type\": \"not\",\n" +
" \"values\": null\n" +
" },\n" +
" {\n" +
" \"fieldName\": \"common_isp\",\n" +
" \"type\": \"not\",\n" +
" \"values\": null\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"transforms\":[\n" +
" {\"function\":\"combination\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"common_app_label,/\"},\n" +
" {\"function\":\"hierarchy\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"/\"}\n" +
" ],\n" +
" \"action\":[\n" +
" {\"label\": \"Default\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num\"},\n" +
" {\"label\": \"HTTP\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num,unique_sip_num,unique_cip_num\"}\n" +
" ],\n" +
" \"granularity\":{\n" +
" \"type\": \"period\",\n" +
" \"period\": \"5M\"\n" +
" }\n" +
" }\n" +
"}";
source.put("common_protocol_label", "HTTP");
source.put("common_isp", "Unicom");
source.put("common_device_id", "1");
String data = JSONObject.parseObject(schema).getString("data");
String filters = JSONObject.parseObject(data).getString("filters");
boolean flag = true;
String type = JSONObject.parseObject(filters).getString("type");
JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields"));
if ("and".equals(type)) {
for (int i = 0; i < fieldsArr.size(); i++) {
JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString());
String name = field.getString("fieldName");
String fieldType = field.getString("type");
Object values = field.get("values");
Object nameValue = source.get(name);
if ("not".equals(fieldType)) {
if (nameValue == values) {
//满足过滤条件
flag = false;
}
} else if ("in".equals(fieldType)) {
if (!values.toString().contains(nameValue.toString())) {
//满足过滤条件
flag = false;
}
}
}
if (flag){
System.out.println("输出到下个Bolt");
}else {
System.out.println("此条消息被过滤掉");
}
}
}
}

View File

@@ -0,0 +1,43 @@
package com.wp;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
/**
* @ClassNameSchemaTest
* @Author lixkvip@126.com
* @Date2020/6/28 10:11
* @Version V1.0
**/
public class SchemaTest {
static String str = "";
public static void main(String[] args) {
String str1 = null;
String str2 = " ";
System.out.println(StringUtil.isNotBlank(str1));
System.out.println(StringUtil.isNotEmpty(str1));
System.out.println(StringUtil.isNotBlank(str2));
System.out.println(StringUtil.isNotEmpty(str2));
}
public static void reAdd(int m, String[] split, String str) {
//递归拼接字符串
if (0 == m) {
} else {
//递归的核心代码
str = str + split[m - 1] + "/";
reAdd(m - 1, split, str);
}
}
}

12
src/test/test.iml Normal file
View File

@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/java" isTestSource="true" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="main" />
</component>
</module>