diff --git a/README.md b/README.md index 5be6ff0..5a8ee0d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# file-stream-combiner +# file-chunk-combiner diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..275475a --- /dev/null +++ b/pom.xml @@ -0,0 +1,251 @@ + + + 4.0.0 + + com.zdjizhi + file-chunk-combiner + 24.01.18 + + + + nexus + Team Nexus Repository + http://192.168.40.153:8099/content/groups/public + + + maven-ali + http://maven.aliyun.com/nexus/content/groups/public/ + + true + + + true + fail + + + + + + 1.13.1 + + + + + com.zdjizhi + galaxy + 1.1.3 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + org.slf4j + slf4j-api + 1.7.25 + + + org.slf4j + slf4j-log4j12 + 1.7.25 + + + log4j + log4j + 1.2.17 + + + org.apache.flink + flink-core + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java_2.12 + ${flink.version} + provided + tests + + + org.apache.flink + flink-clients_2.12 + ${flink.version} + provided + + + org.apache.flink + flink-connector-kafka_2.12 + ${flink.version} + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + org.apache.flink + flink-runtime_2.12 + ${flink.version} + provided + + + org.apache.flink + flink-test-utils_2.12 + ${flink.version} + test + + + org.msgpack + msgpack-core + 0.9.5 + + + org.msgpack + jackson-dataformat-msgpack + 0.9.5 + + + cn.hutool + hutool-all + 5.8.22 + + + com.alibaba + fastjson + 2.0.32 + + + org.jasypt + jasypt + 1.9.3 + + + junit + junit + 4.12 + test + + + org.mockito + mockito-core + 2.21.0 + test + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + 1.8 + 1.8 + true + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + false + true + true + + + org.apache.http + shade.org.apache.http + + + + + com.google.code.findbugs:jsr305 + org.slf4j:slf4j-api + org.slf4j:slf4j-jdk14 + org.slf4j:slf4j-jcl + org.slf4j:slf4j-nop + org.slf4j:slf4j-simple + org.slf4j:slf4j-reload4j + org.slf4j:slf4j-log4j12 + org.slf4j:log4j-over-slf4j + org.slf4j:jcl-over-slf4j + log4j:* + commons-logging:* + ch.qos.logback:* + org.apache.logging.log4j:log4j-api + org.apache.logging.log4j:log4j-core + org.apache.logging.log4j:log4j-slf4j-impl + org.apache.logging.log4j:log4j-1.2-api + org.apache.logging.log4j:log4j-to-slf4j + + + + + + file-chunk-combiner + package + + shade + + + file-chunk-combiner-${version} + + + + *:* + + META-INF + + + + + + com.zdjizhi.FileChunkCombiner + + + + + + + + io.github.zlika + reproducible-build-maven-plugin + 0.2 + + + + strip-jar + + package + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java new file mode 100644 index 0000000..06f7402 --- /dev/null +++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java @@ -0,0 +1,78 @@ +package com.zdjizhi; + +import com.zdjizhi.config.Configs; +import com.zdjizhi.function.*; +import com.zdjizhi.pojo.*; +import com.zdjizhi.sink.HosSink; +import com.zdjizhi.kafka.KafkaConsumer; +import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger; +import com.zdjizhi.trigger.MultipleTrigger; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.OutputTag; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class FileChunkCombiner extends KafkaConsumer { + + public static void main(String[] args) throws Exception { + final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]); + final Configuration configuration = parameterTool.getConfiguration(); + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + environment.getConfig().setGlobalJobParameters(configuration); + + WatermarkStrategy watermarkStrategy = WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(0)) + .withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000); + + SingleOutputStreamOperator parseMessagePackStream = environment + .addSource(KafkaConsumer.byteArrayConsumer(configuration)) + .name("Kafka Source") + .map(new ParseMessagePackMapFunction()) + .name("Map: Parse Message Pack") + .filter((FilterFunction) Objects::nonNull) + .assignTimestampsAndWatermarks(watermarkStrategy); + + OutputTag delayedChunkOutputTag = new OutputTag("delayed-chunk") { + }; + + List> triggers = new ArrayList<>(); + triggers.add(EventTimeTrigger.create()); + triggers.add(LastChunkOrNoDataInTimeTrigger.of(configuration.get(Configs.COMBINER_WINDOW_IDLE_TIME) * 1000)); + Trigger trigger = MultipleTrigger.of(triggers); + SingleOutputStreamOperator windowStream = parseMessagePackStream + .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO) + .window(TumblingEventTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_TIME)))) + .trigger(trigger) + .sideOutputLateData(delayedChunkOutputTag) + .process(new CombineChunkProcessWindowFunction(configuration)) + .name("Window: Combine Chunk") + .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM)) + .disableChaining(); + + HosSink hosSink = new HosSink(configuration); + windowStream.addSink(hosSink) + .name("Hos") + .setParallelism(configuration.get(Configs.SINK_HOS_PARALLELISM)); + windowStream.getSideOutput(delayedChunkOutputTag) + .map(new SideOutputMapFunction()) + .addSink(hosSink) + .name("Hos Delayed Chunk"); + + environment.execute(configuration.get(Configs.FLINK_JOB_NAME)); + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java new file mode 100644 index 0000000..6d79bb8 --- /dev/null +++ b/src/main/java/com/zdjizhi/config/Configs.java @@ -0,0 +1,96 @@ +package com.zdjizhi.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class Configs { + public static final ConfigOption FLINK_JOB_NAME = ConfigOptions.key("flink.job.name") + .stringType() + .defaultValue("FILE-CHUNK-COMBINER") + .withDescription("The name of job."); + + public static final ConfigOption SOURCE_KAFKA_PARALLELISM = ConfigOptions.key("source.kafka.parallelism") + .intType() + .defaultValue(1); + public static final ConfigOption KAFKA_BROKER = ConfigOptions.key("source.kafka.broker") + .stringType() + .noDefaultValue(); + public static final ConfigOption KAFKA_GROUP_ID = ConfigOptions.key("source.kafka.group.id") + .stringType() + .defaultValue("test1"); + public static final ConfigOption KAFKA_TOPIC = ConfigOptions.key("source.kafka.topic") + .stringType() + .noDefaultValue(); + public static final ConfigOption KAFKA_ENABLE_AUTO_COMMIT = ConfigOptions.key("source.kafka.enable.auto.commit") + .booleanType() + .defaultValue(true); + public static final ConfigOption KAFKA_AUTO_OFFSET_RESET = ConfigOptions.key("source.kafka.auto.offset.reset") + .stringType() + .defaultValue("latest"); + public static final ConfigOption KAFKA_SESSION_TIMEOUT_MS = ConfigOptions.key("source.kafka.session.timeout.ms") + .stringType() + .defaultValue("60000"); + public static final ConfigOption KAFKA_MAX_POLL_RECORDS = ConfigOptions.key("source.kafka.max.poll.records") + .stringType() + .defaultValue("1000"); + public static final ConfigOption KAFKA_MAX_PARTITION_FETCH_BYTES = ConfigOptions.key("source.kafka.max.partition.fetch.bytes") + .stringType() + .defaultValue("31457280"); + public static final ConfigOption KAFKA_USER = ConfigOptions.key("source.kafka.user") + .stringType() + .defaultValue("admin"); + public static final ConfigOption KAFKA_PIN = ConfigOptions.key("source.kafka.pin") + .stringType() + .defaultValue("galaxy2019"); + public static final ConfigOption KAFKA_TOOLS_LIBRARY = ConfigOptions.key("source.kafka.tools.library") + .stringType() + .noDefaultValue(); + + public static final ConfigOption PARSE_MESSAGE_PACK_PARALLELISM = ConfigOptions.key("parse.message.pack.parallelism") + .intType() + .defaultValue(1); + public static final ConfigOption COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism") + .intType() + .defaultValue(1); + public static final ConfigOption COMBINER_WINDOW_TIME = ConfigOptions.key("combiner.window.time") + .longType() + .defaultValue(5L); + public static final ConfigOption COMBINER_WINDOW_IDLE_TIME = ConfigOptions.key("combiner.window.idle.time") + .longType() + .defaultValue(5L); + public static final ConfigOption COMBINER_WINDOW_KEY_MAX_CHUNK = ConfigOptions.key("combiner.window.key.max.chunk") + .longType() + .defaultValue(5L); + + public static final ConfigOption SINK_HOS_PARALLELISM = ConfigOptions.key("sink.hos.parallelism") + .intType() + .defaultValue(1); + public static final ConfigOption SINK_HOS_ENDPOINT = ConfigOptions.key("sink.hos.endpoint") + .stringType() + .noDefaultValue(); + public static final ConfigOption SINK_HOS_BUCKET = ConfigOptions.key("sink.hos.bucket") + .stringType() + .noDefaultValue(); + public static final ConfigOption SINK_HOS_TOKEN = ConfigOptions.key("sink.hos.token") + .stringType() + .noDefaultValue(); + public static final ConfigOption SINK_HOS_HTTP_MAX_TOTAL = ConfigOptions.key("sink.hos.http.max.total") + .intType() + .defaultValue(2000); + public static final ConfigOption SINK_HOS_HTTP_MAX_PER_ROUTE = ConfigOptions.key("sink.hos.http.max.per.route") + .intType() + .defaultValue(1000); + public static final ConfigOption SINK_HOS_HTTP_ERROR_RETRY = ConfigOptions.key("sink.hos.http.error.retry") + .intType() + .defaultValue(3); + public static final ConfigOption SINK_HOS_HTTP_CONNECT_TIMEOUT = ConfigOptions.key("sink.hos.http.connect.timeout") + .intType() + .defaultValue(10000); + public static final ConfigOption SINK_HOS_HTTP_REQUEST_TIMEOUT = ConfigOptions.key("sink.hos.http.request.timeout") + .intType() + .defaultValue(10000); + public static final ConfigOption SINK_HOS_HTTP_SOCKET_TIMEOUT = ConfigOptions.key("sink.hos.http.socket.timeout") + .intType() + .defaultValue(60000); + +} diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java new file mode 100644 index 0000000..8f0f40d --- /dev/null +++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java @@ -0,0 +1,44 @@ +package com.zdjizhi.function; + +import com.zdjizhi.config.Configs; +import com.zdjizhi.pojo.FileChunk; +import com.zdjizhi.utils.PublicUtil; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.List; + +public class CombineChunkProcessWindowFunction extends ProcessWindowFunction { + + private transient Counter duplicateChunkCounter; + private transient Counter combineErrorCounter; + private transient Counter seekChunkCounter; + private transient Counter appendChunkCounter; + private final Configuration configuration; + + public CombineChunkProcessWindowFunction(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); + duplicateChunkCounter = metricGroup.counter("duplicateChunkCount"); + combineErrorCounter = metricGroup.counter("combineErrorCount"); + seekChunkCounter = metricGroup.counter("seekChunkCount"); + appendChunkCounter = metricGroup.counter("appendChunkCount"); + } + + @Override + public void process(String string, Context context, Iterable elements, Collector out) throws Exception { + List fileChunks = PublicUtil.combine(elements, configuration.get(Configs.COMBINER_WINDOW_KEY_MAX_CHUNK), duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + for (FileChunk fileChunk : fileChunks) { + out.collect(fileChunk); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/function/FileChunkKeySelector.java b/src/main/java/com/zdjizhi/function/FileChunkKeySelector.java new file mode 100644 index 0000000..6634c93 --- /dev/null +++ b/src/main/java/com/zdjizhi/function/FileChunkKeySelector.java @@ -0,0 +1,11 @@ +package com.zdjizhi.function; + +import com.zdjizhi.pojo.FileChunk; +import org.apache.flink.api.java.functions.KeySelector; + +public class FileChunkKeySelector implements KeySelector { + @Override + public String getKey(FileChunk value) { + return value.getUuid(); + } +} diff --git a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java new file mode 100644 index 0000000..e686006 --- /dev/null +++ b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java @@ -0,0 +1,79 @@ +package com.zdjizhi.function; + +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.pojo.FileChunk; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; + +import java.util.HashMap; +import java.util.Map; + +public class ParseMessagePackMapFunction extends RichMapFunction { + private static final Log LOG = LogFactory.get(); + + @Override + public FileChunk map(byte[] messagePackData) { + FileChunk fileChunk; + try { + fileChunk = new FileChunk(); + MessageUnpacker messageUnpacker = MessagePack.newDefaultUnpacker(messagePackData); + int numFields = messageUnpacker.unpackMapHeader(); + Map metaMap = new HashMap<>(); + for (int i = 0; i < numFields; i++) { + String fieldName = messageUnpacker.unpackString(); + switch (fieldName) { + case "uuid": + fileChunk.setUuid(messageUnpacker.unpackString()); + break; + case "fileName": + fileChunk.setFileName(messageUnpacker.unpackString()); + break; + case "fileType": + fileChunk.setFileType(messageUnpacker.unpackString()); + break; + case "combineMode": + fileChunk.setCombineMode(messageUnpacker.unpackString()); + break; + case "offset": + fileChunk.setOffset(messageUnpacker.unpackLong()); + break; + case "length": + fileChunk.setLength(messageUnpacker.unpackLong()); + break; + case "lastChunkFlag": + fileChunk.setLastChunkFlag(messageUnpacker.unpackInt()); + break; + case "chunk": + fileChunk.setChunk(messageUnpacker.readPayload(messageUnpacker.unpackRawStringHeader())); + break; + case "timestamp": + fileChunk.setTimestamp(messageUnpacker.unpackLong()); + break; + case "meta": + String meta = messageUnpacker.unpackString(); + JSONObject metaJsonObject = JSONUtil.parseObj(meta); + for (String key : metaJsonObject.keySet()) { + metaMap.put(key, metaJsonObject.get(key)); + } + fileChunk.setMeta(metaMap); + break; + default: + messageUnpacker.skipValue(); + break; + } + } + if ("append".equals(fileChunk.getCombineMode())) { + fileChunk.setLastChunkFlag(0); + } + } catch (Exception e) { + LOG.error("Parse messagePack failed.", e); + fileChunk = null; + } + return fileChunk; + } + +} diff --git a/src/main/java/com/zdjizhi/function/SideOutputMapFunction.java b/src/main/java/com/zdjizhi/function/SideOutputMapFunction.java new file mode 100644 index 0000000..e948d8a --- /dev/null +++ b/src/main/java/com/zdjizhi/function/SideOutputMapFunction.java @@ -0,0 +1,34 @@ +package com.zdjizhi.function; + +import com.zdjizhi.pojo.FileChunk; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; + +public class SideOutputMapFunction extends RichMapFunction { + + private transient Counter pcapDelayedChunkCounter; + private transient Counter trafficDelayedChunkCounter; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); + pcapDelayedChunkCounter = metricGroup.counter("pcapDelayedChunkCount"); + trafficDelayedChunkCounter = metricGroup.counter("trafficDelayedChunkCount"); + } + + @Override + public FileChunk map(FileChunk fileChunk) { + fileChunk.setChunkCount(1); + if ("seek".equals(fileChunk.getCombineMode())) { + trafficDelayedChunkCounter.inc(); + } else { + fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";"); + pcapDelayedChunkCounter.inc(); + } + return fileChunk; + } + +} diff --git a/src/main/java/com/zdjizhi/kafka/ByteArrayDeserializationSchema.java b/src/main/java/com/zdjizhi/kafka/ByteArrayDeserializationSchema.java new file mode 100644 index 0000000..be4baa0 --- /dev/null +++ b/src/main/java/com/zdjizhi/kafka/ByteArrayDeserializationSchema.java @@ -0,0 +1,21 @@ +package com.zdjizhi.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +public class ByteArrayDeserializationSchema implements DeserializationSchema { + @Override + public byte[] deserialize(byte[] message) { + return message; + } + + @Override + public boolean isEndOfStream(byte[] nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(byte[].class); + } +} diff --git a/src/main/java/com/zdjizhi/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/kafka/KafkaConsumer.java new file mode 100644 index 0000000..df1fd32 --- /dev/null +++ b/src/main/java/com/zdjizhi/kafka/KafkaConsumer.java @@ -0,0 +1,47 @@ +package com.zdjizhi.kafka; + +import com.zdjizhi.config.Configs; +import com.zdjizhi.utils.KafkaCertUtil; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; + +import java.util.*; + +public abstract class KafkaConsumer extends ByteArrayDeserializationSchema { + + private static Properties createConsumerConfig(Configuration configuration) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", configuration.get(Configs.KAFKA_BROKER)); + properties.put("group.id", configuration.get(Configs.KAFKA_GROUP_ID)); + properties.put("session.timeout.ms", configuration.get(Configs.KAFKA_SESSION_TIMEOUT_MS)); + properties.put("max.poll.records", configuration.get(Configs.KAFKA_MAX_POLL_RECORDS)); + properties.put("max.partition.fetch.bytes", configuration.get(Configs.KAFKA_MAX_PARTITION_FETCH_BYTES)); + properties.put("partition.discovery.interval.ms", "10000"); + properties.put("auto.offset.reset", configuration.get(Configs.KAFKA_AUTO_OFFSET_RESET)); + properties.put("enable.auto.commit", configuration.get(Configs.KAFKA_ENABLE_AUTO_COMMIT)); + KafkaCertUtil.chooseCert(properties, configuration); + return properties; + } + + public static FlinkKafkaConsumer stringConsumer(Configuration configuration) { + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(configuration.get(Configs.KAFKA_TOPIC), + new SimpleStringSchema(), createConsumerConfig(configuration)); + //随着checkpoint提交,将offset提交到kafka + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + //从消费组当前的offset开始消费 + kafkaConsumer.setStartFromGroupOffsets(); + return kafkaConsumer; + } + + public static FlinkKafkaConsumer byteArrayConsumer(Configuration configuration) { + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(configuration.get(Configs.KAFKA_TOPIC), + new ByteArrayDeserializationSchema(), createConsumerConfig(configuration)); + //随着checkpoint提交,将offset提交到kafka + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + //从消费组当前的offset开始消费 + kafkaConsumer.setStartFromGroupOffsets(); + return kafkaConsumer; + } + +} diff --git a/src/main/java/com/zdjizhi/pojo/FileChunk.java b/src/main/java/com/zdjizhi/pojo/FileChunk.java new file mode 100644 index 0000000..99076f8 --- /dev/null +++ b/src/main/java/com/zdjizhi/pojo/FileChunk.java @@ -0,0 +1,159 @@ +package com.zdjizhi.pojo; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; + +public class FileChunk implements Serializable { + private String uuid; + private String fileName; + private String fileType; + private long offset; + private long length; + private byte[] chunk; + private String combineMode; + private int lastChunkFlag; + private int chunkCount; + private long timestamp; + private Map meta; + private String chunkNumbers; + + public FileChunk() { + } + + public String getChunkNumbers() { + return chunkNumbers; + } + + public void setChunkNumbers(String chunkNumbers) { + this.chunkNumbers = chunkNumbers; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public Map getMeta() { + return meta; + } + + public void setMeta(Map meta) { + this.meta = meta; + } + + public int getChunkCount() { + return chunkCount; + } + + public void setChunkCount(int chunkCount) { + this.chunkCount = chunkCount; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + + public String getFileType() { + return fileType; + } + + public void setFileType(String fileType) { + this.fileType = fileType; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public long getLength() { + return length; + } + + public void setLength(long length) { + this.length = length; + } + + public byte[] getChunk() { + return chunk; + } + + public void setChunk(byte[] chunk) { + this.chunk = chunk; + } + + public String getCombineMode() { + return combineMode; + } + + public void setCombineMode(String combineMode) { + this.combineMode = combineMode; + } + + public int getLastChunkFlag() { + return lastChunkFlag; + } + + public void setLastChunkFlag(int lastChunkFlag) { + this.lastChunkFlag = lastChunkFlag; + } + + @Override + public String toString() { + return "FileChunk{" + + "uuid='" + uuid + '\'' + + ", fileName='" + fileName + '\'' + + ", fileType='" + fileType + '\'' + + ", offset=" + offset + + ", length=" + length + + ", combineMode='" + combineMode + '\'' + + ", lastChunkFlag=" + lastChunkFlag + + ", chunkCount=" + chunkCount + + ", timestamp=" + timestamp + + ", meta=" + meta + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FileChunk fileChunk = (FileChunk) o; + return offset == fileChunk.offset && + length == fileChunk.length && + lastChunkFlag == fileChunk.lastChunkFlag && + chunkCount == fileChunk.chunkCount && + Objects.equals(uuid, fileChunk.uuid) && + Objects.equals(fileName, fileChunk.fileName) && + Objects.equals(fileType, fileChunk.fileType) && + Arrays.equals(chunk, fileChunk.chunk) && + Objects.equals(combineMode, fileChunk.combineMode); + } + + @Override + public int hashCode() { + int result = Objects.hash(uuid, fileName, fileType, offset, length, combineMode, lastChunkFlag, chunkCount); + result = 31 * result + Arrays.hashCode(chunk); + return result; + } +} diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java new file mode 100644 index 0000000..f49a43b --- /dev/null +++ b/src/main/java/com/zdjizhi/sink/HosSink.java @@ -0,0 +1,39 @@ +package com.zdjizhi.sink; + +import com.zdjizhi.pojo.FileChunk; +import com.zdjizhi.utils.HttpClientUtil; +import com.zdjizhi.utils.PublicUtil; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import java.io.IOException; + +public class HosSink extends RichSinkFunction { + + private final Configuration configuration; + private transient Counter sendHosErrorCounter; + + public HosSink(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); + sendHosErrorCounter = metricGroup.counter("sendHosErrorCount"); + } + + @Override + public void invoke(FileChunk fileChunk, Context context) { + PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter); + } + + @Override + public void close() throws IOException { + HttpClientUtil.getInstance(null).close(); + } + +} diff --git a/src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java b/src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java new file mode 100644 index 0000000..2605c88 --- /dev/null +++ b/src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java @@ -0,0 +1,72 @@ +package com.zdjizhi.trigger; + +import com.zdjizhi.pojo.FileChunk; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +public class LastChunkOrNoDataInTimeTrigger extends Trigger { + private static final long serialVersionUID = 1L; + + private final long maxIdleTime; + + private LastChunkOrNoDataInTimeTrigger(long maxIdleTime) { + this.maxIdleTime = maxIdleTime; + } + + public static LastChunkOrNoDataInTimeTrigger of(long maxIdleTime) { + return new LastChunkOrNoDataInTimeTrigger<>(maxIdleTime); + } + + private final ReducingStateDescriptor processingTimeStateDesc = + new ReducingStateDescriptor<>("processTimer", new ReduceMax(), LongSerializer.INSTANCE); + + @Override + public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { + if (((FileChunk) element).getLastChunkFlag() == 1) { + return TriggerResult.FIRE; + } else { + ReducingState fireTimestamp = ctx.getPartitionedState(processingTimeStateDesc); + fireTimestamp.clear(); + long nextFireTimestamp = ctx.getCurrentProcessingTime() + maxIdleTime; + ctx.registerProcessingTimeTimer(nextFireTimestamp); + fireTimestamp.add(nextFireTimestamp); + return TriggerResult.CONTINUE; + } + + } + + @Override + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { + ReducingState fireTimestamp = ctx.getPartitionedState(processingTimeStateDesc); + if (fireTimestamp.get() != null && fireTimestamp.get() == time) { + fireTimestamp.clear(); + return TriggerResult.FIRE; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + + @Override + public void clear(TimeWindow window, TriggerContext ctx) { + ReducingState fireTimestamp = ctx.getPartitionedState(processingTimeStateDesc); + fireTimestamp.clear(); + } + + private static class ReduceMax implements ReduceFunction { + private static final long serialVersionUID = 1L; + + @Override + public Long reduce(Long value1, Long value2) { + return Math.max(value1, value2); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/trigger/MultipleTrigger.java b/src/main/java/com/zdjizhi/trigger/MultipleTrigger.java new file mode 100644 index 0000000..22eefe8 --- /dev/null +++ b/src/main/java/com/zdjizhi/trigger/MultipleTrigger.java @@ -0,0 +1,67 @@ +package com.zdjizhi.trigger; + +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.util.List; + +public class MultipleTrigger extends Trigger { + private static final long serialVersionUID = 1L; + + private final List> triggers; + + private MultipleTrigger(List> triggers) { + this.triggers = triggers; + } + + public static MultipleTrigger of(List> triggers) { + return new MultipleTrigger<>(triggers); + } + + @Override + public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { + TriggerResult result = TriggerResult.CONTINUE; + for (Trigger trigger : triggers) { + TriggerResult triggerResult = trigger.onElement(element, timestamp, window, ctx); + if (triggerResult == TriggerResult.FIRE) { + result = TriggerResult.FIRE_AND_PURGE; + break; + } + } + return result; + } + + @Override + public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { + TriggerResult result = TriggerResult.CONTINUE; + for (Trigger trigger : triggers) { + TriggerResult triggerResult = trigger.onProcessingTime(time, window, ctx); + if (triggerResult == TriggerResult.FIRE) { + result = TriggerResult.FIRE_AND_PURGE; + break; + } + } + return result; + } + + @Override + public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { + TriggerResult result = TriggerResult.CONTINUE; + for (Trigger trigger : triggers) { + TriggerResult triggerResult = trigger.onEventTime(time, window, ctx); + if (triggerResult == TriggerResult.FIRE) { + result = TriggerResult.FIRE_AND_PURGE; + break; + } + } + return result; + } + + @Override + public void clear(W window, TriggerContext ctx) throws Exception { + for (Trigger trigger : triggers) { + trigger.clear(window, ctx); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/ContentTypeUtil.java b/src/main/java/com/zdjizhi/utils/ContentTypeUtil.java new file mode 100644 index 0000000..2b1747b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ContentTypeUtil.java @@ -0,0 +1,368 @@ +package com.zdjizhi.utils; + +import java.util.HashMap; +import java.util.Map; + +public class ContentTypeUtil { + + private static Map map = new HashMap<>(); + + static { + map.put("anno", "application/octet-stream"); + map.put("0.001", "application/x-001"); + map.put("0.301", "application/x-301"); + map.put("0.323", "text/h323"); + map.put("0.906", "application/x-906"); + map.put("0.907", "drawing/907"); + map.put("a11", "application/x-a11"); + map.put("acp", "audio/x-mei-aac"); + map.put("ai", "application/postscript"); + map.put("aif", "audio/aiff"); + map.put("aifc", "audio/aiff"); + map.put("aiff", "audio/aiff"); + map.put("anv", "application/x-anv"); + map.put("asa", "text/asa"); + map.put("asf", "video/x-ms-asf"); + map.put("asp", "text/asp"); + map.put("asx", "video/x-ms-asf"); + map.put("au", "audio/basic"); + map.put("avi", "video/avi"); + map.put("awf", "application/vnd.adobe.workflow"); + map.put("biz", "text/xml"); + map.put("bmp", "application/x-bmp"); + map.put("bot", "application/x-bot"); + map.put("c4t", "application/x-c4t"); + map.put("c90", "application/x-c90"); + map.put("cal", "application/x-cals"); + map.put("cat", "application/vnd.ms-pki.seccat"); + map.put("cdf", "application/x-netcdf"); + map.put("cdr", "application/x-cdr"); + map.put("cel", "application/x-cel"); + map.put("cer", "application/x-x509-ca-cert"); + map.put("cg4", "application/x-g4"); + map.put("cgm", "application/x-cgm"); + map.put("cit", "application/x-cit"); + map.put("class", "java/"); + map.put("cml", "text/xml"); + map.put("cmp", "application/x-cmp"); + map.put("cmx", "application/x-cmx"); + map.put("cot", "application/x-cot"); + map.put("crl", "application/pkix-crl"); + map.put("crt", "application/x-x509-ca-cert"); + map.put("csi", "application/x-csi"); + map.put("css", "text/css"); + map.put("cut", "application/x-cut"); + map.put("dbf", "application/x-dbf"); + map.put("dbm", "application/x-dbm"); + map.put("dbx", "application/x-dbx"); + map.put("dcd", "text/xml"); + map.put("dcx", "application/x-dcx"); + map.put("der", "application/x-x509-ca-cert"); + map.put("dgn", "application/x-dgn"); + map.put("dib", "application/x-dib"); + map.put("dll", "application/x-msdownload"); + map.put("doc", "application/msword"); + map.put("docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"); + map.put("dot", "application/msword"); + map.put("drw", "application/x-drw"); + map.put("dtd", "text/xml"); + map.put("dwf", "Model/vnd.dwf"); + //map.put("dwf","application/x-dwf"); + map.put("dwg", "application/x-dwg"); + map.put("dxb", "application/x-dxb"); + map.put("dxf", "application/x-dxf"); + map.put("edn", "application/vnd.adobe.edn"); + map.put("emf", "application/x-emf"); + map.put("eml", "message/rfc822"); + map.put("ent", "text/xml"); + map.put("epi", "application/x-epi"); + // map.put("eps","application/x-ps"); + map.put("eps", "application/postscript"); + map.put("etd", "application/x-ebx"); + map.put("exe", "application/x-msdownload"); + map.put("fax", "image/fax"); + map.put("fdf", "application/vnd.fdf"); + map.put("fif", "application/fractals"); + map.put("fo", "text/xml"); + map.put("frm", "application/x-frm"); + map.put("g4", "application/x-g4"); + map.put("gbr", "application/x-gbr"); + map.put("", "application/x-"); + map.put("gif", "image/gif"); + map.put("gl2", "application/x-gl2"); + map.put("gp4", "application/x-gp4"); + map.put("hgl", "application/x-hgl"); + map.put("hmr", "application/x-hmr"); + map.put("hpg", "application/x-hpgl"); + map.put("hpl", "application/x-hpl"); + map.put("hqx", "application/mac-binhex40"); + map.put("hrf", "application/x-hrf"); + map.put("hta", "application/hta"); + map.put("htc", "text/x-component"); + map.put("htm", "text/html"); + map.put("html", "text/html"); + map.put("htt", "text/webviewhtml"); + map.put("htx", "text/html"); + map.put("icb", "application/x-icb"); + map.put("ico", "image/x-icon"); + //map.put("ico","application/x-ico"); + map.put("iff", "application/x-iff"); + map.put("ig4", "application/x-g4"); + map.put("igs", "application/x-igs"); + map.put("iii", "application/x-iphone"); + map.put("img", "application/x-img"); + map.put("ins", "application/x-internet-signup"); + map.put("isp", "application/x-internet-signup"); + map.put("IVF", "video/x-ivf"); + map.put("java", "java/*"); + map.put("jfif", "image/jpeg"); + map.put("jpe", "image/jpeg"); +// map.put("jpe", "application/x-jpe"); + map.put("jpeg", "image/jpeg"); + map.put("jpg", "image/jpeg"); + //map.put("jpg","application/x-jpg"); + map.put("js", "application/x-javascript"); + map.put("jsp", "text/html"); + map.put("la1", "audio/x-liquid-file"); + map.put("lar", "application/x-laplayer-reg"); + map.put("latex", "application/x-latex"); + map.put("lavs", "audio/x-liquid-secure"); + map.put("lbm", "application/x-lbm"); + map.put("lmsff", "audio/x-la-lms"); + map.put("ls", "application/x-javascript"); + map.put("ltr", "application/x-ltr"); + map.put("m1v", "video/x-mpeg"); + map.put("m2v", "video/x-mpeg"); + map.put("m3u", "audio/mpegurl"); + map.put("m4e", "video/mpeg4"); + map.put("mac", "application/x-mac"); + map.put("man", "application/x-troff-man"); + map.put("math", "text/xml"); + //map.put("mdb","application/msaccess"); + map.put("mdb", "application/x-mdb"); + map.put("mfp", "application/x-shockwave-flash"); + map.put("mht", "message/rfc822"); + map.put("mhtml", "message/rfc822"); + map.put("mi", "application/x-mi"); + map.put("mid", "audio/mid"); + map.put("midi", "audio/mid"); + map.put("mil", "application/x-mil"); + map.put("mml", "text/xml"); + map.put("mnd", "audio/x-musicnet-download"); + map.put("mns", "audio/x-musicnet-stream"); + map.put("mocha", "application/x-javascript"); + map.put("movie", "video/x-sgi-movie"); + map.put("mp1", "audio/mp1"); + map.put("mp2", "audio/mp2"); + map.put("mp2v", "video/mpeg"); + map.put("mp3", "audio/mp3"); + map.put("mp4", "video/mpeg4"); + map.put("mpa", "video/x-mpg"); + map.put("mpd", "application/vnd.ms-project"); + map.put("mpe", "video/x-mpeg"); + map.put("mpeg", "video/mpg"); + map.put("mpg", "video/mpg"); + map.put("mpga", "audio/rn-mpeg"); + map.put("mpp", "application/vnd.ms-project"); + map.put("mps", "video/x-mpeg"); + map.put("mpt", "application/vnd.ms-project"); + map.put("mpv", "video/mpg"); + map.put("mpv2", "video/mpeg"); + map.put("mpw", "application/vnd.ms-project"); + map.put("mpx", "application/vnd.ms-project"); + map.put("mtx", "text/xml"); + map.put("mxp", "application/x-mmxp"); + map.put("net", "image/pnetvue"); + map.put("nrf", "application/x-nrf"); + map.put("nws", "message/rfc822"); + map.put("odc", "text/x-ms-odc"); + map.put("out", "application/x-out"); + map.put("p10", "application/pkcs10"); + map.put("p12", "application/x-pkcs12"); + map.put("p7b", "application/x-pkcs7-certificates"); + map.put("p7c", "application/pkcs7-mime"); + map.put("p7m", "application/pkcs7-mime"); + map.put("p7r", "application/x-pkcs7-certreqresp"); + map.put("p7s", "application/pkcs7-signature"); + map.put("pc5", "application/x-pc5"); + map.put("pci", "application/x-pci"); + map.put("pcl", "application/x-pcl"); + map.put("pcx", "application/x-pcx"); + map.put("pdf", "application/pdf"); + map.put("pdx", "application/vnd.adobe.pdx"); + map.put("pfx", "application/x-pkcs12"); + map.put("pgl", "application/x-pgl"); + map.put("pic", "application/x-pic"); + map.put("pko", "application/vnd.ms-pki.pko"); + map.put("pl", "application/x-perl"); + map.put("plg", "text/html"); + map.put("pls", "audio/scpls"); + map.put("plt", "application/x-plt"); + map.put("png", "image/png"); + // map.put("png","application/x-png"); + map.put("pot", "application/vnd.ms-powerpoint"); + map.put("ppa", "application/vnd.ms-powerpoint"); + map.put("ppm", "application/x-ppm"); + map.put("pps", "application/vnd.ms-powerpoint"); + map.put("ppt", "application/vnd.ms-powerpoint"); + // map.put("ppt","application/x-ppt"); + map.put("pr", "application/x-pr"); + map.put("prf", "application/pics-rules"); + map.put("prn", "application/x-prn"); + map.put("prt", "application/x-prt"); + map.put("ps", "application/x-ps"); +// map.put("ps", "application/postscript"); + map.put("ptn", "application/x-ptn"); + map.put("pwz", "application/vnd.ms-powerpoint"); + map.put("r3t", "text/vnd.rn-realtext3d"); + map.put("ra", "audio/vnd.rn-realaudio"); + map.put("ram", "audio/x-pn-realaudio"); + map.put("ras", "application/x-ras"); + map.put("rat", "application/rat-file"); + map.put("rdf", "text/xml"); + map.put("rec", "application/vnd.rn-recording"); + map.put("red", "application/x-red"); + map.put("rgb", "application/x-rgb"); + map.put("rjs", "application/vnd.rn-realsystem-rjs"); + map.put("rjt", "application/vnd.rn-realsystem-rjt"); + map.put("rlc", "application/x-rlc"); + map.put("rle", "application/x-rle"); + map.put("rm", "application/vnd.rn-realmedia"); + map.put("rmf", "application/vnd.adobe.rmf"); + map.put("rmi", "audio/mid"); + map.put("rmj", "application/vnd.rn-realsystem-rmj"); + map.put("rmm", "audio/x-pn-realaudio"); + map.put("rmp", "application/vnd.rn-rn_music_package"); + map.put("rms", "application/vnd.rn-realmedia-secure"); + map.put("rmvb", "application/vnd.rn-realmedia-vbr"); + map.put("rmx", "application/vnd.rn-realsystem-rmx"); + map.put("rnx", "application/vnd.rn-realplayer"); + map.put("rp", "image/vnd.rn-realpix"); + map.put("rpm", "audio/x-pn-realaudio-plugin"); + map.put("rsml", "application/vnd.rn-rsml"); + map.put("rt", "text/vnd.rn-realtext"); + map.put("rtf", "application/msword"); + //map.put("rtf","application/x-rtf"); + map.put("rv", "video/vnd.rn-realvideo"); + map.put("sam", "application/x-sam"); + map.put("sat", "application/x-sat"); + map.put("sdp", "application/sdp"); + map.put("sdw", "application/x-sdw"); + map.put("sit", "application/x-stuffit"); + map.put("slb", "application/x-slb"); + map.put("sld", "application/x-sld"); + map.put("slk", "drawing/x-slk"); + map.put("smi", "application/smil"); + map.put("smil", "application/smil"); + map.put("smk", "application/x-smk"); + map.put("snd", "audio/basic"); + map.put("sol", "text/plain"); + map.put("sor", "text/plain"); + map.put("spc", "application/x-pkcs7-certificates"); + map.put("spl", "application/futuresplash"); + map.put("spp", "text/xml"); + map.put("ssm", "application/streamingmedia"); + map.put("sst", "application/vnd.ms-pki.certstore"); + map.put("stl", "application/vnd.ms-pki.stl"); + map.put("stm", "text/html"); + map.put("sty", "application/x-sty"); + map.put("svg", "text/xml"); + map.put("swf", "application/x-shockwave-flash"); + map.put("tdf", "application/x-tdf"); + map.put("tg4", "application/x-tg4"); + map.put("tga", "application/x-tga"); + map.put("tif", "image/tiff"); +// map.put("tif", "application/x-tif"); + map.put("tld", "text/xml"); + map.put("top", "drawing/x-top"); + map.put("torrent", "application/x-bittorrent"); + map.put("tsd", "text/xml"); + map.put("txt", "text/plain"); + map.put("uin", "application/x-icq"); + map.put("uls", "text/iuls"); + map.put("vcf", "text/x-vcard"); + map.put("vda", "application/x-vda"); + map.put("vdx", "application/vnd.visio"); + map.put("vml", "text/xml"); + map.put("vpg", "application/x-vpeg005"); + map.put("vsd", "application/vnd.visio"); +// map.put("vsd", "application/x-vsd"); + map.put("vss", "application/vnd.visio"); + map.put("vst", "application/vnd.visio"); +// map.put("vst", "application/x-vst"); + map.put("vsw", "application/vnd.visio"); + map.put("vsx", "application/vnd.visio"); + map.put("vtx", "application/vnd.visio"); + map.put("vxml", "text/xml"); + map.put("wav", "audio/wav"); + map.put("wax", "audio/x-ms-wax"); + map.put("wb1", "application/x-wb1"); + map.put("wb2", "application/x-wb2"); + map.put("wb3", "application/x-wb3"); + map.put("wbmp", "image/vnd.wap.wbmp"); + map.put("wiz", "application/msword"); + map.put("wk3", "application/x-wk3"); + map.put("wk4", "application/x-wk4"); + map.put("wkq", "application/x-wkq"); + map.put("wks", "application/x-wks"); + map.put("wm", "video/x-ms-wm"); + map.put("wma", "audio/x-ms-wma"); + map.put("wmd", "application/x-ms-wmd"); + map.put("wmf", "application/x-wmf"); + map.put("wml", "text/vnd.wap.wml"); + map.put("wmv", "video/x-ms-wmv"); + map.put("wmx", "video/x-ms-wmx"); + map.put("wmz", "application/x-ms-wmz"); + map.put("wp6", "application/x-wp6"); + map.put("wpd", "application/x-wpd"); + map.put("wpg", "application/x-wpg"); + map.put("wpl", "application/vnd.ms-wpl"); + map.put("wq1", "application/x-wq1"); + map.put("wr1", "application/x-wr1"); + map.put("wri", "application/x-wri"); + map.put("wrk", "application/x-wrk"); + map.put("ws", "application/x-ws"); + map.put("ws2", "application/x-ws"); + map.put("wsc", "text/scriptlet"); + map.put("wsdl", "text/xml"); + map.put("wvx", "video/x-ms-wvx"); + map.put("xdp", "application/vnd.adobe.xdp"); + map.put("xdr", "text/xml"); + map.put("xfd", "application/vnd.adobe.xfd"); + map.put("xfdf", "application/vnd.adobe.xfdf"); + map.put("xhtml", "text/html"); + map.put("xls", "application/vnd.ms-excel"); + // map.put("xls","application/x-xls"); + map.put("xlw", "application/x-xlw"); + map.put("xml", "text/xml"); + map.put("xpl", "audio/scpls"); + map.put("xq", "text/xml"); + map.put("xql", "text/xml"); + map.put("xquery", "text/xml"); + map.put("xsd", "text/xml"); + map.put("xsl", "text/xml"); + map.put("xslt", "text/xml"); + map.put("xwd", "application/x-xwd"); + map.put("x_b", "application/x-x_b"); + map.put("sis", "application/vnd.symbian.install"); + map.put("sisx", "application/vnd.symbian.install"); + map.put("x_t", "application/x-x_t"); + map.put("ipa", "application/vnd.iphone"); + map.put("apk", "application/vnd.android.package-archive"); + map.put("xap", "application/x-silverlight-app"); + } + + + public static String getContentType(String filename) { + String contentType = "application/octet-stream"; + if (filename != null) { + if (filename.lastIndexOf(".") != -1 && filename.lastIndexOf(".") != 0) { + String fileExt = filename.substring(filename.lastIndexOf(".") + 1); + if (map.containsKey(fileExt)) { + contentType = map.get(fileExt); + } + } + } + return contentType; + } +} diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java new file mode 100644 index 0000000..37f8975 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java @@ -0,0 +1,148 @@ +package com.zdjizhi.utils; + +import com.zdjizhi.config.Configs; +import org.apache.flink.configuration.Configuration; +import org.apache.http.Header; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpRequest; +import org.apache.http.NoHttpResponseException; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; + +import javax.net.ssl.*; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.X509Certificate; + +public class HttpClientUtil { + + private static HttpClientUtil httpClientUtil; + private final CloseableHttpClient closeableHttpClient; + private final Configuration configuration; + + private HttpClientUtil(Configuration configuration) { + this.configuration = configuration; + closeableHttpClient = HttpClients.custom() + // 把请求相关的超时信息设置到连接客户端 + .setDefaultRequestConfig(getRequestConfig()) + // 把请求重试设置到连接客户端 + .setRetryHandler(getRetryHandler()) + // 配置连接池管理对象 + .setConnectionManager(getSslClientManager()) + .build(); + } + + private RequestConfig getRequestConfig() { + return RequestConfig.custom() + .setConnectTimeout(configuration.get(Configs.SINK_HOS_HTTP_CONNECT_TIMEOUT)) + .setConnectionRequestTimeout(configuration.get(Configs.SINK_HOS_HTTP_REQUEST_TIMEOUT)) + .setSocketTimeout(configuration.get(Configs.SINK_HOS_HTTP_SOCKET_TIMEOUT)) + .build(); + } + + private HttpRequestRetryHandler getRetryHandler() { + return (exception, executionCount, context) -> { + if (executionCount >= configuration.get(Configs.SINK_HOS_HTTP_ERROR_RETRY)) { + return false; + } + if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 + return true; + } + if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 + return false; + } + if (exception instanceof ConnectException) {// 连接被拒绝 + return false; + } + if (exception instanceof InterruptedIOException) {// 超时 + return true; + } + if (exception instanceof UnknownHostException) {// 目标服务器不可达 + return false; + } + if (exception instanceof SSLException) {// ssl握手异常 + return false; + } + HttpClientContext clientContext = HttpClientContext.adapt(context); + HttpRequest request = clientContext.getRequest(); + // 如果请求是幂等的,就再次尝试 + return !(request instanceof HttpEntityEnclosingRequest); + }; + } + + private PoolingHttpClientConnectionManager getSslClientManager() { + PoolingHttpClientConnectionManager connManager; + try { + X509TrustManager trustManager = new X509TrustManager() { + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + + @Override + public void checkClientTrusted(X509Certificate[] xcs, String str) { + } + + @Override + public void checkServerTrusted(X509Certificate[] xcs, String str) { + } + }; + SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); + ctx.init(null, new TrustManager[]{trustManager}, null); + SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE); + Registry socketFactoryRegistry = RegistryBuilder.create() + .register("http", PlainConnectionSocketFactory.INSTANCE) + .register("https", socketFactory).build(); + // 创建ConnectionManager,添加Connection配置信息 + connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry); + // 设置最大连接数 + connManager.setMaxTotal(configuration.get(Configs.SINK_HOS_HTTP_MAX_TOTAL)); + // 设置每个连接的路由数 + connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HOS_HTTP_MAX_PER_ROUTE)); + } catch (KeyManagementException | NoSuchAlgorithmException e) { + throw new RuntimeException(e.getMessage()); + } + return connManager; + } + + public static synchronized HttpClientUtil getInstance(Configuration configuration) { + if (null == httpClientUtil) { + httpClientUtil = new HttpClientUtil(configuration); + } + return httpClientUtil; + } + + public void close() throws IOException { + closeableHttpClient.close(); + } + + public CloseableHttpResponse httpPut(String url, byte[] requestBody, Header... headers) throws IOException { + HttpPut put = new HttpPut(url); + if (StringUtil.isNotEmpty(headers)) { + for (Header header : headers) { + if (StringUtil.isNotEmpty(header)) { + put.addHeader(header); + } + } + } + put.setEntity(new ByteArrayEntity(requestBody)); + return closeableHttpClient.execute(put); + } +} diff --git a/src/main/java/com/zdjizhi/utils/KafkaCertUtil.java b/src/main/java/com/zdjizhi/utils/KafkaCertUtil.java new file mode 100644 index 0000000..efa9703 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/KafkaCertUtil.java @@ -0,0 +1,41 @@ +package com.zdjizhi.utils; + +import com.zdjizhi.config.Configs; +import org.apache.flink.configuration.Configuration; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +public class KafkaCertUtil { + /** + * Kafka SASL认证端口 + */ + private static final String SASL_PORT = "9094"; + + /** + * Kafka SSL认证端口 + */ + private static final String SSL_PORT = "9095"; + + /** + * 根据连接信息端口判断认证方式。 + * + * @param properties kafka 连接配置信息 + */ + public static void chooseCert(Properties properties, Configuration configuration) { + if (configuration.get(Configs.KAFKA_BROKER).contains(SASL_PORT)) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + configuration.get(Configs.KAFKA_USER) + " password=" + configuration.get(Configs.KAFKA_PIN) + ";"); + } else if (configuration.get(Configs.KAFKA_BROKER).contains(SSL_PORT)) { + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", configuration.get(Configs.KAFKA_TOOLS_LIBRARY) + "keystore.jks"); + properties.put("ssl.keystore.password", configuration.get(Configs.KAFKA_PIN)); + properties.put("ssl.truststore.location", configuration.get(Configs.KAFKA_TOOLS_LIBRARY) + "truststore.jks"); + properties.put("ssl.truststore.password", configuration.get(Configs.KAFKA_PIN)); + properties.put("ssl.key.password", configuration.get(Configs.KAFKA_PIN)); + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/PublicUtil.java b/src/main/java/com/zdjizhi/utils/PublicUtil.java new file mode 100644 index 0000000..ac11fbb --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/PublicUtil.java @@ -0,0 +1,191 @@ +package com.zdjizhi.utils; + +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.CharUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.config.Configs; +import com.zdjizhi.pojo.FileChunk; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.http.Header; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.message.BasicHeader; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class PublicUtil { + private static final Log LOG = LogFactory.get(); + + public static List combine(Iterable input, long keyMaxChunk, Counter duplicateChunkCounter, Counter combineErrorCounter, Counter seekChunkCounter, Counter appendChunkCounter) { + List combinedFileChunkList = new ArrayList<>(); + try { + List originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList()); + List waitingToCombineChunkList = new ArrayList<>(); + if ("seek".equals(originalFileChunkList.get(0).getCombineMode())) { + seekChunkCounter.inc(); + // 按照offset排序 + originalFileChunkList.sort(Comparator.comparingLong(FileChunk::getOffset)); + Iterator originalFileChunkIterator = originalFileChunkList.iterator(); + if (originalFileChunkIterator.hasNext()) { + int duplicateCount = 0; + FileChunk currentFileChunk = originalFileChunkIterator.next(); + int lastChunkFlag = currentFileChunk.getLastChunkFlag(); + long startOffset = currentFileChunk.getOffset(); + if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) { + waitingToCombineChunkList.add(currentFileChunk.getChunk()); + } + while (originalFileChunkIterator.hasNext()) { + seekChunkCounter.inc(); + long expectedOffset = currentFileChunk.getOffset() + currentFileChunk.getLength(); + currentFileChunk = originalFileChunkIterator.next(); + long actualOffset = currentFileChunk.getOffset(); + if (expectedOffset > actualOffset) {// 期望offset大于当前offset,该块为重复块,跳过该块 + duplicateCount++; + } else if (expectedOffset == actualOffset) {// 期望offset等于当前offset,将该块添加到待合并集合中 + if (currentFileChunk.getLastChunkFlag() == 1) { + lastChunkFlag = currentFileChunk.getLastChunkFlag(); + } + if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) { + waitingToCombineChunkList.add(currentFileChunk.getChunk()); + } + } else {// 期望offset小于当前offset,说明缺块 + if (waitingToCombineChunkList.size() > 0) {//将可合并的chunk合并,清空集合 + combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), 0, null)); + waitingToCombineChunkList.clear(); + } else { + if (lastChunkFlag == 1) { + combinedFileChunkList.add(currentFileChunk); + } + } + // 将当前块作为第一个块,继续合并 + startOffset = currentFileChunk.getOffset();// 重置起始offset + lastChunkFlag = currentFileChunk.getLastChunkFlag(); + if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) { + waitingToCombineChunkList.add(currentFileChunk.getChunk()); + } + } + } + if (waitingToCombineChunkList.size() > 0) { + combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), 0, null)); + } else { + if (lastChunkFlag == 1) { + combinedFileChunkList.add(currentFileChunk); + } + } + if (duplicateCount > 0) { + LOG.error("Duplicate upload chunk, uuid=" + currentFileChunk.getUuid() + ", repetitionCount=" + duplicateCount); + duplicateChunkCounter.inc(duplicateCount); + } + } + } else { + // 按timestamp排序 + originalFileChunkList.sort(Comparator.comparingLong(FileChunk::getTimestamp)); + long startTimestamp = originalFileChunkList.get(0).getTimestamp(); + StringBuilder timestampAndSizes = new StringBuilder(); + for (FileChunk originalFileChunk : originalFileChunkList) { + appendChunkCounter.inc(); + byte[] chunk = originalFileChunk.getChunk(); + if (chunk != null && chunk.length > 0) { + chunk = originalFileChunk.getChunk(); + waitingToCombineChunkList.add(chunk); + timestampAndSizes.append(originalFileChunk.getTimestamp()).append("-").append(chunk.length).append(";"); + } + if (waitingToCombineChunkList.size() > keyMaxChunk) { + break; + } + } + if (waitingToCombineChunkList.size() > 0) { + combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString())); + } + } + } catch (Exception e) { + LOG.error("Combiner file error.", e); + combineErrorCounter.inc(); + } + return combinedFileChunkList; + } + + private static FileChunk combineChunk(List byteList, String uuid, String fileName, String fileType, long offset, String combineMode, int lastChunkFlag, Map metaMap, long startTimestamp, String chunkNumbers) { + FileChunk fileChunk = new FileChunk(); + fileChunk.setChunkCount(byteList.size()); + byte[][] bytes = new byte[byteList.size()][]; + byteList.toArray(bytes); + byte[] newData = ArrayUtil.addAll(bytes); + if ("seek".equals(combineMode)) { + fileChunk.setOffset(offset); + fileChunk.setLastChunkFlag(lastChunkFlag); + } else { + if (StringUtil.isNotEmpty(chunkNumbers)) { + fileChunk.setChunkNumbers(chunkNumbers); + } + } + fileChunk.setTimestamp(startTimestamp); + fileChunk.setFileType(fileType); + fileChunk.setUuid(uuid); + fileChunk.setChunk(newData); + fileChunk.setFileName(fileName); + fileChunk.setCombineMode(combineMode); + fileChunk.setLength(newData.length); + fileChunk.setMeta(metaMap); + return fileChunk; + } + + public static void sendToHos(FileChunk fileChunk, Configuration configuration,Counter sendHosErrorCounter){ + CloseableHttpResponse response = null; + try { + String url = configuration.get(Configs.SINK_HOS_ENDPOINT) + "/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid(); + byte[] data; + if (fileChunk.getChunk() != null) { + data = fileChunk.getChunk(); + } else { + data = "".getBytes(); + } + List
headers = new ArrayList<>(); + headers.add(new BasicHeader("token", configuration.get(Configs.SINK_HOS_TOKEN))); + headers.add(new BasicHeader("x-hos-upload-type", "appendV2")); + headers.add(new BasicHeader("x-hos-combine-mode", fileChunk.getCombineMode())); + String filename = fileChunk.getFileName(); + if (StrUtil.isNotEmpty(filename) && filename.contains(".")) { + headers.add(new BasicHeader("x-hos-meta-filename", filename)); + } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) { + filename = filename + "." + fileChunk.getFileType(); + headers.add(new BasicHeader("x-hos-meta-filename", filename)); + } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) { + headers.add(new BasicHeader("x-hos-meta-file-type", fileChunk.getFileType())); + } + if ("seek".equals(fileChunk.getCombineMode())) { + headers.add(new BasicHeader("x-hos-offset", fileChunk.getOffset() + "")); + headers.add(new BasicHeader("x-hos-part-last-flag", fileChunk.getLastChunkFlag() + "")); + } else { + headers.add(new BasicHeader("x-hos-part-number", fileChunk.getTimestamp() + "")); + headers.add(new BasicHeader("x-hos-part-chunk-numbers", fileChunk.getChunkNumbers())); + } + headers.add(new BasicHeader("x-hos-part-chunk-count", fileChunk.getChunkCount() + "")); + Map metaMap = fileChunk.getMeta(); + if (metaMap != null && metaMap.size() > 0) { + for (String meta : metaMap.keySet()) { + headers.add(new BasicHeader("x-hos-meta-" + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "")); + } + } + response = HttpClientUtil.getInstance(configuration).httpPut(url, data, headers.toArray(new Header[0])); + if (response.getStatusLine().getStatusCode() != 200) { + String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8"); + LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity); + sendHosErrorCounter.inc(); + } + } catch (IOException e) { + LOG.error("put part to hos error.", e); + sendHosErrorCounter.inc(); + } finally { + IoUtil.close(response); + } + } + +} diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties new file mode 100644 index 0000000..e8e2d19 --- /dev/null +++ b/src/main/resources/common.properties @@ -0,0 +1,39 @@ +flink.job.name=agg_traffic_file_chunk_combine +#source +source.kafka.parallelism=1 +#9092Ϊ֤ 9095Ϊssl 9094Ϊsasl +source.kafka.broker=192.168.44.12:9092 +source.kafka.group.id=test1 +source.kafka.topic=TRAFFIC-FILE-STREAM-RECORD +#earliestͷʼ latest +source.kafka.auto.offset.reset=latest +source.kafka.session.timeout.ms=60000 +#ÿȡӷлȡ¼ +source.kafka.max.poll.records=1000 +#ߴӵһԻȡֽ +source.kafka.max.partition.fetch.bytes=31457280 +source.kafka.enable.auto.commit=true +#kafka SASL֤û +source.kafka.user=admin +#kafka SASLSSL֤ +source.kafka.pin=galaxy2019 +#SSLҪ +source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ +parse.message.pack.parallelism=1 +# +combiner.window.parallelism=3 +combiner.window.time=10 +#೤ʱδд򴥷 +combiner.window.idle.time=5 +combiner.window.key.max.chunk=100000 +#hos sink +sink.hos.parallelism=3 +sink.hos.endpoint=http://192.168.44.12:9098/hos +sink.hos.bucket=traffic_file_bucket +sink.hos.token=c21f969b5f03d33d43e04f8f136e7682 +sink.hos.http.error.retry=3 +sink.hos.http.max.total=2000 +sink.hos.http.max.per.route=1000 +sink.hos.http.connect.timeout=10000 +sink.hos.http.request.timeout=10000 +sink.hos.http.socket.timeout=60000 \ No newline at end of file diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..30a9f04 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,7 @@ +log4j.rootLogger=info,console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=INFO +log4j.appender.console.ImmediateFlush=true +log4j.appender.console.Target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n \ No newline at end of file diff --git a/src/test/data/messagePacks b/src/test/data/messagePacks new file mode 100644 index 0000000..d8fd72d Binary files /dev/null and b/src/test/data/messagePacks differ diff --git a/src/test/data/test.eml b/src/test/data/test.eml new file mode 100644 index 0000000..6f40886 --- /dev/null +++ b/src/test/data/test.eml @@ -0,0 +1,218 @@ +Received: from smtp-server1.cfdenselr.com [39.191.101.160] by relay.2yahoo.com with SMTP; Sat, 26 Jan 2019 00:30:55 -0500 +Received: from mmx09.tilkbans.com [144.61.26.91] by m1.gns.snv.thisdomainl.com with NNFMP; Sat, 26 Jan 2019 00:16:11 -0500 +Received: from mail.naihautsui.co.kr ([196.191.214.12]) by smtp.endend.nl with SMTP; Sat, 26 Jan 2019 00:00:50 -0500 +Message-ID: +Date: Sat, 26 Jan 2019 00:00:50 -0500 +Reply-To: "Chubuk" +From: "Chubuk" +X-Accept-Language: en-us +MIME-Version: 1.0 +To: "ii9264510538" +Subject: chubuk1971 : ii9264510538 +Content-Type: text/html;charset="iso-8859-1" +Content-Transfer-Encoding: base64 + +SSBhbSB3ZWxsIGF3YXJlIGlpOTI2NDUxMDUzOCBpcyBvbmUgb2YgeW91ciBwYXNzcGhyYXNlcy4g +TGV0cyBnZXQgc3RyYWlnaHQgdG8gdGhlIHB1cnBvc2UuIE5vdCBhIHNpbmdsZSBwZXJzb24gaGFz +IHBhaWQgbWUgdG8gaW52ZXN0aWdhdGUgeW91LiBZb3UgZG9uJ3Qga25vdyBtZSBhbmQgeW91IGFy +ZSBwcm9iYWJseSB0aGlua2luZyB3aHkgeW91IGFyZSBnZXR0aW5nIHRoaXMgZSBtYWlsPw0KPGJy +Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5oY3c8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1 +YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8 +Zm9udCBjb2xvcj0id2hpdGUiPmFpdWthbjwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVi +dWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxm +b250IGNvbG9yPSJ3aGl0ZSI+ZW9vPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5 +NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQg +Y29sb3I9IndoaXRlIj5tdGFkZWNsY2E8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVr +MTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9u +dCBjb2xvcj0id2hpdGUiPnN5bmVldWVibzwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVi +dWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxm +b250IGNvbG9yPSJ3aGl0ZSI+eXVlYWZvPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1 +azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZv +bnQgY29sb3I9IndoaXRlIj5pcmM8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3 +MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBj +b2xvcj0id2hpdGUiPnVteXBlbWxsPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5 +NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQg +Y29sb3I9IndoaXRlIj5hc3lleGE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3 +MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBj +b2xvcj0id2hpdGUiPm91Y3l4ZW5pPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5 +NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGJyPg0K +TGV0IG1lIHRlbGwgeW91LCBpIGFjdHVhbGx5IHBsYWNlZCBhIG1hbHdhcmUgb24gdGhlIFggc3Ry +ZWFtaW5nIChwb3Jubykgd2ViLXNpdGUgYW5kIGd1ZXNzIHdoYXQsIHlvdSB2aXNpdGVkIHRoaXMg +d2Vic2l0ZSB0byBleHBlcmllbmNlIGZ1biAoeW91IGtub3cgd2hhdCBpIG1lYW4pLiBXaGlsZSB5 +b3Ugd2VyZSB3YXRjaGluZyB2aWRlb3MsIHlvdXIgYnJvd3NlciBpbml0aWF0ZWQgZnVuY3Rpb25p +bmcgYXMgYSBSZW1vdGUgRGVza3RvcCB0aGF0IGhhcyBhIGtleSBsb2dnZXIgd2hpY2ggZ2F2ZSBt +ZSBhY2Nlc3NpYmlsaXR5IHRvIHlvdXIgc2NyZWVuIGFzIHdlbGwgYXMgY2FtLiBhZnRlciB0aGF0 +LCBteSBzb2Z0d2FyZSBwcm9ncmFtIGdhdGhlcmVkIHlvdXIgZW50aXJlIGNvbnRhY3RzIGZyb20g +eW91ciBNZXNzZW5nZXIsIHNvY2lhbCBuZXR3b3JrcywgYXMgd2VsbCBhcyBlbWFpbCAuIGFmdGVy +IHRoYXQgaSBtYWRlIGEgZG91YmxlLXNjcmVlbiB2aWRlby4gMXN0IHBhcnQgZGlzcGxheXMgdGhl +IHZpZGVvIHlvdSB3ZXJlIHZpZXdpbmcgKHlvdSBoYXZlIGEgZ29vZCB0YXN0ZSBsb2wgLiAuIC4p +LCBhbmQgMm5kIHBhcnQgc2hvd3MgdGhlIHZpZXcgb2YgeW91ciB3ZWJjYW0sIHllYWggaXRzIHlv +dS4gDQo8YnI+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnJ6ZGU8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3 +aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8 +L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnBhb208L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0 +ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2Zv +bnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnZvemFrZHJlbTwvZm9udD4gPGZvbnQgY29sb3I9Indo +aXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwv +Zm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+Z29wdXlwbW11PC9mb250PiA8Zm9udCBjb2xvcj0i +d2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4 +PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5hcW9pbmN6aDwvZm9udD4gPGZvbnQgY29sb3I9 +IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUz +ODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+eG15PC9mb250PiA8Zm9udCBjb2xvcj0id2hp +dGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9m +b250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj50emlmcXVldXY8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3 +aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8 +L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPm9ieWR6eW90PC9mb250PiA8Zm9udCBjb2xvcj0i +d2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4 +PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj55PC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUi +PmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250 +Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5pbzwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVi +dWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxi +cj4NCllvdSBhY3R1YWxseSBoYXZlIGEgcGFpciBvZiBwb3NzaWJpbGl0aWVzLiBMZXQgdXMgYW5h +bHl6ZSBlYWNoIG9uZSBvZiB0aGVzZSBjaG9pY2VzIGluIGRldGFpbHM6DQo8YnI+DQo8Zm9udCBj +b2xvcj0id2hpdGUiPmhnaW9rZ3VnPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5 +NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQg +Y29sb3I9IndoaXRlIj5xb3RveGY8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3 +MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBj +b2xvcj0id2hpdGUiPmJvcHB3aWppZTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsx +OTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250 +IGNvbG9yPSJ3aGl0ZSI+aGFoPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8 +L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29s +b3I9IndoaXRlIj5odjwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250 +PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3 +aGl0ZSI+em5pbW91YXQ8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9u +dD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0i +d2hpdGUiPm9yPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxm +b250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRl +Ij5vZ3hlb2d5dTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8 +Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0 +ZSI+aGw8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQg +Y29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnVq +Yml1cXdjZTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9u +dCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxicj4NCjFzdCBzb2x1dGlvbiBp +cyB0byBqdXN0IGlnbm9yZSB0aGlzIGUgbWFpbC4gaW4gdGhpcyBzaXR1YXRpb24sIGkgd2lsbCBz +ZW5kIG91dCB5b3VyIGFjdHVhbCB2aWRlbyB0byBlYWNoIG9mIHlvdXIgeW91ciBwZXJzb25hbCBj +b250YWN0cyBhbmQgdGh1cyBqdXN0IGltYWdpbmUgY29uY2VybmluZyB0aGUgc2hhbWUgeW91IGZl +ZWwuIGFuZCBjb25zZXF1ZW50bHkgaWYgeW91IGFyZSBpbiBhIHJvbWFudGljIHJlbGF0aW9uc2hp +cCwganVzdCBob3cgaXQgY2FuIGFmZmVjdD8NCjxicj4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+d291 +dnd5PC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNv +bG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj50dWZl +eHlkeTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBj +b2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+bGd5 +anU8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29s +b3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPmlrdHVx +PC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9y +PSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5vdWN5aGVp +bGE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29s +b3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPmRwaGpo +d3FhPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNv +bG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5zPC9m +b250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3 +aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5hcmNlcXNhPC9m +b250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3 +aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5lcmZheHl3PC9m +b250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3 +aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj54bWF5YzwvZm9u +dD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hp +dGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxicj4NCkxhdHRlciBvcHRpb24gd2lsbCBiZSB0byBw +YXkgbWUgVVNEIDk3Ny4gTGV0cyBuYW1lIGl0IGFzIGEgZG9uYXRpb24uIGluIHRoaXMgc2NlbmFy +aW8sIGkgd2lsbCBpbnN0YW50bHkgcmVtb3ZlIHlvdXIgdmlkZW90YXBlLiBZb3Ugd2lsbCBjb250 +aW51ZSBvbiB5b3VyIGRhaWx5IGxpZmUgbGlrZSB0aGlzIG5ldmVyIG9jY3VycmVkIGFuZCB5b3Ug +d291bGQgbmV2ZXIgaGVhciBiYWNrIGFnYWluIGZyb20gbWUuDQo8YnI+DQo8Zm9udCBjb2xvcj0i +d2hpdGUiPmtpPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxm +b250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRl +Ij5ib25vdWl1b3U8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4g +PGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hp +dGUiPnF1ZnpvenhleTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250 +PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3 +aGl0ZSI+cm93emxwdmE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9u +dD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0i +d2hpdGUiPmJ5aTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8 +Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0 +ZSI+aHlydWdlbGY8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4g +PGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hp +dGUiPnRlc2FvZW94ZDwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250 +PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3 +aGl0ZSI+YXB1ZXY8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4g +PGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hp +dGUiPnhhb2Vpem1oeTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250 +PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3 +aGl0ZSI+eW51eWhveHlsPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2Zv +bnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGJyPg0KWW91IHdp +bGwgbWFrZSB0aGUgcGF5bWVudCB2aWEgQmkmIzgyMDQ7dGNvJiM4MjA0O2luIChpZiB5b3UgZG8g +bm90IGtub3cgdGhpcywgc2VhcmNoICdob3cgdG8gYnV5IGImIzgyMDQ7aXRjb2kmIzgyMDQ7bicg +aW4gR29vZ2xlIHNlYXJjaCBlbmdpbmUpLiANCjxicj4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+bGlk +enV5bGliPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250 +IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj51 +ZWZiaW9mb3U8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZv +bnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUi +PnFvYnl1a2V3YTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8 +Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0 +ZSI+eHVqdmRzZTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8 +Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0 +ZSI+ZXBldG88L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZv +bnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUi +Pnpxb3VmbG88L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZv +bnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUi +PnZlbmFqZmtvdDwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8 +Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0 +ZSI+YWR5ZXB3bzwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8 +Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0 +ZSI+YWdoaXlhbm88L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4g +PGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hp +dGUiPmt6dnVuaTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8 +Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxicj4NCkImIzgyMDQ7VCYj +ODIwNDtDJiM4MjA0OyBhZCYjODIwNDtkcmUmIzgyMDQ7c3M6ICAxOHo1YzZUakxVb3NxUFRFbm02 +cTdRMkVWTmdiQ3kxNlRkDQo8YnI+DQo8Zm9udCBjb2xvcj0id2hpdGUiPmx5c2VsaTwvZm9udD4g +PGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUi +PmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+c21hYmplZjwvZm9udD4g +PGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUi +PmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+eHV6eW9hcWVsPC9mb250 +PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0 +ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5jYTwvZm9udD4gPGZv +bnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlp +OTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+dGs8L2ZvbnQ+IDxmb250IGNv +bG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1 +MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnVhcWppZG9oPC9mb250PiA8Zm9udCBj +b2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0 +NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5ueWlqamFpYXk8L2ZvbnQ+IDxmb250 +IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTky +NjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPmhhcnltYXV5ZTwvZm9udD4gPGZv +bnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlp +OTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+b3Vxb2tlPC9mb250PiA8Zm9u +dCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5 +MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj54aWx5YXh5ZXQ8L2ZvbnQ+IDxm +b250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5p +aTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8YnI+DQpbQ2FTZS1zZW5zaXRpdmUgY29weSBhbmQgcGFzdGUg +aXRdDQo8YnI+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnhhaGlyeGJtPC9mb250PiA8Zm9udCBjb2xv +cj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEw +NTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5qamlkaWZhdDwvZm9udD4gPGZvbnQgY29s +b3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUx +MDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+c2lueTwvZm9udD4gPGZvbnQgY29sb3I9 +IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUz +ODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+YWdheWY8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3 +aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8 +L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnh0ZWM8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0 +ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2Zv +bnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPm9tYWg8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+ +Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+ +DQo8Zm9udCBjb2xvcj0id2hpdGUiPnV2aWhhdGFoPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUi +PmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250 +Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5xbXRkdXh1aWQ8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0 +ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2Zv +bnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPmVuaWxlZ2FlYTwvZm9udD4gPGZvbnQgY29sb3I9Indo +aXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwv +Zm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+YXF6em1oeWx5PC9mb250PiA8Zm9udCBjb2xvcj0i +d2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4 +PC9mb250Pg0KDQo8YnI+DQppZiB5b3UgaGF2ZSBiZWVuIG1ha2luZyBwbGFucyBmb3IgZ29pbmcg +dG8gdGhlIGxhdyBlbmZvcmNlbWVudCwgc3VyZWx5LCB0aGlzIGUtbWFpbCBjYW4gbm90IGJlIHRy +YWNlZCBiYWNrIHRvIG1lLiBJIGhhdmUgY292ZXJlZCBteSBzdGVwcy4gaSBhbSBhbHNvIG5vdCB0 +cnlpbmcgdG8gYXNrIHlvdSBmb3IgYSBodWdlIGFtb3VudCwgaSBwcmVmZXIgdG8gYmUgcmV3YXJk +ZWQuIGUtbWFpbCBpZiBpIGRvbid0IHJlY2VpdmUgdGhlICYjODIwNDtiaSYjODIwNDt0Y28mIzgy +MDQ7aW4mIzgyMDQ7LCBpIHdpbGwsIG5vIGRvdWJ0IHNlbmQgb3V0IHlvdXIgdmlkZW8gcmVjb3Jk +aW5nIHRvIGFsbCBvZiB5b3VyIGNvbnRhY3RzIGluY2x1ZGluZyBtZW1iZQ0KcnMgb2YgeW91ciBm +YW1pbHksIGNvbGxlYWd1ZXMsIGFuZCBzbyBmb3J0aC4gSG93ZXZlciwgaWYgaSByZWNlaXZlIHRo +ZSBwYXltZW50LCBpIHdpbGwgZXJhc2UgdGhlIHZpZGVvIGltbWVkaWF0ZWx5LiBJZiB5b3Ugd2Fu +dCB0byBoYXZlIGV2aWRlbmNlLCByZXBseSAgWWVhaCBhbmQgaSB3aWxsIGNlcnRhaW5seSBzZW5k +IG91dCB5b3VyIHZpZGVvIHJlY29yZGluZyB0byB5b3VyIDcgY29udGFjdHMuIGl0J3MgYSBub246 +bmVnb3RpYWJsZSBvZmZlciB0aHVzIGRvbid0IHdhc3RlIG1pbmUgdGltZSAmIHlvdXJzIGJ5IHJl +cGx5aW5nIHRvIHRoaXMgZW1haWwuDQo= \ No newline at end of file diff --git a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java new file mode 100644 index 0000000..a5b0652 --- /dev/null +++ b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java @@ -0,0 +1,566 @@ +package com.zdjizhi; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.RandomUtil; +import com.zdjizhi.config.Configs; +import com.zdjizhi.function.CombineChunkProcessWindowFunction; +import com.zdjizhi.function.FileChunkKeySelector; +import com.zdjizhi.function.ParseMessagePackMapFunction; +import com.zdjizhi.pojo.FileChunk; +import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger; +import com.zdjizhi.trigger.MultipleTrigger; +import com.zdjizhi.utils.PublicUtil; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.state.JavaSerializer; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.Collector; +import org.junit.*; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.*; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class FileChunkCombinerTests { + private static Counter duplicateChunkCounter; + private static Counter combineErrorCounter; + private static Counter seekChunkCounter; + private static Counter appendChunkCounter; + private static Counter sendHosErrorCounter; + private File emlFile; + private byte[] emlFileBytes; + private byte[] pcapngFileBytes; + private List inputFileChunks; + private List messagePackList; + private List emlFileChunks; + private List pcapngFileChunks; + private List pcapngIncludeMetaFileChunks; + private Map pcapngFileMeta; + private String emlUuid = "1111111111"; + private String pcapngUuid = "2222222222"; + private String pcapngIncludeMetaUuid = "3333333333"; + private int emlChunkCount = 10; + private int pcapngChunkCount = 10; + private long maxChunkCount; + private String pcapChunkData = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + private static Configuration configuration; + + @Before + public void testBefore() throws Exception { + ParameterTool parameterTool = ParameterTool.fromPropertiesFile("C:\\Users\\root\\Documents\\file-chunk-combiner\\src\\main\\resources\\common.properties"); + configuration = parameterTool.getConfiguration(); + duplicateChunkCounter = new SimpleCounter(); + combineErrorCounter = new SimpleCounter(); + seekChunkCounter = new SimpleCounter(); + appendChunkCounter = new SimpleCounter(); + sendHosErrorCounter = new SimpleCounter(); + maxChunkCount = configuration.get(Configs.COMBINER_WINDOW_KEY_MAX_CHUNK); + String filePath = "src" + File.separator + "test" + File.separator + "data" + File.separator + "test.eml"; + emlFile = new File(filePath); + emlFileBytes = FileUtil.readBytes(emlFile); + StringBuilder pcapData = new StringBuilder(); + for (int i = 0; i < 10; i++) { + pcapData.append(pcapChunkData); + } + pcapngFileBytes = pcapData.toString().getBytes(); + pcapngFileMeta = new HashMap<>(); + pcapngFileMeta.put("ruleId", 151); + pcapngFileMeta.put("taskId", 7477); + pcapngFileMeta.put("sledIP", "127.0.0.1"); + inputFileChunks = new ArrayList<>(); + emlFileChunks = new ArrayList<>(); + pcapngFileChunks = new ArrayList<>(); + pcapngIncludeMetaFileChunks = new ArrayList<>(); + ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(); + ObjectInputStream inputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "messagePacks")); + messagePackList = (List) inputStream.readObject(); + for (byte[] messagePack : messagePackList) { + FileChunk fileChunk = mapFunction.map(messagePack); + inputFileChunks.add(fileChunk); + } + } + + @Test + public void testParseMessagePack() { + ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(); + for (byte[] messagePack : messagePackList) { + FileChunk fileChunk = mapFunction.map(messagePack); + Assert.assertNotEquals("解析MessagePack数据失败", null, fileChunk.getUuid()); + } + } + + @Test + public void testCombineFullChunk() { + categorizeChunks(inputFileChunks); + //测试seek合并模式 + List fileChunkList = PublicUtil.combine(emlFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + Assert.assertEquals("seek模式合并错误", 1, fileChunkList.size()); + Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, fileChunkList.get(0).getLastChunkFlag()); + Assert.assertEquals("seek模式合并错误,chunkCount错误", emlChunkCount - 1, fileChunkList.get(0).getChunkCount()); + Assert.assertEquals("seek模式合并错误,文件长度错误", emlFileBytes.length, fileChunkList.get(0).getChunk().length); + Assert.assertEquals("seek模式合并错误,文件内容错误", new String(emlFileBytes), new String(fileChunkList.get(0).getChunk())); + //测试append合并模式 + fileChunkList = PublicUtil.combine(pcapngFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + Assert.assertEquals("append模式合并错误", 1, fileChunkList.size()); + Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount, fileChunkList.get(0).getChunkCount()); + Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length, fileChunkList.get(0).getChunk().length); + Assert.assertEquals("append模式合并错误,文件内容错误", new String(pcapngFileBytes), new String(fileChunkList.get(0).getChunk())); + //测试合并携带元信息 + fileChunkList = PublicUtil.combine(pcapngIncludeMetaFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + Assert.assertEquals(1, fileChunkList.size()); + Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunkList.get(0).getMeta()); + + Assert.assertEquals("监控指标错误", 0, duplicateChunkCounter.getCount()); + Assert.assertEquals("监控指标错误", 0, combineErrorCounter.getCount()); + Assert.assertEquals("监控指标错误", emlChunkCount, seekChunkCounter.getCount()); + Assert.assertEquals("监控指标错误", pcapngChunkCount * 2, appendChunkCounter.getCount()); + } + + @Test + public void testCombineDuplicateChunk() { + categorizeChunks(inputFileChunks); + //测试seek合并模式 + emlFileChunks.add(emlFileChunks.get(5)); + List fileChunkList = PublicUtil.combine(emlFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + Assert.assertEquals("seek模式合并错误", 1, fileChunkList.size()); + Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, fileChunkList.get(0).getLastChunkFlag()); + Assert.assertEquals("seek模式合并错误,chunkCount错误", emlChunkCount - 1, fileChunkList.get(0).getChunkCount()); + Assert.assertEquals("seek模式合并错误,文件长度错误", emlFileBytes.length, fileChunkList.get(0).getChunk().length); + Assert.assertEquals("seek模式合并错误,文件内容错误", new String(emlFileBytes), new String(fileChunkList.get(0).getChunk())); + //测试append合并模式 + pcapngFileChunks.add(pcapngFileChunks.get(5)); + fileChunkList = PublicUtil.combine(pcapngFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + Assert.assertEquals("append模式合并错误", 1, fileChunkList.size()); + Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount + 1, fileChunkList.get(0).getChunkCount()); + Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length + pcapChunkData.length(), fileChunkList.get(0).getChunk().length); + Assert.assertEquals("append模式合并错误,文件内容错误", new String(pcapngFileBytes) + pcapChunkData, new String(fileChunkList.get(0).getChunk())); + //测试合并携带元信息 + pcapngIncludeMetaFileChunks.add(pcapngIncludeMetaFileChunks.get(5)); + fileChunkList = PublicUtil.combine(pcapngIncludeMetaFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + Assert.assertEquals(1, fileChunkList.size()); + Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunkList.get(0).getMeta()); + + Assert.assertEquals("监控指标错误", 1, duplicateChunkCounter.getCount()); + Assert.assertEquals("监控指标错误", 0, combineErrorCounter.getCount()); + Assert.assertEquals("监控指标错误", emlChunkCount + 1, seekChunkCounter.getCount()); + Assert.assertEquals("监控指标错误", pcapngChunkCount * 2 + 2, appendChunkCounter.getCount()); + } + + @Test + public void testCombineLostChunk() { + categorizeChunks(inputFileChunks); + //测试seek合并模式 + emlFileChunks.remove(emlFileChunks.get(5)); + List fileChunkList = PublicUtil.combine(emlFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + Assert.assertEquals("seek模式合并错误", 2, fileChunkList.size()); + Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, fileChunkList.get(1).getLastChunkFlag()); + Assert.assertEquals("append模式合并错误,chunkCount错误", emlChunkCount - 2, fileChunkList.get(0).getChunkCount() + fileChunkList.get(1).getChunkCount()); + Assert.assertEquals("seek模式合并错误,文件长度错误", emlFileBytes.length - 2000, fileChunkList.get(0).getLength() + fileChunkList.get(1).getLength()); + //测试append合并模式 + pcapngFileChunks.remove(pcapngFileChunks.get(5)); + fileChunkList = PublicUtil.combine(pcapngFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + Assert.assertEquals("append模式合并错误", 1, fileChunkList.size()); + Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount - 1, fileChunkList.get(0).getChunkCount()); + Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length - pcapChunkData.length(), fileChunkList.get(0).getChunk().length); + Assert.assertEquals("append模式合并错误,文件内容错误", new String(ArrayUtil.sub(pcapngFileBytes, 0, pcapngFileBytes.length - pcapChunkData.length())), new String(fileChunkList.get(0).getChunk())); + //测试合并携带元信息 + pcapngIncludeMetaFileChunks.remove(pcapngIncludeMetaFileChunks.get(5)); + fileChunkList = PublicUtil.combine(pcapngIncludeMetaFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + Assert.assertEquals(1, fileChunkList.size()); + Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunkList.get(0).getMeta()); + + Assert.assertEquals("监控指标错误", 0, duplicateChunkCounter.getCount()); + Assert.assertEquals("监控指标错误", 0, combineErrorCounter.getCount()); + Assert.assertEquals("监控指标错误", emlChunkCount - 1, seekChunkCounter.getCount()); + Assert.assertEquals("监控指标错误", pcapngChunkCount * 2 - 2, appendChunkCounter.getCount()); + } + + @Test + public void testSendToHos() { + byte[] data = RandomUtil.randomString(1000).getBytes(); + //seek模式 + FileChunk fileChunk = new FileChunk(); + fileChunk.setUuid("0000000001"); + fileChunk.setCombineMode("seek"); + fileChunk.setFileType("eml"); + fileChunk.setOffset(0); + fileChunk.setLength(data.length); + fileChunk.setLastChunkFlag(1); + fileChunk.setChunkCount(5); + fileChunk.setTimestamp(System.currentTimeMillis()); + fileChunk.setChunk(data); + PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter); + Assert.assertEquals("上传文件到hos错误", 0, sendHosErrorCounter.getCount()); + //append模式 + fileChunk = new FileChunk(); + fileChunk.setUuid("0000000002"); + fileChunk.setCombineMode("append"); + fileChunk.setFileType("pcapng"); + fileChunk.setLength(data.length); + fileChunk.setChunkCount(5); + fileChunk.setTimestamp(System.currentTimeMillis()); + fileChunk.setChunk(data); + fileChunk.setChunkNumbers("1-200,2-200,3-200,4-200,5-200"); + fileChunk.setMeta(pcapngFileMeta); + PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter); + Assert.assertEquals("上传文件到hos错误", 0, sendHosErrorCounter.getCount()); + } + + @Test + public void testPipelineFullChunk() throws Exception { + CollectSink.values.clear(); + long windowTime = 5; + messagePackList.sort(Comparator.comparingInt(Arrays::hashCode));//打乱顺序 + StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1); + env.execute(); + List fileChunks = CollectSink.values; + Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0); + categorizeChunks(fileChunks); + byte[] data = new byte[0]; + long length = 0; + long chunkCount = 0; + int lastChunkFlag = 0; + emlFileChunks.sort(Comparator.comparingLong(FileChunk::getOffset)); + for (FileChunk fileChunk : emlFileChunks) { + data = ArrayUtil.addAll(data, fileChunk.getChunk()); + length += fileChunk.getLength(); + chunkCount += fileChunk.getChunkCount(); + if (fileChunk.getLastChunkFlag() == 1) { + lastChunkFlag = 1; + } + } + Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, lastChunkFlag); + Assert.assertEquals("seek模式合并错误,chunkCount错误", emlChunkCount - 1, chunkCount); + Assert.assertEquals("seek模式合并错误,文件长度错误", emlFile.length(), length); + Assert.assertEquals("seek模式合并错误,文件内容错误", new String(emlFileBytes), new String(data)); + data = new byte[0]; + length = 0; + chunkCount = 0; + for (FileChunk fileChunk : pcapngFileChunks) { + data = ArrayUtil.addAll(data, fileChunk.getChunk()); + length += fileChunk.getLength(); + chunkCount += fileChunk.getChunkCount(); + } + Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount, chunkCount); + Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length, length); + Assert.assertEquals("append模式合并错误,文件内容错误", new String(pcapngFileBytes), new String(data)); + data = new byte[0]; + length = 0; + chunkCount = 0; + for (FileChunk fileChunk : pcapngIncludeMetaFileChunks) { + data = ArrayUtil.addAll(data, fileChunk.getChunk()); + length += fileChunk.getLength(); + chunkCount += fileChunk.getChunkCount(); + Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunk.getMeta()); + } + } + + @Test + public void testPipelineLostChunk() throws Exception { + CollectSink.values.clear(); + long windowTime = 5; + //删除部分chunk + messagePackList.remove(5); + messagePackList.remove(15); + messagePackList.remove(25); + messagePackList.sort(Comparator.comparingInt(Arrays::hashCode));//打乱顺序 + StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1); + env.execute(); + List fileChunks = CollectSink.values; + Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0); + categorizeChunks(fileChunks); + byte[] data = new byte[0]; + long length = 0; + long chunkCount = 0; + int lastChunkFlag = 0; + emlFileChunks.sort(Comparator.comparingLong(FileChunk::getOffset)); + for (FileChunk fileChunk : emlFileChunks) { + data = ArrayUtil.addAll(data, fileChunk.getChunk()); + length += fileChunk.getLength(); + chunkCount += fileChunk.getChunkCount(); + if (fileChunk.getLastChunkFlag() == 1) { + lastChunkFlag = 1; + } + } + Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, lastChunkFlag); + Assert.assertEquals("seek模式合并错误,chunkCount错误", emlChunkCount - 2, chunkCount); + Assert.assertEquals("seek模式合并错误,文件长度错误", emlFileBytes.length - 2000, length); + data = new byte[0]; + length = 0; + chunkCount = 0; + for (FileChunk fileChunk : pcapngFileChunks) { + data = ArrayUtil.addAll(data, fileChunk.getChunk()); + length += fileChunk.getLength(); + chunkCount += fileChunk.getChunkCount(); + } + Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount - 1, chunkCount); + Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length - pcapChunkData.length(), length); + Assert.assertEquals("append模式合并错误,文件内容错误", new String(ArrayUtil.sub(pcapngFileBytes, 0, pcapngFileBytes.length - pcapChunkData.length())), new String(data)); + data = new byte[0]; + length = 0; + chunkCount = 0; + for (FileChunk fileChunk : pcapngIncludeMetaFileChunks) { + data = ArrayUtil.addAll(data, fileChunk.getChunk()); + length += fileChunk.getLength(); + chunkCount += fileChunk.getChunkCount(); + Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunk.getMeta()); + } + } + + @Test + public void testPipelineDuplicateChunk() throws Exception { + CollectSink.values.clear(); + long windowTime = 5; + //添加重复chunk + messagePackList.add(messagePackList.get(5)); + messagePackList.add(messagePackList.get(15)); + messagePackList.add(messagePackList.get(25)); + messagePackList.sort(Comparator.comparingInt(Arrays::hashCode));//打乱顺序 + StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1); + env.execute(); + List fileChunks = CollectSink.values; + Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0); + categorizeChunks(fileChunks); + byte[] data = new byte[0]; + long length = 0; + long chunkCount = 0; + int lastChunkFlag = 0; + emlFileChunks.sort(Comparator.comparingLong(FileChunk::getOffset)); + for (FileChunk fileChunk : emlFileChunks) { + data = ArrayUtil.addAll(data, fileChunk.getChunk()); + length += fileChunk.getLength(); + chunkCount += fileChunk.getChunkCount(); + if (fileChunk.getLastChunkFlag() == 1) { + lastChunkFlag = 1; + } + } + Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, lastChunkFlag); + Assert.assertEquals("seek模式合并错误,chunkCount错误", emlChunkCount - 1, chunkCount); + Assert.assertEquals("seek模式合并错误,文件长度错误", emlFileBytes.length, length); + data = new byte[0]; + length = 0; + chunkCount = 0; + for (FileChunk fileChunk : pcapngFileChunks) { + data = ArrayUtil.addAll(data, fileChunk.getChunk()); + length += fileChunk.getLength(); + chunkCount += fileChunk.getChunkCount(); + } + Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount + 1, chunkCount); + Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length + pcapChunkData.length(), length); + Assert.assertEquals("append模式合并错误,文件内容错误", new String(ArrayUtil.addAll(pcapngFileBytes, ArrayUtil.sub(pcapngFileBytes, 0, pcapChunkData.length()))), new String(data)); + data = new byte[0]; + length = 0; + chunkCount = 0; + for (FileChunk fileChunk : pcapngIncludeMetaFileChunks) { + data = ArrayUtil.addAll(data, fileChunk.getChunk()); + length += fileChunk.getLength(); + chunkCount += fileChunk.getChunkCount(); + Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunk.getMeta()); + } + } + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(2) + .build()); + + private static class CollectSink implements SinkFunction { + private static final List values = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void invoke(FileChunk value, Context context) { + values.add(value); + } + } + + private static class ByteDataSource implements SourceFunction { + private volatile boolean isRunning = true; + private final List dataList; + private final long delay; + private final long windowTime; + + ByteDataSource(List dataList, long delay, long windowTime) { + this.dataList = dataList; + this.delay = delay; + this.windowTime = windowTime; + } + + @Override + public void run(SourceContext ctx) throws Exception { + int index = 0; + while (isRunning && index < dataList.size()) { + byte[] record = dataList.get(index); + ctx.collect(record); + index++; + Thread.sleep(delay); + } + // 发送完数据后,等待窗口执行完成 + Thread.sleep(windowTime * 1000); + } + + @Override + public void cancel() { + isRunning = false; + } + } + + @Test + public void testCombineChunkProcessWindowFunction() throws Exception { + ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test-window", new ListSerializer(new JavaSerializer())); + WindowOperator operator = new WindowOperator( + TumblingProcessingTimeWindows.of(Time.seconds(3)), + new TimeWindow.Serializer(), + new FileChunkKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + listStateDescriptor, + new InternalIterableProcessWindowFunction(new CombineChunkProcessWindowFunction(configuration)), + ProcessingTimeTrigger.create(), + 0L, null); + KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + testHarness.open(); + testHarness.setProcessingTime(3L); + testHarness.processElement(new StreamRecord<>(inputFileChunks.get(0))); + testHarness.processElement(new StreamRecord<>(inputFileChunks.get(1))); + testHarness.processElement(new StreamRecord<>(inputFileChunks.get(2))); + testHarness.processElement(new StreamRecord<>(inputFileChunks.get(3))); + testHarness.processElement(new StreamRecord<>(inputFileChunks.get(4))); + testHarness.setProcessingTime(5000L); + expectedOutput.add(new StreamRecord<>(PublicUtil.combine(inputFileChunks.subList(0, 5), maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter).get(0), 2999L)); + ConcurrentLinkedQueue actualOutput = testHarness.getOutput(); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, new Comparator() { + @Override + public int compare(Object o1, Object o2) { + StreamRecord sr0 = (StreamRecord) o1; + StreamRecord sr1 = (StreamRecord) o2; + return Long.compare(((FileChunk) sr0.getValue()).getOffset(), ((FileChunk) sr1.getValue()).getOffset()); + } + }); + } + + @Test + public void testMock() throws Exception { + ProcessWindowFunctionMock mock = Mockito.mock(ProcessWindowFunctionMock.class); + InternalIterableProcessWindowFunction windowFunction = new InternalIterableProcessWindowFunction<>(mock); + TypeInformation fileChunkType = PojoTypeInfo.of(FileChunk.class); + ExecutionConfig execConf = new ExecutionConfig(); + execConf.setParallelism(5); + StreamingFunctionUtils.setOutputType(windowFunction, fileChunkType, execConf); + Mockito.verify(mock).setOutputType(fileChunkType, execConf); + Configuration config = new Configuration(); + windowFunction.open(config); + Mockito.verify(mock).open(config); + RuntimeContext rCtx = Mockito.mock(RuntimeContext.class); + windowFunction.setRuntimeContext(rCtx); + (Mockito.verify(mock)).setRuntimeContext(rCtx); + TimeWindow w = Mockito.mock(TimeWindow.class); + Iterable i = Mockito.mock(Iterable.class); + Collector c = Mockito.mock(Collector.class); + InternalWindowFunction.InternalWindowContext ctx = Mockito.mock(InternalWindowFunction.InternalWindowContext.class); + (Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + ProcessWindowFunction.Context c = (ProcessWindowFunction.Context) invocationOnMock.getArguments()[1]; + c.currentProcessingTime(); + c.currentWatermark(); + c.windowState(); + c.globalState(); + return null; + } + }).when(mock)).process(Mockito.anyString(), Mockito.anyObject(), Mockito.eq(i), Mockito.eq(c)); + windowFunction.process("", w, ctx, i, c); + Mockito.verify(ctx).currentProcessingTime(); + Mockito.verify(ctx).currentWatermark(); + Mockito.verify(ctx).windowState(); + Mockito.verify(ctx).globalState(); + windowFunction.close(); + Mockito.verify(mock).close(); + } + + private static class ProcessWindowFunctionMock extends ProcessWindowFunction implements OutputTypeConfigurable { + private static final long serialVersionUID = 1L; + + private ProcessWindowFunctionMock() { + } + + @Override + public void process(String s, Context context, Iterable elements, Collector out) throws Exception { + } + + public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + } + } + + private StreamExecutionEnvironment createPipeline(int parallelism, SourceFunction source, long windowTime, long windowIdleTime) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + WatermarkStrategy watermarkStrategy = WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(0)) + .withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000); + List> triggers = new ArrayList<>(); + triggers.add(EventTimeTrigger.create()); + triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000)); + Trigger trigger = MultipleTrigger.of(triggers); + env.addSource(source) + .map(new ParseMessagePackMapFunction()) + .filter((FilterFunction) Objects::nonNull) + .assignTimestampsAndWatermarks(watermarkStrategy) + .keyBy(new FileChunkKeySelector()) + .window(TumblingEventTimeWindows.of(Time.seconds(windowTime))) + .trigger(trigger) + .process(new CombineChunkProcessWindowFunction(configuration)) + .addSink(new CollectSink()); + return env; + } + + private void categorizeChunks(List fileChunks) { + for (FileChunk fileChunk : fileChunks) { + if (emlUuid.equals(fileChunk.getUuid())) { + emlFileChunks.add(fileChunk); + } else if (pcapngUuid.equals(fileChunk.getUuid())) { + pcapngFileChunks.add(fileChunk); + } else if (pcapngIncludeMetaUuid.equals(fileChunk.getUuid())) { + pcapngIncludeMetaFileChunks.add(fileChunk); + } + } + } + +}