diff --git a/pom.xml b/pom.xml index 2e03234..59a11e9 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ nexus Team Nexus Repository - http://192.168.10.125:8099/content/groups/public + http://192.168.40.125:8099/content/groups/public @@ -67,6 +67,7 @@ properties **/*.properties + false @@ -84,6 +85,8 @@ UTF-8 1.0.0 1.0.2 + 1.4.9 + 2.7.1 @@ -181,6 +184,90 @@ + + + org.apache.hbase + hbase-client + ${hbase.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hbase + hbase-server + ${hbase.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + diff --git a/properties/core-site.xml b/properties/core-site.xml new file mode 100644 index 0000000..66742f0 --- /dev/null +++ b/properties/core-site.xml @@ -0,0 +1,64 @@ + + + + + + + + + fs.defaultFS + hdfs://ns1 + + + hadoop.tmp.dir + file:/opt/hadoop/tmp + + + io.file.buffer.size + 131702 + + + hadoop.proxyuser.root.hosts + * + + + hadoop.proxyuser.root.groups + * + + + hadoop.logfile.size + 10000000 + The max size of each log file + + + + hadoop.logfile.count + 1 + The max number of log files + + + ha.zookeeper.quorum + 192.168.40.202:2181,192.168.40.203:2181,192.168.40.206:2181 + + +io.compression.codecs +com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec + + +io.compression.codec.lzo.class +com.hadoop.compression.lzo.LzoCodec + + + diff --git a/properties/hbase-site.xml b/properties/hbase-site.xml new file mode 100644 index 0000000..b81715c --- /dev/null +++ b/properties/hbase-site.xml @@ -0,0 +1,81 @@ + + + + + + hbase.rootdir + hdfs://ns1/hbase/hbase-1.4.9 + + + hbase.cluster.distributed + true + + + hbase.zookeeper.quorum + 192.168.40.202,192.168.40.203,192.168.40.206 + + +hbase.master.info.port +60010 + + + + phoenix.schema.isNamespaceMappingEnabled + true + + + phoenix.schema.mapSystemTablesToNamespace + true + + + hbase.client.keyvalue.maxsize + 99428800 + + + hbase.server.keyvalue.maxsize + 99428800 + + + hbase.regionserver.wal.codec + org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec + + + phoenix.query.timeoutMs + 1800000 + + + hbase.regionserver.lease.period + 1200000 + + + hbase.rpc.timeout + 1200000 + + + hbase.client.scanner.caching + 1000 + + + hbase.client.scanner.timeout.period + 1200000 + + diff --git a/properties/hdfs-site.xml b/properties/hdfs-site.xml new file mode 100644 index 0000000..5bf742b --- /dev/null +++ b/properties/hdfs-site.xml @@ -0,0 +1,116 @@ + + + + + + + + + dfs.namenode.name.dir + file:/home/bigdata/hadoop/dfs/name + + + dfs.datanode.data.dir + file:/home/bigdata/hadoop/dfs/data + + + dfs.replication + 2 + + + dfs.namenode.secondary.http-address + 192.168.40.202:9001 + + + dfs.webhdfs.enabled + true + + + dfs.permissions + false + + + dfs.permissions.enabled + false + + + dfs.nameservices + ns1 + + + dfs.blocksize + 134217728 + + + dfs.ha.namenodes.ns1 + nn1,nn2 + + + + dfs.namenode.rpc-address.ns1.nn1 + 192.168.40.202:9000 + + + + dfs.namenode.http-address.ns1.nn1 + 192.168.40.202:50070 + + + + dfs.namenode.rpc-address.ns1.nn2 + 192.168.40.203:9000 + + + + dfs.namenode.http-address.ns1.nn2 + 192.168.40.203:50070 + + + + dfs.namenode.shared.edits.dir + qjournal://192.168.40.203:8485;192.168.40.206:8485;192.168.40.202:8485/ns1 + + + + dfs.journalnode.edits.dir + /home/bigdata/hadoop/journal + + + + dfs.client.failover.proxy.provider.ns1 + org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider + + + + dfs.ha.fencing.methods + sshfence + + + + dfs.ha.fencing.ssh.private-key-files + /root/.ssh/id_rsa + + + + dfs.ha.fencing.ssh.connect-timeout + 30000 + + + + dfs.ha.automatic-failover.enabled + true + + + diff --git a/properties/redis_config.properties b/properties/redis_config.properties index f99d396..3c529de 100644 --- a/properties/redis_config.properties +++ b/properties/redis_config.properties @@ -1,8 +1,8 @@ #*****************jedis连接参数设置********************* #redis服务器ip -redis.ip=192.168.40.123 +redis.ip=192.168.40.153 #redis服务器端口号 -redis.port=6379 +redis.port=19000 #与服务器建立连接的超时时间 redis.timeout=3000 #************************jedis池参数设置******************* diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 3077812..0e64802 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,27 +1,29 @@ #管理kafka地址 -#bootstrap.servers=10.4.35.7:9092,10.4.35.8:9092,10.4.35.9:9092 -bootstrap.servers=192.168.6.200:9093,192.168.6.200:9094,192.168.6.200:9095 +bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 #zookeeper 地址 -zookeeper.servers=192.168.6.200:2181 -#zookeeper.servers=192.168.40.207:2181 +zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 + +#hbase zookeeper地址 +hbase.zookeeper.servers=192.168.40.203:2186 + +#hbase tablename +hbase.table.name=subcriber_info #latest/earliest auto.offset.reset=latest #kafka broker下的topic名称 kafka.topic=SESSION-RECORD-LOG -#kafka.topic=Snowflake-test #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=session-record-log-z +group.id=session-completion-program #输出topic -#results.output.topic=SESSION-TEST-COMPLETED-LOG results.output.topic=SESSION-RECORD-COMPLETED-LOG #storm topology workers -topology.workers=1 +topology.workers=2 #spout并行度 建议与kafka分区数相同 spout.parallelism=3 @@ -33,16 +35,16 @@ datacenter.bolt.parallelism=3 kafka.bolt.parallelism=3 #定位库地址 -ip.library=/dat/ +ip.library=/home/ceiec/topology/dat/ #kafka批量条数 batch.insert.num=5000 #数据中心(UID) -data.center.id.num=15 +data.center.id.num=10 #tick时钟频率 -topology.tick.tuple.freq.secs=5 +topology.tick.tuple.freq.secs=60 #当bolt性能受限时,限制spout接收速度,理论看ack开启才有效 topology.config.max.spout.pending=150000 @@ -66,4 +68,4 @@ influx.ip=http://192.168.40.151:8086 influx.username=admin #influx密码 -influx.password=admin \ No newline at end of file +influx.password=admin diff --git a/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java index e67b6cf..109f856 100644 --- a/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java @@ -1,5 +1,8 @@ package cn.ac.iie.bolt; +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.hbase.HbaseUtils; +import cn.ac.iie.utils.system.TupleUtils; import com.zdjizhi.utils.StringUtil; import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; @@ -10,6 +13,7 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import java.util.HashMap; import java.util.Map; import static cn.ac.iie.utils.general.TransFormUtils.getJsonMessage; @@ -31,15 +35,28 @@ public class ConnCompletionBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { try { - String message = tuple.getString(0); - if (StringUtil.isNotBlank(message)) { - basicOutputCollector.emit(new Values(getJsonMessage(message))); + if (TupleUtils.isTick(tuple)) { +// HbaseUtils.change(); + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(getJsonMessage(message))); + } } + } catch (Exception e) { logger.error("接收解析过程出现异常", e); } } + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + return conf; + } + @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("connLog")); diff --git a/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java index c19acff..e260447 100644 --- a/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java +++ b/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java @@ -60,7 +60,7 @@ public class NtcLogSendBolt extends BaseBasicBolt { } } } catch (Exception e) { - logger.error("日志发送Kafka过程出现异常 ", e); + logger.error("日志发送Kafka过程出现异常"); e.printStackTrace(); } } diff --git a/src/main/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java new file mode 100644 index 0000000..9b6ebde --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java @@ -0,0 +1,49 @@ +package cn.ac.iie.bolt.radius; + +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +import static cn.ac.iie.utils.general.TransFormUtils.getRadiusMessage; + +/** + * 通联关系日志补全 + * + * @author qidaijie + */ +public class RadiusCompletionBolt extends BaseBasicBolt { + private static final long serialVersionUID = -3657802387129063952L; + private final static Logger logger = Logger.getLogger(RadiusCompletionBolt.class); + + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(getRadiusMessage(message))); + } + } catch (Exception e) { + logger.error("接收解析过程出现异常", e); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("connLog")); + } + +} diff --git a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java index 26e2173..66e124d 100644 --- a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java +++ b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java @@ -40,6 +40,8 @@ public class FlowWriteConfig { */ public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers"); public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); + public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(0, "hbase.table.name"); public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic"); public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic"); diff --git a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java index ede06c3..6b677d8 100644 --- a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java +++ b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java @@ -3,7 +3,7 @@ package cn.ac.iie.topology; import cn.ac.iie.bolt.ConnCompletionBolt; import cn.ac.iie.bolt.NtcLogSendBolt; -import cn.ac.iie.bolt.SummaryBolt; +import cn.ac.iie.bolt.radius.RadiusCompletionBolt; import cn.ac.iie.common.FlowWriteConfig; import cn.ac.iie.spout.CustomizedKafkaSpout; import org.apache.log4j.Logger; @@ -58,8 +58,13 @@ public class LogFlowWriteTopology { private void buildTopology() { builder = new TopologyBuilder(); builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); - builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); - builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt"); + if ("RADIUS-LOG".equals(FlowWriteConfig.KAFKA_TOPIC)) { + builder.setBolt("RadiusCompletionBolt", new RadiusCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt"); + } else { + builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt"); + } // builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt"); } diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java index 9c0bb8f..3790c35 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java +++ b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java @@ -2,6 +2,7 @@ package cn.ac.iie.utils.general; import cn.ac.iie.bean.SessionRecordLog; import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.hbase.HbaseUtils; import cn.ac.iie.utils.redis.RedisPollUtils; import cn.ac.iie.utils.system.SnowflakeId; import cn.ac.iie.utils.zookeeper.DistributedLock; @@ -31,8 +32,6 @@ public class TransFormUtils { .loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") .build(); -// private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); -// private static SnowflakeId snowflakeId = new SnowflakeId(); /** * 解析日志,并补全 @@ -52,14 +51,40 @@ public class TransFormUtils { sessionRecordLog.setServer_asn(ipLookup.asnLookup(serverIp, true)); sessionRecordLog.setDomain(getTopDomain(sessionRecordLog.getSni(), sessionRecordLog.getHost())); sessionRecordLog.setRecv_time(System.currentTimeMillis() / 1000); -// sessionRecordLog.setSubscribe_id(getSubscribeId(clientIp)); +// sessionRecordLog.setSubscribe_id(HbaseUtils.getAccount(clientIp)); return JSONObject.toJSONString(sessionRecordLog); } catch (Exception e) { - logger.error("日志解析过程出现异常", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e); return ""; } } + + /** + * 解析日志,并补全 + * + * @param message radius原始日志 + * @return 补全后的日志 + */ + public static String getRadiusMessage(String message) { + SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class); + String serverIp = sessionRecordLog.getServer_ip(); + String clientIp = sessionRecordLog.getClient_ip(); + try { + sessionRecordLog.setUid(SnowflakeId.generateId()); + sessionRecordLog.setServer_location(ipLookup.countryLookup(serverIp)); + sessionRecordLog.setClient_location(ipLookup.cityLookupDetail(clientIp)); + sessionRecordLog.setClient_asn(ipLookup.asnLookup(clientIp, true)); + sessionRecordLog.setServer_asn(ipLookup.asnLookup(serverIp, true)); + sessionRecordLog.setRecv_time(System.currentTimeMillis() / 1000); + return JSONObject.toJSONString(sessionRecordLog); + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e); + return ""; + } + } + + /** * 有sni通过sni获取域名,有hots根据host获取域名 * @@ -67,7 +92,7 @@ public class TransFormUtils { * @param host host * @return 顶级域名 */ - private static String getTopDomain(String sni, String host) { + public static String getTopDomain(String sni, String host) { if (StringUtil.isNotBlank(sni)) { return getDomain(sni); } else if (StringUtil.isNotBlank(host)) { diff --git a/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java new file mode 100644 index 0000000..c7b05e3 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java @@ -0,0 +1,124 @@ +package cn.ac.iie.utils.hbase; + +import cn.ac.iie.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + */ +public class HbaseUtils { + private final static Logger logger = Logger.getLogger(HbaseUtils.class); + private static Map subIdMap = new HashMap<>(16); + private static Connection connection; + private static Long time; + +// static { +// // 管理Hbase的配置信息 +// Configuration configuration = HBaseConfiguration.create(); +// // 设置zookeeper节点 +// configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS); +// try { +// connection = ConnectionFactory.createConnection(configuration); +// time = System.currentTimeMillis(); +// getAll(); +// } catch (IOException e) { +// logger.error("获取HBase连接失败"); +// e.printStackTrace(); +// } +// } + + /** + * 更新变量 + */ + public static void change() { + Long nowTime = System.currentTimeMillis(); + timestampsFilter(time - 1500, nowTime + 500); + } + + /** + * 获取变更内容 + * + * @param startTime 开始时间 + * @param endTime 结束时间 + */ + private static void timestampsFilter(Long startTime, Long endTime) { + Table table = null; + TableName tableName = TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME); + Admin admin = null; + try { + table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME)); + Scan scan2 = new Scan(); + scan2.setTimeRange(startTime, endTime); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + admin = connection.getAdmin(); + admin.flush(tableName); + logger.warn("当前集合长度" + subIdMap.keySet().size()); + logger.warn("更新后集合keys:" + subIdMap.keySet()); + time = endTime; + scanner.close(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (table != null) { + try { + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + /** + * 获取所有的 key value + */ + private static void getAll() { + try { + Table table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME)); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + logger.warn("获取全量后集合keys:" + subIdMap.keySet()); + scanner.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 获取 account + * + * @param ip + * @return + */ + public static String getAccount(String ip) { + if (StringUtil.isNotBlank(ip)) { + return subIdMap.get(ip); + } else { + return ""; + } + } +} diff --git a/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java b/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java index 11ae57a..1bf76eb 100644 --- a/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java +++ b/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java @@ -59,7 +59,7 @@ public class KafkaLogNtc { } } kafkaProducer.flush(); - logger.warn("Log sent to National Center successfully!!!!!"); + logger.debug("Log sent to National Center successfully!!!!!"); } /** @@ -75,7 +75,7 @@ public class KafkaLogNtc { properties.put("request.timeout.ms", 60000); properties.put("batch.size", 262144); properties.put("buffer.memory", 33554432); - properties.put("compression.type", "snappy"); +// properties.put("compression.type", "snappy"); kafkaProducer = new KafkaProducer<>(properties); } diff --git a/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java b/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java index 378bef5..0394378 100644 --- a/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java +++ b/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java @@ -43,7 +43,7 @@ public class RedisPollUtils { poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONBORROW))); // 根据配置实例化jedis池 jedisPool = new JedisPool(poolConfig, props.getProperty(FlowWriteConfig.REDIS_IP), - Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_PORT))); + Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_PORT)), 3000, "123456"); } catch (Exception e) { logger.error("Redis连接池初始化错误", e); } @@ -96,19 +96,15 @@ public class RedisPollUtils { // return workId; // } - public static Integer getWorkerId(String key) { - int workId = 0; + public static String getWorkerId(String key) { + String sub = ""; try (Jedis jedis = RedisPollUtils.getJedis()) { - if (jedis != null) { - workId = Integer.parseInt(jedis.get(key)); - jedis.set(key, String.valueOf(workId + 2)); - logger.error("\n工作id是:" + workId + "\n"); - } + sub = jedis.get(key); } catch (Exception e) { logger.error("通过Redis获取用户名出现异常", e); - workId = RandomUtils.nextInt(0, 31); + } - return workId; + return sub; } diff --git a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java index 5f77996..5c3462d 100644 --- a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java +++ b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java @@ -2,6 +2,7 @@ package cn.ac.iie.utils.system; import cn.ac.iie.common.FlowWriteConfig; import cn.ac.iie.utils.zookeeper.DistributedLock; +import cn.ac.iie.utils.zookeeper.ZooKeeperLock; import cn.ac.iie.utils.zookeeper.ZookeeperUtils; import org.apache.log4j.Logger; @@ -98,19 +99,40 @@ public class SnowflakeId { /** * 构造函数 */ +// private SnowflakeId() { +// DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); +// lock.lock(); +// int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); +// if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { +// throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); +// } +// int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM; +// if (dataCenterId > maxDataCenterId || dataCenterId < 0) { +// throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId)); +// } +// this.workerId = tmpWorkerId; +// this.dataCenterId = dataCenterId; +// } + private SnowflakeId() { - DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); - lock.lock(); - int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); - if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { - throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); + ZooKeeperLock lock = new ZooKeeperLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "/locks", "disLocks"); + if (lock.lock()) { + int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); + if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { + throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); + } + int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM; + if (dataCenterId > maxDataCenterId || dataCenterId < 0) { + throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId)); + } + this.workerId = tmpWorkerId; + this.dataCenterId = dataCenterId; + try { + lock.unlock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM; - if (dataCenterId > maxDataCenterId || dataCenterId < 0) { - throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId)); - } - this.workerId = tmpWorkerId; - this.dataCenterId = dataCenterId; } // ==============================Methods========================================== diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java b/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java index 15f4506..f85a84e 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java +++ b/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java @@ -190,27 +190,14 @@ public class DistributedLock implements Lock, Watcher { public static void main(String[] args) { ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); - Runnable runnable = new Runnable() { @Override public void run() { - DistributedLock lock = null; - try { - lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); - lock.lock(); -// System.out.println(SnowflakeId.generateId()); - System.out.println(1); - Thread.sleep(3000); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - if (lock != null) { - lock.unlock(); - } - } + DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); + lock.lock(); + lock.unlock(); } }; - for (int i = 0; i < 10; i++) { Thread t = new Thread(runnable); t.start(); diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java b/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java new file mode 100644 index 0000000..6cff343 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java @@ -0,0 +1,170 @@ +package cn.ac.iie.utils.zookeeper; + +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class ZooKeeperLock implements Watcher { + private ZooKeeper zk = null; + private String rootLockNode; // 锁的根节点 + private String lockName; // 竞争资源,用来生成子节点名称 + private String currentLock; // 当前锁 + private String waitLock; // 等待的锁(前一个锁) + private CountDownLatch countDownLatch; // 计数器(用来在加锁失败时阻塞加锁线程) + private int sessionTimeout = 30000; // 超时时间 + + // 1. 构造器中创建ZK链接,创建锁的根节点 + public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) { + this.rootLockNode = rootLockNode; + this.lockName = lockName; + try { + // 创建连接,zkAddress格式为:IP:PORT + zk = new ZooKeeper(zkAddress, this.sessionTimeout, this); + // 检测锁的根节点是否存在,不存在则创建 + Stat stat = zk.exists(rootLockNode, false); + if (null == stat) { + zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (IOException | InterruptedException | KeeperException e) { + e.printStackTrace(); + } + } + + // 2. 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放 + public boolean lock() { + if (this.tryLock()) { + System.out.println("线程【" + Thread.currentThread().getName() + "】加锁(" + this.currentLock + ")成功!"); + return true; + } else { + return waitOtherLock(this.waitLock, this.sessionTimeout); + } + } + + public boolean tryLock() { + // 分隔符 + String split = "_lock_"; + if (this.lockName.contains("_lock_")) { + throw new RuntimeException("lockName can't contains '_lock_' "); + } + try { + // 创建锁节点(临时有序节点) + this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + System.out.println("线程【" + Thread.currentThread().getName() + + "】创建锁节点(" + this.currentLock + ")成功,开始竞争..."); + // 取所有子节点 + List nodes = zk.getChildren(this.rootLockNode, false); + // 取所有竞争lockName的锁 + List lockNodes = new ArrayList(); + for (String nodeName : nodes) { + if (nodeName.split(split)[0].equals(this.lockName)) { + lockNodes.add(nodeName); + } + } + Collections.sort(lockNodes); + // 取最小节点与当前锁节点比对加锁 + String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0); + if (this.currentLock.equals(currentLockPath)) { + return true; + } + // 加锁失败,设置前一节点为等待锁节点 + String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1); + int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1; + this.waitLock = lockNodes.get(preNodeIndex); + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); + } + return false; + } + + private boolean waitOtherLock(String waitLock, int sessionTimeout) { + boolean islock = false; + try { + // 监听等待锁节点 + String waitLockNode = this.rootLockNode + "/" + waitLock; + Stat stat = zk.exists(waitLockNode, true); + if (null != stat) { + System.out.println("线程【" + Thread.currentThread().getName() + + "】锁(" + this.currentLock + ")加锁失败,等待锁(" + waitLockNode + ")释放..."); + // 设置计数器,使用计数器阻塞线程 + this.countDownLatch = new CountDownLatch(1); + islock = this.countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS); + this.countDownLatch = null; + if (islock) { + System.out.println("线程【" + Thread.currentThread().getName() + "】锁(" + + this.currentLock + ")加锁成功,锁(" + waitLockNode + ")已经释放"); + } else { + System.out.println("线程【" + Thread.currentThread().getName() + "】锁(" + + this.currentLock + ")加锁失败..."); + } + } else { + islock = true; + } + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); + } + return islock; + } + + // 3. 释放锁 + public void unlock() throws InterruptedException { + try { + Stat stat = zk.exists(this.currentLock, false); + if (null != stat) { + System.out.println("线程【" + Thread.currentThread().getName() + "】释放锁 " + this.currentLock); + zk.delete(this.currentLock, -1); + this.currentLock = null; + } + } catch (InterruptedException | KeeperException e) { + e.printStackTrace(); + } finally { + zk.close(); + } + } + + // 4. 监听器回调 + @Override + public void process(WatchedEvent watchedEvent) { + if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) { + // 计数器减一,恢复线程操作 + this.countDownLatch.countDown(); + } + } + + + public static void doSomething() { + + } + + public static void main(String[] args) { + Runnable runnable = new Runnable() { + @Override + public void run() { + ZooKeeperLock lock = null; + lock = new ZooKeeperLock("192.168.40.119:2181", "/locks", "test1"); + if (lock.lock()) { + doSomething(); + try { +// Thread.sleep(1000); + lock.unlock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + }; + + for (int i = 0; i < 10; i++) { + Thread t = new Thread(runnable); + t.start(); + } + } + + +} diff --git a/src/test/java/cn/ac/iie/test/test.java b/src/test/java/cn/ac/iie/test/test.java index 9d2d332..705a0f7 100644 --- a/src/test/java/cn/ac/iie/test/test.java +++ b/src/test/java/cn/ac/iie/test/test.java @@ -4,43 +4,30 @@ import cn.ac.iie.bean.SessionRecordLog; import cn.ac.iie.common.FlowWriteConfig; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.serializer.SerializerFeature; import com.zdjizhi.utils.IpLookup; import org.junit.Test; import javax.servlet.http.HttpServletRequest; import java.math.BigInteger; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; public class test { public static void main(String[] args) { - String message = "{\"str_ea_m-t-r-a-ceid\":\"JSON\"}"; + String message = "{\"str_ea_m-t-r-a-ceid\":\"JSON\",\"uid\":\"0\"}"; SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class); - System.out.println(sessionRecordLog.getStream_trace_id()); - - String message2 = "{\"streamtraceid\":\"JSON\"}"; - SessionRecordLog sessionRecordLog2 = JSONObject.parseObject(message2, SessionRecordLog.class); - System.out.println(sessionRecordLog2.getStream_trace_id()); - - JSONObject jsonObject = JSON.parseObject(message); - System.out.println("\n" + Arrays.toString(jsonObject.keySet().toArray())); - - HttpServletRequest request = null; - if (request != null) { - String contextPath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + request.getContextPath(); - } - - System.out.println(System.currentTimeMillis() / 1000); + System.out.println(JSONObject.toJSONString(sessionRecordLog)); } @Test public void test2() { -// String minTimeStampStr = "00000000000000000000000000000000000000000"; - String minTimeStampStr = "000000000000000000000000000000000000000"; - long minTimeStamp = new BigInteger(minTimeStampStr, 2).longValue(); -// String maxTimeStampStr = "11111111111111111111111111111111111111111"; - String maxTimeStampStr = "111111111111111111111111111111111111111"; - long maxTimeStamp = new BigInteger(maxTimeStampStr, 2).longValue(); - long oneYearMills = 1L * 1000 * 60 * 60 * 24 * 365; - System.out.println((maxTimeStamp - minTimeStamp) / oneYearMills); + Map map = new HashMap<>(); + map.put("a","a"); + map.put("b","a"); + map.put("c","a"); + + System.out.println(map.keySet()); } } diff --git a/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java b/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java index f442ebe..1423f98 100644 --- a/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java +++ b/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java @@ -1,26 +1,182 @@ package cn.ac.iie.test.zookeeper; +import cn.ac.iie.common.FlowWriteConfig; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; -public interface DistributedLock { - /** - * 获取锁,如果没有得到就等待 - */ - public void acquire() throws Exception; +public class DistributedLock implements Lock, Watcher { + private ZooKeeper zk; + private String root = "/locks";//根 + private String lockName;//竞争资源的标志 + private String waitNode;//等待前一个锁 + private String myZnode;//当前锁 + private CountDownLatch latch;//计数器 + private CountDownLatch connectedSignal = new CountDownLatch(1); + private int sessionTimeout = 2000; + public static void main(String[] args) { + DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); + lock.lock(); + System.out.println(1); + if (lock!=null){ + lock.unlock(); + } + } /** - * 获取锁,直到超时 + * 创建分布式锁,使用前请确认config配置的zookeeper服务可用 * - * @param unit time参数的单位 - * @throws Exception - * @return是否获取到锁 + * @param config 192.168.1.127:2181 + * @param lockName 竞争资源标志,lockName中不能包含单词_lock_ */ - public boolean acquire(long time, TimeUnit unit) throws Exception; + public DistributedLock(String config, String lockName) { + this.lockName = lockName; + // 创建一个与服务器的连接 + try { + zk = new ZooKeeper(config, sessionTimeout, this); + connectedSignal.await(); + Stat stat = zk.exists(root, false);//此去不执行 Watcher + if (stat == null) { + // 创建根节点 + zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (IOException e) { + throw new LockException(e); + } catch (KeeperException e) { + throw new LockException(e); + } catch (InterruptedException e) { + throw new LockException(e); + } + } /** - * 释放锁 - * - * @throws Exception + * zookeeper节点的监视器 */ - public void release() throws Exception; -} + public void process(WatchedEvent event) { + //建立连接用 + if (event.getState() == Event.KeeperState.SyncConnected) { + connectedSignal.countDown(); + return; + } + //其他线程放弃锁的标志 + if (this.latch != null) { + this.latch.countDown(); + } + } + + public void lock() { + try { + if (this.tryLock()) { + System.out.println("Thread " + Thread.currentThread().getId() + " " + myZnode + " get lock true"); + return; + } else { + waitForLock(waitNode, sessionTimeout);//等待锁 + } + } catch (KeeperException e) { + throw new LockException(e); + } catch (InterruptedException e) { + throw new LockException(e); + } + } + + public boolean tryLock() { + try { + String splitStr = "_lock_"; + if (lockName.contains(splitStr)) + throw new LockException("lockName can not contains \\u000B"); + //创建临时子节点 + myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + System.out.println(myZnode + " is created "); + //取出所有子节点 + List subNodes = zk.getChildren(root, false); + //取出所有lockName的锁 + List lockObjNodes = new ArrayList(); + for (String node : subNodes) { + String _node = node.split(splitStr)[0]; + if (_node.equals(lockName)) { + lockObjNodes.add(node); + } + } + Collections.sort(lockObjNodes); + + if (myZnode.equals(root + "/" + lockObjNodes.get(0))) { + //如果是最小的节点,则表示取得锁 + System.out.println(myZnode + "==" + lockObjNodes.get(0)); + return true; + } + //如果不是最小的节点,找到比自己小1的节点 + String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); + waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);//找到前一个子节点 + } catch (KeeperException e) { + throw new LockException(e); + } catch (InterruptedException e) { + throw new LockException(e); + } + return false; + } + + public boolean tryLock(long time, TimeUnit unit) { + try { + if (this.tryLock()) { + return true; + } + return waitForLock(waitNode, time); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { + Stat stat = zk.exists(root + "/" + lower, true);//同时注册监听。 + //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 + if (stat != null) { + System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); + this.latch = new CountDownLatch(1); + this.latch.await(waitTime, TimeUnit.MILLISECONDS);//等待,这里应该一直等待其他线程释放锁 + this.latch = null; + } + return true; + } + + public void unlock() { + try { + System.out.println("unlock " + myZnode); + zk.delete(myZnode, -1); + myZnode = null; + zk.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (KeeperException e) { + e.printStackTrace(); + } + } + + public void lockInterruptibly() throws InterruptedException { + this.lock(); + } + + public Condition newCondition() { + return null; + } + + public class LockException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public LockException(String e) { + super(e); + } + + public LockException(Exception e) { + super(e); + } + } +} \ No newline at end of file diff --git a/src/test/java/cn/ac/iie/test/zookeeper/RandomTest.java b/src/test/java/cn/ac/iie/test/zookeeper/RandomTest.java new file mode 100644 index 0000000..3730073 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/zookeeper/RandomTest.java @@ -0,0 +1,8 @@ +package cn.ac.iie.test.zookeeper; + +public class RandomTest { + public static void main(String[] args) { + + + } +}