commit c084f808c593cdddddc4256d8d1c2b66eba7d07d Author: chaoc Date: Fri Aug 11 13:42:39 2023 +0800 feat: init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d769462 --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..cb134e0 --- /dev/null +++ b/pom.xml @@ -0,0 +1,155 @@ + + + 4.0.0 + + com.chaoc.flink + test-json-serialization + 1.0-SNAPSHOT + + + UTF-8 + 1.8 + ${java.version} + ${java.version} + 1.13.6 + 2.12 + 1.7.32 + 2.17.1 + + + + + org.apache.logging.log4j + log4j-slf4j-impl + runtime + + + org.apache.logging.log4j + log4j-api + runtime + + + org.apache.logging.log4j + log4j-core + runtime + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + provided + + + org.apache.flink + flink-clients_${scala.binary.version} + provided + + + org.apache.flink + flink-connector-kafka_${scala.binary.version} + + + + org.apache.flink + flink-json + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-connector-kafka_${scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-json + ${flink.version} + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + package + + shade + + + false + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/chaoc/flink/serialization/JsonTest.java b/src/main/java/com/chaoc/flink/serialization/JsonTest.java new file mode 100644 index 0000000..f00fabe --- /dev/null +++ b/src/main/java/com/chaoc/flink/serialization/JsonTest.java @@ -0,0 +1,42 @@ +package com.chaoc.flink.serialization; + +import org.apache.flink.formats.json.JsonNodeDeserializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import com.chaoc.flink.serialization.formats.JsonNodeSerializationSchema; + +import java.util.Properties; + +public class JsonTest { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final Properties srcProps = new Properties(); + srcProps.setProperty("bootstrap.servers", "192.168.41.32:9092"); + srcProps.setProperty("group.id", "jackson-serialization"); + + final FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>( + "SESSION-RECORD-JSON", + new JsonNodeDeserializationSchema(), + srcProps); + + final Properties dstProps = new Properties(); + dstProps.setProperty("bootstrap.servers", "192.168.41.32:9092"); + + final FlinkKafkaProducer producer = new FlinkKafkaProducer<>( + "SESSION-RECORD-JSON-COMPLETED", + new JsonNodeSerializationSchema(), + dstProps); + + env.addSource(consumer) + .name("SessionRecordJacksonReader") + .addSink(producer) + .name("SessionRecordJacksonWriter"); + + env.execute("Jackson Serialization Test"); + } +} diff --git a/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java b/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java new file mode 100644 index 0000000..0de2f56 --- /dev/null +++ b/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java @@ -0,0 +1,20 @@ +package com.chaoc.flink.serialization.formats; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +public class JsonNodeSerializationSchema implements SerializationSchema { + + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public byte[] serialize(ObjectNode element) { + try { + return mapper.writeValueAsBytes(element); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties new file mode 100644 index 0000000..e7c4ef9 --- /dev/null +++ b/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = WARN +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n