原始的形式使用窗口实现预聚合初始版本

This commit is contained in:
lee
2020-06-10 17:58:31 +08:00
parent cb4ee7544e
commit cbf8c0ca39
24 changed files with 581 additions and 446 deletions

View File

@@ -34,7 +34,7 @@
<configuration>
<transformers>
<transformer>
<mainClass>cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology</mainClass>
<mainClass>cn.ac.iie.origion.topology.LogFlowWriteTopology</mainClass>
</transformer>
<transformer>
<resource>META-INF/spring.handlers</resource>

View File

@@ -38,7 +38,7 @@
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology</mainClass>
<mainClass>cn.ac.iie.origion.topology.LogFlowWriteTopology</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

View File

@@ -1,4 +1,4 @@
package cn.ac.iie.trident.aggregate.bean;
package cn.ac.iie.origion.bean;
/**
* @ClassNameConnectionRecordLog

View File

@@ -1,4 +1,4 @@
package cn.ac.iie.trident.aggregate.bean;
package cn.ac.iie.origion.bean;
/**
* @ClassNameKeyBean

View File

@@ -1,4 +1,4 @@
package cn.ac.iie.trident.aggregate.bean;
package cn.ac.iie.origion.bean;
import java.io.Serializable;

View File

@@ -0,0 +1,105 @@
package cn.ac.iie.origion.bolt;
import cn.ac.iie.origion.bean.ValueBean;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import cn.ac.iie.origion.utils.TupleUtils;
import com.alibaba.fastjson.JSON;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class AggregateBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(AggregateBolt.class);
private static final long serialVersionUID = 9006119186526123734L;
private static HashMap<String, Integer> map = new HashMap();
private static ValueBean valueBean = new ValueBean();
private static ValueBean tupleValueBean = new ValueBean();
private static String key = "";
private static Integer value = 0;
private static String message = "";
@Override
public void prepare(Map stormConf, TopologyContext context) {
System.out.println("prepare方法执行了++++++++++++++++++++++++++");
logger.error("prepare方法执行了++++++++++++++++++++++++++");
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
System.out.println("执行了一次====================================" + value + "==========================" + System.currentTimeMillis());
try {
if (TupleUtils.isTick(tuple)) {
System.out.println(this.map);
//批量发送到下一个bolt
basicOutputCollector.emit(new Values(JSON.toJSONString(this.map)));
} else {
message = tuple.getString(0);
//TODO 获取一条tuple数据的key和value
//TODO 两个count聚合后放入HashMap中利用HashMap的去重功能实现value的覆盖
this.map.put("192", addValueBean(value, 1));
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 30);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("connLog"));
}
/**
* 将两个ValueBean中的相应的属性相加
*
* @param result
* @param value2
* @return
*/
@SuppressWarnings("all")
public ValueBean addValueBean(ValueBean result, ValueBean value2) {
result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
result.setCommon_sessions(result.getCommon_sessions() + 1L);
return result;
}
public Integer addValueBean(Integer result, Integer value2) {
return result + value2;
}
}

View File

@@ -0,0 +1,102 @@
package cn.ac.iie.origion.bolt;
import cn.ac.iie.origion.bean.KeyBean;
import cn.ac.iie.origion.bean.ValueBean;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassNameMyWindowBolt
* @Author lixkvip@126.com
* @Date2020/6/9 14:45
* @Version V1.0
**/
public class MyWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;
private static ArrayList<Integer> list;
private static HashMap<String, ValueBean> map;
private static String message;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
map = new HashMap<>(16);
list = new ArrayList<>();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("map"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return super.getComponentConfiguration();
}
@Override
public void execute(TupleWindow inputWindow) {
//TODO 清空上一个窗口聚合的数据
map.clear();
//TODO 遍历一个窗口的数据
for (Tuple tuple : inputWindow.getNew()) {
//TODO 将一个Tuple解析成String
message = tuple.getStringByField("source");
//TODO 获取Tuple中的value Bean
ValueBean valueBean = JSONObject.parseObject(message,ValueBean.class);
//TODO 获取Tuple中的key String
String key = JSONObject.toJSONString(JSONObject.parseObject(message, KeyBean.class));
//TODO 获取map中的value Bean
ValueBean mapValueBean = map.getOrDefault(key,new ValueBean());
//TODO 将tuple中的value和map中的value做累加
mapValueBean = addValueBean(mapValueBean, valueBean);
//TODO 将累加后的结果放到map中
map.put(key, mapValueBean);
}
collector.emit(new Values(map));
}
@SuppressWarnings("all")
/**
* 将两个ValueBean中的相应的属性相加
* @param result
* @param value2
* @return
*/
public ValueBean addValueBean(ValueBean result, ValueBean value2) {
result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
result.setCommon_sessions(result.getCommon_sessions() + 1L);
return result;
}
}

View File

@@ -0,0 +1,85 @@
package cn.ac.iie.origion.bolt;
import cn.ac.iie.origion.bean.ConnectionRecordLog;
import cn.ac.iie.origion.bean.ValueBean;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import cn.ac.iie.origion.utils.KafkaLogNtc;
import com.alibaba.fastjson.JSONObject;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* 发送到kafka的bolt
*/
public class NtcLogSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = 3237813470939823159L;
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
private KafkaLogNtc kafkaLogNtc;
private static ConnectionRecordLog connectionRecordLog;
@Override
public void prepare(Map stormConf, TopologyContext context) {
kafkaLogNtc = KafkaLogNtc.getInstance();
connectionRecordLog = new ConnectionRecordLog();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
HashMap<String,ValueBean> hashMap = (HashMap) tuple.getValue(0);
if (hashMap.size() != 0) {
for (String key : hashMap.keySet()) {
JSONObject keys = JSONObject.parseObject(key);
ValueBean valueBean = hashMap.get(key);
connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id")));
connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action")));
connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action"));
connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip"));
connectionRecordLog.setCommon_client_location(keys.getString("common_client_location"));
connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip"));
connectionRecordLog.setCommon_device_id(keys.getString("common_device_id"));
connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id"));
connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip"));
connectionRecordLog.setCommon_server_location(keys.getString("common_server_location"));
connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port")));
connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol"));
connectionRecordLog.setHttp_domain(keys.getString("http_domain"));
connectionRecordLog.setSsl_sni(keys.getString("ssl_sni"));
//TODO 为Value赋值
connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
}
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

View File

@@ -0,0 +1,56 @@
package cn.ac.iie.origion.bolt;
import cn.ac.iie.origion.bean.ValueBean;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import com.alibaba.fastjson.JSONObject;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassNamePrintBolt
* @Author lixkvip@126.com
* @Date2020/6/9 13:53
* @Version V1.0
**/
public class PrintBolt extends BaseBasicBolt {
private static final long serialVersionUID = -3663610927224396615L;
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@SuppressWarnings("all")
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
HashMap<String,ValueBean> hashMap = (HashMap) tuple.getValue(0);
if (hashMap.size() != 0) {
for (String key : hashMap.keySet()) {
System.out.println(key);
System.out.println(JSONObject.toJSONString(hashMap.get(key)));
}
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}

View File

@@ -0,0 +1,81 @@
package cn.ac.iie.origion.spout;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* kafkaSpout
*
* @author Administrator
*/
public class CustomizedKafkaSpout extends BaseRichSpout {
private static final long serialVersionUID = -3363788553406229592L;
private KafkaConsumer<String, String> consumer;
private SpoutOutputCollector collector = null;
private TopologyContext context = null;
private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
private static Properties createConsumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
props.put("group.id", FlowWriteConfig.GROUP_ID);
props.put("session.timeout.ms", "60000");
props.put("max.poll.records", 3000);
props.put("max.partition.fetch.bytes", 31457280);
props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
this.context = context;
Properties prop = createConsumerConfig();
this.consumer = new KafkaConsumer<>(prop);
this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC));
}
@Override
public void close() {
consumer.close();
}
@Override
public void nextTuple() {
try {
// TODO Auto-generated method stub
ConsumerRecords<String, String> records = consumer.poll(10000L);
Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
for (ConsumerRecord<String, String> record : records) {
this.collector.emit(new Values(record.value()));
}
} catch (Exception e) {
logger.error("KafkaSpout发送消息出现异常!", e);
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("source"));
}
}

View File

@@ -0,0 +1,95 @@
package cn.ac.iie.origion.topology;
import cn.ac.iie.origion.bolt.MyWindowBolt;
import cn.ac.iie.origion.bolt.NtcLogSendBolt;
import cn.ac.iie.origion.spout.CustomizedKafkaSpout;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import java.util.concurrent.TimeUnit;
/**
* Storm程序主类
*
* @author Administrator
*/
public class LogFlowWriteTopology {
private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
private final String topologyName;
private final Config topologyConfig;
private TopologyBuilder builder;
private LogFlowWriteTopology() {
this(LogFlowWriteTopology.class.getSimpleName());
}
private LogFlowWriteTopology(String topologyName) {
this.topologyName = topologyName;
topologyConfig = createTopologConfig();
}
private Config createTopologConfig() {
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setMaxSpoutPending(150000);
conf.setNumAckers(3);
return conf;
}
private void runLocally() throws InterruptedException {
topologyConfig.setMaxTaskParallelism(1);
StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
}
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
//设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
}
private void buildTopology() {
builder = new TopologyBuilder();
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), 3);
builder.setBolt("TEST-CONN", new MyWindowBolt()
.withWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS),
new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)),FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
.localOrShuffleGrouping("LogFlowWriteSpout");
// builder.setBolt("TEST-CONN", new AggregateBolt(),3).localOrShuffleGrouping("LogFlowWriteSpout");
// builder.setBolt("KAKFA-CONN", new PrintBolt(),3).localOrShuffleGrouping("TEST-CONN");
builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(),FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("TEST-CONN");
}
public static void main(String[] args) throws Exception {
LogFlowWriteTopology csst = null;
boolean runLocally = true;
String parameter = "remote";
int size = 2;
if (args.length >= size && parameter.equalsIgnoreCase(args[1])) {
runLocally = false;
csst = new LogFlowWriteTopology(args[0]);
} else {
csst = new LogFlowWriteTopology();
}
csst.buildTopology();
if (runLocally) {
logger.info("执行本地模式...");
csst.runLocally();
} else {
logger.info("执行远程部署模式...");
csst.runRemotely();
}
}
}

View File

@@ -1,4 +1,4 @@
package cn.ac.iie.trident.aggregate.topology;
package cn.ac.iie.origion.topology;
import org.apache.storm.Config;

View File

@@ -1,36 +1,35 @@
package cn.ac.iie.trident.aggregate;
package cn.ac.iie.origion.utils;
import cn.ac.iie.origion.bean.KeyBean;
import cn.ac.iie.origion.bean.ValueBean;
import com.alibaba.fastjson.JSONObject;
import cn.ac.iie.trident.aggregate.bean.KeyBean;
import cn.ac.iie.trident.aggregate.bean.ValueBean;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* 把一个tuple解析成
* @ClassNameToJson
* @ClassNameAggregateUtil
* @Author lixkvip@126.com
* @Date2020/5/29 16:05
* @Date2020/6/9 11:44
* @Version V1.0
**/
public class ParseJson2KV extends BaseFunction {
public class AggregateUtil {
private static ValueBean valueBean = new ValueBean();
private static KeyBean keyBean = new KeyBean();
/**
* 接收到的是一个json字符串该方法将其解析成 Bean
* @param message
* @return
*/
public static String aggregate(String message){
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//TODO 获取tuple输入内容,解析成map
Map map = JSONObject.parseObject(tuple.getStringByField("str"));
Map map = JSONObject.parseObject(message);
//TODO KEY
@@ -61,7 +60,7 @@ public class ParseJson2KV extends BaseFunction {
valueBean.setCommon_sessions(Long.parseLong(map.getOrDefault("common_sessions",0).toString()));
collector.emit(new Values(JSONObject.toJSONString(keyBean), JSONObject.toJSONString(valueBean)));
return message;
}
}

View File

@@ -1,4 +1,4 @@
package cn.ac.iie.trident.aggregate.utils;
package cn.ac.iie.origion.utils;
/**

View File

@@ -1,4 +1,4 @@
package cn.ac.iie.trident.aggregate.utils;
package cn.ac.iie.origion.utils;
import java.util.Properties;

View File

@@ -1,4 +1,4 @@
package cn.ac.iie.trident.aggregate.utils;
package cn.ac.iie.origion.utils;
import org.apache.kafka.clients.producer.*;
@@ -14,6 +14,7 @@ import java.util.Properties;
*/
public class KafkaLogNtc {
private static Logger logger = Logger.getLogger(KafkaLogNtc.class);
/**
@@ -40,15 +41,15 @@ public class KafkaLogNtc {
public void sendMessage(String message) {
final int[] errorSum = {0};
kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
errorSum[0]++;
}
kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
errorSum[0]++;
}
});
}
});
kafkaProducer.flush();
logger.debug("Log sent to National Center successfully!!!!!");
@@ -71,5 +72,4 @@ public class KafkaLogNtc {
kafkaProducer = new KafkaProducer<>(properties);
}
}

View File

@@ -0,0 +1,23 @@
package cn.ac.iie.origion.utils;
import org.apache.storm.Constants;
import org.apache.storm.tuple.Tuple;
/**
* 用于检测是否是系统发送的tuple
*
* @author Administrator
*/
public final class TupleUtils {
/**
* 判断是否系统自动发送的Tuple
*
* @param tuple 元组
* @return true or false
*/
public static boolean isTick(Tuple tuple) {
return tuple != null
&& Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}
}

View File

@@ -1,98 +0,0 @@
package cn.ac.iie.trident.aggregate;
import com.alibaba.fastjson.JSON;
import cn.ac.iie.trident.aggregate.bean.ValueBean;
import org.apache.storm.tuple.Values;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import java.util.HashMap;
/**
* @ClassNameAggCount
* @Author lixkvip@126.com
* @Date2020/6/1 10:48
* @Version V1.0
**/
public class AggCount extends BaseAggregator<AggCount.State> {
static class State {
HashMap<String,ValueBean> map = new HashMap();
ValueBean valueBean = new ValueBean();
ValueBean tupleValueBean = new ValueBean();
String key = "";
}
@Override
public AggCount.State init(Object batchId, TridentCollector collector) {
return new State();
}
/**
* 聚合一个tuple
* @param state
* @param tuple
* @param collector
*/
@Override
public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {
//TODO 获取一条tuple数据的key和value
state.key = tuple.getStringByField("key");
state.tupleValueBean = JSON.parseObject(tuple.getStringByField("value"),ValueBean.class);
//TODO 获取HashMap中的key对应的value如果没有就默认为null
state.valueBean = state.map.getOrDefault(state.key, new ValueBean());
//TODO 聚合两个value
state.valueBean = addValueBean(state.valueBean,state.tupleValueBean);
//TODO 两个count聚合后放入HashMap中利用HashMap的去重功能实现value的覆盖
state.map.put(state.key, state.valueBean);
}
/**
* 处理一批tuple
* @param state
* @param collector
*/
@Override
public void complete(State state, TridentCollector collector) {
collector.emit(new Values(JSON.toJSONString(state.map)));
}
/**
* 将两个ValueBean中的相应的属性相加
* @param result
* @param value2
* @return
*/
public ValueBean addValueBean(ValueBean result, ValueBean value2){
result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
result.setCommon_sessions(result.getCommon_sessions() + 1L);
return result;
}
}

View File

@@ -1,58 +0,0 @@
package cn.ac.iie.trident.aggregate;
import cn.ac.iie.trident.aggregate.bolt.KafkaBolt;
import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout;
import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
import org.apache.storm.tuple.Fields;
import java.util.concurrent.TimeUnit;
/**
* @ClassNameWcTopo
* @Author lixkvip@126.com
* @Date2020/5/29 10:38
* @Version V1.0
**/
public class AggregateTopology {
public static void main(String[] args) {
//TODO 创建一个topo任务
TridentTopology topology = new TridentTopology();
//TODO 为Topo绑定Spout
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
.name("one")
.parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度
.name("two")
.each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
.parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度
.name("three")
.slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
.name("four")
.each(new Fields("map"), new KafkaBolt(), new Fields())
.name("five")
.parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度
.name("six");
Config config = new Config();
// config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
config.setDebug(false);
config.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); //worker的数量
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-wordcount", config, topology.build());
// StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build());
}
}

View File

@@ -1,75 +0,0 @@
package cn.ac.iie.trident.aggregate.bolt;
import cn.ac.iie.trident.aggregate.bean.ConnectionRecordLog;
import cn.ac.iie.trident.aggregate.bean.ValueBean;
import cn.ac.iie.trident.aggregate.utils.KafkaLogNtc;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
/**
* @ClassNameKafkaBolt
* @Author lixkvip@126.com
* @Date2020/6/3 16:50
* @Version V1.0
**/
public class KafkaBolt extends BaseFunction {
private static final long serialVersionUID = -2107081139682355171L;
private static KafkaLogNtc kafkaLogNtc;
private static ConnectionRecordLog connectionRecordLog = new ConnectionRecordLog();
private static JSONObject jsonObject;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
if (kafkaLogNtc == null) {
kafkaLogNtc = KafkaLogNtc.getInstance();
}
//TODO 解析成json对象方便以后的遍历
jsonObject = JSONObject.parseObject(tuple.getStringByField("map"));
for (String key : jsonObject.keySet()) {
//TODO 为Key赋值
JSONObject keys = JSONObject.parseObject(key);
connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id")));
connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action")));
connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action"));
connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip"));
connectionRecordLog.setCommon_client_location(keys.getString("common_client_location"));
connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip"));
connectionRecordLog.setCommon_device_id(keys.getString("common_device_id"));
connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id"));
connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip"));
connectionRecordLog.setCommon_server_location(keys.getString("common_server_location"));
connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port")));
connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol"));
connectionRecordLog.setHttp_domain(keys.getString("http_domain"));
connectionRecordLog.setSsl_sni(keys.getString("ssl_sni"));
//TODO 为Value赋值
ValueBean valueBean = JSONObject.parseObject(jsonObject.get(key).toString(), ValueBean.class);
connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
}
}
}

View File

@@ -1,40 +0,0 @@
package cn.ac.iie.trident.aggregate.spout;
import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.spout.SchemeAsMultiScheme;
/**
* @ClassNameKafkaSpout
* @Author lixkvip@126.com
* @Date2020/6/4 11:55
* @Version V1.0
**/
public class TridentKafkaSpout {
/**
* kafka生产者适配器单例用来代理kafka生产者发送消息
*/
private static OpaqueTridentKafkaSpout opaqueTridentKafkaSpout;
public static OpaqueTridentKafkaSpout getInstance() {
if (opaqueTridentKafkaSpout == null) {
BrokerHosts zkHosts = new ZkHosts(FlowWriteConfig.ZOOKEEPER_SERVERS);
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.startOffsetTime = -1L;
kafkaConfig.socketTimeoutMs=60000;
//不透明事务型Spout
opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
}
return opaqueTridentKafkaSpout;
}
}

View File

@@ -1,111 +0,0 @@
package cn.ac.iie.trident.aggregate.topology;
import cn.ac.iie.trident.aggregate.AggCount;
import cn.ac.iie.trident.aggregate.ParseJson2KV;
import cn.ac.iie.trident.aggregate.bolt.KafkaBolt;
import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout;
import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
import org.apache.storm.tuple.Fields;
import java.util.concurrent.TimeUnit;
/**
* Storm程序主类
*
* @author Administrator
*/
public class LogFlowWriteTopology {
private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
private TopologyBuilder builder;
private static TridentTopology tridentTopology;
private static Config createTopologConfig() {
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
return conf;
}
private static StormTopology buildTopology() {
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
tridentTopology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
.parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)
.each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
.parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
.slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
.each(new Fields("map"), new KafkaBolt(), new Fields());
return tridentTopology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
//TODO 创建一个topo任务
TridentTopology topology = new TridentTopology();
//TODO 为Topo绑定Spout
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
/* topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
.parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)//6
.each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
.parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)//9
.slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
.each(new Fields("map"), new KafkaBolt(), new Fields());*/
topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
.name("one")
.parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度
.name("two")
.each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
.parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度
.name("three")
.slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
.name("four")
.each(new Fields("map"), new KafkaBolt(), new Fields())
.name("five")
.parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度
.name("six");
if(args.length == 0){//本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-function", conf, topology.build());
Thread.sleep(100000);
cluster.shutdown();
}else{//集群模式运行
StormSubmitter.submitTopology(args[0], conf, topology.build());
}
}
}

View File

@@ -1,23 +0,0 @@
#Log4j
log4j.rootLogger=warn,console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=info
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=error
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
log4j.appender.file.file=storm-topology.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置com.nis.web.dao是mybatis接口所在包
log4j.logger.com.nis.web.dao=debug
#bonecp数据源配置
log4j.category.com.jolbox=debug,console

View File

@@ -1,12 +1,6 @@
package com.wp;
import cn.ac.iie.trident.aggregate.bean.ValueBean;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.TridentTopology;