diff --git a/pom.xml b/pom.xml index 936ee44..8ab5cba 100644 --- a/pom.xml +++ b/pom.xml @@ -5,8 +5,8 @@ 4.0.0 com.zdjizhi - log-completion-schema - 230907 + log-stream-doublewrite + 24.01 log-completion-schema http://www.example.com diff --git a/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java b/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java index f88aeb1..a31d73a 100644 --- a/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java +++ b/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java @@ -75,7 +75,6 @@ public class TypeMapCompleted extends ProcessFunction { jsonObject.put("common_action", actionMap.get(record.get("proxy_action").toString().replace(" ",""))); } - jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000); TransForm.transformLog(jsonObject); MetaUtil.typeTransform(jsonObject); diff --git a/src/main/java/com/zdjizhi/operator/process/DealFileProcess.java b/src/main/java/com/zdjizhi/operator/process/DealFileProcess.java index d893c78..3bba57d 100644 --- a/src/main/java/com/zdjizhi/operator/process/DealFileProcess.java +++ b/src/main/java/com/zdjizhi/operator/process/DealFileProcess.java @@ -10,7 +10,6 @@ import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.pojo.FileMeta; import com.zdjizhi.common.pojo.SourceList; import com.zdjizhi.tools.general.FileEdit; -import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; diff --git a/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java b/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java index 57db453..5696f4b 100644 --- a/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java +++ b/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java @@ -7,7 +7,6 @@ import com.alibaba.fastjson2.*; import com.geedgenetworks.utils.IpLookupV2; import com.geedgenetworks.utils.StringUtil; import com.google.common.base.Joiner; -import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.pojo.KnowlegeBaseMeta; import com.zdjizhi.tools.connections.http.HttpClientService; @@ -204,9 +203,4 @@ public class IpLookupUtils { } return knowlegeBaseMeta; } - - public static void main(String[] args) { - final String countryLookup = IpLookupUtils.getIpLookup().asnLookup("10.64.10.7"); - System.out.println(countryLookup); - } } diff --git a/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java b/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java index 439f77d..cc1c5a7 100644 --- a/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java +++ b/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java @@ -1,6 +1,5 @@ package com.zdjizhi.tools.logtransformation; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import java.util.*; @@ -32,22 +31,6 @@ public class ConvertRecordToPERCENT { percent.put("common_start_time", (long) record.get("start_timestamp_ms") / 1000); percent.put("common_end_time", (long) record.get("end_timestamp_ms") / 1000); - - if (record.containsKey("security_rule_list")) { - percent.put("common_policy_id", (Integer) JSONArray.from(record.get("security_rule_list")).get(0)); - percent.put("common_action", fillingCommonAction((String) record.get("security_action"))); - } - - if (record.containsKey("monitor_rule_list")) { - percent.put("common_policy_id", (Integer) JSONArray.from(record.get("monitor_rule_list")).get(0)); - percent.put("common_action", 1); - } - - if (record.containsKey("proxy_rule_list")) { - percent.put("common_policy_id", (Integer) JSONArray.from(record.get("proxy_rule_list")).get(0)); - percent.put("common_action", fillingCommonAction((String) record.get("proxy_action"))); - } - //填充common_sessions percent.put("common_sessions", 1); @@ -77,58 +60,6 @@ public class ConvertRecordToPERCENT { return percent; } - private int fillingCommonAction(String action) { - int number; - switch (action) { - case "none": - number = 0; - break; - case "Monitor": - case "monitor": - number = 1; - break; - case "Intercept": - case "intercept": - number = 2; - break; - case "No Intercept": - number = 3; - break; - case "Active Defence": - number = 4; - break; - case "WAN NAT": - number = 8; - break; - case "Reject": - case "Deny": - number = 16; - break; - case "Shaping": - number = 32; - break; - case "Manipulate": - number = 48; - break; - case "Service Chaining": - number = 64; - break; - case "Allow": - case "Bypass": - number = 96; - break; - case "Shunt": - number = 128; - break; - case "Statistics": - number = 129; - break; - default: - number = 0; - } - return number; - } - public JSONObject removeFields(JSONObject record) { for (Map.Entry entry : recordSchema.entrySet()) { if (record.containsKey(entry.getValue())) { diff --git a/src/test/java/com/zdjizhi/hdfs/FileUtilsTest.java b/src/test/java/com/zdjizhi/hdfs/FileUtilsTest.java deleted file mode 100644 index bd5a33d..0000000 --- a/src/test/java/com/zdjizhi/hdfs/FileUtilsTest.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.zdjizhi.hdfs; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.IOUtils; -import org.junit.Test; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; - -/** - * @author qidaijie - * @Package com.zdjizhi.tools.connections.hadoop - * @Description: - * @date 2022/11/217:57 - */ -public class FileUtilsTest { - private static final Log logger = LogFactory.get(); - - private static FileSystem fileSystem; - - static { - Configuration configuration = new Configuration(); - try { - configuration.set("fs.defaultFS","hdfs://ns1"); - configuration.set("hadoop.proxyuser.root.hosts","*"); - configuration.set("hadoop.proxyuser.root.groups","*"); - configuration.set("ha.zookeeper.quorum","192.168.44.83:2181,192.168.44.84:2181,192.168.44.85:2181"); - configuration.set("dfs.nameservices","ns1"); - configuration.set("dfs.ha.namenodes.ns1","nn1,nn2"); - configuration.set("dfs.namenode.rpc-address.ns1.nn1","192.168.44.85:9000"); - configuration.set("dfs.namenode.rpc-address.ns1.nn2","192.168.44.86:9000"); - configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); - //创建fileSystem,用于连接hdfs - fileSystem = FileSystem.get(configuration); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Test - public void mkdir() throws Exception{ - fileSystem.mkdirs(new Path("/knowledgebase/test")); - } - - @Test - public void create() throws Exception{ - FSDataOutputStream outputStream = fileSystem.create(new Path("/knowledgebase/test/test.txt")); - outputStream.write("Hello World".getBytes()); - outputStream.flush(); - outputStream.close(); - } - - @Test - public void cat() throws Exception{ - FSDataInputStream inputStream = fileSystem.open(new Path("/knowledgebase/test/test.txt")); - IOUtils.copyBytes(inputStream, System.out, 1024); - inputStream.close(); - } - - @Test - public void rename() throws Exception{ - fileSystem.rename(new Path("/knowledgebase/test/test.txt"), new Path("/knowledgebase/test/test1.txt")); - } - - @Test - public void delete() throws Exception{ - fileSystem.delete(new Path("/knowledgebase/test"),true);//是否递归删除 - } -} -