使用定时器实现二次聚合

This commit is contained in:
lee
2020-06-15 15:08:22 +08:00
parent cbf8c0ca39
commit 4a8718a458
8 changed files with 88 additions and 100 deletions

View File

@@ -95,7 +95,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
<dependency>

View File

@@ -1,9 +1,9 @@
#管理kafka地址
#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
bootstrap.servers=192.168.40.127:9093
bootstrap.servers=192.168.40.207:9092
#zookeeper 地址
zookeeper.servers=192.168.40.127:2182/kafka-test
zookeeper.servers=192.168.40.207:2181
#zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
#hbase zookeeper地址
@@ -21,11 +21,11 @@ kafka.compression.type=none
#kafka broker下的topic名称
#kafka.topic=SECURITY-EVENT-LOG
kafka.topic=test528
kafka.topic=test615
#kafka.topic=CONNECTION-RECORD-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=lxk-200512
group.id=lxk615
#输出topic
results.output.topic=agg_test

View File

@@ -8,7 +8,6 @@ package cn.ac.iie.origion.bean;
**/
public class KeyBean {
private long common_recv_time;
private int common_policy_id;
private int common_action;
private String common_sub_action;
@@ -24,14 +23,6 @@ public class KeyBean {
private String http_domain;
private String ssl_sni;
public long getCommon_recv_time() {
return common_recv_time;
}
public void setCommon_recv_time(long common_recv_time) {
this.common_recv_time = common_recv_time;
}
public int getCommon_policy_id() {
return common_policy_id;
}
@@ -147,7 +138,7 @@ public class KeyBean {
@Override
public int hashCode() {
return ("" + getCommon_recv_time() + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode();
return ("" + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode();
}
@@ -155,8 +146,7 @@ public class KeyBean {
public boolean equals(Object o) {
if (o instanceof KeyBean) {
KeyBean keyBean = (KeyBean) o;
return (this.getCommon_recv_time()==(keyBean.getCommon_recv_time()) &&
this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) &&
return (this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) &&
this.getCommon_action()==(keyBean.getCommon_action()) &&
this.getCommon_sub_action().equals(keyBean.getCommon_sub_action()) &&
this.getCommon_client_ip().equals(keyBean.getCommon_client_ip()) &&

View File

@@ -5,8 +5,8 @@ import cn.ac.iie.origion.bean.ValueBean;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import cn.ac.iie.origion.utils.TupleUtils;
import com.alibaba.fastjson.JSON;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -23,38 +23,37 @@ public class AggregateBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(AggregateBolt.class);
private static final long serialVersionUID = 9006119186526123734L;
private static HashMap<String, Integer> map = new HashMap();
private static ValueBean valueBean = new ValueBean();
private static ValueBean tupleValueBean = new ValueBean();
private static String key = "";
private static Integer value = 0;
private static String message = "";
private HashMap<String, ValueBean> map;
private String key = "";
private ValueBean value;
private ValueBean mapValue;
@Override
public void prepare(Map stormConf, TopologyContext context) {
System.out.println("prepare方法执行了++++++++++++++++++++++++++");
logger.error("prepare方法执行了++++++++++++++++++++++++++");
map = new HashMap<>(16);
key = "";
value = new ValueBean();
mapValue = new ValueBean();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
System.out.println("执行了一次====================================" + value + "==========================" + System.currentTimeMillis());
try {
if (TupleUtils.isTick(tuple)) {
System.out.println(this.map);
//批量发送到下一个bolt
basicOutputCollector.emit(new Values(JSON.toJSONString(this.map)));
//TODO 发送到kafka的 bolt
for (String key : map.keySet()) {
basicOutputCollector.emit(new Values(key,map.get(key)));
}
map.clear();
} else {
message = tuple.getString(0);
//TODO 获取一条tuple数据的key和value
key = tuple.getString(0);
value = (ValueBean) tuple.getValue(1);
//TODO 两个count聚合后放入HashMap中利用HashMap的去重功能实现value的覆盖
this.map.put("192", addValueBean(value, 1));
mapValue = map.getOrDefault(key, new ValueBean());
mapValue = addValueBean(mapValue, value);
map.put(key, mapValue);
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
@@ -65,14 +64,15 @@ public class AggregateBolt extends BaseBasicBolt {
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 30);
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.AGG_TIME);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("connLog"));
// outputFieldsDeclarer.declare(new Fields("map"));
outputFieldsDeclarer.declare(new Fields("key","value"));
}
/**
@@ -90,16 +90,10 @@ public class AggregateBolt extends BaseBasicBolt {
result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
result.setCommon_sessions(result.getCommon_sessions() + 1L);
result.setCommon_sessions(result.getCommon_sessions() + value2.getCommon_sessions());
return result;
}
public Integer addValueBean(Integer result, Integer value2) {
return result + value2;
}
}

View File

@@ -12,7 +12,6 @@ import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -25,11 +24,9 @@ import java.util.Map;
public class MyWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;
private static ArrayList<Integer> list;
private static HashMap<String, ValueBean> map;
private static String message;
private static OutputCollector collector;
private HashMap<String, ValueBean> map;
private String message;
@Override
@@ -37,13 +34,13 @@ public class MyWindowBolt extends BaseWindowedBolt {
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
map = new HashMap<>(16);
list = new ArrayList<>();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("map"));
declarer.declare(new Fields("key","value"));
// declarer.declare(new Fields("map"));
}
@Override
@@ -63,11 +60,11 @@ public class MyWindowBolt extends BaseWindowedBolt {
message = tuple.getStringByField("source");
//TODO 获取Tuple中的value Bean
ValueBean valueBean = JSONObject.parseObject(message,ValueBean.class);
ValueBean valueBean = JSONObject.parseObject(message, ValueBean.class);
//TODO 获取Tuple中的key String
String key = JSONObject.toJSONString(JSONObject.parseObject(message, KeyBean.class));
//TODO 获取map中的value Bean
ValueBean mapValueBean = map.getOrDefault(key,new ValueBean());
ValueBean mapValueBean = map.getOrDefault(key, new ValueBean());
//TODO 将tuple中的value和map中的value做累加
mapValueBean = addValueBean(mapValueBean, valueBean);
@@ -75,7 +72,12 @@ public class MyWindowBolt extends BaseWindowedBolt {
map.put(key, mapValueBean);
}
collector.emit(new Values(map));
//TODO 遍历map将 K V发送出去
for (String key : map.keySet()) {
collector.emit(new Values(key,map.get(key)));
}
// collector.emit(new Values(map));
}
@SuppressWarnings("all")
@@ -93,8 +95,6 @@ public class MyWindowBolt extends BaseWindowedBolt {
result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
result.setCommon_sessions(result.getCommon_sessions() + 1L);
return result;
}

View File

@@ -26,58 +26,60 @@ public class NtcLogSendBolt extends BaseBasicBolt {
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
private KafkaLogNtc kafkaLogNtc;
private static ConnectionRecordLog connectionRecordLog;
private JSONObject key;
private ValueBean valueBean;
private ConnectionRecordLog connectionRecordLog;
@Override
public void prepare(Map stormConf, TopologyContext context) {
kafkaLogNtc = KafkaLogNtc.getInstance();
connectionRecordLog = new ConnectionRecordLog();
key = new JSONObject();
valueBean = new ValueBean();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
// System.out.println(this.getClass() + " 获取的tuple的sessions " + (tuple.getValue(0)) + ((ValueBean)tuple.getValue(1)).getCommon_sessions());
try {
HashMap<String,ValueBean> hashMap = (HashMap) tuple.getValue(0);
if (hashMap.size() != 0) {
for (String key : hashMap.keySet()) {
JSONObject keys = JSONObject.parseObject(key);
key = JSONObject.parseObject(tuple.getValue(0).toString());
ValueBean valueBean = hashMap.get(key);
connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id")));
connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action")));
connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action"));
connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip"));
connectionRecordLog.setCommon_client_location(keys.getString("common_client_location"));
connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip"));
connectionRecordLog.setCommon_device_id(keys.getString("common_device_id"));
connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id"));
connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip"));
connectionRecordLog.setCommon_server_location(keys.getString("common_server_location"));
connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port")));
connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol"));
connectionRecordLog.setHttp_domain(keys.getString("http_domain"));
connectionRecordLog.setSsl_sni(keys.getString("ssl_sni"));
valueBean = (ValueBean) (tuple.getValue(1));
//TODO 为Value赋值
connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
connectionRecordLog.setCommon_policy_id(Integer.parseInt(key.getString("common_policy_id")));
connectionRecordLog.setCommon_action(Integer.parseInt(key.getString("common_action")));
connectionRecordLog.setCommon_sub_action(key.getString("common_sub_action"));
connectionRecordLog.setCommon_client_ip(key.getString("common_client_ip"));
connectionRecordLog.setCommon_client_location(key.getString("common_client_location"));
connectionRecordLog.setCommon_sled_ip(key.getString("common_sled_ip"));
connectionRecordLog.setCommon_device_id(key.getString("common_device_id"));
connectionRecordLog.setCommon_subscriber_id(key.getString("common_subscriber_id"));
connectionRecordLog.setCommon_server_ip(key.getString("common_server_ip"));
connectionRecordLog.setCommon_server_location(key.getString("common_server_location"));
connectionRecordLog.setCommon_server_port(Integer.parseInt(key.getString("common_server_port")));
connectionRecordLog.setCommon_l4_protocol(key.getString("common_l4_protocol"));
connectionRecordLog.setHttp_domain(key.getString("http_domain"));
connectionRecordLog.setSsl_sni(key.getString("ssl_sni"));
connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
//TODO 为Value赋值
kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
}
}
} catch (Exception e) {
kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

View File

@@ -1,5 +1,6 @@
package cn.ac.iie.origion.topology;
import cn.ac.iie.origion.bolt.AggregateBolt;
import cn.ac.iie.origion.bolt.MyWindowBolt;
import cn.ac.iie.origion.bolt.NtcLogSendBolt;
import cn.ac.iie.origion.spout.CustomizedKafkaSpout;
@@ -11,6 +12,7 @@ import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import java.util.concurrent.TimeUnit;
@@ -41,13 +43,15 @@ public class LogFlowWriteTopology {
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setMaxSpoutPending(150000);
conf.setNumAckers(3);
conf.setNumAckers(FlowWriteConfig.TOPOLOGY_WORKERS);
// conf.setTopologyWorkerMaxHeapSize(6144);
conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G");
return conf;
}
private void runLocally() throws InterruptedException {
topologyConfig.setMaxTaskParallelism(1);
StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 6000);
}
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
@@ -59,15 +63,14 @@ public class LogFlowWriteTopology {
private void buildTopology() {
builder = new TopologyBuilder();
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), 3);
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
builder.setBolt("TEST-CONN", new MyWindowBolt()
.withWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS),
new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)),FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
.withWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
.localOrShuffleGrouping("LogFlowWriteSpout");
// builder.setBolt("TEST-CONN", new AggregateBolt(),3).localOrShuffleGrouping("LogFlowWriteSpout");
// builder.setBolt("KAKFA-CONN", new PrintBolt(),3).localOrShuffleGrouping("TEST-CONN");
builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(),FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("TEST-CONN");
builder.setBolt("AGG-BOLT", new AggregateBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).fieldsGrouping("TEST-CONN", new Fields("key"));
builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("AGG-BOLT");
}
public static void main(String[] args) throws Exception {

View File

@@ -34,7 +34,6 @@ public class AggregateUtil {
//TODO KEY
keyBean.setCommon_recv_time(0L);
keyBean.setCommon_policy_id(Integer.parseInt(map.getOrDefault("common_policy_id","0").toString()));
keyBean.setCommon_action(Integer.parseInt(map.getOrDefault("common_action","0").toString()));
keyBean.setCommon_sub_action(map.getOrDefault("common_sub_action","").toString());