package com.zdjizhi.utils.ck; import cn.hutool.core.io.IoUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import static com.zdjizhi.common.FlowWriteConfig.*; public class ClickhouseSingleSink extends RichSinkFunction> { private static final Log log = LogFactory.get(); private Connection connection; private PreparedStatement preparedStatement; public String sink; public ClickhouseSingleSink(String sink) { this.sink = sink; } public String getSink() { return sink; } public void setSink(String sink) { this.sink = sink; } @Override public void invoke(Map logs, Context context) throws Exception { executeInsert(logs, getSink()); } @Override public void open(Configuration parameters) throws Exception { try { Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); // BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props); // connection = dataSource.getConnection(); log.info("get clickhouse connection success"); } catch (ClassNotFoundException | SQLException e) { log.error("clickhouse connection error ,{}", e); } } @Override public void close() throws Exception { IoUtil.close(preparedStatement); IoUtil.close(connection); } public void executeInsert(Map data, String tableName) { try { List keys = new LinkedList<>(data.keySet()); connection.setAutoCommit(false); preparedStatement = connection.prepareStatement(preparedSql(keys, tableName)); int count = 0; List values = new LinkedList<>(data.values()); for (int i = 1; i <= values.size(); i++) { Object val = values.get(i - 1); if (val instanceof Long) { preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); } else if (val instanceof Integer) { preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val))); } else if (val instanceof Boolean) { preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val))); } else { preparedStatement.setString((i), StrUtil.toString(val)); } } preparedStatement.addBatch(); count++; //1w提交一次 if (count % SINK_BATCH == 0) { preparedStatement.executeBatch(); connection.commit(); preparedStatement.clearBatch(); count = 0; } if (count > 0) { preparedStatement.executeBatch(); connection.commit(); } } catch (Exception ex) { log.error("ClickhouseSink插入报错", ex); } } public static String preparedSql(List fields, String tableName) { String placeholders = fields.stream() .filter(Objects::nonNull) .map(f -> "?") .collect(Collectors.joining(", ")); String columns = fields.stream() .filter(Objects::nonNull) .collect(Collectors.joining(", ")); String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName, "(", columns, ") VALUES (", placeholders, ")"); log.debug(sql); return sql; } }