package com.zdjizhi.utils.ck; import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import static com.zdjizhi.common.FlowWriteConfig.*; public class ClickhouseSink extends RichSinkFunction> { private static final Log log = LogFactory.get(); private static Connection connection; private static PreparedStatement preparedStatement; public String sink; static { try { Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); // BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props); // connection = dataSource.getConnection(); log.info("get clickhouse connection success"); } catch (ClassNotFoundException | SQLException e) { log.error("clickhouse connection error ,{}", e); } } public ClickhouseSink(String sink) { this.sink = sink; } public String getSink() { return sink; } public void setSink(String sink) { this.sink = sink; } @Override public void invoke(Map log, Context context) throws Exception { executeInsert(log, getSink()); } @Override public void open(Configuration parameters) throws Exception { } @Override public void close() throws Exception { if (null != connection) { connection.close(); } if (null != preparedStatement) { preparedStatement.close(); } } public void executeInsert(Map data, String tableName) { try { int count = 1; List keys = new LinkedList<>(data.keySet()); connection.setAutoCommit(false); preparedStatement = connection.prepareStatement(preparedSql(keys, tableName)); List values = new LinkedList<>(data.values()); for (int i = 1; i <= values.size(); i++) { Object val = values.get(i - 1); if (val instanceof Long) { preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); } else if (val instanceof Integer) { preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val))); } else if (val instanceof Boolean) { preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val))); } else { preparedStatement.setString((i), StrUtil.toString(val)); } } preparedStatement.addBatch(); count = count + 1; try { //1w提交一次 if (count % 10000 == 0) { preparedStatement.executeBatch(); connection.commit(); preparedStatement.clearBatch(); count = 1; } preparedStatement.executeBatch(); connection.commit(); } catch (Exception ee) { log.error("数据插入clickhouse 报错:", ee); } } catch (Exception ex) { log.error("ClickhouseSink插入报错", ex); } } public String preparedSql(List fields, String tableName) { String placeholders = fields.stream() .filter(Objects::nonNull) .map(f -> "?") .collect(Collectors.joining(", ")); String columns = fields.stream() .filter(Objects::nonNull) .collect(Collectors.joining(", ")); String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName, "(", columns, ") VALUES (", placeholders, ")"); log.info(sql); return sql; } }