更新2109版本

This commit is contained in:
qidaijie
2021-09-27 11:11:56 +08:00
parent fcd97b7aab
commit 4b68261130
15 changed files with 168 additions and 206 deletions

33
pom.xml
View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId>
<version>20210728</version>
<version>210908-security</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>
@@ -135,13 +135,13 @@
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
<type>pom</type>
<scope>${scope.type}</scope>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.flink</groupId>-->
<!--<artifactId>flink-table</artifactId>-->
<!--<version>${flink.version}</version>-->
<!--<type>pom</type>-->
<!--<scope>${scope.type}</scope>-->
<!--</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
@@ -155,23 +155,32 @@
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<!--<scope>${scope.type}</scope>-->
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!--test-->
<!--<dependency>-->
<!--<groupId>org.apache.flink</groupId>-->
<!--<artifactId>flink-connector-hbase-2.2_2.12</artifactId>-->
<!--<version>${flink.version}</version>-->
<!--<scope>${scope.type}</scope>-->
<!--</dependency>-->
<!--test-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<!--<scope>${scope.type}</scope>-->
</dependency>

View File

@@ -22,8 +22,29 @@ buffer.memory=134217728
#10M
max.request.size=10485760
#kafka SASL<53><4C>֤<EFBFBD>û<EFBFBD><C3BB><EFBFBD>
kafka.user=admin
#kafka SASL<53><4C>SSL<53><4C>֤<EFBFBD><D6A4><EFBFBD><EFBFBD>
kafka.pin=galaxy2019
#kafka source connection timeout
session.timeout.ms=60000
#kafka source poll
max.poll.records=3000
#kafka source poll bytes
max.partition.fetch.bytes=31457280
#hbase table name
hbase.table.name=subscriber_info
#<23>ʼ<EFBFBD>Ĭ<EFBFBD>ϱ<EFBFBD><CFB1><EFBFBD>
mail.default.charset=UTF-8
mail.default.charset=UTF-8
#kafka source protocol; SSL or SASL
kafka.source.protocol=SASL
#kafka sink protocol; SSL or SASL
kafka.sink.protocol=SASL

View File

@@ -1,37 +1,37 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
input.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
input.kafka.servers=192.168.44.12:9091
#管理输出kafka地址
output.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
output.kafka.servers=192.168.44.12:9091
#zookeeper 地址 用于配置log_id
zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
zookeeper.servers=192.168.44.12:2181
#hbase zookeeper地址 用于连接HBase
hbase.zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/定位库------------------------------#
#定位库地址
ip.library=/home/bigdata/topology/dat/
tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
#网关的schema位置
schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log
schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/session_record
#网关APP_ID 获取接口
app.id.http=http://192.168.44.67:9999/open-api/appDicList
app.id.http=http://192.168.44.12:9999/open-api/appDicList
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
input.kafka.topic=CONNECTION-RECORD-LOG
input.kafka.topic=SESSION-RECORD
#补全数据 输出 topic
output.kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
output.kafka.topic=SESSION-RECORD-COMPLETED
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=connection-record-flink-20210809
group.id=session-record-log-20210902-1
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none
@@ -39,22 +39,13 @@ producer.kafka.compression.type=none
#生产者ack
producer.ack=1
#接收自kafka的消费者 client-id
consumer.client.id=consumer-connection-record
#回写给kafka的生产者 client-id
producer.client.id=producer-connection-record
#--------------------------------topology配置------------------------------#
#consumer 并行度
consumer.parallelism=3
consumer.parallelism=1
#map函数并行度
map.parallelism=3
#producer 并行度
producer.parallelism=3
#转换函数并行度
transform.parallelism=1
#数据中心,取值范围(0-63)
data.center.id.num=0

View File

@@ -1,21 +0,0 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.system.FlowWriteConfigurations;
/**
* @author Administrator
*/
public class DefaultProConfig {
public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
}

View File

@@ -15,22 +15,29 @@ public class FlowWriteConfig {
public static final String IF_CONDITION_SPLITTER = "=";
public static final String MODEL = "remote";
public static final String PROTOCOL_SPLITTER = "\\.";
/**
* System
* System config
*/
public static final Integer CONSUMER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "consumer.parallelism");
public static final Integer MAP_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "map.parallelism");
public static final Integer PRODUCER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "producer.parallelism");
public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
/**
* kafka source config
*/
public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
/**
* kafka
* kafka sink config
*/
public static final String INPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String OUTPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "output.kafka.servers");
@@ -40,14 +47,22 @@ public class FlowWriteConfig {
public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic");
public static final String INPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "input.kafka.topic");
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack");
public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
public static final String KAFKA_SOURCE_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.source.protocol");
public static final String KAFKA_SINK_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.sink.protocol");
public static final String KAFKA_USER = FlowWriteConfigurations.getStringProperty(1, "kafka.user");
public static final String KAFKA_PIN = FlowWriteConfigurations.getStringProperty(1, "kafka.pin");
/**
* kafka限流配置-20201117
* connection kafka
*/
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
public static final String CONSUMER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "consumer.client.id");
public static final String PRODUCER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "producer.client.id");
public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
/**
* http

View File

@@ -32,22 +32,28 @@ public class LogFlowWriteTopology {
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
//对原始日志进行处理补全转换等
DataStream<String> cleaningLog = streamSource.map(new MapCompletedFunction())
.name("TransFormLogs").setParallelism(FlowWriteConfig.MAP_PARALLELISM);
DataStream<String> cleaningLog = streamSource.map(new MapCompletedFunction()).name("TransFormLogs")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//过滤空数据不发送到Kafka内
DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData");
DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//发送数据到Kafka
result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
.setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM);
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
} else {
DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData");
result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM);
//过滤空数据不发送到Kafka内
DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//发送数据到Kafka
result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
}
try {
environment.execute(args[0]);
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e);
e.printStackTrace();
}
}

View File

@@ -30,12 +30,12 @@ public class SnowflakeId {
/**
* 机器id所占的位数
*/
private final long workerIdBits = 7L;
private final long workerIdBits = 8L;
/**
* 数据标识id所占的位数
*/
private final long dataCenterIdBits = 6L;
private final long dataCenterIdBits = 5L;
/**
* 支持的最大机器id结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
@@ -74,12 +74,12 @@ public class SnowflakeId {
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
* 工作机器ID(0~127)
* 工作机器ID(0~255)
*/
private long workerId;
/**
* 数据中心ID(0~63)
* 数据中心ID(0~31)
*/
private long dataCenterId;

View File

@@ -6,7 +6,6 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookup;
@@ -34,12 +33,12 @@ class TransFunction {
* IP定位库工具类
*/
private static IpLookup ipLookup = new IpLookup.Builder(false)
.loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb")
.loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb")
.loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb")
.loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb")
.loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
.loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
.loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6.mmdb")
.loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v4.mmdb")
.loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v6.mmdb")
.loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
.build();
/**
@@ -93,9 +92,9 @@ class TransFunction {
*/
static String radiusMatch(String ip) {
String account = HBaseUtils.getAccount(ip.trim());
if (StringUtil.isBlank(account)) {
logger.warn("HashMap get account is null, Ip is :" + ip);
}
// if (StringUtil.isBlank(account)) {
// logger.warn("HashMap get account is null, Ip is :" + ip);
// }
return account;
}

View File

@@ -2,7 +2,6 @@ package com.zdjizhi.utils.hbase;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -46,7 +45,7 @@ public class HBaseUtils {
*/
private HBaseUtils() {
zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS;
hBaseTable = DefaultProConfig.HBASE_TABLE_NAME;
hBaseTable = FlowWriteConfig.HBASE_TABLE_NAME;
//获取连接
getConnection();
//拉取所有

View File

@@ -0,0 +1,36 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/9/610:37
*/
class CertUtils {
static void chooseCert(String type, Properties properties) {
switch (type) {
case "SSL":
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_PIN);
properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_PIN);
properties.put("ssl.key.password", FlowWriteConfig.KAFKA_PIN);
break;
case "SASL":
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ FlowWriteConfig.KAFKA_USER + " password=" + FlowWriteConfig.KAFKA_PIN + ";");
break;
default:
}
}
}

View File

@@ -4,6 +4,7 @@ import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
@@ -25,10 +26,8 @@ public class Consumer {
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/*
* kafka限流配置-20201117
*/
// properties.put(ConsumerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.CONSUMER_CLIENT_ID);
CertUtils.chooseCert(FlowWriteConfig.KAFKA_SOURCE_PROTOCOL,properties);
return properties;
}

View File

@@ -1,12 +1,12 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Optional;
import java.util.Properties;
/**
@@ -20,21 +20,17 @@ public class Producer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.OUTPUT_KAFKA_SERVERS);
// properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
properties.put("retries", DefaultProConfig.RETRIES);
properties.put("linger.ms", DefaultProConfig.LINGER_MS);
properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
properties.put("retries", FlowWriteConfig.RETRIES);
properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
CertUtils.chooseCert(FlowWriteConfig.KAFKA_SINK_PROTOCOL, properties);
/**
* kafka限流配置-20201117
*/
// properties.put(ProducerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.PRODUCER_CLIENT_ID);
// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
return properties;
}
@@ -43,9 +39,10 @@ public class Producer {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
FlowWriteConfig.OUTPUT_KAFKA_TOPIC,
new SimpleStringSchema(),
createProducerConfig());
createProducerConfig(), Optional.empty());
kafkaProducer.setLogFailuresOnly(false);
// kafkaProducer.setWriteTimestampToKafka(true);
return kafkaProducer;

View File

@@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
*/
public class ZookeeperUtils implements Watcher {
private static final Log logger = LogFactory.get();
private static final int ID_MAX = 255;
private ZooKeeper zookeeper;
@@ -46,7 +47,7 @@ public class ZookeeperUtils implements Watcher {
connectZookeeper(zookeeperIp);
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > 63) {
if (workerId > ID_MAX) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {

View File

@@ -1,92 +0,0 @@
package com.zdjizhi;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.DefaultProConfig;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
/**
* NTC系统配置产生日志写入数据中心类
*
* @author Administrator
* @create 2018-08-13 15:11
*/
public class KafkaLogSend {
private static final Log logger = LogFactory.get();
/**
* kafka生产者用于向kafka中发送消息
*/
private static org.apache.kafka.clients.producer.Producer<String, String> kafkaProducer;
/**
* kafka生产者适配器单例用来代理kafka生产者发送消息
*/
private static KafkaLogSend kafkaLogSend;
private KafkaLogSend() {
initKafkaProducer();
}
public static KafkaLogSend getInstance() {
if (kafkaLogSend == null) {
kafkaLogSend = new KafkaLogSend();
}
return kafkaLogSend;
}
public void sendMessage(String message) {
// for (String value : list) {
kafkaProducer.send(new ProducerRecord<>("test", message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("写入test出现异常", exception);
}
}
});
// }
// kafkaProducer.flush();
logger.debug("Log sent to National Center successfully!!!!!");
}
/**
* 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次
*/
private void initKafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
properties.put("retries", DefaultProConfig.RETRIES);
properties.put("linger.ms", DefaultProConfig.LINGER_MS);
properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
properties.put("security.protocol", "SSL");
properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks");
properties.put("ssl.keystore.password", "ceiec2019");
properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks");
properties.put("ssl.truststore.password", "ceiec2019");
properties.put("ssl.key.password", "ceiec2019");
/*
* kafka限流配置-20201117
*/
// properties.put(ProducerConfig.CLIENT_ID_CONFIG, VoipRelationConfig.PRODUCER_CLIENT_ID);
// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
kafkaProducer = new KafkaProducer<>(properties);
}
}

View File

@@ -3,6 +3,7 @@ package com.zdjizhi;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
@@ -17,7 +18,7 @@ public class KafkaTest {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093");
properties.put("bootstrap.servers", "192.168.44.12:9091");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
@@ -30,12 +31,13 @@ public class KafkaTest {
properties.put("security.protocol", "SSL");
// properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks");
properties.put("ssl.keystore.location", "/usr/ca/client/client.keystore.jks");
properties.put("ssl.keystore.password", "ceiec2019");
properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\keystore.jks");
properties.put("ssl.keystore.password", "galaxy2019");
// properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks");
properties.put("ssl.truststore.location", "/usr/ca/trust/client.truststore.jks");
properties.put("ssl.truststore.password", "ceiec2019");
properties.put("ssl.key.password", "ceiec2019");
properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\truststore.jks");
properties.put("ssl.truststore.password", "galaxy2019");
properties.put("ssl.key.password", "galaxy2019");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
Producer<String, String> producer = new KafkaProducer<String, String>(properties);