diff --git a/src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java b/src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java deleted file mode 100644 index 06b2135..0000000 --- a/src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java +++ /dev/null @@ -1,195 +0,0 @@ -package cn.ac.iie.origion.bean; - -/** - * @ClassNameConnectionRecordLog - * @Author lixkvip@126.com - * @Date2020/5/28 13:44 - * @Version V1.0 - **/ -public class ConnectionRecordLog { - - //key - private long common_recv_time; - private int common_policy_id; - private int common_action; - private String common_sub_action; - private String common_client_ip; - private String common_client_location; - private String common_sled_ip; - private String common_device_id; - private String common_subscriber_id; - private String common_server_ip; - private String common_server_location; - private int common_server_port; - private String common_l4_protocol; - private String http_domain; - private String ssl_sni; - - //value - private long common_sessions; - private long common_c2s_pkt_num; - private long common_s2c_pkt_num; - private long common_c2s_byte_num; - private long common_s2c_byte_num; - - - public long getCommon_recv_time() { - return common_recv_time; - } - - public void setCommon_recv_time(long common_recv_time) { - this.common_recv_time = common_recv_time; - } - - public int getCommon_policy_id() { - return common_policy_id; - } - - public void setCommon_policy_id(int common_policy_id) { - this.common_policy_id = common_policy_id; - } - - public int getCommon_action() { - return common_action; - } - - public void setCommon_action(int common_action) { - this.common_action = common_action; - } - - public String getCommon_sub_action() { - return common_sub_action; - } - - public void setCommon_sub_action(String common_sub_action) { - this.common_sub_action = common_sub_action; - } - - public String getCommon_client_ip() { - return common_client_ip; - } - - public void setCommon_client_ip(String common_client_ip) { - this.common_client_ip = common_client_ip; - } - - public String getCommon_client_location() { - return common_client_location; - } - - public void setCommon_client_location(String common_client_location) { - this.common_client_location = common_client_location; - } - - public String getCommon_sled_ip() { - return common_sled_ip; - } - - public void setCommon_sled_ip(String common_sled_ip) { - this.common_sled_ip = common_sled_ip; - } - - public String getCommon_device_id() { - return common_device_id; - } - - public void setCommon_device_id(String common_device_id) { - this.common_device_id = common_device_id; - } - - public String getCommon_subscriber_id() { - return common_subscriber_id; - } - - public void setCommon_subscriber_id(String common_subscriber_id) { - this.common_subscriber_id = common_subscriber_id; - } - - public String getCommon_server_ip() { - return common_server_ip; - } - - public void setCommon_server_ip(String common_server_ip) { - this.common_server_ip = common_server_ip; - } - - public String getCommon_server_location() { - return common_server_location; - } - - public void setCommon_server_location(String common_server_location) { - this.common_server_location = common_server_location; - } - - public int getCommon_server_port() { - return common_server_port; - } - - public void setCommon_server_port(int common_server_port) { - this.common_server_port = common_server_port; - } - - public String getCommon_l4_protocol() { - return common_l4_protocol; - } - - public void setCommon_l4_protocol(String common_l4_protocol) { - this.common_l4_protocol = common_l4_protocol; - } - - public String getHttp_domain() { - return http_domain; - } - - public void setHttp_domain(String http_domain) { - this.http_domain = http_domain; - } - - public String getSsl_sni() { - return ssl_sni; - } - - public void setSsl_sni(String ssl_sni) { - this.ssl_sni = ssl_sni; - } - - public long getCommon_sessions() { - return common_sessions; - } - - public void setCommon_sessions(long common_sessions) { - this.common_sessions = common_sessions; - } - - public long getCommon_c2s_pkt_num() { - return common_c2s_pkt_num; - } - - public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) { - this.common_c2s_pkt_num = common_c2s_pkt_num; - } - - public long getCommon_s2c_pkt_num() { - return common_s2c_pkt_num; - } - - public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) { - this.common_s2c_pkt_num = common_s2c_pkt_num; - } - - public long getCommon_c2s_byte_num() { - return common_c2s_byte_num; - } - - public void setCommon_c2s_byte_num(long common_c2s_byte_num) { - this.common_c2s_byte_num = common_c2s_byte_num; - } - - public long getCommon_s2c_byte_num() { - return common_s2c_byte_num; - } - - public void setCommon_s2c_byte_num(long common_s2c_byte_num) { - this.common_s2c_byte_num = common_s2c_byte_num; - } -} diff --git a/src/main/java/cn/ac/iie/origion/bean/KeyBean.java b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java deleted file mode 100644 index 6898558..0000000 --- a/src/main/java/cn/ac/iie/origion/bean/KeyBean.java +++ /dev/null @@ -1,166 +0,0 @@ -package cn.ac.iie.origion.bean; - -/** - * @ClassNameKeyBean - * @Author lixkvip@126.com - * @Date2020/6/3 18:52 - * @Version V1.0 - **/ -public class KeyBean { - - private int common_policy_id; - private int common_action; - private String common_sub_action; - private String common_client_ip; - private String common_client_location; - private String common_sled_ip; - private String common_device_id; - private String common_subscriber_id; - private String common_server_ip; - private String common_server_location; - private int common_server_port; - private String common_l4_protocol; - private String http_domain; - private String ssl_sni; - - public int getCommon_policy_id() { - return common_policy_id; - } - - public void setCommon_policy_id(int common_policy_id) { - this.common_policy_id = common_policy_id; - } - - public int getCommon_action() { - return common_action; - } - - public void setCommon_action(int common_action) { - this.common_action = common_action; - } - - public String getCommon_sub_action() { - return common_sub_action; - } - - public void setCommon_sub_action(String common_sub_action) { - this.common_sub_action = common_sub_action; - } - - public String getCommon_client_ip() { - return common_client_ip; - } - - public void setCommon_client_ip(String common_client_ip) { - this.common_client_ip = common_client_ip; - } - - public String getCommon_client_location() { - return common_client_location; - } - - public void setCommon_client_location(String common_client_location) { - this.common_client_location = common_client_location; - } - - public String getCommon_sled_ip() { - return common_sled_ip; - } - - public void setCommon_sled_ip(String common_sled_ip) { - this.common_sled_ip = common_sled_ip; - } - - public String getCommon_device_id() { - return common_device_id; - } - - public void setCommon_device_id(String common_device_id) { - this.common_device_id = common_device_id; - } - - public String getCommon_subscriber_id() { - return common_subscriber_id; - } - - public void setCommon_subscriber_id(String common_subscriber_id) { - this.common_subscriber_id = common_subscriber_id; - } - - public String getCommon_server_ip() { - return common_server_ip; - } - - public void setCommon_server_ip(String common_server_ip) { - this.common_server_ip = common_server_ip; - } - - public String getCommon_server_location() { - return common_server_location; - } - - public void setCommon_server_location(String common_server_location) { - this.common_server_location = common_server_location; - } - - public int getCommon_server_port() { - return common_server_port; - } - - public void setCommon_server_port(int common_server_port) { - this.common_server_port = common_server_port; - } - - public String getCommon_l4_protocol() { - return common_l4_protocol; - } - - public void setCommon_l4_protocol(String common_l4_protocol) { - this.common_l4_protocol = common_l4_protocol; - } - - public String getHttp_domain() { - return http_domain; - } - - public void setHttp_domain(String http_domain) { - this.http_domain = http_domain; - } - - public String getSsl_sni() { - return ssl_sni; - } - - public void setSsl_sni(String ssl_sni) { - this.ssl_sni = ssl_sni; - } - - @Override - public int hashCode() { - - return ("" + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode(); - - } - - @Override - public boolean equals(Object o) { - if (o instanceof KeyBean) { - KeyBean keyBean = (KeyBean) o; - return (this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) && - this.getCommon_action()==(keyBean.getCommon_action()) && - this.getCommon_sub_action().equals(keyBean.getCommon_sub_action()) && - this.getCommon_client_ip().equals(keyBean.getCommon_client_ip()) && - this.getCommon_client_location().equals(keyBean.getCommon_client_location()) && - this.getCommon_sled_ip().equals(keyBean.getCommon_sled_ip()) && - this.getCommon_device_id().equals(keyBean.getCommon_device_id()) && - this.getCommon_subscriber_id().equals(keyBean.getCommon_subscriber_id()) && - this.getCommon_server_ip().equals(keyBean.getCommon_server_ip()) && - this.getCommon_server_location().equals(keyBean.getCommon_server_location()) && - this.getCommon_server_port()==(keyBean.getCommon_server_port()) && - this.getCommon_l4_protocol().equals(keyBean.getCommon_l4_protocol()) && - this.getHttp_domain().equals(keyBean.getHttp_domain()) && - this.getSsl_sni().equals(keyBean.getSsl_sni())); - } - return false; - } -} diff --git a/src/main/java/cn/ac/iie/origion/bean/ValueBean.java b/src/main/java/cn/ac/iie/origion/bean/ValueBean.java deleted file mode 100644 index cfe2f37..0000000 --- a/src/main/java/cn/ac/iie/origion/bean/ValueBean.java +++ /dev/null @@ -1,59 +0,0 @@ -package cn.ac.iie.origion.bean; - -import java.io.Serializable; - -/** - * @ClassNameValueBean - * @Author lixkvip@126.com - * @Date2020/6/2 14:05 - * @Version V1.0 - **/ -public class ValueBean implements Serializable { - - private long common_sessions; - private long common_c2s_pkt_num; - private long common_s2c_pkt_num; - private long common_c2s_byte_num; - private long common_s2c_byte_num; - - - public long getCommon_sessions() { - return common_sessions; - } - - public void setCommon_sessions(long common_sessions) { - this.common_sessions = common_sessions; - } - - public long getCommon_c2s_pkt_num() { - return common_c2s_pkt_num; - } - - public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) { - this.common_c2s_pkt_num = common_c2s_pkt_num; - } - - public long getCommon_s2c_pkt_num() { - return common_s2c_pkt_num; - } - - public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) { - this.common_s2c_pkt_num = common_s2c_pkt_num; - } - - public long getCommon_c2s_byte_num() { - return common_c2s_byte_num; - } - - public void setCommon_c2s_byte_num(long common_c2s_byte_num) { - this.common_c2s_byte_num = common_c2s_byte_num; - } - - public long getCommon_s2c_byte_num() { - return common_s2c_byte_num; - } - - public void setCommon_s2c_byte_num(long common_s2c_byte_num) { - this.common_s2c_byte_num = common_s2c_byte_num; - } -} diff --git a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java deleted file mode 100644 index d7f5eb7..0000000 --- a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java +++ /dev/null @@ -1,99 +0,0 @@ -package cn.ac.iie.origion.bolt; - - -import cn.ac.iie.origion.bean.ValueBean; -import cn.ac.iie.origion.utils.FlowWriteConfig; -import cn.ac.iie.origion.utils.TupleUtils; - -import org.apache.log4j.Logger; -import org.apache.storm.Config; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -import java.util.HashMap; -import java.util.Map; - - -public class AggregateBolt extends BaseBasicBolt { - private final static Logger logger = Logger.getLogger(AggregateBolt.class); - private static final long serialVersionUID = 9006119186526123734L; - - private HashMap map; - private String key = ""; - private ValueBean value; - private ValueBean mapValue; - - - @Override - public void prepare(Map stormConf, TopologyContext context) { - map = new HashMap<>(16); - key = ""; - value = new ValueBean(); - mapValue = new ValueBean(); - } - - @Override - public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { - try { - if (TupleUtils.isTick(tuple)) { - //TODO 发送到kafka的 bolt - for (String key : map.keySet()) { - basicOutputCollector.emit(new Values(key,map.get(key))); - } - map.clear(); - } else { - //TODO 获取一条tuple数据的key和value - key = tuple.getString(0); - value = (ValueBean) tuple.getValue(1); - //TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖 - mapValue = map.getOrDefault(key, new ValueBean()); - mapValue = addValueBean(mapValue, value); - map.put(key, mapValue); - } - } catch (Exception e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); - e.printStackTrace(); - } - } - - @Override - public Map getComponentConfiguration() { - Map conf = new HashMap(16); - conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.AGG_TIME); - return conf; - } - - - @Override - public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { -// outputFieldsDeclarer.declare(new Fields("map")); - outputFieldsDeclarer.declare(new Fields("key","value")); - } - - /** - * 将两个ValueBean中的相应的属性相加 - * - * @param result - * @param value2 - * @return - */ - - @SuppressWarnings("all") - public ValueBean addValueBean(ValueBean result, ValueBean value2) { - - result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num()); - result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num()); - result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num()); - result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num()); - result.setCommon_sessions(result.getCommon_sessions() + value2.getCommon_sessions()); - - - return result; - } - -} diff --git a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java deleted file mode 100644 index 4432255..0000000 --- a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java +++ /dev/null @@ -1,102 +0,0 @@ -package cn.ac.iie.origion.bolt; - -import cn.ac.iie.origion.bean.KeyBean; -import cn.ac.iie.origion.bean.ValueBean; -import com.alibaba.fastjson.JSONObject; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseWindowedBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; -import org.apache.storm.windowing.TupleWindow; - -import java.util.HashMap; -import java.util.Map; - -/** - * @ClassNameMyWindowBolt - * @Author lixkvip@126.com - * @Date2020/6/9 14:45 - * @Version V1.0 - **/ -public class MyWindowBolt extends BaseWindowedBolt { - - - private static OutputCollector collector; - private HashMap map; - private String message; - - - @Override - - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - map = new HashMap<>(16); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - declarer.declare(new Fields("key","value")); -// declarer.declare(new Fields("map")); - } - - @Override - public Map getComponentConfiguration() { - return super.getComponentConfiguration(); - } - - @Override - public void execute(TupleWindow inputWindow) { - //TODO 清空上一个窗口聚合的数据 - map.clear(); - - //TODO 遍历一个窗口的数据 - for (Tuple tuple : inputWindow.getNew()) { - - //TODO 将一个Tuple解析成String - message = tuple.getStringByField("source"); - - //TODO 获取Tuple中的value Bean - ValueBean valueBean = JSONObject.parseObject(message, ValueBean.class); - //TODO 获取Tuple中的key String - String key = JSONObject.toJSONString(JSONObject.parseObject(message, KeyBean.class)); - //TODO 获取map中的value Bean - ValueBean mapValueBean = map.getOrDefault(key, new ValueBean()); - //TODO 将tuple中的value和map中的value做累加 - mapValueBean = addValueBean(mapValueBean, valueBean); - - //TODO 将累加后的结果放到map中 - map.put(key, mapValueBean); - } - - //TODO 遍历map将 K V发送出去 - for (String key : map.keySet()) { - collector.emit(new Values(key,map.get(key))); - } -// collector.emit(new Values(map)); - - } - - @SuppressWarnings("all") - /** - * 将两个ValueBean中的相应的属性相加 - * @param result - * @param value2 - * @return - */ - - public ValueBean addValueBean(ValueBean result, ValueBean value2) { - - result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num()); - result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num()); - result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num()); - result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num()); - result.setCommon_sessions(result.getCommon_sessions() + 1L); - return result; - } - - -} diff --git a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java deleted file mode 100644 index 40a2070..0000000 --- a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java +++ /dev/null @@ -1,87 +0,0 @@ -package cn.ac.iie.origion.bolt; - - -import cn.ac.iie.origion.bean.ConnectionRecordLog; -import cn.ac.iie.origion.bean.ValueBean; -import cn.ac.iie.origion.utils.FlowWriteConfig; -import cn.ac.iie.origion.utils.KafkaLogNtc; -import com.alibaba.fastjson.JSONObject; -import org.apache.log4j.Logger; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Tuple; - -import java.util.HashMap; -import java.util.Map; - -/** - * 发送到kafka的bolt - */ -public class NtcLogSendBolt extends BaseBasicBolt { - - - private static final long serialVersionUID = 3237813470939823159L; - private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); - private KafkaLogNtc kafkaLogNtc; - - private JSONObject key; - private ValueBean valueBean; - private ConnectionRecordLog connectionRecordLog; - - - @Override - public void prepare(Map stormConf, TopologyContext context) { - kafkaLogNtc = KafkaLogNtc.getInstance(); - connectionRecordLog = new ConnectionRecordLog(); - key = new JSONObject(); - valueBean = new ValueBean(); - } - - @Override - public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { - - -// System.out.println(this.getClass() + " 获取的tuple的sessions " + (tuple.getValue(0)) + ((ValueBean)tuple.getValue(1)).getCommon_sessions()); - try { - key = JSONObject.parseObject(tuple.getValue(0).toString()); - - valueBean = (ValueBean) (tuple.getValue(1)); - - connectionRecordLog.setCommon_recv_time(System.currentTimeMillis()); - connectionRecordLog.setCommon_policy_id(Integer.parseInt(key.getString("common_policy_id"))); - connectionRecordLog.setCommon_action(Integer.parseInt(key.getString("common_action"))); - connectionRecordLog.setCommon_sub_action(key.getString("common_sub_action")); - connectionRecordLog.setCommon_client_ip(key.getString("common_client_ip")); - connectionRecordLog.setCommon_client_location(key.getString("common_client_location")); - connectionRecordLog.setCommon_sled_ip(key.getString("common_sled_ip")); - connectionRecordLog.setCommon_device_id(key.getString("common_device_id")); - connectionRecordLog.setCommon_subscriber_id(key.getString("common_subscriber_id")); - connectionRecordLog.setCommon_server_ip(key.getString("common_server_ip")); - connectionRecordLog.setCommon_server_location(key.getString("common_server_location")); - connectionRecordLog.setCommon_server_port(Integer.parseInt(key.getString("common_server_port"))); - connectionRecordLog.setCommon_l4_protocol(key.getString("common_l4_protocol")); - connectionRecordLog.setHttp_domain(key.getString("http_domain")); - connectionRecordLog.setSsl_sni(key.getString("ssl_sni")); - - //TODO 为Value赋值 - - connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions()); - connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num()); - connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num()); - connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num()); - connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num()); - - kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog)); - } catch (Exception e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); - e.printStackTrace(); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } -} diff --git a/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java b/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java deleted file mode 100644 index 4215d0e..0000000 --- a/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java +++ /dev/null @@ -1,56 +0,0 @@ -package cn.ac.iie.origion.bolt; - -import cn.ac.iie.origion.bean.ValueBean; - -import cn.ac.iie.origion.utils.FlowWriteConfig; -import com.alibaba.fastjson.JSONObject; -import org.apache.log4j.Logger; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Tuple; - -import java.util.HashMap; -import java.util.Map; - -/** - * @ClassNamePrintBolt - * @Author lixkvip@126.com - * @Date2020/6/9 13:53 - * @Version V1.0 - **/ -public class PrintBolt extends BaseBasicBolt { - - private static final long serialVersionUID = -3663610927224396615L; - private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); - - - @Override - public void prepare(Map stormConf, TopologyContext context) { - } - - @SuppressWarnings("all") - @Override - public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { - try { - HashMap hashMap = (HashMap) tuple.getValue(0); - - if (hashMap.size() != 0) { - for (String key : hashMap.keySet()) { - System.out.println(key); - System.out.println(JSONObject.toJSONString(hashMap.get(key))); - } - - } - } catch (Exception e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); - e.printStackTrace(); - } - } - - - @Override - public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - } -} diff --git a/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java deleted file mode 100644 index e7d7a35..0000000 --- a/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java +++ /dev/null @@ -1,65 +0,0 @@ -package cn.ac.iie.origion.utils; - - -import cn.ac.iie.origion.bean.KeyBean; -import cn.ac.iie.origion.bean.ValueBean; -import com.alibaba.fastjson.JSONObject; - -import java.util.Map; - -/** - * @ClassNameAggregateUtil - * @Author lixkvip@126.com - * @Date2020/6/9 11:44 - * @Version V1.0 - **/ -public class AggregateUtil { - - - private static ValueBean valueBean = new ValueBean(); - private static KeyBean keyBean = new KeyBean(); - - /** - * 接收到的是一个json字符串,该方法将其解析成 Bean - * @param message - * @return - */ - public static String aggregate(String message){ - - - - //TODO 获取tuple输入内容,解析成map - Map map = JSONObject.parseObject(message); - - - //TODO KEY - - keyBean.setCommon_policy_id(Integer.parseInt(map.getOrDefault("common_policy_id","0").toString())); - keyBean.setCommon_action(Integer.parseInt(map.getOrDefault("common_action","0").toString())); - keyBean.setCommon_sub_action(map.getOrDefault("common_sub_action","").toString()); - keyBean.setCommon_client_ip(map.getOrDefault("common_client_ip","").toString()); - keyBean.setCommon_client_location(map.getOrDefault("common_client_location","").toString()); - keyBean.setCommon_sled_ip(map.getOrDefault("common_sled_ip","").toString()); - keyBean.setCommon_device_id(map.getOrDefault("common_device_id","").toString()); - keyBean.setCommon_subscriber_id(map.getOrDefault("common_subscriber_id","").toString()); - keyBean.setCommon_server_ip(map.getOrDefault("common_server_ip","").toString()); - keyBean.setCommon_server_location(map.getOrDefault("common_server_location","").toString()); - keyBean.setCommon_server_port(Integer.parseInt(map.getOrDefault("common_server_port","0" ).toString())); - keyBean.setCommon_l4_protocol(map.getOrDefault("common_l4_protocol","").toString()); - keyBean.setHttp_domain(map.getOrDefault("http_domain","").toString()); - keyBean.setSsl_sni(map.getOrDefault("ssl_sni","").toString()); - - //TODO VALUE - - - valueBean.setCommon_c2s_pkt_num(Long.parseLong(map.getOrDefault("common_c2s_pkt_num",0).toString())); - valueBean.setCommon_s2c_pkt_num(Long.parseLong(map.getOrDefault("common_s2c_pkt_num",0).toString())); - valueBean.setCommon_c2s_byte_num(Long.parseLong(map.getOrDefault("common_c2s_byte_num",0).toString())); - valueBean.setCommon_s2c_byte_num(Long.parseLong(map.getOrDefault("common_s2c_byte_num",0).toString())); - valueBean.setCommon_sessions(Long.parseLong(map.getOrDefault("common_sessions",0).toString())); - - - - return message; - } -} diff --git a/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java b/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java deleted file mode 100644 index 16a069c..0000000 --- a/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java +++ /dev/null @@ -1,48 +0,0 @@ -package cn.ac.iie.origion.utils; - - -/** - * @author Administrator - */ -public class FlowWriteConfig { - - public static final int IPV4_TYPE = 1; - public static final int IPV6_TYPE = 2; - public static final String FORMAT_SPLITTER = ","; - /** - * System - */ - public static final Integer SPOUT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "spout.parallelism"); - public static final Integer DATACENTER_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "datacenter.bolt.parallelism"); - public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers"); - public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism"); - public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks"); - public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time"); - public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num"); - public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); - public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num"); - public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); - - - public static final Integer AGG_TIME = FlowWriteConfigurations.getIntProperty(0, "agg.time"); - - - /** - * kafka - */ - public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers"); - public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); - public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); - public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(0, "hbase.table.name"); - public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); - public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic"); - public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic"); - public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset"); - public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type"); - - /** - * http - */ - public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http"); - -} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java b/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java deleted file mode 100644 index 6c0d6bc..0000000 --- a/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java +++ /dev/null @@ -1,23 +0,0 @@ -package cn.ac.iie.origion.utils; - -import org.apache.storm.Constants; -import org.apache.storm.tuple.Tuple; - -/** - * 用于检测是否是系统发送的tuple - * - * @author Administrator - */ -public final class TupleUtils { - /** - * 判断是否系统自动发送的Tuple - * - * @param tuple 元组 - * @return true or false - */ - public static boolean isTick(Tuple tuple) { - return tuple != null - && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) - && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); - } -} diff --git a/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java new file mode 100644 index 0000000..0c88a7f --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java @@ -0,0 +1,172 @@ +package cn.ac.iie.storm.bolt; + +import cn.ac.iie.storm.utils.combine.AggregateUtils; +import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import cn.ac.iie.storm.utils.http.HttpClientUtil; +import com.alibaba.fastjson.JSONObject; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.TupleUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * @ClassNameAggregateBolt + * @Author lixkvip@126.com + * @Date2020/6/24 13:39 + * @Version V1.0 + **/ +public class AggregateBolt extends BaseBasicBolt { + private final static Logger logger = Logger.getLogger(AggregateBolt.class); + private static final long serialVersionUID = -7666031217706448622L; + private HashMap metricsMap; + private HashMap actionMap; + private HashMap cacheMap; + private static String timestamp; + + /** + * 只会在程序初始化的时候执行一次 + * + * @param stormConf + * @param context + */ + @Override + public void prepare(Map stormConf, TopologyContext context) { +// timestampValue = System.currentTimeMillis() / 1000; + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + timestamp = AggregateUtils.getTimeMetric(schema); + cacheMap = new HashMap<>(32); + + // TODO 获取action标签的内容 + actionMap = AggregateUtils.getActionMap(schema); + metricsMap = AggregateUtils.getMetircsMap(schema); + + } + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + try { + if (TupleUtils.isTick(input)) { + long timestampValue = System.currentTimeMillis() / 1000; + for (String s : cacheMap.keySet()) { + JSONObject result = JSONObject.parseObject(s); + result.put(timestamp, timestampValue); + result.putAll(cacheMap.get(s)); + collector.emit(new Values(result.toString())); + } + cacheMap.clear(); + + } else { + String label = input.getStringByField("label"); + //action中某个协议的所有function,如果没有就默认 + String[] metrics = actionMap.getOrDefault(label, actionMap.get("Default")); + + String dimensions = input.getStringByField("dimensions"); + String message = input.getStringByField("message"); + + //一条数据 + JSONObject event = JSONObject.parseObject(message); + //数据中的key值 (protocol,device_id,isp) + //map中对应的数据,可能为空,为空就默认创建一个对象 + JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, new JSONObject()); + //TODO 接下来遍历所有的函数,去metrics的Map中去找对应的方法,并执行 + for (String metric : metrics) { + String name = metricsMap.get(metric).getString("name"); + //可能为空 + String fieldName = metricsMap.get(name).getString("fieldName"); + String nameValue = cacheMessage.getString(name); + //map中的字段值 + nameValue = (nameValue == null) ? "0" : nameValue; + + String fieldNameValue = event.getString(fieldName); + //数据中的字段值 + fieldNameValue = (fieldNameValue == null) ? "0" : fieldNameValue; + + //TODO 每次新增函数,需要改动此处代码 + functionSet(name, cacheMessage, nameValue, fieldNameValue); + } + cacheMap.put(dimensions, cacheMessage); + + } + } catch (Exception e) { + logger.error("计算节点异常,异常信息:" + e); + } + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("message")); + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap(16); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.AGG_TIME); + return conf; + } + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param name 函数名称 + * @param cacheMessage 结果集 + * @param nameValue 当前值 + * @param fieldNameValue 新加值 + */ + private static void functionSet(String name, JSONObject cacheMessage, String nameValue, String fieldNameValue) { + switch (name) { + case "sessions": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + case "c2s_byte_num": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + case "s2c_byte_num": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + case "c2s_pkt_num": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + case "s2c_pkt_num": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + case "c2s_ipfrag_num": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + case "s2c_ipfrag_num": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + case "s2c_tcp_lostlen": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + case "c2s_tcp_lostlen": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + case "c2s_tcp_unorder_num": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + case "s2c_tcp_unorder_num": + cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + break; + + case "unique_sip_num": + //TODO + //cacheMessage.put(name, AggregateUtils.) + break; + case "unique_cip_num": + //TODO + break; + default: + break; + } + } +} diff --git a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java new file mode 100644 index 0000000..367d62b --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java @@ -0,0 +1,168 @@ +package cn.ac.iie.storm.bolt; + + +import cn.ac.iie.storm.utils.combine.AggregateUtils; +import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import cn.ac.iie.storm.utils.http.HttpClientUtil; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.TupleUtils; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static cn.ac.iie.storm.utils.combine.AggregateUtils.transDimensions; +import static cn.ac.iie.storm.utils.combine.AggregateUtils.updateAppRelation; + +/** + * @ClassNameMyWindowBolt + * @Author lixkvip@126.com + * @Date2020/6/9 14:45 + * @Version V1.0 + **/ +public class ParseKvBolt extends BaseBasicBolt { + private final static Logger logger = Logger.getLogger(ParseKvBolt.class); + private static final long serialVersionUID = -999382396035310355L; + private JSONArray transforms; + private JSONArray dimensions; + private static HashMap appMap = new HashMap<>(32); + + + /** + * 此方法只在程序启动的时候执行一次,用来初始化 + * + * @param stormConf Map + * @param context TopologyContext + */ + @Override + public void prepare(Map stormConf, TopologyContext context) { + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + JSONObject jsonObject = JSONObject.parseObject(schema); + String data = JSONObject.parseObject(jsonObject.getString("data")).getString("data"); + //TODO 解析 schema + transforms = JSONObject.parseArray(JSONObject.parseObject(data).getString("transforms")); + + //TODO 获取dimensions + dimensions = JSONObject.parseArray(JSONObject.parseObject(data).getString("dimensions")); + updateAppRelation(appMap); + + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + try { + if (TupleUtils.isTick(tuple)) { + updateAppRelation(appMap); + } else { + //TODO 解析tuple的 message + JSONObject message = JSONObject.parseObject(tuple.getStringByField("source")); + + //TODO 新建一个dimensions的Json对象 + JSONObject dimensionsObj = transDimensions(dimensions, message); + + for (Object transform : transforms) { + JSONObject transformObj = JSONObject.parseObject(transform.toString()); + String function = transformObj.getString("function"); + String name = transformObj.getString("name"); + String fieldName = transformObj.getString("fieldName"); + String parameters = transformObj.getString("parameters"); + + switch (function) { + case "alignment": + if (StringUtil.isNotBlank(parameters)) { + if (message.containsKey(fieldName)) { + alignmentUtils(parameters, message, name, fieldName); + } + } + break; + case "combination": + if (StringUtil.isNotBlank(parameters)) { + combinationUtils(parameters, message, name, fieldName, dimensionsObj); + } + break; + case "hierarchy": + String hierarchyValue = message.getString(fieldName); + //TODO 执行拆分代码 + if (StringUtil.isNotBlank(hierarchyValue) && StringUtil.isNotBlank(parameters)) { + String[] hierarchyPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); + String[] dimensionsArr = hierarchyValue.split(hierarchyPars[0]); + //TODO 递归拼接tuple并发送出去 + AggregateUtils.reSend(1, dimensionsArr, "", collector, message, dimensionsObj, name); + } + break; + default: + //数据原样输出 + collector.emit(new Values(null, null, message.toString())); + break; + } + } + } + } catch (Exception e) { + logger.error("上层解析原始日志/拼接计算日志发送异常,异常信息:" + e); + e.printStackTrace(); + } + + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap<>(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.UPDATE_APP_ID_TIME); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + declarer.declare(new Fields("label", "dimensions", "message")); + } + + /** + * alignment ID替换操作 + * + * @param parameters 参数集 + * @param message 原始日志 + * @param name 结果日志列名 + * @param fieldName 原始日志列名 + */ + private static void alignmentUtils(String parameters, JSONObject message, String name, String fieldName) { + String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); + String data = message.getString(fieldName); + System.out.println("alignmentPars=" + Arrays.toString(alignmentPars) + "data=" + data); + int subscript = Integer.parseInt(alignmentPars[0]); + String[] fieldSplit = data.split(alignmentPars[1]); + Long appID = Long.valueOf(fieldSplit[subscript]); + int length = fieldSplit[subscript].length(); + StringBuilder sb = new StringBuilder(data); + message.put(name, sb.replace(0, length, appMap.get(appID))); + } + + /** + * combination 拼接操作 + * + * @param parameters 参数集 + * @param message 原始日志 + * @param name 结果日志列名 + * @param fieldName 原始日志列名 + * @param dimensionsObj 结果集 + */ + private static void combinationUtils(String parameters, JSONObject message, String name, String fieldName, JSONObject dimensionsObj) { + String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); + String parameter0Value = message.getString(combinationPars[0]); + if (StringUtil.isNotBlank(parameter0Value)) { + String combinationValue = parameter0Value + combinationPars[1] + message.getString(fieldName); + message.put(fieldName, combinationValue); + dimensionsObj.put(name, combinationValue); + } + } +} diff --git a/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java b/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java new file mode 100644 index 0000000..36e57a6 --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java @@ -0,0 +1,44 @@ +package cn.ac.iie.storm.bolt; + + +import cn.ac.iie.storm.utils.common.LogSendKafka; +import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.Map; + +/** + * 发送到kafka的bolt + * + * @author qidaijie + */ +public class ResultSendBolt extends BaseBasicBolt { + + private static final long serialVersionUID = 3237813470939823159L; + private static Logger logger = Logger.getLogger(ResultSendBolt.class); + private LogSendKafka logSendKafka; + + @Override + public void prepare(Map stormConf, TopologyContext context) { + logSendKafka = LogSendKafka.getInstance(); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + logSendKafka.sendMessage(tuple.getStringByField("message")); + } catch (Exception e) { + logger.error(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "日志发送Kafka过程出现异常,异常信息:" + e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} diff --git a/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java b/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java new file mode 100644 index 0000000..6093a94 --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java @@ -0,0 +1,68 @@ +package cn.ac.iie.storm.bolt.change; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +/** + * @ClassNameFilterBolt + * @Author lixkvip@126.com + * @Date2020/7/1 12:02 + * @Version V1.0 + **/ +public class FilterBolt extends BaseBasicBolt { + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + JSONObject source = JSONObject.parseObject(input.getStringByField("source")); + String schema = ""; + + String data = JSONObject.parseObject(schema).getString("data"); + + String filters = JSONObject.parseObject(data).getString("filters"); + + boolean flag = true; + String type = JSONObject.parseObject(filters).getString("type"); + JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields")); + if ("and".equals(type)) { + for (int i = 0; i < fieldsArr.size(); i++) { + + JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString()); + String name = field.getString("fieldName"); + String fieldType = field.getString("type"); + Object values = field.get("values"); + + Object nameValue = source.get(name); + + System.out.println("nameValue ========" +nameValue); + + if ("not".equals(fieldType)) { + + if (nameValue == values) { + //满足过滤条件 + flag = false; + } + + } else if ("in".equals(fieldType)) { + if (!values.toString().contains(nameValue.toString())) { + //满足过滤条件 + flag = false; + } + } + }} + + + + collector.emit(new Values(source)); + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("filter")); + } +} diff --git a/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java b/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java new file mode 100644 index 0000000..e251f21 --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java @@ -0,0 +1,40 @@ +package cn.ac.iie.storm.bolt.print; + +import org.apache.log4j.Logger; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +/** + * @ClassNamePrintBolt + * @Author lixkvip@126.com + * @Date2020/6/28 15:34 + * @Version V1.0 + **/ +public class PrintBolt extends BaseBasicBolt { + private final static Logger logger = Logger.getLogger(PrintBolt.class); + private static long a; + private long b; + public static long c; + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + logger.error("==================================一批数据========================="); + + a= System.currentTimeMillis(); + b= System.currentTimeMillis(); + c= System.currentTimeMillis(); + + + logger.error(Thread.currentThread() + "private static long a======:" + a); + logger.error(Thread.currentThread() + "private long b======:" + b); + logger.error(Thread.currentThread() + "public static long c======:" + c); + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} diff --git a/src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java similarity index 83% rename from src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java rename to src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java index 501f2f6..fd6dfba 100644 --- a/src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java +++ b/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java @@ -1,7 +1,6 @@ -package cn.ac.iie.origion.spout; +package cn.ac.iie.storm.spout; - -import cn.ac.iie.origion.utils.FlowWriteConfig; +import cn.ac.iie.storm.utils.file.StreamAggregateConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -32,12 +31,12 @@ public class CustomizedKafkaSpout extends BaseRichSpout { private static Properties createConsumerConfig() { Properties props = new Properties(); - props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS); - props.put("group.id", FlowWriteConfig.GROUP_ID); + props.put("bootstrap.servers", StreamAggregateConfig.BOOTSTRAP_SERVERS); + props.put("group.id", StreamAggregateConfig.GROUP_ID); props.put("session.timeout.ms", "60000"); props.put("max.poll.records", 3000); props.put("max.partition.fetch.bytes", 31457280); - props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET); + props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; @@ -50,7 +49,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout { this.context = context; Properties prop = createConsumerConfig(); this.consumer = new KafkaConsumer<>(prop); - this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC)); + this.consumer.subscribe(Collections.singletonList(StreamAggregateConfig.KAFKA_TOPIC)); } @Override @@ -63,7 +62,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout { try { // TODO Auto-generated method stub ConsumerRecords records = consumer.poll(10000L); - Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME); + Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME); for (ConsumerRecord record : records) { this.collector.emit(new Values(record.value())); } diff --git a/src/main/java/cn/ac/iie/origion/topology/StormRunner.java b/src/main/java/cn/ac/iie/storm/topology/StormRunner.java similarity index 96% rename from src/main/java/cn/ac/iie/origion/topology/StormRunner.java rename to src/main/java/cn/ac/iie/storm/topology/StormRunner.java index f4ecbcd..6e77c66 100644 --- a/src/main/java/cn/ac/iie/origion/topology/StormRunner.java +++ b/src/main/java/cn/ac/iie/storm/topology/StormRunner.java @@ -1,4 +1,4 @@ -package cn.ac.iie.origion.topology; +package cn.ac.iie.storm.topology; import org.apache.storm.Config; diff --git a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java similarity index 50% rename from src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java rename to src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java index c8359b0..ed38bce 100644 --- a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java +++ b/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java @@ -1,51 +1,52 @@ -package cn.ac.iie.origion.topology; +package cn.ac.iie.storm.topology; -import cn.ac.iie.origion.bolt.AggregateBolt; -import cn.ac.iie.origion.bolt.MyWindowBolt; -import cn.ac.iie.origion.bolt.NtcLogSendBolt; -import cn.ac.iie.origion.spout.CustomizedKafkaSpout; -import cn.ac.iie.origion.utils.FlowWriteConfig; +import cn.ac.iie.storm.bolt.AggregateBolt; +import cn.ac.iie.storm.bolt.ResultSendBolt; +import cn.ac.iie.storm.bolt.ParseKvBolt; +import cn.ac.iie.storm.spout.CustomizedKafkaSpout; +import cn.ac.iie.storm.utils.file.StreamAggregateConfig; import org.apache.log4j.Logger; import org.apache.storm.Config; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.tuple.Fields; -import java.util.concurrent.TimeUnit; - - /** - * Storm程序主类 - * - * @author Administrator - */ + * @ClassNameFlowAggregateTopo + * @Author lixkvip@126.com + * @Date2020/6/24 10:14 + * @Version V1.0 + **/ +public class StreamAggregateTopology { -public class LogFlowWriteTopology { - private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class); + + private static Logger logger = Logger.getLogger(StreamAggregateTopology.class); private final String topologyName; private final Config topologyConfig; private TopologyBuilder builder; - private LogFlowWriteTopology() { - this(LogFlowWriteTopology.class.getSimpleName()); + private StreamAggregateTopology() { + this(StreamAggregateTopology.class.getSimpleName()); } - private LogFlowWriteTopology(String topologyName) { + private StreamAggregateTopology(String topologyName) { this.topologyName = topologyName; topologyConfig = createTopologConfig(); } + /** + * 测试配置 + * conf.setTopologyWorkerMaxHeapSize(6144); + * conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G"); + */ private Config createTopologConfig() { Config conf = new Config(); conf.setDebug(false); conf.setMessageTimeoutSecs(60); - conf.setMaxSpoutPending(150000); - conf.setNumAckers(FlowWriteConfig.TOPOLOGY_WORKERS); -// conf.setTopologyWorkerMaxHeapSize(6144); - conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G"); + conf.setMaxSpoutPending(StreamAggregateConfig.SPOUT_PARALLELISM); + conf.setNumAckers(StreamAggregateConfig.TOPOLOGY_NUM_ACKS); return conf; } @@ -55,7 +56,7 @@ public class LogFlowWriteTopology { } private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { - topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); + topologyConfig.setNumWorkers(StreamAggregateConfig.TOPOLOGY_WORKERS); //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌 topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8); StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); @@ -63,26 +64,30 @@ public class LogFlowWriteTopology { private void buildTopology() { builder = new TopologyBuilder(); - builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); - builder.setBolt("TEST-CONN", new MyWindowBolt() - .withWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), - new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) - .localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), StreamAggregateConfig.SPOUT_PARALLELISM); + + builder.setBolt("ParseKvBolt", new ParseKvBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM) + .localOrShuffleGrouping("CustomizedKafkaSpout"); + + builder.setBolt("AggregateBolt", new AggregateBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM) + .fieldsGrouping("ParseKvBolt", new Fields("dimensions")); + + builder.setBolt("ResultSendBolt", new ResultSendBolt(), StreamAggregateConfig.KAFKA_BOLT_PARALLELISM) + .localOrShuffleGrouping("AggregateBolt"); +// builder.setBolt("PrintBolt", new PrintBolt(), 3).localOrShuffleGrouping("LogFlowWriteSpout"); - builder.setBolt("AGG-BOLT", new AggregateBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).fieldsGrouping("TEST-CONN", new Fields("key")); - builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("AGG-BOLT"); } public static void main(String[] args) throws Exception { - LogFlowWriteTopology csst = null; + StreamAggregateTopology csst = null; boolean runLocally = true; String parameter = "remote"; int size = 2; if (args.length >= size && parameter.equalsIgnoreCase(args[1])) { runLocally = false; - csst = new LogFlowWriteTopology(args[0]); + csst = new StreamAggregateTopology(args[0]); } else { - csst = new LogFlowWriteTopology(); + csst = new StreamAggregateTopology(); } csst.buildTopology(); diff --git a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java new file mode 100644 index 0000000..fec3592 --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java @@ -0,0 +1,204 @@ +package cn.ac.iie.storm.utils.combine; + +import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import cn.ac.iie.storm.utils.http.HttpClientUtil; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import org.apache.log4j.Logger; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.tuple.Values; + + +import java.util.HashMap; + +/** + * @ClassNameAggregateUtils + * @Author lixkvip@126.com + * @Date2020/6/23 14:04 + * @Version V1.0 + **/ +public class AggregateUtils { + private final static Logger logger = Logger.getLogger(AggregateUtils.class); + + /** + * Long类型的数据求和 + * + * @param value1 第一个值 + * @param value2 第二个值 + * @return value1 + value2 + */ + public static Long longSum(Long value1, Long value2) { + + return value1 + value2; + } + + /** + * 计算Count + * + * @param count 当前count值 + * @return count+1 + */ + public static Long count(Long count) { + + count++; + return count; + } + + + /** + * 返回指标列的Map集合 + * + * @param schema 动态获取的schema + * @return 指标列集合 + * (c2s_byte_len, { "function" : "c2s_byte_sum", "name" : "c2s_byte_len", "fieldName" : "common_c2s_byte_num" }) + */ + public static HashMap getMetircsMap(String schema) { + HashMap metricsMap = new HashMap<>(16); + + JSONObject jsonObject = JSONObject.parseObject(schema); + String data = jsonObject.getString("data"); + JSONArray metrics = JSONObject.parseArray(JSONObject.parseObject(JSONObject.parseObject(data).getString("data")).getString("metrics")); + + for (Object metric : metrics) { + JSONObject json = JSONObject.parseObject(metric.toString()); + String name = json.getString("name"); + metricsMap.put(name, json); + } + + return metricsMap; + } + + + /** + * 递归发送tuple + * + * @param headIndex ssss + * @param splitArr + * @param initStr + * @param collector + * @param message + * @param dimesionsObj + * @param name + */ + public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) { + +// //递归拼接字符串 +// if (splitArr.length == headIndex - 1) { +// //什么也不做 +// } else { +// //递归的核心代码 +// if ("".equals(initStr)) { +// initStr = splitArr[splitArr.length - headIndex]; +// } else { +// initStr = initStr + "/" + splitArr[splitArr.length - headIndex]; +// } +// dimesionsObj.put(name, initStr); +// +// collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString())); +// reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name); +// } + + //递归拼接字符串 + if (splitArr.length != headIndex - 1) { + //递归的核心代码 + if ("".equals(initStr)) { + initStr = splitArr[splitArr.length - headIndex]; + } else { + initStr = initStr + "/" + splitArr[splitArr.length - headIndex]; + } + dimesionsObj.put(name, initStr); + + collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString())); + reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name); + } + } + + + /** + * 获取action模块的Map集合 + * + * @param schema 动态获取的schema + * @return (HTTP,metrics数组) + */ + public static HashMap getActionMap(String schema) { + JSONObject jsonObject = JSONObject.parseObject(schema); + String data = jsonObject.getString("data"); + JSONArray actions = JSONObject.parseArray(JSONObject.parseObject(JSONObject.parseObject(data).getString("data")).getString("action")); + HashMap map = new HashMap<>(16); + + for (Object action : actions) { + JSONObject json = JSONObject.parseObject(action.toString()); + + String label = json.getString("label"); + String[] metrics = json.getString("metrics").split(","); + + map.put(label, metrics); + } + return map; + } + + + /** + * 获取时间列的集合 + * + * @param schema 动态获取的schema + * @return 时间列 + */ + public static String getTimeMetric(String schema) { + JSONObject jsonObject = JSONObject.parseObject(schema); + String data = jsonObject.getString("data"); + + return JSONObject.parseObject(JSONObject.parseObject(JSONObject.parseObject(data) + .getString("data")) + .getString("timestamp")) + .getString("name"); + } + + /** + * 更新缓存中的对应关系map + * + * @param hashMap 当前缓存对应关系map + */ + public static void updateAppRelation(HashMap hashMap) { + try { + Long begin = System.currentTimeMillis(); + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP); + String data = JSONObject.parseObject(schema).getString("data"); + JSONArray objects = JSONArray.parseArray(data); + for (Object object : objects) { + JSONArray jsonArray = JSONArray.parseArray(object.toString()); + Long key = jsonArray.getLong(0); + String value = jsonArray.getString(1); + if (hashMap.containsKey(key)) { + if (!value.equals(hashMap.get(key))) { + hashMap.put(key, value); + } + } else { + hashMap.put(key, value); + } + + } + System.out.println((System.currentTimeMillis() - begin)); + logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size()); + } catch (Exception e) { + logger.error("更新缓存APP-ID失败,异常:" + e); + } + } + + /** + * 解析 dimensions 字段集 + * + * @param dimensions 维度集 + * @param message 原始日志 + * @return 结果维度集 + */ + public static JSONObject transDimensions(JSONArray dimensions, JSONObject message) { + JSONObject dimensionsObj = new JSONObject(); + for (Object dimension : dimensions) { + String fieldName = JSONObject.parseObject(dimension.toString()).getString("fieldName"); + String name = JSONObject.parseObject(dimension.toString()).getString("name"); + dimensionsObj.put(name, message.get(fieldName)); + } + return dimensionsObj; + } +} diff --git a/src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java b/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java similarity index 65% rename from src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java rename to src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java index 89ea53a..378fea6 100644 --- a/src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java +++ b/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java @@ -1,6 +1,7 @@ -package cn.ac.iie.origion.utils; +package cn.ac.iie.storm.utils.common; +import cn.ac.iie.storm.utils.file.StreamAggregateConfig; import org.apache.kafka.clients.producer.*; import org.apache.log4j.Logger; @@ -13,9 +14,9 @@ import java.util.Properties; * @create 2018-08-13 15:11 */ -public class KafkaLogNtc { +public class LogSendKafka { - private static Logger logger = Logger.getLogger(KafkaLogNtc.class); + private static Logger logger = Logger.getLogger(LogSendKafka.class); /** * kafka生产者,用于向kafka中发送消息 @@ -25,27 +26,27 @@ public class KafkaLogNtc { /** * kafka生产者适配器(单例),用来代理kafka生产者发送消息 */ - private static KafkaLogNtc kafkaLogNtc; + private static LogSendKafka logSendKafka; - private KafkaLogNtc() { + private LogSendKafka() { initKafkaProducer(); } - public static KafkaLogNtc getInstance() { - if (kafkaLogNtc == null) { - kafkaLogNtc = new KafkaLogNtc(); + public static LogSendKafka getInstance() { + if (logSendKafka == null) { + logSendKafka = new LogSendKafka(); } - return kafkaLogNtc; + return logSendKafka; } public void sendMessage(String message) { final int[] errorSum = {0}; - kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() { + kafkaProducer.send(new ProducerRecord<>(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { - logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception); + logger.error("写入" + StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception); errorSum[0]++; } } @@ -60,7 +61,7 @@ public class KafkaLogNtc { */ private void initKafkaProducer() { Properties properties = new Properties(); - properties.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS); + properties.put("bootstrap.servers", StreamAggregateConfig.RESULTS_BOOTSTRAP_SERVERS); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("acks", "1"); @@ -68,7 +69,7 @@ public class KafkaLogNtc { properties.put("request.timeout.ms", 30000); properties.put("batch.size", 262144); properties.put("buffer.memory", 33554432); - properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE); + properties.put("compression.type", StreamAggregateConfig.KAFKA_COMPRESSION_TYPE); kafkaProducer = new KafkaProducer<>(properties); } diff --git a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java new file mode 100644 index 0000000..a768ce1 --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java @@ -0,0 +1,45 @@ +package cn.ac.iie.storm.utils.file; + + +/** + * @author Administrator + */ +public class StreamAggregateConfig { + + public static final String FORMAT_SPLITTER = ","; + /** + * System + */ + public static final Integer SPOUT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "spout.parallelism"); + public static final Integer DATACENTER_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "datacenter.bolt.parallelism"); + public static final Integer TOPOLOGY_WORKERS = StreamAggregateConfigurations.getIntProperty(0, "topology.workers"); + public static final Integer KAFKA_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "kafka.bolt.parallelism"); + public static final Integer TOPOLOGY_NUM_ACKS = StreamAggregateConfigurations.getIntProperty(0, "topology.num.acks"); + public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = StreamAggregateConfigurations.getIntProperty(0, "topology.spout.sleep.time"); + public static final Integer BATCH_INSERT_NUM = StreamAggregateConfigurations.getIntProperty(0, "batch.insert.num"); + public static final Integer DATA_CENTER_ID_NUM = StreamAggregateConfigurations.getIntProperty(0, "data.center.id.num"); + public static final Integer MAX_FAILURE_NUM = StreamAggregateConfigurations.getIntProperty(0, "max.failure.num"); + + + public static final Integer AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "agg.time"); + public static final Integer UPDATE_APP_ID_TIME = StreamAggregateConfigurations.getIntProperty(0, "update.app.id.time"); + + + /** + * kafka + */ + public static final String BOOTSTRAP_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "bootstrap.servers"); + public static final String RESULTS_BOOTSTRAP_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "results.bootstrap.servers"); + public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id"); + public static final String RESULTS_OUTPUT_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "results.output.topic"); + public static final String KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "kafka.topic"); + public static final String AUTO_OFFSET_RESET = StreamAggregateConfigurations.getStringProperty(0, "auto.offset.reset"); + public static final String KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "kafka.compression.type"); + + /** + * http + */ + public static final String SCHEMA_HTTP = StreamAggregateConfigurations.getStringProperty(0, "schema.http"); + public static final String APP_ID_HTTP = StreamAggregateConfigurations.getStringProperty(0, "app.id.http"); + +} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java similarity index 87% rename from src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java rename to src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java index 70c9a6d..03f67c0 100644 --- a/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java +++ b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java @@ -1,4 +1,4 @@ -package cn.ac.iie.origion.utils; +package cn.ac.iie.storm.utils.file; import java.util.Properties; @@ -7,7 +7,7 @@ import java.util.Properties; * @author Administrator */ -public final class FlowWriteConfigurations { +public final class StreamAggregateConfigurations { // private static Properties propCommon = new Properties(); private static Properties propService = new Properties(); @@ -56,7 +56,7 @@ public final class FlowWriteConfigurations { static { try { - propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); } catch (Exception e) { // propCommon = null; propService = null; diff --git a/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java b/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..2e00efd --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java @@ -0,0 +1,55 @@ +package cn.ac.iie.storm.utils.http; + +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 获取网关schema的工具类 + * + * @author qidaijie + */ +public class HttpClientUtil { + + /** + * 请求网关获取schema + * @param http 网关url + * @return schema + */ + public static String requestByGetMethod(String http) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder = null; + try { + HttpGet get = new HttpGet(http); + try (CloseableHttpResponse httpResponse = httpClient.execute(get)) { + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + String line; + while ((line = bufferedReader.readLine()) != null) { + entityStringBuilder.append(line); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + if (httpClient != null) { + httpClient.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + return entityStringBuilder.toString(); + } + +} diff --git a/src/main/main.iml b/src/main/main.iml new file mode 100644 index 0000000..908ad4f --- /dev/null +++ b/src/main/main.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/src/test/java/com/wp/AppIdTest.java b/src/test/java/com/wp/AppIdTest.java new file mode 100644 index 0000000..39c9348 --- /dev/null +++ b/src/test/java/com/wp/AppIdTest.java @@ -0,0 +1,54 @@ +package com.wp; + +import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import cn.ac.iie.storm.utils.http.HttpClientUtil; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; + +/** + * @author qidaijie + * @Package com.wp + * @Description: + * @date 2020/9/2215:09 + */ +public class AppIdTest { + + @Test + public void appTest() { + //http://192.168.44.12:9999/open-api/appDicList + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP); + JSONObject jsonObject = JSONObject.parseObject(schema); + String data = jsonObject.getString("data"); + HashMap map = new HashMap<>(16); + JSONArray objects = JSONArray.parseArray(data); + for (Object object : objects) { + JSONArray jsonArray = JSONArray.parseArray(object.toString()); + map.put(jsonArray.getLong(0), jsonArray.getString(1)); +// System.out.println(object); + } + System.out.println(map.toString()); + + System.out.println(map.size()); + } + + @Test + public void changeApp() { + String a = "QQ"; + String[] alignmentPars = "0,/".split(StreamAggregateConfig.FORMAT_SPLITTER); + String data = "100/HTTP"; + int subscript = Integer.parseInt(alignmentPars[0]); + String[] fieldSplit = data.split(alignmentPars[1]); + Long appID = Long.valueOf(fieldSplit[subscript]); + int length = fieldSplit[subscript].length(); + StringBuilder sb = new StringBuilder(data); + + System.out.println(sb.replace(0, length, a)); + + + } +} diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java deleted file mode 100644 index 71ab064..0000000 --- a/src/test/java/com/wp/AppTest.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.wp; - - -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.operation.builtin.Debug; -import org.apache.storm.trident.testing.FixedBatchSpout; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -/** - * Unit test for simple App. - */ -public class AppTest{ - - - @org.junit.Test - public void test(){ - - Config conf = new Config(); -// conf.setDebug(false); - conf.setMessageTimeoutSecs(60); - conf.setNumWorkers(1); - - FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3, - new Values("nickt1", 4), - new Values("nickt2", 7), - new Values("nickt3", 8), - new Values("nickt4", 9), - new Values("nickt5", 7), - new Values("nickt6", 11), - new Values("nickt7", 5) - ); - spout.setCycle(true); - TridentTopology topology = new TridentTopology(); - topology.newStream("spout1", spout) - .batchGlobal() - .each(new Fields("user"),new Debug("print:")) - .parallelismHint(5); - - LocalCluster cluster = new LocalCluster(); - - cluster.submitTopology("trident-function", conf, topology.build()); - - } - -} - - - diff --git a/src/test/java/com/wp/FilterBolt.java b/src/test/java/com/wp/FilterBolt.java new file mode 100644 index 0000000..a3a9ee7 --- /dev/null +++ b/src/test/java/com/wp/FilterBolt.java @@ -0,0 +1,133 @@ +package com.wp; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; + +/** + * @ClassNameFilterBolt + * @Author lixkvip@126.com + * @Date2020/7/1 14:53 + * @Version V1.0 + **/ +public class FilterBolt { + @SuppressWarnings("all") + public static void main(String[] args) { + JSONObject source = new JSONObject(); + + + String schema = "{\n" + + " \"task\": \"Application-Protocol-Distribution\",\n" + + " \"in\": \"CONNECTION-SKETCH-COMPLETED\",\n" + + " \"out\": \"TRAFFIC-PROTOCOL-STAT-LOG\",\n" + + " \"data\": {\n" + + " \"timestamp\": {\n" + + " \"name\": \"stat_time\"\n" + + " },\n" + + " \"dimensions\": [\n" + + " {\n" + + " \"name\": \"protocol_id\",\n" + + " \"fieldName\": \"common_protocol_label\",\n" + + " \"type\": \"String\"\n" + + " },\n" + + " {\n" + + " \"name\": \"device_id\",\n" + + " \"fieldName\": \"common_device_id\",\n" + + " \"type\": \"String\"\n" + + " },\n" + + " {\n" + + " \"name\": \"isp\",\n" + + " \"fieldName\": \"common_isp\",\n" + + " \"type\": \"String\"\n" + + " }\n" + + " ],\n" + + " \"metrics\": [\n" + + " { \"function\" : \"sessions_count\", \"name\" : \"sessions\"},\n" + + " { \"function\" : \"c2s_byte_sum\", \"name\" : \"c2s_byte_len\", \"fieldName\" : \"common_c2s_byte_num\" },\n" + + " { \"function\" : \"s2c_byte_sum\", \"name\" : \"s2c_byte_len\", \"fieldName\" : \"common_s2c_byte_num\" },\n" + + " { \"function\" : \"c2s_pkt_sum\", \"name\" : \"c2s_pkt_num\", \"fieldName\" : \"common_c2s_pkt_num\" },\n" + + " { \"function\" : \"s2c_pkt_sum\", \"name\" : \"s2c_pkt_num\", \"fieldName\" : \"common_s2c_pkt_num\" },\n" + + " { \"function\" : \"sip_disCount\", \"name\" : \"unique_sip_num\", \"fieldName\" : \"common_server_ip\" },\n" + + " { \"function\" : \"cip_disCount\", \"name\" : \"unique_cip_num\", \"fieldName\" : \"common_client_ip\" }\n" + + " ],\n" + + " \"filters\": {\n" + + " \"type\": \"and\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"fieldName\": \"common_device_id\",\n" + + " \"type\": \"not\",\n" + + " \"values\": null\n" + + " },\n" + + " {\n" + + " \"fieldName\": \"common_protocol_label\",\n" + + " \"type\": \"not\",\n" + + " \"values\": null\n" + + " },\n" + + " {\n" + + " \"fieldName\": \"common_isp\",\n" + + " \"type\": \"not\",\n" + + " \"values\": null\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"transforms\":[\n" + + " {\"function\":\"combination\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"common_app_label,/\"},\n" + + " {\"function\":\"hierarchy\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"/\"}\n" + + " ],\n" + + " \"action\":[\n" + + " {\"label\": \"Default\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num\"},\n" + + " {\"label\": \"HTTP\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num,unique_sip_num,unique_cip_num\"}\n" + + " ],\n" + + " \"granularity\":{\n" + + " \"type\": \"period\",\n" + + " \"period\": \"5M\"\n" + + " }\n" + + " }\n" + + "}"; + + + source.put("common_protocol_label", "HTTP"); + source.put("common_isp", "Unicom"); + source.put("common_device_id", "1"); + String data = JSONObject.parseObject(schema).getString("data"); + + String filters = JSONObject.parseObject(data).getString("filters"); + + boolean flag = true; + String type = JSONObject.parseObject(filters).getString("type"); + JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields")); + if ("and".equals(type)) { + for (int i = 0; i < fieldsArr.size(); i++) { + + JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString()); + String name = field.getString("fieldName"); + String fieldType = field.getString("type"); + Object values = field.get("values"); + + Object nameValue = source.get(name); + + + if ("not".equals(fieldType)) { + + if (nameValue == values) { + //满足过滤条件 + flag = false; + } + + } else if ("in".equals(fieldType)) { + if (!values.toString().contains(nameValue.toString())) { + //满足过滤条件 + flag = false; + } + } + } + + if (flag){ + System.out.println("输出到下个Bolt"); + }else { + + System.out.println("此条消息被过滤掉"); + } + + } + } +} diff --git a/src/test/java/com/wp/SchemaTest.java b/src/test/java/com/wp/SchemaTest.java new file mode 100644 index 0000000..f275592 --- /dev/null +++ b/src/test/java/com/wp/SchemaTest.java @@ -0,0 +1,43 @@ +package com.wp; + +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; + +/** + * @ClassNameSchemaTest + * @Author lixkvip@126.com + * @Date2020/6/28 10:11 + * @Version V1.0 + **/ +public class SchemaTest { + + static String str = ""; + + public static void main(String[] args) { + + + String str1 = null; + String str2 = " "; + + + System.out.println(StringUtil.isNotBlank(str1)); + System.out.println(StringUtil.isNotEmpty(str1)); + + System.out.println(StringUtil.isNotBlank(str2)); + System.out.println(StringUtil.isNotEmpty(str2)); + + } + + public static void reAdd(int m, String[] split, String str) { + + //递归拼接字符串 + if (0 == m) { + } else { + //递归的核心代码 + str = str + split[m - 1] + "/"; + reAdd(m - 1, split, str); + + } + + } +} diff --git a/src/test/test.iml b/src/test/test.iml new file mode 100644 index 0000000..5ebc6f4 --- /dev/null +++ b/src/test/test.iml @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file