commit 868364da6b1282c1c1c0dcaf2670a1b3d75a8bae Author: lee Date: Wed May 27 19:45:31 2020 +0800 OLAP预聚合代码初始版本 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..11afe9c --- /dev/null +++ b/pom.xml @@ -0,0 +1,292 @@ + + 4.0.0 + + cn.ac.iie + log-stream-aggregation + 0.0.1-SNAPSHOT + jar + + log-stream-aggregation + http://maven.apache.org + + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.2 + + + package + + shade + + + + + cn.ac.iie.topology.LogFlowWriteTopology + + + META-INF/spring.handlers + + + META-INF/spring.schemas + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + + + properties + + **/*.properties + + false + + + properties + + log4j.properties + + false + + + + + + UTF-8 + 1.0.0 + 1.0.2 + 2.2.1 + 2.7.1 + + + + + + org.apache.kafka + kafka_2.11 + 1.0.0 + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.storm + storm-core + ${storm.version} + provided + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + org.influxdb + influxdb-java + 2.1 + + + + org.apache.storm + storm-kafka + ${storm.version} + + + + junit + junit + 4.12 + test + + + + com.alibaba + fastjson + 1.2.59 + + + + cglib + cglib-nodep + 3.2.4 + + + + com.zdjizhi + galaxy + 1.0.3 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.zookeeper + zookeeper + 3.4.9 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hbase + hbase-client + ${hbase.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hbase + hbase-server + ${hbase.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + org.junit.jupiter + junit-jupiter-api + 5.3.2 + compile + + + + + org.apache.httpcomponents + httpclient + 4.5.2 + + + + org.apache.httpcomponents + httpcore + 4.4.1 + + + + + diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..8072d1c --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,87 @@ +#管理kafka地址 +#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 +bootstrap.servers=192.168.40.127:9093 + +#zookeeper 地址 +zookeeper.servers=192.168.40.127:2182 +#zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 + +#hbase zookeeper地址 +#hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 +hbase.zookeeper.servers=192.168.40.203:2181 + +#hbase tablename +hbase.table.name=subscriber_info + +#latest/earliest +auto.offset.reset=latest + +#压缩模式 none or snappy +kafka.compression.type=none + +#kafka broker下的topic名称 +#kafka.topic=SECURITY-EVENT-LOG +kafka.topic=CONNECTION-RECORD-LOG + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=lxk-200512 + +#输出topic +results.output.topic=CONNECTION-RECORD-COMPLETED-LOG +#results.output.topic=SECURITY-EVENT-COMPLETED-LOG + +#storm topology workers +topology.workers=1 + +#spout并行度 建议与kafka分区数相同 +spout.parallelism=1 + +#处理补全操作的bolt并行度-worker的倍数 +datacenter.bolt.parallelism=1 + +#写入kafka的并行度10 +kafka.bolt.parallelism=1 + +#定位库地址 +#ip.library=/home/ceiec/topology/dat/ +#ip.library=D:\\workerSpace\\K18-Phase2\\3.0.2019115\\log-stream-completion\\ +ip.library=/dat/ + +#kafka批量条数 +batch.insert.num=2000 + +#网关的schema位置 +schema.http=http://192.168.40.224:9999/metadata/schema/v1/fields/connection_record_log + +#数据中心(UID) +data.center.id.num=15 + +#tick时钟频率 +topology.tick.tuple.freq.secs=5 + +#hbase 更新时间 +hbase.tick.tuple.freq.secs=60 + +#当bolt性能受限时,限制spout接收速度,理论看ack开启才有效 +topology.config.max.spout.pending=150000 + +#ack设置 1启动ack 0不启动ack +topology.num.acks=0 + +#spout接收睡眠时间 +topology.spout.sleep.time=1 + +#允许发送kafka最大失败数 +max.failure.num=20 + +#邮件默认编码 +mail.default.charset=UTF-8 + +#influx地址 +influx.ip=http://192.168.40.151:8086 + +#influx用户名 +influx.username=admin + +#influx密码 +influx.password=admin \ No newline at end of file diff --git a/src/java/cn/ac/iie/bean/KeyTuple.java b/src/java/cn/ac/iie/bean/KeyTuple.java new file mode 100644 index 0000000..8dc6bab --- /dev/null +++ b/src/java/cn/ac/iie/bean/KeyTuple.java @@ -0,0 +1,138 @@ +package cn.ac.iie.bean; + +/** + * @ClassNameKeyTuple + * @Author lixkvip@126.com + * @Date2020/5/27 16:18 + * @Version V1.0 + **/ +public class KeyTuple { + + private int common_policy_id; + private int common_action; + private String common_sub_action; + private String common_client_ip; + private String common_client_location; + private String common_sled_ip; + private String common_device_id; + private String common_subscriber_id; + private String common_server_ip; + private String common_server_location; + private int common_server_port; + private String common_l4_protocol; + private String http_domain; + private String ssl_sni; + + + public int getCommon_policy_id() { + return common_policy_id; + } + + public void setCommon_policy_id(int common_policy_id) { + this.common_policy_id = common_policy_id; + } + + public int getCommon_action() { + return common_action; + } + + public void setCommon_action(int common_action) { + this.common_action = common_action; + } + + public String getCommon_sub_action() { + return common_sub_action; + } + + public void setCommon_sub_action(String common_sub_action) { + this.common_sub_action = common_sub_action; + } + + public String getCommon_client_ip() { + return common_client_ip; + } + + public void setCommon_client_ip(String common_client_ip) { + this.common_client_ip = common_client_ip; + } + + public String getCommon_client_location() { + return common_client_location; + } + + public void setCommon_client_location(String common_client_location) { + this.common_client_location = common_client_location; + } + + public String getCommon_sled_ip() { + return common_sled_ip; + } + + public void setCommon_sled_ip(String common_sled_ip) { + this.common_sled_ip = common_sled_ip; + } + + public String getCommon_device_id() { + return common_device_id; + } + + public void setCommon_device_id(String common_device_id) { + this.common_device_id = common_device_id; + } + + public String getCommon_subscriber_id() { + return common_subscriber_id; + } + + public void setCommon_subscriber_id(String common_subscriber_id) { + this.common_subscriber_id = common_subscriber_id; + } + + public String getCommon_server_ip() { + return common_server_ip; + } + + public void setCommon_server_ip(String common_server_ip) { + this.common_server_ip = common_server_ip; + } + + public String getCommon_server_location() { + return common_server_location; + } + + public void setCommon_server_location(String common_server_location) { + this.common_server_location = common_server_location; + } + + public int getCommon_server_port() { + return common_server_port; + } + + public void setCommon_server_port(int common_server_port) { + this.common_server_port = common_server_port; + } + + public String getCommon_l4_protocol() { + return common_l4_protocol; + } + + public void setCommon_l4_protocol(String common_l4_protocol) { + this.common_l4_protocol = common_l4_protocol; + } + + public String getHttp_domain() { + return http_domain; + } + + public void setHttp_domain(String http_domain) { + this.http_domain = http_domain; + } + + public String getSsl_sni() { + return ssl_sni; + } + + public void setSsl_sni(String ssl_sni) { + this.ssl_sni = ssl_sni; + } +} diff --git a/src/java/cn/ac/iie/bean/ValueTuple.java b/src/java/cn/ac/iie/bean/ValueTuple.java new file mode 100644 index 0000000..4c12f52 --- /dev/null +++ b/src/java/cn/ac/iie/bean/ValueTuple.java @@ -0,0 +1,67 @@ +package cn.ac.iie.bean; + +/** + * @ClassNameValueTuple + * @Author lixkvip@126.com + * @Date2020/5/27 16:18 + * @Version V1.0 + **/ +public class ValueTuple { + + private int common_sessions; + private int common_c2s_pkt_num; + private int common_s2c_pkt_num; + private int common_c2s_byte_num; + private int common_s2c_byte_num; + + public int getCommon_sessions() { + return common_sessions; + } + + public void setCommon_sessions(int common_sessions) { + this.common_sessions = common_sessions; + } + + public int getCommon_c2s_pkt_num() { + return common_c2s_pkt_num; + } + + public void setCommon_c2s_pkt_num(int common_c2s_pkt_num) { + this.common_c2s_pkt_num = common_c2s_pkt_num; + } + + public int getCommon_s2c_pkt_num() { + return common_s2c_pkt_num; + } + + public void setCommon_s2c_pkt_num(int common_s2c_pkt_num) { + this.common_s2c_pkt_num = common_s2c_pkt_num; + } + + public int getCommon_c2s_byte_num() { + return common_c2s_byte_num; + } + + public void setCommon_c2s_byte_num(int common_c2s_byte_num) { + this.common_c2s_byte_num = common_c2s_byte_num; + } + + public int getCommon_s2c_byte_num() { + return common_s2c_byte_num; + } + + public void setCommon_s2c_byte_num(int common_s2c_byte_num) { + this.common_s2c_byte_num = common_s2c_byte_num; + } + + @Override + public String toString() { + return "ValueTuple{" + + "common_sessions=" + common_sessions + + ", common_c2s_pkt_num=" + common_c2s_pkt_num + + ", common_s2c_pkt_num=" + common_s2c_pkt_num + + ", common_c2s_byte_num=" + common_c2s_byte_num + + ", common_s2c_byte_num=" + common_s2c_byte_num + + '}'; + } +} diff --git a/src/java/cn/ac/iie/bolt/CompletionBolt.java b/src/java/cn/ac/iie/bolt/CompletionBolt.java new file mode 100644 index 0000000..e34ab88 --- /dev/null +++ b/src/java/cn/ac/iie/bolt/CompletionBolt.java @@ -0,0 +1,54 @@ +package cn.ac.iie.bolt; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +public class CompletionBolt extends BaseBasicBolt { + private final static Logger logger = Logger.getLogger(CompletionBolt.class); + private static final long serialVersionUID = 9006119186526123734L; + + @Override + public void prepare(Map stormConf, TopologyContext context) { + } + + @SuppressWarnings("Duplicates") + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(message)); + } + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); + } + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS); + return conf; + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("connLog")); + } + +} diff --git a/src/java/cn/ac/iie/bolt/NtcLogSendBolt.java b/src/java/cn/ac/iie/bolt/NtcLogSendBolt.java new file mode 100644 index 0000000..fa8fab3 --- /dev/null +++ b/src/java/cn/ac/iie/bolt/NtcLogSendBolt.java @@ -0,0 +1,71 @@ +package cn.ac.iie.bolt; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.kafka.KafkaLogNtc; +import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + * @date 2018/8/14 + */ +public class NtcLogSendBolt extends BaseBasicBolt { + private static final long serialVersionUID = -3663610927224396615L; + private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); + private List list; + private KafkaLogNtc kafkaLogNtc; + + + @Override + public void prepare(Map stormConf, TopologyContext context) { + list = new LinkedList<>(); + kafkaLogNtc = KafkaLogNtc.getInstance(); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + if (TupleUtils.isTick(tuple)) { + if (list.size() != 0) { + kafkaLogNtc.sendMessage(list); + list.clear(); + } + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + list.add(message); + } + if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) { + kafkaLogNtc.sendMessage(list); + list.clear(); + } + } + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); + e.printStackTrace(); + } + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap<>(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + } + +} diff --git a/src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java b/src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java new file mode 100644 index 0000000..7dbe16c --- /dev/null +++ b/src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java @@ -0,0 +1,51 @@ +package cn.ac.iie.bolt.radius; + +import cn.ac.iie.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + + +/** + * 通联关系日志补全 + * + * @author qidaijie + */ +public class RadiusCompletionBolt extends BaseBasicBolt { + + private final static Logger logger = Logger.getLogger(RadiusCompletionBolt.class); + private static final long serialVersionUID = -3657802387129063952L; + + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + @SuppressWarnings("Duplicates") + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(message)); + } + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("connLog")); + } + +} diff --git a/src/java/cn/ac/iie/common/FlowWriteConfig.java b/src/java/cn/ac/iie/common/FlowWriteConfig.java new file mode 100644 index 0000000..f049547 --- /dev/null +++ b/src/java/cn/ac/iie/common/FlowWriteConfig.java @@ -0,0 +1,51 @@ +package cn.ac.iie.common; + + +import cn.ac.iie.utils.system.FlowWriteConfigurations; + +/** + * @author Administrator + */ +public class FlowWriteConfig { + + public static final int IPV4_TYPE = 1; + public static final int IPV6_TYPE = 2; + public static final String FORMAT_SPLITTER = ","; + /** + * System + */ + public static final Integer SPOUT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "spout.parallelism"); + public static final Integer DATACENTER_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "datacenter.bolt.parallelism"); + public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers"); + public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism"); + public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs"); + public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); + public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending"); + public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks"); + public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time"); + public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num"); + public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); + public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num"); + public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); + + /** + * kafka + */ + public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers"); + public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); + public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(0, "hbase.table.name"); + public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); + public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic"); + public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic"); + public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset"); + public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type"); + + public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library"); + + /** + * http + */ + public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http"); + +} \ No newline at end of file diff --git a/src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java new file mode 100644 index 0000000..6b2d82b --- /dev/null +++ b/src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java @@ -0,0 +1,81 @@ +package cn.ac.iie.spout; + +import cn.ac.iie.common.FlowWriteConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +/** + * kafkaSpout + * + * @author Administrator + */ +public class CustomizedKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class); + + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS); + props.put("group.id", FlowWriteConfig.GROUP_ID); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // TODO Auto-generated method stub + this.collector = collector; + this.context = context; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC)); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void nextTuple() { + try { + // TODO Auto-generated method stub + ConsumerRecords records = consumer.poll(10000L); + Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME); + for (ConsumerRecord record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("KafkaSpout发送消息出现异常!", e); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + declarer.declare(new Fields("source")); + } +} diff --git a/src/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/java/cn/ac/iie/topology/LogFlowWriteTopology.java new file mode 100644 index 0000000..ca90ca6 --- /dev/null +++ b/src/java/cn/ac/iie/topology/LogFlowWriteTopology.java @@ -0,0 +1,92 @@ +package cn.ac.iie.topology; + + +import cn.ac.iie.bolt.CompletionBolt; +import cn.ac.iie.bolt.NtcLogSendBolt; +import cn.ac.iie.bolt.radius.RadiusCompletionBolt; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.spout.CustomizedKafkaSpout; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +/** + * Storm程序主类 + * + * @author Administrator + */ + +public class LogFlowWriteTopology { + private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class); + private final String topologyName; + private final Config topologyConfig; + private TopologyBuilder builder; + + private LogFlowWriteTopology() { + this(LogFlowWriteTopology.class.getSimpleName()); + } + + private LogFlowWriteTopology(String topologyName) { + this.topologyName = topologyName; + topologyConfig = createTopologConfig(); + } + + private Config createTopologConfig() { + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setMaxSpoutPending(FlowWriteConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING); + conf.setNumAckers(FlowWriteConfig.TOPOLOGY_NUM_ACKS); + return conf; + } + + private void runLocally() throws InterruptedException { + topologyConfig.setMaxTaskParallelism(1); + StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); + } + + private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); + //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌 + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8); + StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); + } + + private void buildTopology() { + builder = new TopologyBuilder(); + builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); + + builder.setBolt(FlowWriteConfig.KAFKA_TOPIC, new CompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + + builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping(FlowWriteConfig.KAFKA_TOPIC); + + + } + + public static void main(String[] args) throws Exception { + LogFlowWriteTopology csst = null; + boolean runLocally = true; + String parameter = "remote"; + int size = 2; + if (args.length >= size && parameter.equalsIgnoreCase(args[1])) { + runLocally = false; + csst = new LogFlowWriteTopology(args[0]); + } else { + csst = new LogFlowWriteTopology(); + } + + csst.buildTopology(); + + if (runLocally) { + logger.info("执行本地模式..."); + csst.runLocally(); + } else { + logger.info("执行远程部署模式..."); + csst.runRemotely(); + } + } +} diff --git a/src/java/cn/ac/iie/topology/StormRunner.java b/src/java/cn/ac/iie/topology/StormRunner.java new file mode 100644 index 0000000..f5094a4 --- /dev/null +++ b/src/java/cn/ac/iie/topology/StormRunner.java @@ -0,0 +1,35 @@ +package cn.ac.iie.topology; + + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +/** + * @author Administrator + */ +public final class StormRunner{ + private static final int MILLS_IN_SEC = 1000; + + private StormRunner() {} + + public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { + + LocalCluster localCluster = new LocalCluster(); + localCluster.submitTopology(topologyName, conf, builder.createTopology()); + Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC); + localCluster.shutdown(); + + } + + public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); + } + + +} diff --git a/src/java/cn/ac/iie/utils/general/Aggregate.java b/src/java/cn/ac/iie/utils/general/Aggregate.java new file mode 100644 index 0000000..e427574 --- /dev/null +++ b/src/java/cn/ac/iie/utils/general/Aggregate.java @@ -0,0 +1,152 @@ +package cn.ac.iie.utils.general; + +import cn.ac.iie.bean.ValueTuple; +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.json.JsonParseUtil; +import cn.ac.iie.utils.tuple.TwoTuple; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; + +import java.util.HashMap; + +public class Aggregate { + + + /** + * 在内存中加载反射类用的map + */ + private static HashMap map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 反射成一个类 + */ + private static Object mapObject = JsonParseUtil.generateObject(map); + + private static String key; + + private static HashMap resultMap = new HashMap<>(); + + private static Object conn; + + private static ValueTuple valueTuple = new ValueTuple(); + + private static String test = "{\"bgp_as_num\":\"100\",\"bgp_route\":\"192.168.222.0/24\",\"bgp_type\":1,\"common_action\":4,\"common_address_list\":\"\",\"common_address_type\":4,\"common_app_id\":0,\"common_app_label\":\"\",\"common_c2s_byte_num\":650,\"common_c2s_pkt_num\":7,\"common_client_asn\":\"9198\",\"common_client_ip\":\"95.56.198.87\",\"common_client_location\":\"Pervomayskiy,Almaty oblysy,哈萨克斯坦\",\"common_client_port\":13555,\"common_con_duration_ms\":154122,\"common_device_id\":\"2506398\",\"common_direction\":0,\"common_encapsulation\":0,\"common_end_time\":1590388545,\"common_entrance_id\":20,\"common_has_dup_traffic\":1,\"common_isp\":\"CMCC\",\"common_l4_protocol\":\"VLAN\",\"common_link_id\":1,\"common_log_id\":126995036504993794,\"common_policy_id\":0,\"common_protocol_id\":0,\"common_recv_time\":1590388694,\"common_s2c_byte_num\":9921,\"common_s2c_pkt_num\":21,\"common_schema_type\":\"SSL\",\"common_server_asn\":\"12876\",\"common_server_ip\":\"62.210.101.44\",\"common_server_location\":\"法国\",\"common_server_port\":22,\"common_service\":7,\"common_sled_ip\":\"192.168.10.36\",\"common_start_time\":1590388490,\"common_stream_dir\":2,\"common_stream_error\":\"\",\"common_stream_trace_id\":6193492085736674541,\"common_user_region\":\"prysUgOCWSmUYcGRL5rcUvVc8zbI9MOtlb9KOvH8rZCMRVqnIEyQVtQfBp94IIIjha24tGQ4x33qtC3jSx8udADkuezGGzrTrxCB\",\"common_user_tags\":\"9PD3v4GaIgS97l4wQnRtahW00YBi3933RDQg8PpiB8R9ftXhELHploJ0Ocg1Pj0xH06svaPbY2Tp1Chb0usQPttRqhpNbHTkW3En\",\"dns_aa\":0,\"dns_ancount\":64,\"dns_arcount\":22,\"dns_cname\":\"bFh2JvWJMWTCNcVEyuroMimLhoNM3O4effDDiNA9SRlCFdzaev10\",\"dns_message_id\":744559,\"dns_nscount\":59,\"dns_opcode\":0,\"dns_qclass\":2,\"dns_qdcount\":26,\"dns_qname\":\"kankanews.com12041281\",\"dns_qr\":1,\"dns_qtype\":5,\"dns_ra\":0,\"dns_rcode\":9,\"dns_rd\":0,\"dns_rr\":\"{\\\"aEWseVK\\\":\\\"UEUZ4qlk8qOjJBZ4\\\",\\\"9jGNxy0\\\":\\\"s075dZOXDXZ\\\",\\\"yyNXYD9G\\\":\\\"EEKxB99FuYDHH2E6NrV\\\",\\\"al23zn\\\":\\\"4dX1qd5L0A\\\"}\",\"dns_sub\":1,\"dns_tc\":0,\"ftp_account\":\"JXU2RDRCJXU4QkQ1\",\"ftp_content\":\"JXU2RDRCJXU4QkQ1\",\"ftp_url\":\"ftp://test:test@76.95.92.196/soft/list.txt\",\"http_content_length\":\"37339\",\"http_content_type\":\"application/x-javascript\",\"http_cookie\":\"BD_UPN=12314753\",\"http_domain\":\"163.com\",\"http_host\":\"v.163.com\",\"http_proxy_flag\":1,\"http_referer\":\"https://www.baidu.com/\",\"http_request_body\":\"\",\"http_request_body_key\":\"\",\"http_request_header\":\"\",\"http_request_line\":\"GET www.baidu.com/ HTTP/1.1\",\"http_response_body\":\"\",\"http_response_body_key\":\"\",\"http_response_header\":\"\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_sequence\":9,\"http_set_cookie\":\"delPer=0; path=/; domain=.baidu.com\",\"http_snapshot\":\"\",\"http_url\":\"http://v.163.com/movie/2011/7/0/3/M7B9K1R60_M7BAANT03.html\",\"http_user_agent\":\"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.87 Safari/537.36\",\"http_version\":\"http1\",\"mail_account\":\"123456789\",\"mail_attachment_content\":\"\",\"mail_attachment_name\":\"%u6D4B%u8BD5%u90AE%u4EF6\",\"mail_attachment_name_charset\":\"UTF-8\",\"mail_bcc\":\"\",\"mail_cc\":\"0yrj459uw4c@msn.com\",\"mail_content\":\"\",\"mail_content_charset\":\"\",\"mail_eml_file\":\"\",\"mail_from\":\"jcgkljyfkx@msn.com\",\"mail_from_cmd\":\"5pk@163.net\",\"mail_protocol_type\":\"POP3\",\"mail_snapshot\":\"\",\"mail_subject\":\"%u6D4B%u8BD5%u90AE%u4EF6\",\"mail_subject_charset\":\"UTF-8\",\"mail_to\":\"cbs@yahoo.com\",\"mail_to_cmd\":\"xlk7nj@hotmail.com\",\"ssl_cert_verify\":0,\"ssl_client_side_latency\":5237,\"ssl_client_side_version\":\"SSLv3\",\"ssl_cn\":\"\",\"ssl_con_latency_ms\":3,\"ssl_error\":\"\",\"ssl_intercept_state\":0,\"ssl_pinningst\":1,\"ssl_san\":\"\",\"ssl_server_side_latency\":4644,\"ssl_server_side_version\":\"TLSv1.1\",\"ssl_sni\":\"cztv.com11547021\",\"ssl_version\":\"V3\",\"streaming_media_protocol\":\"RTP\",\"streaming_media_url\":\"http://home.sogua.com/lujingai/mv/play.aspx?id=30195689\",\"voip_called_account\":\"13307536537\",\"voip_called_number\":\"15301710004\",\"voip_calling_account\":\"15901848931\",\"voip_calling_number\":\"13908208553\"}"; + + + public static void main(String[] args) { + + + + resultMap = aggregateJsonToMap(resultMap, test); + + System.out.println("聚合一次之后: " + resultMap.get(key)); + + resultMap = aggregateJsonToMap(resultMap, test); + + System.out.println("聚合两次之后: " + resultMap.get(key)); + + + } + + /** + * 将一条新数据累加到HashMap中 + * @param map + * @param message + * @return + */ + public static HashMap aggregateJsonToMap(HashMap map, String message) { + + ValueTuple valueTuple = JSON.parseObject(message, ValueTuple.class); + + key = getKey(message); + + map.put(key,addTuple(map.get(key), valueTuple)); + + return map; + } + + /** + * 两个ValueTuple类型的对象做相应属性的聚合 + * @param result + * @param message + * @return + */ + public static ValueTuple addTuple(ValueTuple result,ValueTuple message){ + + if (result == null){ + + result = new ValueTuple(); + } + + result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + message.getCommon_s2c_byte_num()); + result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + message.getCommon_c2s_byte_num()); + result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + message.getCommon_s2c_pkt_num()); + result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + message.getCommon_c2s_pkt_num()); + result.setCommon_sessions(result.getCommon_sessions() + message.getCommon_sessions()); + + return result; + } + + public static String getKey(String message){ + Object conn = JSONObject.parseObject(message, mapObject.getClass()); + //TODO key + Object common_policy_id = JsonParseUtil.getValue(conn, "common_policy_id"); + Object common_action = JsonParseUtil.getValue(conn, "common_action"); + Object common_sub_action = JsonParseUtil.getValue(conn, "common_sub_action"); + Object common_client_ip = JsonParseUtil.getValue(conn, "common_client_ip"); + Object common_client_location = JsonParseUtil.getValue(conn, "common_client_location"); + Object common_sled_ip = JsonParseUtil.getValue(conn, "common_sled_ip"); + Object common_device_id = JsonParseUtil.getValue(conn, "common_device_id"); + Object common_subscriber_id = JsonParseUtil.getValue(conn, "common_subscriber_id"); + Object common_server_ip = JsonParseUtil.getValue(conn, "common_server_ip"); + Object common_server_location = JsonParseUtil.getValue(conn, "common_server_location"); + Object common_server_port = JsonParseUtil.getValue(conn, "common_server_port"); + Object common_l4_protocol = JsonParseUtil.getValue(conn, "common_l4_protocol"); + Object http_domain = JsonParseUtil.getValue(conn, "http_domain"); + Object ssl_sni = JsonParseUtil.getValue(conn, "ssl_sni"); + + + StringBuilder builder = new StringBuilder(); + builder.append(common_policy_id).append("_") + .append(common_action).append("_") + .append(common_sub_action).append("_") + .append(common_client_ip).append("_") + .append(common_client_location).append("_") + .append(common_sled_ip).append("_") + .append(common_device_id).append("_") + .append(common_subscriber_id).append("_") + .append(common_server_ip).append("_") + .append(common_server_location).append("_") + .append(common_server_port).append("_") + .append(common_l4_protocol).append("_") + .append(http_domain).append("_") + .append(ssl_sni); + + return builder.toString(); + } + + +/* public static ValueTuple getValueTuple(String message){ + + conn = JSONObject.parseObject(message, mapObject.getClass()); + Object common_sessions = JsonParseUtil.getValue(conn, "common_sessions"); + + if (StringUtil.isEmpty(common_sessions)) { + common_sessions = 0; + } + Object common_c2s_pkt_num = JsonParseUtil.getValue(conn, "common_c2s_pkt_num"); + Object common_s2c_pkt_num = JsonParseUtil.getValue(conn, "common_s2c_pkt_num"); + Object common_c2s_byte_num = JsonParseUtil.getValue(conn, "common_c2s_byte_num"); + Object common_s2c_byte_num = JsonParseUtil.getValue(conn, "common_s2c_byte_num"); + + valueTuple.setCommon_sessions((int) common_sessions); + valueTuple.setCommon_c2s_pkt_num((int) common_c2s_pkt_num); + valueTuple.setCommon_s2c_pkt_num((int) common_s2c_pkt_num); + valueTuple.setCommon_c2s_byte_num((int) common_c2s_byte_num); + valueTuple.setCommon_s2c_byte_num((int) common_s2c_byte_num); + + return valueTuple; + + }*/ +} diff --git a/src/java/cn/ac/iie/utils/http/HttpClientUtil.java b/src/java/cn/ac/iie/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..347a69b --- /dev/null +++ b/src/java/cn/ac/iie/utils/http/HttpClientUtil.java @@ -0,0 +1,55 @@ +package cn.ac.iie.utils.http; + +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 获取网关schema的工具类 + * + * @author qidaijie + */ +public class HttpClientUtil { + + /** + * 请求网关获取schema + * @param http 网关url + * @return schema + */ + public static String requestByGetMethod(String http) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder = null; + try { + HttpGet get = new HttpGet(http); + try (CloseableHttpResponse httpResponse = httpClient.execute(get)) { + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + String line = null; + while ((line = bufferedReader.readLine()) != null) { + entityStringBuilder.append(line); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + if (httpClient != null) { + httpClient.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + return entityStringBuilder.toString(); + } + +} diff --git a/src/java/cn/ac/iie/utils/json/JsonParseUtil.java b/src/java/cn/ac/iie/utils/json/JsonParseUtil.java new file mode 100644 index 0000000..0350e7f --- /dev/null +++ b/src/java/cn/ac/iie/utils/json/JsonParseUtil.java @@ -0,0 +1,204 @@ +package cn.ac.iie.utils.json; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.http.HttpClientUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import net.sf.cglib.beans.BeanGenerator; +import net.sf.cglib.beans.BeanMap; + +import java.util.*; + +/** + * 使用FastJson解析json的工具类 + * + * @author qidaijie + */ +public class JsonParseUtil { + + /** + * 模式匹配,给定一个类型字符串返回一个类类型 + * + * @param type 类型 + * @return 类类型 + */ + + private static Class getClassName(String type) { + Class clazz; + + switch (type) { + case "int": + clazz = Integer.class; + break; + case "String": + clazz = String.class; + break; + case "long": + clazz = long.class; + break; + case "Integer": + clazz = Integer.class; + break; + case "double": + clazz = double.class; + break; + case "float": + clazz = float.class; + break; + case "char": + clazz = char.class; + break; + case "byte": + clazz = byte.class; + break; + case "boolean": + clazz = boolean.class; + break; + case "short": + clazz = short.class; + break; + default: + clazz = String.class; + } + return clazz; + } + + /** + * 根据反射生成对象的方法 + * + * @param properties 反射类用的map + * @return 生成的Object类型的对象 + */ + public static Object generateObject(Map properties) { + BeanGenerator generator = new BeanGenerator(); + Set keySet = properties.keySet(); + for (Object aKeySet : keySet) { + String key = (String) aKeySet; + generator.addProperty(key, (Class) properties.get(key)); + } + return generator.create(); + } + + /** + * 获取属性值的方法 + * + * @param obj 对象 + * @param property key + * @return 属性的值 + */ + public static Object getValue(Object obj, String property) { + BeanMap beanMap = BeanMap.create(obj); + return beanMap.get(property); + } + + /** + * 更新属性值的方法 + * + * @param obj 对象 + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(Object obj, String property, Object value) { + BeanMap beanMap = BeanMap.create(obj); + beanMap.put(property, value); + } + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @param http 网关schema地址 + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static HashMap getMapFromHttp(String http) { + HashMap map = new HashMap<>(); + + String schema = HttpClientUtil.requestByGetMethod(http); + Object data = JSON.parseObject(schema).get("data"); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + String name = JSON.parseObject(field.toString()).get("name").toString(); + String type = JSON.parseObject(field.toString()).get("type").toString(); + //组合用来生成实体类的map + map.put(name, getClassName(type)); + } + + return map; + } + + + /** + * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * + * @param http 网关url + * @return 任务列表 + */ + public static ArrayList getJobListFromHttp(String http) { + ArrayList list = new ArrayList<>(); + + String schema = HttpClientUtil.requestByGetMethod(http); + //解析data + Object data = JSON.parseObject(schema).get("data"); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + + if (JSON.parseObject(field.toString()).containsKey("doc")) { + Object doc = JSON.parseObject(field.toString()).get("doc"); + + if (JSON.parseObject(doc.toString()).containsKey("format")) { + String name = JSON.parseObject(field.toString()).get("name").toString(); + Object format = JSON.parseObject(doc.toString()).get("format"); + JSONObject formatObject = JSON.parseObject(format.toString()); + + String functions = formatObject.get("functions").toString(); + String appendTo = null; + String params = null; + + if (formatObject.containsKey("appendTo")) { + appendTo = formatObject.get("appendTo").toString(); + } + + if (formatObject.containsKey("param")) { + params = formatObject.get("param").toString(); + } + + + if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], null}); + } + + } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]}); + + } + } else { + list.add(new String[]{name, name, functions, params}); + } + + } + } + + } + return list; + } + + +} \ No newline at end of file diff --git a/src/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java b/src/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java new file mode 100644 index 0000000..09097c2 --- /dev/null +++ b/src/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java @@ -0,0 +1,80 @@ +package cn.ac.iie.utils.kafka; + +import cn.ac.iie.common.FlowWriteConfig; +import org.apache.kafka.clients.producer.*; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.Properties; + +/** + * NTC系统配置产生日志写入数据中心类 + * + * @author Administrator + * @create 2018-08-13 15:11 + */ + +public class KafkaLogNtc { + private static Logger logger = Logger.getLogger(KafkaLogNtc.class); + + /** + * kafka生产者,用于向kafka中发送消息 + */ + private static Producer kafkaProducer; + + /** + * kafka生产者适配器(单例),用来代理kafka生产者发送消息 + */ + private static KafkaLogNtc kafkaLogNtc; + + private KafkaLogNtc() { + initKafkaProducer(); + } + + public static KafkaLogNtc getInstance() { + if (kafkaLogNtc == null) { + kafkaLogNtc = new KafkaLogNtc(); + } + return kafkaLogNtc; + } + + + public void sendMessage(List list) { + final int[] errorSum = {0}; + for (String value : list) { + kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, value), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception); + errorSum[0]++; + } + } + }); + if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) { + list.clear(); + } + } + kafkaProducer.flush(); + logger.debug("Log sent to National Center successfully!!!!!"); + } + + /** + * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次 + */ + private void initKafkaProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", "1"); + properties.put("linger.ms", "2"); + properties.put("request.timeout.ms", 30000); + properties.put("batch.size", 262144); + properties.put("buffer.memory", 33554432); + properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE); + kafkaProducer = new KafkaProducer<>(properties); + } + + +} diff --git a/src/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java b/src/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java new file mode 100644 index 0000000..273a5f8 --- /dev/null +++ b/src/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java @@ -0,0 +1,65 @@ +package cn.ac.iie.utils.system; + +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class FlowWriteConfigurations { + + // private static Properties propCommon = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); +// } else if (type == 1) { +// return propCommon.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); +// } else if (type == 1) { +// return Integer.parseInt(propCommon.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); +// } else if (type == 1) { +// return Long.parseLong(propCommon.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propService.getProperty(key).toLowerCase().trim()); +// } else if (type == 1) { +// return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); + } else { + return null; + } + } + + static { + try { + propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + } catch (Exception e) { +// propCommon = null; + propService = null; + } + } +} diff --git a/src/java/cn/ac/iie/utils/system/TupleUtils.java b/src/java/cn/ac/iie/utils/system/TupleUtils.java new file mode 100644 index 0000000..53e14ca --- /dev/null +++ b/src/java/cn/ac/iie/utils/system/TupleUtils.java @@ -0,0 +1,23 @@ +package cn.ac.iie.utils.system; + +import org.apache.storm.Constants; +import org.apache.storm.tuple.Tuple; + +/** + * 用于检测是否是系统发送的tuple + * + * @author Administrator + */ +public final class TupleUtils { + /** + * 判断是否系统自动发送的Tuple + * + * @param tuple 元组 + * @return true or false + */ + public static boolean isTick(Tuple tuple) { + return tuple != null + && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) + && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); + } +} diff --git a/src/java/cn/ac/iie/utils/tuple/ThreeTuple.java b/src/java/cn/ac/iie/utils/tuple/ThreeTuple.java new file mode 100644 index 0000000..0529035 --- /dev/null +++ b/src/java/cn/ac/iie/utils/tuple/ThreeTuple.java @@ -0,0 +1,20 @@ +package cn.ac.iie.utils.tuple; + +public class ThreeTuple { + + public String first; + + public long second; + + public int third; + + public ThreeTuple(String name,long time, int sum){ + first = name; + second = time; + third = sum; + } + + public String toString(){ + return "[" + first + ", " + second + ", " + third + "]"; + } +} diff --git a/src/java/cn/ac/iie/utils/tuple/TupleAggregate.java b/src/java/cn/ac/iie/utils/tuple/TupleAggregate.java new file mode 100644 index 0000000..b3d2e71 --- /dev/null +++ b/src/java/cn/ac/iie/utils/tuple/TupleAggregate.java @@ -0,0 +1,195 @@ +package cn.ac.iie.utils.tuple; + + +import com.alibaba.fastjson.JSON; + + +import java.util.HashMap; + +public class TupleAggregate { + + + private static TwoTuple a = new TwoTuple<>("192.168.40.101", 1); + private static TwoTuple b = new TwoTuple<>("192.168.40.101", 1); + + private static ThreeTuple a1 = new ThreeTuple<>("lxk", 30L, 2); + private static ThreeTuple b1 = new ThreeTuple<>("lxk", 20L, 2); + + + public static TwoTuple parseJsonToTuple(String json) { + + CONN conn = JSON.parseObject(json, CONN.class); + //二元组 key + TwoTuple key = new TwoTuple<>(conn.getCip(), conn.getNum()); + //三元组 value + ThreeTuple value = new ThreeTuple<>(conn.getName(), conn.getTime(), conn.getSum()); + + + return new TwoTuple<>(key, value); + } + + /** + * 聚合两个三元组 + * + * @param tuple1 + * @param tuple2 + * @return + */ + public static ThreeTuple addTuple(ThreeTuple tuple1, ThreeTuple tuple2) { + + + tuple1.second += tuple2.second; + tuple1.third += tuple2.third; + + return tuple1; + + } + + /** + * 将一条新数据累加到HashMap中 + * + * @param map + * @param json + * @return map1 + */ + public static HashMap aggregate(HashMap map, String json) { + + + //TODO json解析成对象,取出key聚合组成tuple 与HashMap中具有相同key的聚合 + + /** + * 还存在的问题 + * 1. key是对象 就算值一样也不会相同 (重写HashCode和equal方法) + * + * 2. 拿key去map中取值,如果为null,后面聚合会报错 空指针异常 + */ + + //一条日志 ==》 两元组 + TwoTuple tuple = parseJsonToTuple(json); + //取出key + TwoTuple key = tuple.first; + //内存中的HashMap中获取具有相同key的value + ThreeTuple value = map.get(key); + //将两个value聚合,赋值给value + value = addTuple(value, tuple.second); + //聚合的结果放回到内存中的HashMap + map.put(key, value); + + return map; + + } + + public static void main(String[] args) { + +// HashMap map1 = new HashMap<>(); +// a1 = addTuple(a1, b1); +// System.out.println("聚合成功:" + a1); + + + CONN conn1 = new CONN(); + CONN conn2 = new CONN(); + + conn1.setCip("192.168.40.101"); + conn2.setCip("192.168.40.101"); + + conn1.setNum(1); + conn2.setNum(1); + + conn1.setName("lxk"); + conn2.setName("lxk"); + + conn1.setTime(100L); + conn2.setTime(200L); + + conn1.setSum(10); + conn2.setSum(20); + + System.out.println("conn1" + conn1); + System.out.println("conn2" + conn2); + + String json1 = JSON.toJSONString(conn1); + String json2 = JSON.toJSONString(conn2); + + + HashMap map = new HashMap<>(); + + map.put(a, a1); + + System.out.println("开始的map:" + map); + + map = aggregate(map, json1); + + System.out.println("后来的map:" + map); + + System.out.println("a的HashCode: " + a.hashCode()); + System.out.println("b的HashCode: " + b.hashCode()); + + + + } + + +} + +class CONN { + + //二元组使用的 key + private String cip; + private int num; + + //三元组使用的 value + private String name; + private long time; + private int sum; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long getTime() { + return time; + } + + public void setTime(long time) { + this.time = time; + } + + public int getSum() { + return sum; + } + + public void setSum(int sum) { + this.sum = sum; + } + + public String getCip() { + return cip; + } + + public void setCip(String cip) { + this.cip = cip; + } + + public int getNum() { + return num; + } + + public void setNum(int num) { + this.num = num; + } + + @Override + public String toString() { + return "CONN{" + + "cip='" + cip + '\'' + + ", num=" + num + + ", name='" + name + '\'' + + ", time=" + time + + ", sum=" + sum + + '}'; + } +} \ No newline at end of file diff --git a/src/java/cn/ac/iie/utils/tuple/TwoTuple.java b/src/java/cn/ac/iie/utils/tuple/TwoTuple.java new file mode 100644 index 0000000..618f3ee --- /dev/null +++ b/src/java/cn/ac/iie/utils/tuple/TwoTuple.java @@ -0,0 +1,32 @@ +package cn.ac.iie.utils.tuple; + +public class TwoTuple { + + public A first; + + public B second; + + public TwoTuple(){}; + public TwoTuple(A cip, B num) { + first = cip; + second = num; + } + + public String toString() { + return "[" + first + ", " + second + "]"; + } + + @Override + public int hashCode() { + + int result = (first != null && second != null) ? (first.toString() + second).hashCode() : 0; + + return result; + } + + @Override + public boolean equals(Object o) { + + return this.first.toString().equals(((TwoTuple) o).first.toString()) && this.first.toString().equals(((TwoTuple) o).first.toString()); + } +} diff --git a/src/java/log4j.properties b/src/java/log4j.properties new file mode 100644 index 0000000..17c0e9a --- /dev/null +++ b/src/java/log4j.properties @@ -0,0 +1,23 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=error +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=storm-topology.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=debug +#bonecp数据源配置 +log4j.category.com.jolbox=debug,console \ No newline at end of file