From a8af7eea6604b85762edc6857a9d0a028b9ff711 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Thu, 5 Sep 2019 17:26:02 +0800 Subject: [PATCH] =?UTF-8?q?2019-09-05=E4=BF=AE=E6=94=B9=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- properties/service_flow_config.properties | 27 +++--- .../ac/iie/topology/LogFlowWriteTopology.java | 2 +- .../ac/iie/utils/general/TransFormUtils.java | 5 + .../cn/ac/iie/utils/system/SnowflakeId.java | 5 +- .../iie/utils/zookeeper/DistributedLock.java | 10 +- .../iie/utils/zookeeper/ZookeeperUtils.java | 22 ++--- .../cn/ac/iie/test/zookeeper/KafkaLogNtc.java | 92 +++++++++++++++++++ .../java/cn/ac/iie/test/zookeeper/test.java | 13 ++- 8 files changed, 146 insertions(+), 30 deletions(-) create mode 100644 src/test/java/cn/ac/iie/test/zookeeper/KafkaLogNtc.java diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 8bfb59b..06c7f56 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,36 +1,39 @@ #管理kafka地址 #bootstrap.servers=10.4.35.7:9092,10.4.35.8:9092,10.4.35.9:9092 -bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 +bootstrap.servers=192.168.6.200:9093,192.168.6.200:9094,192.168.6.200:9095 #zookeeper 地址 -zookeeper.servers=192.168.40.207:2181 +zookeeper.servers=192.168.6.200:2181 +#zookeeper.servers=192.168.40.207:2181 #latest/earliest auto.offset.reset=latest #kafka broker下的topic名称 -kafka.topic=SESSION-TEST-LOG +kafka.topic=SESSION-RECORD-LOG +#kafka.topic=Snowflake-test #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=session-record-log-g +group.id=session-record-log-z #输出topic -results.output.topic=SESSION-TEST-COMPLETED-LOG +#results.output.topic=SESSION-TEST-COMPLETED-LOG +results.output.topic=SESSION-RECORD-COMPLETED-LOG #storm topology workers -topology.workers=6 +topology.workers=1 #spout并行度 建议与kafka分区数相同 -spout.parallelism=12 +spout.parallelism=3 #处理补全操作的bolt并行度-worker的倍数 -datacenter.bolt.parallelism=30 +datacenter.bolt.parallelism=3 #写入kafkad的并行度 -kafka.bolt.parallelism=30 +kafka.bolt.parallelism=3 #定位库地址 -ip.library=/home/ceiec/topology/dat/ +ip.library=/dat/ #kafka批量条数 batch.insert.num=5000 @@ -57,10 +60,10 @@ check.ip.scope=10,100,192 max.failure.num=20 #influx地址 -influx.ip=http://10.0.5.19:8086 +influx.ip=http://192.168.40.207:8086 #influx用户名 influx.username=admin #influx密码 -influx.password=admin \ No newline at end of file +influx.password=123456 \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java index af9cd74..ede06c3 100644 --- a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java +++ b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java @@ -60,7 +60,7 @@ public class LogFlowWriteTopology { builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt"); - builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt"); +// builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt"); } public static void main(String[] args) throws Exception { diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java index 57e461f..9c0bb8f 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java +++ b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java @@ -114,5 +114,10 @@ public class TransFormUtils { return ""; } + public static void main(String[] args) { + String s = ipLookup.countryLookup("192.168.10.207"); + System.out.println(s); + } + } diff --git a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java index e697202..5f77996 100644 --- a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java +++ b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java @@ -105,11 +105,12 @@ public class SnowflakeId { if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); } - if (FlowWriteConfig.DATA_CENTER_ID_NUM > maxDataCenterId || FlowWriteConfig.DATA_CENTER_ID_NUM < 0) { + int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM; + if (dataCenterId > maxDataCenterId || dataCenterId < 0) { throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId)); } this.workerId = tmpWorkerId; - this.dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM; + this.dataCenterId = dataCenterId; } // ==============================Methods========================================== diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java b/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java index e57b0a5..15f4506 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java +++ b/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java @@ -196,9 +196,13 @@ public class DistributedLock implements Lock, Watcher { public void run() { DistributedLock lock = null; try { -// lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); -// lock.lock(); - System.out.println(SnowflakeId.generateId()); + lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); + lock.lock(); +// System.out.println(SnowflakeId.generateId()); + System.out.println(1); + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); } finally { if (lock != null) { lock.unlock(); diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java index be6a74a..639b50c 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java +++ b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java @@ -37,16 +37,17 @@ public class ZookeeperUtils implements Watcher { * @param path 节点路径 */ public int modifyNode(String path) { + createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE); createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); - int workerId = 0; + int workerId; try { - connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS); + connectZookeeper(); Stat stat = zookeeper.exists(path, true); workerId = Integer.parseInt(getNodeDate(path)); - if (workerId > 55){ + if (workerId > 55) { workerId = 0; zookeeper.setData(path, "1".getBytes(), stat.getVersion()); - }else { + } else { String result = String.valueOf(workerId + 1); if (stat != null) { zookeeper.setData(path, result.getBytes(), stat.getVersion()); @@ -67,11 +68,10 @@ public class ZookeeperUtils implements Watcher { /** * 连接zookeeper * - * @param host 地址 */ - public void connectZookeeper(String host) { + private void connectZookeeper() { try { - zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); + zookeeper = new ZooKeeper(FlowWriteConfig.ZOOKEEPER_SERVERS, SESSION_TIME_OUT, this); countDownLatch.await(); } catch (IOException | InterruptedException e) { e.printStackTrace(); @@ -81,7 +81,7 @@ public class ZookeeperUtils implements Watcher { /** * 关闭连接 */ - public void closeConn() { + private void closeConn() { try { if (zookeeper != null) { zookeeper.close(); @@ -97,7 +97,7 @@ public class ZookeeperUtils implements Watcher { * @param path 节点路径 * @return 内容/异常null */ - public String getNodeDate(String path) { + private String getNodeDate(String path) { String result = null; Stat stat = new Stat(); try { @@ -115,9 +115,9 @@ public class ZookeeperUtils implements Watcher { * @param date 节点所存储的数据的byte[] * @param acls 控制权限策略 */ - public void createNode(String path, byte[] date, List acls) { + private void createNode(String path, byte[] date, List acls) { try { - connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS); + connectZookeeper(); Stat exists = zookeeper.exists(path, true); if (exists == null) { zookeeper.create(path, date, acls, CreateMode.PERSISTENT); diff --git a/src/test/java/cn/ac/iie/test/zookeeper/KafkaLogNtc.java b/src/test/java/cn/ac/iie/test/zookeeper/KafkaLogNtc.java new file mode 100644 index 0000000..72f670a --- /dev/null +++ b/src/test/java/cn/ac/iie/test/zookeeper/KafkaLogNtc.java @@ -0,0 +1,92 @@ +package cn.ac.iie.test.zookeeper; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.influxdb.InfluxDbUtils; +import org.apache.kafka.clients.producer.*; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * NTC系统配置产生日志写入数据中心类 + * + * @author Administrator + * @create 2018-08-13 15:11 + */ + +public class KafkaLogNtc { + private static Logger logger = Logger.getLogger(KafkaLogNtc.class); + + /** + * kafka生产者,用于向kafka中发送消息 + */ + private static Producer kafkaProducer; + + /** + * kafka生产者适配器(单例),用来代理kafka生产者发送消息 + */ + private static KafkaLogNtc kafkaLogNtc; + + private KafkaLogNtc() { + initKafkaProducer(); + } + + public static KafkaLogNtc getInstance() { + if (kafkaLogNtc == null) { + kafkaLogNtc = new KafkaLogNtc(); + } + return kafkaLogNtc; + } + + + public void sendMessage(List list) { + final int[] errorSum = {0}; + for (String value : list) { + kafkaProducer.send(new ProducerRecord<>("topic001", value), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception); + errorSum[0]++; + } + } + }); + if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) { + InfluxDbUtils.sendKafkaFail(list.size()); + list.clear(); + } + } + kafkaProducer.flush(); + logger.warn("Log sent to National Center successfully!!!!!"); + } + + /** + * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次 + */ + private void initKafkaProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "kafka1:9093"); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", "1"); + properties.put("linger.ms", "2"); + properties.put("request.timeout.ms", 60000); + properties.put("batch.size", 262144); + properties.put("buffer.memory", 33554432); + properties.put("compression.type", "snappy"); + kafkaProducer = new KafkaProducer<>(properties); + } + + public static void main(String[] args) { + KafkaLogNtc kafkaLogNtc = KafkaLogNtc.getInstance(); + List list = new ArrayList<>(); + list.add("a"); + list.add("b"); + list.add("c"); + kafkaLogNtc.sendMessage(list); + + } + +} diff --git a/src/test/java/cn/ac/iie/test/zookeeper/test.java b/src/test/java/cn/ac/iie/test/zookeeper/test.java index b7cb523..75ac221 100644 --- a/src/test/java/cn/ac/iie/test/zookeeper/test.java +++ b/src/test/java/cn/ac/iie/test/zookeeper/test.java @@ -1,5 +1,16 @@ package cn.ac.iie.test.zookeeper; -public class test { +import cn.ac.iie.common.FlowWriteConfig; +import com.zdjizhi.utils.IPUtil; +import com.zdjizhi.utils.IpLookup; +public class test { + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4("Kazakhstan.mmdb") + .loadDataFileV6("Kazakhstan.mmdb") + .build(); + + public static void main(String[] args) { + System.out.println(ipLookup.cityLookupDetail("256.5.5.5")); + } }