2019-09-05修改版

This commit is contained in:
qidaijie
2019-09-05 17:26:02 +08:00
parent a5070711f6
commit a8af7eea66
8 changed files with 146 additions and 30 deletions

View File

@@ -1,36 +1,39 @@
#管理kafka地址
#bootstrap.servers=10.4.35.7:9092,10.4.35.8:9092,10.4.35.9:9092
bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
bootstrap.servers=192.168.6.200:9093,192.168.6.200:9094,192.168.6.200:9095
#zookeeper 地址
zookeeper.servers=192.168.40.207:2181
zookeeper.servers=192.168.6.200:2181
#zookeeper.servers=192.168.40.207:2181
#latest/earliest
auto.offset.reset=latest
#kafka broker下的topic名称
kafka.topic=SESSION-TEST-LOG
kafka.topic=SESSION-RECORD-LOG
#kafka.topic=Snowflake-test
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=session-record-log-g
group.id=session-record-log-z
#输出topic
results.output.topic=SESSION-TEST-COMPLETED-LOG
#results.output.topic=SESSION-TEST-COMPLETED-LOG
results.output.topic=SESSION-RECORD-COMPLETED-LOG
#storm topology workers
topology.workers=6
topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=12
spout.parallelism=3
#处理补全操作的bolt并行度-worker的倍数
datacenter.bolt.parallelism=30
datacenter.bolt.parallelism=3
#写入kafkad的并行度
kafka.bolt.parallelism=30
kafka.bolt.parallelism=3
#定位库地址
ip.library=/home/ceiec/topology/dat/
ip.library=/dat/
#kafka批量条数
batch.insert.num=5000
@@ -57,10 +60,10 @@ check.ip.scope=10,100,192
max.failure.num=20
#influx地址
influx.ip=http://10.0.5.19:8086
influx.ip=http://192.168.40.207:8086
#influx用户名
influx.username=admin
#influx密码
influx.password=admin
influx.password=123456

View File

@@ -60,7 +60,7 @@ public class LogFlowWriteTopology {
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt");
builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt");
// builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt");
}
public static void main(String[] args) throws Exception {

View File

@@ -114,5 +114,10 @@ public class TransFormUtils {
return "";
}
public static void main(String[] args) {
String s = ipLookup.countryLookup("192.168.10.207");
System.out.println(s);
}
}

View File

@@ -105,11 +105,12 @@ public class SnowflakeId {
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (FlowWriteConfig.DATA_CENTER_ID_NUM > maxDataCenterId || FlowWriteConfig.DATA_CENTER_ID_NUM < 0) {
int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
}
this.workerId = tmpWorkerId;
this.dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
this.dataCenterId = dataCenterId;
}
// ==============================Methods==========================================

View File

@@ -196,9 +196,13 @@ public class DistributedLock implements Lock, Watcher {
public void run() {
DistributedLock lock = null;
try {
// lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
// lock.lock();
System.out.println(SnowflakeId.generateId());
lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
lock.lock();
// System.out.println(SnowflakeId.generateId());
System.out.println(1);
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock != null) {
lock.unlock();

View File

@@ -37,16 +37,17 @@ public class ZookeeperUtils implements Watcher {
* @param path 节点路径
*/
public int modifyNode(String path) {
createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE);
createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
int workerId = 0;
int workerId;
try {
connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS);
connectZookeeper();
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > 55){
if (workerId > 55) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
}else {
} else {
String result = String.valueOf(workerId + 1);
if (stat != null) {
zookeeper.setData(path, result.getBytes(), stat.getVersion());
@@ -67,11 +68,10 @@ public class ZookeeperUtils implements Watcher {
/**
* 连接zookeeper
*
* @param host 地址
*/
public void connectZookeeper(String host) {
private void connectZookeeper() {
try {
zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
zookeeper = new ZooKeeper(FlowWriteConfig.ZOOKEEPER_SERVERS, SESSION_TIME_OUT, this);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
@@ -81,7 +81,7 @@ public class ZookeeperUtils implements Watcher {
/**
* 关闭连接
*/
public void closeConn() {
private void closeConn() {
try {
if (zookeeper != null) {
zookeeper.close();
@@ -97,7 +97,7 @@ public class ZookeeperUtils implements Watcher {
* @param path 节点路径
* @return 内容/异常null
*/
public String getNodeDate(String path) {
private String getNodeDate(String path) {
String result = null;
Stat stat = new Stat();
try {
@@ -115,9 +115,9 @@ public class ZookeeperUtils implements Watcher {
* @param date 节点所存储的数据的byte[]
* @param acls 控制权限策略
*/
public void createNode(String path, byte[] date, List<ACL> acls) {
private void createNode(String path, byte[] date, List<ACL> acls) {
try {
connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS);
connectZookeeper();
Stat exists = zookeeper.exists(path, true);
if (exists == null) {
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);

View File

@@ -0,0 +1,92 @@
package cn.ac.iie.test.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.influxdb.InfluxDbUtils;
import org.apache.kafka.clients.producer.*;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* NTC系统配置产生日志写入数据中心类
*
* @author Administrator
* @create 2018-08-13 15:11
*/
public class KafkaLogNtc {
private static Logger logger = Logger.getLogger(KafkaLogNtc.class);
/**
* kafka生产者用于向kafka中发送消息
*/
private static Producer<String, String> kafkaProducer;
/**
* kafka生产者适配器单例用来代理kafka生产者发送消息
*/
private static KafkaLogNtc kafkaLogNtc;
private KafkaLogNtc() {
initKafkaProducer();
}
public static KafkaLogNtc getInstance() {
if (kafkaLogNtc == null) {
kafkaLogNtc = new KafkaLogNtc();
}
return kafkaLogNtc;
}
public void sendMessage(List<String> list) {
final int[] errorSum = {0};
for (String value : list) {
kafkaProducer.send(new ProducerRecord<>("topic001", value), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
errorSum[0]++;
}
}
});
if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) {
InfluxDbUtils.sendKafkaFail(list.size());
list.clear();
}
}
kafkaProducer.flush();
logger.warn("Log sent to National Center successfully!!!!!");
}
/**
* 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次
*/
private void initKafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka1:9093");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
properties.put("linger.ms", "2");
properties.put("request.timeout.ms", 60000);
properties.put("batch.size", 262144);
properties.put("buffer.memory", 33554432);
properties.put("compression.type", "snappy");
kafkaProducer = new KafkaProducer<>(properties);
}
public static void main(String[] args) {
KafkaLogNtc kafkaLogNtc = KafkaLogNtc.getInstance();
List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");
kafkaLogNtc.sendMessage(list);
}
}

View File

@@ -1,5 +1,16 @@
package cn.ac.iie.test.zookeeper;
public class test {
import cn.ac.iie.common.FlowWriteConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.IpLookup;
public class test {
private static IpLookup ipLookup = new IpLookup.Builder(false)
.loadDataFileV4("Kazakhstan.mmdb")
.loadDataFileV6("Kazakhstan.mmdb")
.build();
public static void main(String[] args) {
System.out.println(ipLookup.cityLookupDetail("256.5.5.5"));
}
}