diff --git a/pom.xml b/pom.xml
index c7ddb06..bad0fba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
log-completion-schema
- 20210728
+ 210908-security
log-completion-schema
http://www.example.com
@@ -135,13 +135,13 @@
-
- org.apache.flink
- flink-table
- ${flink.version}
- pom
- ${scope.type}
-
+
+
+
+
+
+
+
@@ -155,23 +155,32 @@
org.apache.flink
- flink-streaming-java_2.11
+ flink-streaming-java_2.12
${flink.version}
-
+ ${scope.type}
org.apache.flink
- flink-clients_2.11
+ flink-clients_2.12
${flink.version}
${scope.type}
+
+
+
+
+
+
+
+
+
org.apache.flink
- flink-connector-kafka_2.11
+ flink-connector-kafka_2.12
${flink.version}
diff --git a/properties/default_config.properties b/properties/default_config.properties
index d82130d..c11eeb7 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -22,8 +22,29 @@ buffer.memory=134217728
#10M
max.request.size=10485760
+#kafka SASL验证用户名
+kafka.user=admin
+
+#kafka SASL及SSL验证密码
+kafka.pin=galaxy2019
+
+#kafka source connection timeout
+session.timeout.ms=60000
+
+#kafka source poll
+max.poll.records=3000
+
+#kafka source poll bytes
+max.partition.fetch.bytes=31457280
+
#hbase table name
hbase.table.name=subscriber_info
#邮件默认编码
-mail.default.charset=UTF-8
\ No newline at end of file
+mail.default.charset=UTF-8
+
+#kafka source protocol; SSL or SASL
+kafka.source.protocol=SASL
+
+#kafka sink protocol; SSL or SASL
+kafka.sink.protocol=SASL
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 9bb2f84..362a264 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,37 +1,37 @@
#--------------------------------鍦板潃閰嶇疆------------------------------#
#绠$悊kafka鍦板潃
-input.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
+input.kafka.servers=192.168.44.12:9091
#绠$悊杈撳嚭kafka鍦板潃
-output.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
+output.kafka.servers=192.168.44.12:9091
#zookeeper 鍦板潃 鐢ㄤ簬閰嶇疆log_id
-zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
+zookeeper.servers=192.168.44.12:2181
#hbase zookeeper鍦板潃 鐢ㄤ簬杩炴帴HBase
-hbase.zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
+hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/瀹氫綅搴------------------------------#
#瀹氫綅搴撳湴鍧
-ip.library=/home/bigdata/topology/dat/
+tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
#缃戝叧鐨剆chema浣嶇疆
-schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log
+schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/session_record
#缃戝叧APP_ID 鑾峰彇鎺ュ彛
-app.id.http=http://192.168.44.67:9999/open-api/appDicList
+app.id.http=http://192.168.44.12:9999/open-api/appDicList
#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------#
#kafka 鎺ユ敹鏁版嵁topic
-input.kafka.topic=CONNECTION-RECORD-LOG
+input.kafka.topic=SESSION-RECORD
#琛ュ叏鏁版嵁 杈撳嚭 topic
-output.kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
+output.kafka.topic=SESSION-RECORD-COMPLETED
#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛
-group.id=connection-record-flink-20210809
+group.id=session-record-log-20210902-1
#鐢熶骇鑰呭帇缂╂ā寮 none or snappy
producer.kafka.compression.type=none
@@ -39,22 +39,13 @@ producer.kafka.compression.type=none
#鐢熶骇鑰卆ck
producer.ack=1
-#鎺ユ敹鑷猭afka鐨勬秷璐硅 client-id
-consumer.client.id=consumer-connection-record
-
-#鍥炲啓缁檏afka鐨勭敓浜ц client-id
-producer.client.id=producer-connection-record
-
#--------------------------------topology閰嶇疆------------------------------#
#consumer 骞惰搴
-consumer.parallelism=3
+consumer.parallelism=1
-#map鍑芥暟骞惰搴
-map.parallelism=3
-
-#producer 骞惰搴
-producer.parallelism=3
+#杞崲鍑芥暟骞惰搴
+transform.parallelism=1
#鏁版嵁涓績锛屽彇鍊艰寖鍥(0-63)
data.center.id.num=0
diff --git a/src/main/java/com/zdjizhi/common/DefaultProConfig.java b/src/main/java/com/zdjizhi/common/DefaultProConfig.java
deleted file mode 100644
index b98ea53..0000000
--- a/src/main/java/com/zdjizhi/common/DefaultProConfig.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.zdjizhi.common;
-
-
-import com.zdjizhi.utils.system.FlowWriteConfigurations;
-
-/**
- * @author Administrator
- */
-public class DefaultProConfig {
-
-
- public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
- public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
- public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
- public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
- public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
- public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
- public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
-
-
-}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index bf82757..aa3c757 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -15,22 +15,29 @@ public class FlowWriteConfig {
public static final String IF_CONDITION_SPLITTER = "=";
public static final String MODEL = "remote";
public static final String PROTOCOL_SPLITTER = "\\.";
+
/**
- * System
+ * System config
*/
public static final Integer CONSUMER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "consumer.parallelism");
- public static final Integer MAP_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "map.parallelism");
- public static final Integer PRODUCER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "producer.parallelism");
+ public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
+ public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
+ /**
+ * kafka source config
+ */
+ public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
/**
- * kafka
+ * kafka sink config
*/
public static final String INPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String OUTPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "output.kafka.servers");
@@ -40,14 +47,22 @@ public class FlowWriteConfig {
public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic");
public static final String INPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "input.kafka.topic");
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack");
- public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
+ public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+ public static final String KAFKA_SOURCE_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.source.protocol");
+ public static final String KAFKA_SINK_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.sink.protocol");
+ public static final String KAFKA_USER = FlowWriteConfigurations.getStringProperty(1, "kafka.user");
+ public static final String KAFKA_PIN = FlowWriteConfigurations.getStringProperty(1, "kafka.pin");
/**
- * kafka闄愭祦閰嶇疆-20201117
+ * connection kafka
*/
- public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
- public static final String CONSUMER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "consumer.client.id");
- public static final String PRODUCER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "producer.client.id");
+ public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
/**
* http
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index a9b38ca..5c89522 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -32,22 +32,28 @@ public class LogFlowWriteTopology {
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
//瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑
- DataStream cleaningLog = streamSource.map(new MapCompletedFunction())
- .name("TransFormLogs").setParallelism(FlowWriteConfig.MAP_PARALLELISM);
+ DataStream cleaningLog = streamSource.map(new MapCompletedFunction()).name("TransFormLogs")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//杩囨护绌烘暟鎹笉鍙戦佸埌Kafka鍐
- DataStream result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData");
+ DataStream result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//鍙戦佹暟鎹埌Kafka
result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
- .setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM);
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
} else {
- DataStream result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData");
- result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM);
+ //杩囨护绌烘暟鎹笉鍙戦佸埌Kafka鍐
+ DataStream result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+ //鍙戦佹暟鎹埌Kafka
+ result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
}
try {
environment.execute(args[0]);
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e);
+ e.printStackTrace();
}
}
diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
index d203a2b..168fec2 100644
--- a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
+++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
@@ -30,12 +30,12 @@ public class SnowflakeId {
/**
* 鏈哄櫒id鎵鍗犵殑浣嶆暟
*/
- private final long workerIdBits = 7L;
+ private final long workerIdBits = 8L;
/**
* 鏁版嵁鏍囪瘑id鎵鍗犵殑浣嶆暟
*/
- private final long dataCenterIdBits = 6L;
+ private final long dataCenterIdBits = 5L;
/**
* 鏀寔鐨勬渶澶ф満鍣╥d锛岀粨鏋滄槸63 (杩欎釜绉讳綅绠楁硶鍙互寰堝揩鐨勮绠楀嚭鍑犱綅浜岃繘鍒舵暟鎵鑳借〃绀虹殑鏈澶у崄杩涘埗鏁)
@@ -74,12 +74,12 @@ public class SnowflakeId {
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
- * 宸ヤ綔鏈哄櫒ID(0~127)
+ * 宸ヤ綔鏈哄櫒ID(0~255)
*/
private long workerId;
/**
- * 鏁版嵁涓績ID(0~63)
+ * 鏁版嵁涓績ID(0~31)
*/
private long dataCenterId;
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
index 9fada7b..7dc806e 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -6,7 +6,6 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
-import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookup;
@@ -34,12 +33,12 @@ class TransFunction {
* IP瀹氫綅搴撳伐鍏风被
*/
private static IpLookup ipLookup = new IpLookup.Builder(false)
- .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb")
- .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb")
- .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb")
- .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb")
- .loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
- .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
+ .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
+ .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6.mmdb")
+ .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v4.mmdb")
+ .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v6.mmdb")
+ .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
+ .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
.build();
/**
@@ -93,9 +92,9 @@ class TransFunction {
*/
static String radiusMatch(String ip) {
String account = HBaseUtils.getAccount(ip.trim());
- if (StringUtil.isBlank(account)) {
- logger.warn("HashMap get account is null, Ip is :" + ip);
- }
+// if (StringUtil.isBlank(account)) {
+// logger.warn("HashMap get account is null, Ip is :" + ip);
+// }
return account;
}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
index 60b3d09..710e4b9 100644
--- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
+++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
@@ -2,7 +2,6 @@ package com.zdjizhi.utils.hbase;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -46,7 +45,7 @@ public class HBaseUtils {
*/
private HBaseUtils() {
zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS;
- hBaseTable = DefaultProConfig.HBASE_TABLE_NAME;
+ hBaseTable = FlowWriteConfig.HBASE_TABLE_NAME;
//鑾峰彇杩炴帴
getConnection();
//鎷夊彇鎵鏈
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
new file mode 100644
index 0000000..b09eedb
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/9/610:37
+ */
+class CertUtils {
+ static void chooseCert(String type, Properties properties) {
+ switch (type) {
+ case "SSL":
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", FlowWriteConfig.KAFKA_PIN);
+ break;
+ case "SASL":
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + FlowWriteConfig.KAFKA_USER + " password=" + FlowWriteConfig.KAFKA_PIN + ";");
+ break;
+ default:
+ }
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
index c220064..1036fe9 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
@@ -4,6 +4,7 @@ import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
@@ -25,10 +26,8 @@ public class Consumer {
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- /*
- * kafka闄愭祦閰嶇疆-20201117
- */
-// properties.put(ConsumerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.CONSUMER_CLIENT_ID);
+ CertUtils.chooseCert(FlowWriteConfig.KAFKA_SOURCE_PROTOCOL,properties);
+
return properties;
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
index 077ae71..e1a5b22 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
@@ -1,12 +1,12 @@
package com.zdjizhi.utils.kafka;
-import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -20,21 +20,17 @@ public class Producer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.OUTPUT_KAFKA_SERVERS);
-// properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-// properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
- properties.put("retries", DefaultProConfig.RETRIES);
- properties.put("linger.ms", DefaultProConfig.LINGER_MS);
- properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
- properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
- properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
- properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
+ properties.put("retries", FlowWriteConfig.RETRIES);
+ properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
+ properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
+ properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
+ properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+ CertUtils.chooseCert(FlowWriteConfig.KAFKA_SINK_PROTOCOL, properties);
- /**
- * kafka闄愭祦閰嶇疆-20201117
- */
-// properties.put(ProducerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.PRODUCER_CLIENT_ID);
-// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
return properties;
}
@@ -43,9 +39,10 @@ public class Producer {
FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
FlowWriteConfig.OUTPUT_KAFKA_TOPIC,
new SimpleStringSchema(),
- createProducerConfig());
+ createProducerConfig(), Optional.empty());
kafkaProducer.setLogFailuresOnly(false);
+
// kafkaProducer.setWriteTimestampToKafka(true);
return kafkaProducer;
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
index ebf4368..9efbd46 100644
--- a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
+++ b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
@@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
*/
public class ZookeeperUtils implements Watcher {
private static final Log logger = LogFactory.get();
+ private static final int ID_MAX = 255;
private ZooKeeper zookeeper;
@@ -46,7 +47,7 @@ public class ZookeeperUtils implements Watcher {
connectZookeeper(zookeeperIp);
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
- if (workerId > 63) {
+ if (workerId > ID_MAX) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {
diff --git a/src/test/java/com/zdjizhi/KafkaLogSend.java b/src/test/java/com/zdjizhi/KafkaLogSend.java
deleted file mode 100644
index 5c3feb3..0000000
--- a/src/test/java/com/zdjizhi/KafkaLogSend.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package com.zdjizhi;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.DefaultProConfig;
-import org.apache.kafka.clients.producer.*;
-
-import java.util.Properties;
-
-/**
- * NTC绯荤粺閰嶇疆浜х敓鏃ュ織鍐欏叆鏁版嵁涓績绫
- *
- * @author Administrator
- * @create 2018-08-13 15:11
- */
-
-public class KafkaLogSend {
- private static final Log logger = LogFactory.get();
-
- /**
- * kafka鐢熶骇鑰咃紝鐢ㄤ簬鍚慿afka涓彂閫佹秷鎭
- */
- private static org.apache.kafka.clients.producer.Producer kafkaProducer;
-
- /**
- * kafka鐢熶骇鑰呴傞厤鍣紙鍗曚緥锛夛紝鐢ㄦ潵浠g悊kafka鐢熶骇鑰呭彂閫佹秷鎭
- */
- private static KafkaLogSend kafkaLogSend;
-
- private KafkaLogSend() {
- initKafkaProducer();
- }
-
- public static KafkaLogSend getInstance() {
- if (kafkaLogSend == null) {
- kafkaLogSend = new KafkaLogSend();
- }
- return kafkaLogSend;
- }
-
-
- public void sendMessage(String message) {
-// for (String value : list) {
- kafkaProducer.send(new ProducerRecord<>("test", message), new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- logger.error("鍐欏叆test鍑虹幇寮傚父", exception);
- }
- }
- });
-// }
-// kafkaProducer.flush();
- logger.debug("Log sent to National Center successfully!!!!!");
- }
-
- /**
- * 鏍规嵁kafka鐢熶骇鑰呴厤缃俊鎭垵濮嬪寲kafka娑堟伅鐢熶骇鑰,鍙垵濮嬪寲涓娆
- */
- private void initKafkaProducer() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093");
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("acks", "1");
- properties.put("retries", DefaultProConfig.RETRIES);
- properties.put("linger.ms", DefaultProConfig.LINGER_MS);
- properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
- properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
- properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
- properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
-
- properties.put("security.protocol", "SSL");
- properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks");
- properties.put("ssl.keystore.password", "ceiec2019");
- properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks");
- properties.put("ssl.truststore.password", "ceiec2019");
- properties.put("ssl.key.password", "ceiec2019");
-
-
- /*
- * kafka闄愭祦閰嶇疆-20201117
- */
-// properties.put(ProducerConfig.CLIENT_ID_CONFIG, VoipRelationConfig.PRODUCER_CLIENT_ID);
-// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
-
-
- kafkaProducer = new KafkaProducer<>(properties);
- }
-
-
-}
diff --git a/src/test/java/com/zdjizhi/KafkaTest.java b/src/test/java/com/zdjizhi/KafkaTest.java
index 3bb6d1c..4b034a3 100644
--- a/src/test/java/com/zdjizhi/KafkaTest.java
+++ b/src/test/java/com/zdjizhi/KafkaTest.java
@@ -3,6 +3,7 @@ package com.zdjizhi;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
@@ -17,7 +18,7 @@ public class KafkaTest {
public static void main(String[] args) {
Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093");
+ properties.put("bootstrap.servers", "192.168.44.12:9091");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
@@ -30,12 +31,13 @@ public class KafkaTest {
properties.put("security.protocol", "SSL");
// properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks");
- properties.put("ssl.keystore.location", "/usr/ca/client/client.keystore.jks");
- properties.put("ssl.keystore.password", "ceiec2019");
+ properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\keystore.jks");
+ properties.put("ssl.keystore.password", "galaxy2019");
// properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks");
- properties.put("ssl.truststore.location", "/usr/ca/trust/client.truststore.jks");
- properties.put("ssl.truststore.password", "ceiec2019");
- properties.put("ssl.key.password", "ceiec2019");
+ properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\truststore.jks");
+ properties.put("ssl.truststore.password", "galaxy2019");
+ properties.put("ssl.key.password", "galaxy2019");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
Producer producer = new KafkaProducer(properties);