diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java index e28426e..699f6f1 100644 --- a/src/main/java/com/zdjizhi/config/Configs.java +++ b/src/main/java/com/zdjizhi/config/Configs.java @@ -127,5 +127,4 @@ public class Configs { public static final ConfigOption FILTER_EXPRESSION = ConfigOptions.key("filter.expression") .stringType() .defaultValue(""); - } diff --git a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java index e686006..3e0fc34 100644 --- a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java +++ b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java @@ -75,5 +75,4 @@ public class ParseMessagePackMapFunction extends RichMapFunction { private transient Counter pcapDelayedChunkCounter; @@ -22,7 +24,7 @@ public class SideOutputMapFunction extends RichMapFunction @Override public FileChunk map(FileChunk fileChunk) { fileChunk.setChunkCount(1); - if ("seek".equals(fileChunk.getCombineMode())) { + if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { trafficDelayedChunkCounter.inc(); } else { fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";"); @@ -30,5 +32,4 @@ public class SideOutputMapFunction extends RichMapFunction } return fileChunk; } - } diff --git a/src/main/java/com/zdjizhi/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/kafka/KafkaConsumer.java index df1fd32..c19fe1d 100644 --- a/src/main/java/com/zdjizhi/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/kafka/KafkaConsumer.java @@ -43,5 +43,4 @@ public abstract class KafkaConsumer extends ByteArrayDeserializationSchema { kafkaConsumer.setStartFromGroupOffsets(); return kafkaConsumer; } - } diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java index 15c2c01..5fed4f4 100644 --- a/src/main/java/com/zdjizhi/sink/HBaseSink.java +++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java @@ -206,5 +206,4 @@ public class HBaseSink extends RichSinkFunction { IoUtil.close(syncHBaseConnection); IoUtil.close(AsyncHBaseConnection); } - } diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java index 502c678..dd186f4 100644 --- a/src/main/java/com/zdjizhi/sink/HosSink.java +++ b/src/main/java/com/zdjizhi/sink/HosSink.java @@ -178,7 +178,7 @@ public class HosSink extends RichSinkFunction { httpPut.setHeader(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + ""); } } - httpPut.setEntity(new ByteArrayEntity(fileChunk.getChunk())); + httpPut.setEntity(new ByteArrayEntity(data)); executeRequest(httpPut); } } @@ -242,5 +242,4 @@ public class HosSink extends RichSinkFunction { } } } - } diff --git a/src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java b/src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java index e4f3f51..bfe47a3 100644 --- a/src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java +++ b/src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java @@ -3,7 +3,6 @@ package com.zdjizhi.utils; import org.apache.hadoop.hbase.util.Bytes; public interface HBaseColumnConstants { - String FAMILY_DATA = "data"; String FAMILY_META = "meta"; String COLUMN_FILENAME = "filename"; @@ -46,5 +45,4 @@ public interface HBaseColumnConstants { byte[] BYTE_BUCKET_COLUMN_TTL = Bytes.toBytes(BUCKET_COLUMN_TTL); byte[] BYTE_BUCKET_COLUMN_WAL = Bytes.toBytes(BUCKET_COLUMN_WAL); byte[] BYTE_BUCKET_COLUMN_LOCATION = Bytes.toBytes(BUCKET_COLUMN_LOCATION); - } diff --git a/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java b/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java index be82770..03d76b6 100644 --- a/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java +++ b/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java @@ -27,7 +27,6 @@ public class HBaseConnectionUtil { hbaseConfiguration.set(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, "1073741800"); hbaseConfiguration.set(ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY, configuration.get(Configs.SINK_HBASE_CLIENT_WRITE_BUFFER) + ""); hbaseConfiguration.set(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, configuration.get(Configs.SINK_HBASE_CLIENT_IPC_POOL_SIZE) + ""); - } public static synchronized HBaseConnectionUtil getInstance(Configuration configuration) { diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java index f2e5d33..89640ea 100644 --- a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java @@ -175,5 +175,4 @@ public class HttpClientUtil { .setConnectionManager(getAsyncSslClientManager()) .build(); } - } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index b71bda5..55031b1 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -25,7 +25,7 @@ combiner.window.time=10 combiner.window.idle.time=5 file.max.chunk.count=100000 file.max.size=1073741824 -#eval表达式,根据字段过滤 +#根据字段过滤,java表达式 #filter.expression=FileChunk.fileType == "txt" || FileChunk.fileType == "eml" #sink相关配置 sink.parallelism=2