From 4a8718a458c70d7ea2c087c0248456ae1048f796 Mon Sep 17 00:00:00 2001 From: lee Date: Mon, 15 Jun 2020 15:08:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E5=AE=9A=E6=97=B6=E5=99=A8?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E4=BA=8C=E6=AC=A1=E8=81=9A=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- properties/service_flow_config.properties | 8 +-- .../java/cn/ac/iie/origion/bean/KeyBean.java | 14 +--- .../cn/ac/iie/origion/bolt/AggregateBolt.java | 52 +++++++-------- .../cn/ac/iie/origion/bolt/MyWindowBolt.java | 26 ++++---- .../ac/iie/origion/bolt/NtcLogSendBolt.java | 66 ++++++++++--------- .../topology/LogFlowWriteTopology.java | 19 +++--- .../ac/iie/origion/utils/AggregateUtil.java | 1 - 8 files changed, 88 insertions(+), 100 deletions(-) diff --git a/pom.xml b/pom.xml index 82aca69..b318499 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ org.apache.storm storm-core 1.0.2 - provided + diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index b3eb718..aadcabf 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,9 +1,9 @@ #管理kafka地址 #bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 -bootstrap.servers=192.168.40.127:9093 +bootstrap.servers=192.168.40.207:9092 #zookeeper 地址 -zookeeper.servers=192.168.40.127:2182/kafka-test +zookeeper.servers=192.168.40.207:2181 #zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 #hbase zookeeper地址 @@ -21,11 +21,11 @@ kafka.compression.type=none #kafka broker下的topic名称 #kafka.topic=SECURITY-EVENT-LOG -kafka.topic=test528 +kafka.topic=test615 #kafka.topic=CONNECTION-RECORD-LOG #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=lxk-200512 +group.id=lxk615 #输出topic results.output.topic=agg_test diff --git a/src/main/java/cn/ac/iie/origion/bean/KeyBean.java b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java index 00e9e3c..6898558 100644 --- a/src/main/java/cn/ac/iie/origion/bean/KeyBean.java +++ b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java @@ -8,7 +8,6 @@ package cn.ac.iie.origion.bean; **/ public class KeyBean { - private long common_recv_time; private int common_policy_id; private int common_action; private String common_sub_action; @@ -24,14 +23,6 @@ public class KeyBean { private String http_domain; private String ssl_sni; - public long getCommon_recv_time() { - return common_recv_time; - } - - public void setCommon_recv_time(long common_recv_time) { - this.common_recv_time = common_recv_time; - } - public int getCommon_policy_id() { return common_policy_id; } @@ -147,7 +138,7 @@ public class KeyBean { @Override public int hashCode() { - return ("" + getCommon_recv_time() + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode(); + return ("" + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode(); } @@ -155,8 +146,7 @@ public class KeyBean { public boolean equals(Object o) { if (o instanceof KeyBean) { KeyBean keyBean = (KeyBean) o; - return (this.getCommon_recv_time()==(keyBean.getCommon_recv_time()) && - this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) && + return (this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) && this.getCommon_action()==(keyBean.getCommon_action()) && this.getCommon_sub_action().equals(keyBean.getCommon_sub_action()) && this.getCommon_client_ip().equals(keyBean.getCommon_client_ip()) && diff --git a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java index a656606..d7f5eb7 100644 --- a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java +++ b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java @@ -5,8 +5,8 @@ import cn.ac.iie.origion.bean.ValueBean; import cn.ac.iie.origion.utils.FlowWriteConfig; import cn.ac.iie.origion.utils.TupleUtils; -import com.alibaba.fastjson.JSON; import org.apache.log4j.Logger; +import org.apache.storm.Config; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -23,38 +23,37 @@ public class AggregateBolt extends BaseBasicBolt { private final static Logger logger = Logger.getLogger(AggregateBolt.class); private static final long serialVersionUID = 9006119186526123734L; - private static HashMap map = new HashMap(); - private static ValueBean valueBean = new ValueBean(); - private static ValueBean tupleValueBean = new ValueBean(); - private static String key = ""; - private static Integer value = 0; - private static String message = ""; + private HashMap map; + private String key = ""; + private ValueBean value; + private ValueBean mapValue; @Override public void prepare(Map stormConf, TopologyContext context) { - System.out.println("prepare方法执行了++++++++++++++++++++++++++"); - - logger.error("prepare方法执行了++++++++++++++++++++++++++"); + map = new HashMap<>(16); + key = ""; + value = new ValueBean(); + mapValue = new ValueBean(); } @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { - - System.out.println("执行了一次====================================" + value + "==========================" + System.currentTimeMillis()); try { if (TupleUtils.isTick(tuple)) { - System.out.println(this.map); - //批量发送到下一个bolt - basicOutputCollector.emit(new Values(JSON.toJSONString(this.map))); - - + //TODO 发送到kafka的 bolt + for (String key : map.keySet()) { + basicOutputCollector.emit(new Values(key,map.get(key))); + } + map.clear(); } else { - message = tuple.getString(0); //TODO 获取一条tuple数据的key和value - + key = tuple.getString(0); + value = (ValueBean) tuple.getValue(1); //TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖 - this.map.put("192", addValueBean(value, 1)); + mapValue = map.getOrDefault(key, new ValueBean()); + mapValue = addValueBean(mapValue, value); + map.put(key, mapValue); } } catch (Exception e) { logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); @@ -65,14 +64,15 @@ public class AggregateBolt extends BaseBasicBolt { @Override public Map getComponentConfiguration() { Map conf = new HashMap(16); - conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 30); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.AGG_TIME); return conf; } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("connLog")); +// outputFieldsDeclarer.declare(new Fields("map")); + outputFieldsDeclarer.declare(new Fields("key","value")); } /** @@ -90,16 +90,10 @@ public class AggregateBolt extends BaseBasicBolt { result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num()); result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num()); result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num()); - result.setCommon_sessions(result.getCommon_sessions() + 1L); + result.setCommon_sessions(result.getCommon_sessions() + value2.getCommon_sessions()); return result; } - public Integer addValueBean(Integer result, Integer value2) { - - return result + value2; - } - - } diff --git a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java index 96151ff..4432255 100644 --- a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java +++ b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java @@ -12,7 +12,6 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.windowing.TupleWindow; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -25,11 +24,9 @@ import java.util.Map; public class MyWindowBolt extends BaseWindowedBolt { - private OutputCollector collector; - - private static ArrayList list; - private static HashMap map; - private static String message; + private static OutputCollector collector; + private HashMap map; + private String message; @Override @@ -37,13 +34,13 @@ public class MyWindowBolt extends BaseWindowedBolt { public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; map = new HashMap<>(16); - list = new ArrayList<>(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("map")); + declarer.declare(new Fields("key","value")); +// declarer.declare(new Fields("map")); } @Override @@ -63,11 +60,11 @@ public class MyWindowBolt extends BaseWindowedBolt { message = tuple.getStringByField("source"); //TODO 获取Tuple中的value Bean - ValueBean valueBean = JSONObject.parseObject(message,ValueBean.class); + ValueBean valueBean = JSONObject.parseObject(message, ValueBean.class); //TODO 获取Tuple中的key String String key = JSONObject.toJSONString(JSONObject.parseObject(message, KeyBean.class)); //TODO 获取map中的value Bean - ValueBean mapValueBean = map.getOrDefault(key,new ValueBean()); + ValueBean mapValueBean = map.getOrDefault(key, new ValueBean()); //TODO 将tuple中的value和map中的value做累加 mapValueBean = addValueBean(mapValueBean, valueBean); @@ -75,7 +72,12 @@ public class MyWindowBolt extends BaseWindowedBolt { map.put(key, mapValueBean); } - collector.emit(new Values(map)); + //TODO 遍历map将 K V发送出去 + for (String key : map.keySet()) { + collector.emit(new Values(key,map.get(key))); + } +// collector.emit(new Values(map)); + } @SuppressWarnings("all") @@ -93,8 +95,6 @@ public class MyWindowBolt extends BaseWindowedBolt { result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num()); result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num()); result.setCommon_sessions(result.getCommon_sessions() + 1L); - - return result; } diff --git a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java index 59d7eef..40a2070 100644 --- a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java +++ b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java @@ -26,58 +26,60 @@ public class NtcLogSendBolt extends BaseBasicBolt { private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); private KafkaLogNtc kafkaLogNtc; - private static ConnectionRecordLog connectionRecordLog; + private JSONObject key; + private ValueBean valueBean; + private ConnectionRecordLog connectionRecordLog; @Override public void prepare(Map stormConf, TopologyContext context) { kafkaLogNtc = KafkaLogNtc.getInstance(); connectionRecordLog = new ConnectionRecordLog(); + key = new JSONObject(); + valueBean = new ValueBean(); } + @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + + +// System.out.println(this.getClass() + " 获取的tuple的sessions " + (tuple.getValue(0)) + ((ValueBean)tuple.getValue(1)).getCommon_sessions()); try { - HashMap hashMap = (HashMap) tuple.getValue(0); - if (hashMap.size() != 0) { - for (String key : hashMap.keySet()) { - JSONObject keys = JSONObject.parseObject(key); + key = JSONObject.parseObject(tuple.getValue(0).toString()); - ValueBean valueBean = hashMap.get(key); - connectionRecordLog.setCommon_recv_time(System.currentTimeMillis()); - connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id"))); - connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action"))); - connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action")); - connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip")); - connectionRecordLog.setCommon_client_location(keys.getString("common_client_location")); - connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip")); - connectionRecordLog.setCommon_device_id(keys.getString("common_device_id")); - connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id")); - connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip")); - connectionRecordLog.setCommon_server_location(keys.getString("common_server_location")); - connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port"))); - connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol")); - connectionRecordLog.setHttp_domain(keys.getString("http_domain")); - connectionRecordLog.setSsl_sni(keys.getString("ssl_sni")); + valueBean = (ValueBean) (tuple.getValue(1)); - //TODO 为Value赋值 + connectionRecordLog.setCommon_recv_time(System.currentTimeMillis()); + connectionRecordLog.setCommon_policy_id(Integer.parseInt(key.getString("common_policy_id"))); + connectionRecordLog.setCommon_action(Integer.parseInt(key.getString("common_action"))); + connectionRecordLog.setCommon_sub_action(key.getString("common_sub_action")); + connectionRecordLog.setCommon_client_ip(key.getString("common_client_ip")); + connectionRecordLog.setCommon_client_location(key.getString("common_client_location")); + connectionRecordLog.setCommon_sled_ip(key.getString("common_sled_ip")); + connectionRecordLog.setCommon_device_id(key.getString("common_device_id")); + connectionRecordLog.setCommon_subscriber_id(key.getString("common_subscriber_id")); + connectionRecordLog.setCommon_server_ip(key.getString("common_server_ip")); + connectionRecordLog.setCommon_server_location(key.getString("common_server_location")); + connectionRecordLog.setCommon_server_port(Integer.parseInt(key.getString("common_server_port"))); + connectionRecordLog.setCommon_l4_protocol(key.getString("common_l4_protocol")); + connectionRecordLog.setHttp_domain(key.getString("http_domain")); + connectionRecordLog.setSsl_sni(key.getString("ssl_sni")); - connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions()); - connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num()); - connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num()); - connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num()); - connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num()); + //TODO 为Value赋值 - kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog)); + connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions()); + connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num()); + connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num()); + connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num()); + connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num()); - } - } - } catch (Exception e) { + kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog)); + } catch (Exception e) { logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); e.printStackTrace(); } } - @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { diff --git a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java index b197b6a..c8359b0 100644 --- a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java +++ b/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java @@ -1,5 +1,6 @@ package cn.ac.iie.origion.topology; +import cn.ac.iie.origion.bolt.AggregateBolt; import cn.ac.iie.origion.bolt.MyWindowBolt; import cn.ac.iie.origion.bolt.NtcLogSendBolt; import cn.ac.iie.origion.spout.CustomizedKafkaSpout; @@ -11,6 +12,7 @@ import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; import java.util.concurrent.TimeUnit; @@ -41,13 +43,15 @@ public class LogFlowWriteTopology { conf.setDebug(false); conf.setMessageTimeoutSecs(60); conf.setMaxSpoutPending(150000); - conf.setNumAckers(3); + conf.setNumAckers(FlowWriteConfig.TOPOLOGY_WORKERS); +// conf.setTopologyWorkerMaxHeapSize(6144); + conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G"); return conf; } private void runLocally() throws InterruptedException { topologyConfig.setMaxTaskParallelism(1); - StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); + StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 6000); } private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { @@ -59,15 +63,14 @@ public class LogFlowWriteTopology { private void buildTopology() { builder = new TopologyBuilder(); - builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), 3); + builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); builder.setBolt("TEST-CONN", new MyWindowBolt() - .withWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS), - new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)),FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) + .withWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), + new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) .localOrShuffleGrouping("LogFlowWriteSpout"); -// builder.setBolt("TEST-CONN", new AggregateBolt(),3).localOrShuffleGrouping("LogFlowWriteSpout"); -// builder.setBolt("KAKFA-CONN", new PrintBolt(),3).localOrShuffleGrouping("TEST-CONN"); - builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(),FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("TEST-CONN"); + builder.setBolt("AGG-BOLT", new AggregateBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).fieldsGrouping("TEST-CONN", new Fields("key")); + builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("AGG-BOLT"); } public static void main(String[] args) throws Exception { diff --git a/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java index a7047ef..e7d7a35 100644 --- a/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java +++ b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java @@ -34,7 +34,6 @@ public class AggregateUtil { //TODO KEY - keyBean.setCommon_recv_time(0L); keyBean.setCommon_policy_id(Integer.parseInt(map.getOrDefault("common_policy_id","0").toString())); keyBean.setCommon_action(Integer.parseInt(map.getOrDefault("common_action","0").toString())); keyBean.setCommon_sub_action(map.getOrDefault("common_sub_action","").toString());