diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml index cc9ded2..2c51830 100644 --- a/dependency-reduced-pom.xml +++ b/dependency-reduced-pom.xml @@ -34,7 +34,7 @@ - cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology + cn.ac.iie.origion.topology.LogFlowWriteTopology META-INF/spring.handlers diff --git a/pom.xml b/pom.xml index 8b12d9e..82aca69 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ - cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology + cn.ac.iie.origion.topology.LogFlowWriteTopology diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java b/src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java similarity index 99% rename from src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java rename to src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java index 54fa4f3..06b2135 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java +++ b/src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java @@ -1,4 +1,4 @@ -package cn.ac.iie.trident.aggregate.bean; +package cn.ac.iie.origion.bean; /** * @ClassNameConnectionRecordLog diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java similarity index 99% rename from src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java rename to src/main/java/cn/ac/iie/origion/bean/KeyBean.java index 5515c48..00e9e3c 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java +++ b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java @@ -1,4 +1,4 @@ -package cn.ac.iie.trident.aggregate.bean; +package cn.ac.iie.origion.bean; /** * @ClassNameKeyBean diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java b/src/main/java/cn/ac/iie/origion/bean/ValueBean.java similarity index 97% rename from src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java rename to src/main/java/cn/ac/iie/origion/bean/ValueBean.java index 8ddcc38..cfe2f37 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java +++ b/src/main/java/cn/ac/iie/origion/bean/ValueBean.java @@ -1,4 +1,4 @@ -package cn.ac.iie.trident.aggregate.bean; +package cn.ac.iie.origion.bean; import java.io.Serializable; diff --git a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java new file mode 100644 index 0000000..a656606 --- /dev/null +++ b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java @@ -0,0 +1,105 @@ +package cn.ac.iie.origion.bolt; + + +import cn.ac.iie.origion.bean.ValueBean; +import cn.ac.iie.origion.utils.FlowWriteConfig; +import cn.ac.iie.origion.utils.TupleUtils; + +import com.alibaba.fastjson.JSON; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + + +public class AggregateBolt extends BaseBasicBolt { + private final static Logger logger = Logger.getLogger(AggregateBolt.class); + private static final long serialVersionUID = 9006119186526123734L; + + private static HashMap map = new HashMap(); + private static ValueBean valueBean = new ValueBean(); + private static ValueBean tupleValueBean = new ValueBean(); + private static String key = ""; + private static Integer value = 0; + private static String message = ""; + + + @Override + public void prepare(Map stormConf, TopologyContext context) { + System.out.println("prepare方法执行了++++++++++++++++++++++++++"); + + logger.error("prepare方法执行了++++++++++++++++++++++++++"); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + + System.out.println("执行了一次====================================" + value + "==========================" + System.currentTimeMillis()); + try { + if (TupleUtils.isTick(tuple)) { + System.out.println(this.map); + //批量发送到下一个bolt + basicOutputCollector.emit(new Values(JSON.toJSONString(this.map))); + + + } else { + message = tuple.getString(0); + //TODO 获取一条tuple数据的key和value + + //TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖 + this.map.put("192", addValueBean(value, 1)); + } + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); + } + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 30); + return conf; + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("connLog")); + } + + /** + * 将两个ValueBean中的相应的属性相加 + * + * @param result + * @param value2 + * @return + */ + + @SuppressWarnings("all") + public ValueBean addValueBean(ValueBean result, ValueBean value2) { + + result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num()); + result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num()); + result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num()); + result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num()); + result.setCommon_sessions(result.getCommon_sessions() + 1L); + + + return result; + } + + public Integer addValueBean(Integer result, Integer value2) { + + return result + value2; + } + + +} diff --git a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java new file mode 100644 index 0000000..96151ff --- /dev/null +++ b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java @@ -0,0 +1,102 @@ +package cn.ac.iie.origion.bolt; + +import cn.ac.iie.origion.bean.KeyBean; +import cn.ac.iie.origion.bean.ValueBean; +import com.alibaba.fastjson.JSONObject; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * @ClassNameMyWindowBolt + * @Author lixkvip@126.com + * @Date2020/6/9 14:45 + * @Version V1.0 + **/ +public class MyWindowBolt extends BaseWindowedBolt { + + + private OutputCollector collector; + + private static ArrayList list; + private static HashMap map; + private static String message; + + + @Override + + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + map = new HashMap<>(16); + list = new ArrayList<>(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + declarer.declare(new Fields("map")); + } + + @Override + public Map getComponentConfiguration() { + return super.getComponentConfiguration(); + } + + @Override + public void execute(TupleWindow inputWindow) { + //TODO 清空上一个窗口聚合的数据 + map.clear(); + + //TODO 遍历一个窗口的数据 + for (Tuple tuple : inputWindow.getNew()) { + + //TODO 将一个Tuple解析成String + message = tuple.getStringByField("source"); + + //TODO 获取Tuple中的value Bean + ValueBean valueBean = JSONObject.parseObject(message,ValueBean.class); + //TODO 获取Tuple中的key String + String key = JSONObject.toJSONString(JSONObject.parseObject(message, KeyBean.class)); + //TODO 获取map中的value Bean + ValueBean mapValueBean = map.getOrDefault(key,new ValueBean()); + //TODO 将tuple中的value和map中的value做累加 + mapValueBean = addValueBean(mapValueBean, valueBean); + + //TODO 将累加后的结果放到map中 + map.put(key, mapValueBean); + } + + collector.emit(new Values(map)); + } + + @SuppressWarnings("all") + /** + * 将两个ValueBean中的相应的属性相加 + * @param result + * @param value2 + * @return + */ + + public ValueBean addValueBean(ValueBean result, ValueBean value2) { + + result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num()); + result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num()); + result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num()); + result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num()); + result.setCommon_sessions(result.getCommon_sessions() + 1L); + + + return result; + } + + +} diff --git a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java new file mode 100644 index 0000000..59d7eef --- /dev/null +++ b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java @@ -0,0 +1,85 @@ +package cn.ac.iie.origion.bolt; + + +import cn.ac.iie.origion.bean.ConnectionRecordLog; +import cn.ac.iie.origion.bean.ValueBean; +import cn.ac.iie.origion.utils.FlowWriteConfig; +import cn.ac.iie.origion.utils.KafkaLogNtc; +import com.alibaba.fastjson.JSONObject; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.HashMap; +import java.util.Map; + +/** + * 发送到kafka的bolt + */ +public class NtcLogSendBolt extends BaseBasicBolt { + + + private static final long serialVersionUID = 3237813470939823159L; + private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); + private KafkaLogNtc kafkaLogNtc; + + private static ConnectionRecordLog connectionRecordLog; + + + @Override + public void prepare(Map stormConf, TopologyContext context) { + kafkaLogNtc = KafkaLogNtc.getInstance(); + connectionRecordLog = new ConnectionRecordLog(); + } + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + HashMap hashMap = (HashMap) tuple.getValue(0); + if (hashMap.size() != 0) { + for (String key : hashMap.keySet()) { + JSONObject keys = JSONObject.parseObject(key); + + ValueBean valueBean = hashMap.get(key); + connectionRecordLog.setCommon_recv_time(System.currentTimeMillis()); + connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id"))); + connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action"))); + connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action")); + connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip")); + connectionRecordLog.setCommon_client_location(keys.getString("common_client_location")); + connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip")); + connectionRecordLog.setCommon_device_id(keys.getString("common_device_id")); + connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id")); + connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip")); + connectionRecordLog.setCommon_server_location(keys.getString("common_server_location")); + connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port"))); + connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol")); + connectionRecordLog.setHttp_domain(keys.getString("http_domain")); + connectionRecordLog.setSsl_sni(keys.getString("ssl_sni")); + + //TODO 为Value赋值 + + connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions()); + connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num()); + connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num()); + connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num()); + connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num()); + + kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog)); + + } + } + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); + e.printStackTrace(); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} diff --git a/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java b/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java new file mode 100644 index 0000000..4215d0e --- /dev/null +++ b/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java @@ -0,0 +1,56 @@ +package cn.ac.iie.origion.bolt; + +import cn.ac.iie.origion.bean.ValueBean; + +import cn.ac.iie.origion.utils.FlowWriteConfig; +import com.alibaba.fastjson.JSONObject; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.HashMap; +import java.util.Map; + +/** + * @ClassNamePrintBolt + * @Author lixkvip@126.com + * @Date2020/6/9 13:53 + * @Version V1.0 + **/ +public class PrintBolt extends BaseBasicBolt { + + private static final long serialVersionUID = -3663610927224396615L; + private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); + + + @Override + public void prepare(Map stormConf, TopologyContext context) { + } + + @SuppressWarnings("all") + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + HashMap hashMap = (HashMap) tuple.getValue(0); + + if (hashMap.size() != 0) { + for (String key : hashMap.keySet()) { + System.out.println(key); + System.out.println(JSONObject.toJSONString(hashMap.get(key))); + } + + } + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); + e.printStackTrace(); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + } +} diff --git a/src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java new file mode 100644 index 0000000..501f2f6 --- /dev/null +++ b/src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java @@ -0,0 +1,81 @@ +package cn.ac.iie.origion.spout; + + +import cn.ac.iie.origion.utils.FlowWriteConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +/** + * kafkaSpout + * + * @author Administrator + */ +public class CustomizedKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class); + + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS); + props.put("group.id", FlowWriteConfig.GROUP_ID); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // TODO Auto-generated method stub + this.collector = collector; + this.context = context; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC)); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void nextTuple() { + try { + // TODO Auto-generated method stub + ConsumerRecords records = consumer.poll(10000L); + Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME); + for (ConsumerRecord record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("KafkaSpout发送消息出现异常!", e); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + declarer.declare(new Fields("source")); + } +} diff --git a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java new file mode 100644 index 0000000..b197b6a --- /dev/null +++ b/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java @@ -0,0 +1,95 @@ +package cn.ac.iie.origion.topology; + +import cn.ac.iie.origion.bolt.MyWindowBolt; +import cn.ac.iie.origion.bolt.NtcLogSendBolt; +import cn.ac.iie.origion.spout.CustomizedKafkaSpout; +import cn.ac.iie.origion.utils.FlowWriteConfig; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseWindowedBolt; + +import java.util.concurrent.TimeUnit; + + +/** + * Storm程序主类 + * + * @author Administrator + */ + +public class LogFlowWriteTopology { + private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class); + private final String topologyName; + private final Config topologyConfig; + private TopologyBuilder builder; + + private LogFlowWriteTopology() { + this(LogFlowWriteTopology.class.getSimpleName()); + } + + private LogFlowWriteTopology(String topologyName) { + this.topologyName = topologyName; + topologyConfig = createTopologConfig(); + } + + private Config createTopologConfig() { + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setMaxSpoutPending(150000); + conf.setNumAckers(3); + return conf; + } + + private void runLocally() throws InterruptedException { + topologyConfig.setMaxTaskParallelism(1); + StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); + } + + private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); + //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌 + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8); + StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); + } + + private void buildTopology() { + builder = new TopologyBuilder(); + builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), 3); + builder.setBolt("TEST-CONN", new MyWindowBolt() + .withWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS), + new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)),FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) + .localOrShuffleGrouping("LogFlowWriteSpout"); + +// builder.setBolt("TEST-CONN", new AggregateBolt(),3).localOrShuffleGrouping("LogFlowWriteSpout"); +// builder.setBolt("KAKFA-CONN", new PrintBolt(),3).localOrShuffleGrouping("TEST-CONN"); + builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(),FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("TEST-CONN"); + } + + public static void main(String[] args) throws Exception { + LogFlowWriteTopology csst = null; + boolean runLocally = true; + String parameter = "remote"; + int size = 2; + if (args.length >= size && parameter.equalsIgnoreCase(args[1])) { + runLocally = false; + csst = new LogFlowWriteTopology(args[0]); + } else { + csst = new LogFlowWriteTopology(); + } + + csst.buildTopology(); + + if (runLocally) { + logger.info("执行本地模式..."); + csst.runLocally(); + } else { + logger.info("执行远程部署模式..."); + csst.runRemotely(); + } + } +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java b/src/main/java/cn/ac/iie/origion/topology/StormRunner.java similarity index 96% rename from src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java rename to src/main/java/cn/ac/iie/origion/topology/StormRunner.java index 708f77c..f4ecbcd 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java +++ b/src/main/java/cn/ac/iie/origion/topology/StormRunner.java @@ -1,4 +1,4 @@ -package cn.ac.iie.trident.aggregate.topology; +package cn.ac.iie.origion.topology; import org.apache.storm.Config; diff --git a/src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java similarity index 75% rename from src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java rename to src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java index 2a8191c..a7047ef 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java +++ b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java @@ -1,36 +1,35 @@ -package cn.ac.iie.trident.aggregate; +package cn.ac.iie.origion.utils; + +import cn.ac.iie.origion.bean.KeyBean; +import cn.ac.iie.origion.bean.ValueBean; import com.alibaba.fastjson.JSONObject; -import cn.ac.iie.trident.aggregate.bean.KeyBean; -import cn.ac.iie.trident.aggregate.bean.ValueBean; -import org.apache.storm.trident.operation.BaseFunction; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.tuple.TridentTuple; -import org.apache.storm.tuple.Values; import java.util.Map; /** - * 把一个tuple解析成 - * @ClassNameToJson + * @ClassNameAggregateUtil * @Author lixkvip@126.com - * @Date2020/5/29 16:05 + * @Date2020/6/9 11:44 * @Version V1.0 **/ -public class ParseJson2KV extends BaseFunction { +public class AggregateUtil { private static ValueBean valueBean = new ValueBean(); private static KeyBean keyBean = new KeyBean(); + /** + * 接收到的是一个json字符串,该方法将其解析成 Bean + * @param message + * @return + */ + public static String aggregate(String message){ - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - //TODO 获取tuple输入内容,解析成map - Map map = JSONObject.parseObject(tuple.getStringByField("str")); + Map map = JSONObject.parseObject(message); //TODO KEY @@ -61,7 +60,7 @@ public class ParseJson2KV extends BaseFunction { valueBean.setCommon_sessions(Long.parseLong(map.getOrDefault("common_sessions",0).toString())); - collector.emit(new Values(JSONObject.toJSONString(keyBean), JSONObject.toJSONString(valueBean))); + return message; } } diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java b/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java similarity index 98% rename from src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java rename to src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java index 3905580..16a069c 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java +++ b/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java @@ -1,4 +1,4 @@ -package cn.ac.iie.trident.aggregate.utils; +package cn.ac.iie.origion.utils; /** diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java b/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java similarity index 97% rename from src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java rename to src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java index bf24f34..70c9a6d 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java +++ b/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java @@ -1,4 +1,4 @@ -package cn.ac.iie.trident.aggregate.utils; +package cn.ac.iie.origion.utils; import java.util.Properties; diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java b/src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java similarity index 78% rename from src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java rename to src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java index 22918be..89ea53a 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java +++ b/src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java @@ -1,4 +1,4 @@ -package cn.ac.iie.trident.aggregate.utils; +package cn.ac.iie.origion.utils; import org.apache.kafka.clients.producer.*; @@ -14,6 +14,7 @@ import java.util.Properties; */ public class KafkaLogNtc { + private static Logger logger = Logger.getLogger(KafkaLogNtc.class); /** @@ -40,15 +41,15 @@ public class KafkaLogNtc { public void sendMessage(String message) { final int[] errorSum = {0}; - kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null) { - logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception); - errorSum[0]++; - } + kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception); + errorSum[0]++; } - }); + } + }); kafkaProducer.flush(); logger.debug("Log sent to National Center successfully!!!!!"); @@ -71,5 +72,4 @@ public class KafkaLogNtc { kafkaProducer = new KafkaProducer<>(properties); } - } diff --git a/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java b/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java new file mode 100644 index 0000000..6c0d6bc --- /dev/null +++ b/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java @@ -0,0 +1,23 @@ +package cn.ac.iie.origion.utils; + +import org.apache.storm.Constants; +import org.apache.storm.tuple.Tuple; + +/** + * 用于检测是否是系统发送的tuple + * + * @author Administrator + */ +public final class TupleUtils { + /** + * 判断是否系统自动发送的Tuple + * + * @param tuple 元组 + * @return true or false + */ + public static boolean isTick(Tuple tuple) { + return tuple != null + && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) + && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); + } +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java b/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java deleted file mode 100644 index 5ac557b..0000000 --- a/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java +++ /dev/null @@ -1,98 +0,0 @@ -package cn.ac.iie.trident.aggregate; - - -import com.alibaba.fastjson.JSON; -import cn.ac.iie.trident.aggregate.bean.ValueBean; -import org.apache.storm.tuple.Values; -import org.apache.storm.trident.operation.BaseAggregator; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.tuple.TridentTuple; - - -import java.util.HashMap; - - -/** - * @ClassNameAggCount - * @Author lixkvip@126.com - * @Date2020/6/1 10:48 - * @Version V1.0 - **/ -public class AggCount extends BaseAggregator { - - - static class State { - HashMap map = new HashMap(); - ValueBean valueBean = new ValueBean(); - ValueBean tupleValueBean = new ValueBean(); - String key = ""; - } - - @Override - public AggCount.State init(Object batchId, TridentCollector collector) { - - return new State(); - } - - /** - * 聚合一个tuple - * @param state - * @param tuple - * @param collector - */ - @Override - public void aggregate(State state, TridentTuple tuple, TridentCollector collector) { - - - //TODO 获取一条tuple数据的key和value - state.key = tuple.getStringByField("key"); - state.tupleValueBean = JSON.parseObject(tuple.getStringByField("value"),ValueBean.class); - - - //TODO 获取HashMap中的key对应的value,如果没有就默认为null - state.valueBean = state.map.getOrDefault(state.key, new ValueBean()); - - - //TODO 聚合两个value - state.valueBean = addValueBean(state.valueBean,state.tupleValueBean); - - //TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖 - state.map.put(state.key, state.valueBean); - - - - } - - /** - * 处理一批tuple - * @param state - * @param collector - */ - @Override - public void complete(State state, TridentCollector collector) { - - collector.emit(new Values(JSON.toJSONString(state.map))); - - - } - - /** - * 将两个ValueBean中的相应的属性相加 - * @param result - * @param value2 - * @return - */ - - public ValueBean addValueBean(ValueBean result, ValueBean value2){ - - result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num()); - result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num()); - result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num()); - result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num()); - result.setCommon_sessions(result.getCommon_sessions() + 1L); - - - return result; - } - -} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java deleted file mode 100644 index f63884d..0000000 --- a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java +++ /dev/null @@ -1,58 +0,0 @@ -package cn.ac.iie.trident.aggregate; - -import cn.ac.iie.trident.aggregate.bolt.KafkaBolt; -import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout; -import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; -import org.apache.storm.topology.base.BaseWindowedBolt; -import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; -import org.apache.storm.tuple.Fields; - -import java.util.concurrent.TimeUnit; - - -/** - * @ClassNameWcTopo - * @Author lixkvip@126.com - * @Date2020/5/29 10:38 - * @Version V1.0 - **/ -public class AggregateTopology { - - - - - public static void main(String[] args) { - //TODO 创建一个topo任务 - TridentTopology topology = new TridentTopology(); - //TODO 为Topo绑定Spout - OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); - - topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) - .name("one") - .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度 - .name("two") - .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) - .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度 - .name("three") - .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) - .name("four") - .each(new Fields("map"), new KafkaBolt(), new Fields()) - .name("five") - .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度 - .name("six"); - - Config config = new Config(); -// config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); - config.setDebug(false); - config.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); //worker的数量 - LocalCluster cluster = new LocalCluster(); - - - cluster.submitTopology("trident-wordcount", config, topology.build()); -// StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build()); - } -} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java b/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java deleted file mode 100644 index 3c70f01..0000000 --- a/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java +++ /dev/null @@ -1,75 +0,0 @@ -package cn.ac.iie.trident.aggregate.bolt; - -import cn.ac.iie.trident.aggregate.bean.ConnectionRecordLog; -import cn.ac.iie.trident.aggregate.bean.ValueBean; -import cn.ac.iie.trident.aggregate.utils.KafkaLogNtc; -import com.alibaba.fastjson.JSONObject; -import org.apache.storm.trident.operation.BaseFunction; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.tuple.TridentTuple; - -/** - * @ClassNameKafkaBolt - * @Author lixkvip@126.com - * @Date2020/6/3 16:50 - * @Version V1.0 - **/ -public class KafkaBolt extends BaseFunction { - - - private static final long serialVersionUID = -2107081139682355171L; - private static KafkaLogNtc kafkaLogNtc; - private static ConnectionRecordLog connectionRecordLog = new ConnectionRecordLog(); - private static JSONObject jsonObject; - - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - - if (kafkaLogNtc == null) { - kafkaLogNtc = KafkaLogNtc.getInstance(); - } - - //TODO 解析成json对象,方便以后的遍历 - jsonObject = JSONObject.parseObject(tuple.getStringByField("map")); - - for (String key : jsonObject.keySet()) { - - //TODO 为Key赋值 - - JSONObject keys = JSONObject.parseObject(key); - - connectionRecordLog.setCommon_recv_time(System.currentTimeMillis()); - connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id"))); - connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action"))); - connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action")); - connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip")); - connectionRecordLog.setCommon_client_location(keys.getString("common_client_location")); - connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip")); - connectionRecordLog.setCommon_device_id(keys.getString("common_device_id")); - connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id")); - connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip")); - connectionRecordLog.setCommon_server_location(keys.getString("common_server_location")); - connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port"))); - connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol")); - connectionRecordLog.setHttp_domain(keys.getString("http_domain")); - connectionRecordLog.setSsl_sni(keys.getString("ssl_sni")); - - - - //TODO 为Value赋值 - - ValueBean valueBean = JSONObject.parseObject(jsonObject.get(key).toString(), ValueBean.class); - connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions()); - connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num()); - connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num()); - connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num()); - connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num()); - - kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog)); - - - } - - - } -} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java deleted file mode 100644 index 2583b5b..0000000 --- a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java +++ /dev/null @@ -1,40 +0,0 @@ -package cn.ac.iie.trident.aggregate.spout; - -import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; -import org.apache.storm.kafka.BrokerHosts; -import org.apache.storm.kafka.StringScheme; -import org.apache.storm.kafka.ZkHosts; -import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; -import org.apache.storm.kafka.trident.TridentKafkaConfig; -import org.apache.storm.spout.SchemeAsMultiScheme; - -/** - * @ClassNameKafkaSpout - * @Author lixkvip@126.com - * @Date2020/6/4 11:55 - * @Version V1.0 - **/ -public class TridentKafkaSpout { - - - /** - * kafka生产者适配器(单例),用来代理kafka生产者发送消息 - */ - private static OpaqueTridentKafkaSpout opaqueTridentKafkaSpout; - - public static OpaqueTridentKafkaSpout getInstance() { - if (opaqueTridentKafkaSpout == null) { - - BrokerHosts zkHosts = new ZkHosts(FlowWriteConfig.ZOOKEEPER_SERVERS); - TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC); - kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); - kafkaConfig.startOffsetTime = -1L; - kafkaConfig.socketTimeoutMs=60000; - - //不透明事务型Spout - opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig); - } - return opaqueTridentKafkaSpout; - } - -} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java deleted file mode 100644 index d981fe1..0000000 --- a/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java +++ /dev/null @@ -1,111 +0,0 @@ -package cn.ac.iie.trident.aggregate.topology; - - -import cn.ac.iie.trident.aggregate.AggCount; -import cn.ac.iie.trident.aggregate.ParseJson2KV; -import cn.ac.iie.trident.aggregate.bolt.KafkaBolt; -import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout; -import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; -import org.apache.log4j.Logger; -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.AlreadyAliveException; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.InvalidTopologyException; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.topology.base.BaseWindowedBolt; -import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; -import org.apache.storm.tuple.Fields; - -import java.util.concurrent.TimeUnit; - -/** - * Storm程序主类 - * - * @author Administrator - */ - -public class LogFlowWriteTopology { - private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class); - - private TopologyBuilder builder; - private static TridentTopology tridentTopology; - - - - - - private static Config createTopologConfig() { - Config conf = new Config(); - conf.setDebug(false); - conf.setMessageTimeoutSecs(60); - conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); - return conf; - } - - - private static StormTopology buildTopology() { - - OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); - - tridentTopology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) - .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) - .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) - .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) - .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) - .each(new Fields("map"), new KafkaBolt(), new Fields()); - return tridentTopology.build(); - } - - public static void main(String[] args) throws Exception { - - - Config conf = new Config(); - conf.setDebug(false); - conf.setMessageTimeoutSecs(60); - conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); - - //TODO 创建一个topo任务 - TridentTopology topology = new TridentTopology(); - //TODO 为Topo绑定Spout - OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); - - /* topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) - .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)//6 - .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) - .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)//9 - .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) - .each(new Fields("map"), new KafkaBolt(), new Fields());*/ - - - topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) - .name("one") - .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度 - .name("two") - .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) - .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度 - .name("three") - .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) - .name("four") - .each(new Fields("map"), new KafkaBolt(), new Fields()) - .name("five") - .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度 - .name("six"); - - if(args.length == 0){//本地模式运行 - - LocalCluster cluster = new LocalCluster(); - - cluster.submitTopology("trident-function", conf, topology.build()); - Thread.sleep(100000); - cluster.shutdown(); - }else{//集群模式运行 - StormSubmitter.submitTopology(args[0], conf, topology.build()); - } - - } -} diff --git a/src/main/java/cn/ac/iie/trident/log4j.properties b/src/main/java/cn/ac/iie/trident/log4j.properties deleted file mode 100644 index 8d4ada4..0000000 --- a/src/main/java/cn/ac/iie/trident/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -#Log4j -log4j.rootLogger=warn,console,file -# 控制台日志设置 -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=info -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n - -# 文件日志设置 -log4j.appender.file=org.apache.log4j.DailyRollingFileAppender -log4j.appender.file.Threshold=error -log4j.appender.file.encoding=UTF-8 -log4j.appender.file.Append=true -#路径请用相对路径,做好相关测试输出到应用目下 -log4j.appender.file.file=storm-topology.log -log4j.appender.file.DatePattern='.'yyyy-MM-dd -log4j.appender.file.layout=org.apache.log4j.PatternLayout -#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n -log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n -#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 -log4j.logger.com.nis.web.dao=debug -#bonecp数据源配置 -log4j.category.com.jolbox=debug,console \ No newline at end of file diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java index 77c826f..71ab064 100644 --- a/src/test/java/com/wp/AppTest.java +++ b/src/test/java/com/wp/AppTest.java @@ -1,12 +1,6 @@ package com.wp; -import cn.ac.iie.trident.aggregate.bean.ValueBean; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; + import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.trident.TridentTopology;