diff --git a/pom.xml b/pom.xml
index 5d943cd..aeefdca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
file-chunk-combiner
- 1.2.1
+ 1.3.0
@@ -199,6 +199,56 @@
commons-jexl3
3.2.1
+
+ com.github.ben-manes.caffeine
+ caffeine
+ 2.9.3
+
+
+
+ org.ehcache
+ ehcache
+ 3.10.8
+
+
+ org.glassfish.jaxb
+ jaxb-runtime
+
+
+ javax.cache
+ cache-api
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.glassfish.jaxb
+ jaxb-runtime
+ 2.3.9
+
+
+ javax.cache
+ cache-api
+ 1.1.0
+
+
+ com.sun.xml.bind
+ jaxb-core
+ 2.3.0.1
+
+
+ javax.xml.bind
+ jaxb-api
+ 2.3.1
+
+
+ com.sun.xml.bind
+ jaxb-impl
+ 2.3.1
+
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
index f64b6bd..705f673 100644
--- a/src/main/java/com/zdjizhi/FileChunkCombiner.java
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -1,29 +1,38 @@
package com.zdjizhi;
+import cn.hutool.core.util.StrUtil;
import com.zdjizhi.config.Configs;
import com.zdjizhi.function.*;
+import com.zdjizhi.function.map.ParseMessagePackMapFunction;
+import com.zdjizhi.function.map.ParseProxyFileMetaFlatMapFunction;
+import com.zdjizhi.function.map.ParseSessionFileMetaFlatMapFunction;
+import com.zdjizhi.function.map.SideOutputMapFunction;
+import com.zdjizhi.kafka.FileMetaKafkaConsumer;
import com.zdjizhi.pojo.*;
-import com.zdjizhi.sink.HBaseSink;
-import com.zdjizhi.sink.HosSink;
+import com.zdjizhi.sink.*;
import com.zdjizhi.kafka.KafkaConsumer;
+import com.zdjizhi.trigger.IdleTimeTrigger;
import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger;
+import com.zdjizhi.trigger.LastChunkTrigger;
import com.zdjizhi.trigger.MultipleTrigger;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
public class FileChunkCombiner {
@@ -33,53 +42,211 @@ public class FileChunkCombiner {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().setGlobalJobParameters(configuration);
- WatermarkStrategy watermarkStrategy = WatermarkStrategy
- .forBoundedOutOfOrderness(Duration.ofSeconds(0))
- .withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000);
-
- SingleOutputStreamOperator parseMessagePackStream = environment
- .addSource(KafkaConsumer.byteArrayConsumer(configuration))
- .name("Kafka Source")
- .map(new ParseMessagePackMapFunction(configuration.get(Configs.ENABLE_RATE_LIMIT), configuration.get(Configs.RATE_LIMIT_THRESHOLD), configuration.get(Configs.RATE_LIMIT_EXCLUSION_EXPRESSION)))
- .name("Map: Parse Message Pack")
- .filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
- .assignTimestampsAndWatermarks(watermarkStrategy);
-
+ SingleOutputStreamOperator windowStream;
OutputTag delayedChunkOutputTag = new OutputTag("delayed-chunk") {
};
+ if (configuration.getInteger(Configs.COMBINER_WINDOW_TYPE) == 0) {
+ WatermarkStrategy watermarkStrategy = WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(configuration.get(Configs.COMBINER_WINDOW_ALLOWED_LATENESS)))
+ .withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000);
- List> triggers = new ArrayList<>();
- triggers.add(EventTimeTrigger.create());
- triggers.add(LastChunkOrNoDataInTimeTrigger.of(configuration.get(Configs.COMBINER_WINDOW_IDLE_TIME) * 1000));
- Trigger