list) {
+ List resultList = new ArrayList();
+ for (Event event : list) {
+ Event r = intercept(event);
+ if (r != null) {
+ resultList.add(r);
+ }
+ }
+ return resultList;
+ }
+
+ @Override
+ public void close() {
+ logger.warn("FlumeAvroApp is closed.");
+ }
+
+ /**
+ * 解析日志,并补全
+ *
+ * @param message Security原始日志
+ * @return 补全后的日志
+ *
+ */
+ private String parsingMessage(String header, byte[] message) {
+ String result = null;
+ switch (header) {
+ case "t_cdr_f":
+ result = deserlializationAvro(message, schemaF);
+ break;
+ case "t_cdr_k":
+ result = deserlializationAvro(message, schemaK);
+ break;
+ default:
+ }
+ return result;
+ }
+
+ private static String deserlializationAvro(byte[] avro, Schema schema) {
+ String result = null;
+ try {
+ GenericRecord readUser = null;
+ GenericDatumReader objectGenericDatumReader = new GenericDatumReader<>(schema);
+ BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(avro, null);
+ GenericRecord read = objectGenericDatumReader.read(readUser, binaryDecoder);
+ GenericDatumWriter objectGenericDatumWriter = new GenericDatumWriter<>(schema);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, outputStream);
+ objectGenericDatumWriter.write(read, jsonEncoder);
+ jsonEncoder.flush();
+ result = String.valueOf(read);
+ } catch (RuntimeException | IOException e) {
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+ public static class FlumeAvroAppBuilder implements Builder {
+ private String kSchemaDir;
+ private String fSchemaDir;
+ private String schemaType;
+ private int commonVsysId;
+ private String gtpVersion;
+
+
+ @Override
+ public Interceptor build() {
+ return new Avro2Gtpc(this.kSchemaDir,
+ this.fSchemaDir, this.schemaType,this.commonVsysId,this.gtpVersion);
+ }
+
+ @Override
+ public void configure(Context context) {
+ try {
+ this.kSchemaDir = context.getString("kSchemaDir", "");
+ Preconditions.checkNotNull("".equals(kSchemaDir), "kSchemaDir must be set!!");
+ logger.info("FlumeAvroApp Read kSchemaDir from configuration : " + kSchemaDir);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("FlumeDynamicApp kSchemaDir invalid", e);
+ } catch (Exception e) {
+ logger.error("FlumeAvroApp Get kSchemaDir is error : " + e);
+ }
+
+ try {
+ this.fSchemaDir = context.getString("fSchemaDir", "");
+ Preconditions.checkNotNull("".equals(fSchemaDir), "fSchemaDir must be set!!");
+ logger.info("FlumeAvroApp Read fSchemaDir from configuration : " + fSchemaDir);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("FlumeDynamicApp fSchemaDir invalid", e);
+ } catch (Exception e) {
+ logger.error("FlumeAvroApp Get fSchemaDir is error : " + e);
+ }
+
+ try {
+ this.schemaType = context.getString("schemaType", "");
+ Preconditions.checkNotNull("".equals(schemaType), "schemaType must be set!!");
+ logger.info("FlumeAvroApp Read schemaType from configuration : " + schemaType);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("FlumeDynamicApp schemaType invalid", e);
+ } catch (Exception e) {
+ logger.error("FlumeAvroApp Get schemaType is error : " + e);
+ }
+
+ this.commonVsysId = context.getInteger("commonVsysId",1);
+ logger.info("FlumeAvroApp Read commonVsysId from configuration : " + commonVsysId);
+ this.gtpVersion = context.getString("gtpVersion","null");
+ logger.info("FlumeAvroApp Read gtpVersion from configuration : " + gtpVersion);
+ }
+ }
+
+}
+
diff --git a/src/main/log4j.properties b/src/main/log4j.properties
new file mode 100644
index 0000000..a53ec40
--- /dev/null
+++ b/src/main/log4j.properties
@@ -0,0 +1,25 @@
+#Log4j
+log4j.rootLogger=console
+# 控制台日志设置
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 文件日志设置
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#路径请用相对路径,做好相关测试输出到应用目下
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+##MyBatis 配置,com.nis.web.dao是mybatis接口所在包
+#log4j.logger.com.nis.web.dao=debug
+##bonecp数据源配置
+#log4j.category.com.jolbox=debug,console
+
+
diff --git a/src/test/java/com/zdjizhi/avro/SplitIp.java b/src/test/java/com/zdjizhi/avro/SplitIp.java
new file mode 100644
index 0000000..d8b5485
--- /dev/null
+++ b/src/test/java/com/zdjizhi/avro/SplitIp.java
@@ -0,0 +1,17 @@
+package com.zdjizhi.avro;
+
+public class SplitIp {
+ public static void main(String[] args) {
+ String s = "2409:8134:2801:5c81::1";
+ String trim = s.trim();
+ if (trim.contains("|")) {
+ String[] arr = trim.split("\\|");
+ System.out.println(arr[0].trim());
+ System.out.println(arr[1].trim());
+ } else if (trim.contains(".")) {
+ System.out.println(trim);
+ } else if (trim.contains(":")) {
+ System.out.println(trim);
+ }
+ }
+}