ZX Flume field_splitter_complement Initial commit 202010091814

This commit is contained in:
caohui
2020-10-09 18:14:40 +08:00
commit 899d1d00d5
63 changed files with 2763 additions and 0 deletions

View File

@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flume:flume-ng-core:1.9.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flume:flume-ng-sdk:1.9.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flume:flume-ng-configuration:1.9.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flume.flume-ng-configfilters:flume-ng-config-filter-api:1.9.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flume:flume-ng-auth:1.9.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.slf4j:slf4j-api:1.7.25" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.google.guava:guava:11.0.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.google.code.findbugs:jsr305:1.3.9" level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.1" level="project" />
<orderEntry type="library" name="Maven: commons-codec:commons-codec:1.8" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-cli:commons-cli:1.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-lang:commons-lang:2.5" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.avro:avro:1.7.4" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.codehaus.jackson:jackson-core-asl:1.8.8" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.8.8" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.thoughtworks.paranamer:paranamer:2.3" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.xerial.snappy:snappy-java:1.0.4.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.commons:commons-compress:1.4.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.tukaani:xz:1.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.avro:avro-ipc:1.7.4" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:jetty:6.1.26" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:jetty-util:6.1.26" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.velocity:velocity:1.7" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: io.netty:netty:3.10.6.Final" level="project" />
<orderEntry type="library" name="Maven: joda-time:joda-time:2.9.9" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.eclipse.jetty:jetty-servlet:9.4.6.v20170531" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.eclipse.jetty:jetty-security:9.4.6.v20170531" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.eclipse.jetty:jetty-util:9.4.6.v20170531" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.eclipse.jetty:jetty-server:9.4.6.v20170531" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: javax.servlet:javax.servlet-api:3.1.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.eclipse.jetty:jetty-http:9.4.6.v20170531" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.eclipse.jetty:jetty-io:9.4.6.v20170531" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.eclipse.jetty:jetty-jmx:9.4.6.v20170531" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.google.code.gson:gson:2.2.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.thrift:libthrift:0.9.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient:4.4.1" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpcore:4.4.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.mina:mina-core:2.0.4" level="project" />
<orderEntry type="library" name="Maven: com.zdjizhi:galaxy:1.0.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.4" level="project" />
<orderEntry type="library" name="Maven: log4j:log4j:1.2.14" level="project" />
<orderEntry type="library" name="Maven: com.maxmind.geoip:geoip-api:1.3.1" level="project" />
<orderEntry type="library" name="Maven: com.maxmind.geoip2:geoip2:2.12.0" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.9.5" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.9.5" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.9.5" level="project" />
<orderEntry type="library" name="Maven: com.maxmind.db:maxmind-db:1.2.2" level="project" />
<orderEntry type="library" name="Maven: com.alibaba:fastjson:1.2.47" level="project" />
</component>
</module>

View File

@@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>field_splitter_complement</artifactId>
<groupId>cn.ceiec</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>FlumeFieldSplitterInterceptor</artifactId>
<repositories>
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public</url>
</repository>
<repository>
<id>ebi</id>
<name>www.ebi.ac.uk</name>
<url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.ac.iie.flume.interceptor.FlumeFieldSplitterApp</mainClass>
</transformer>
</transformers>
<!--&lt;!&ndash;测试添加,下面&ndash;&gt;-->
<!--<shadedArtifactAttached>false</shadedArtifactAttached>-->
<!--<createSourcesJar>true</createSourcesJar>-->
<!--<relocations>-->
<!--<relocation>-->
<!--<pattern>com.fasterxml.jackson.core</pattern>-->
<!--<shadedPattern>com.shaded.fasterxml.jackson.core</shadedPattern>-->
<!--</relocation>-->
<!--</relocations>-->
<!--&lt;!&ndash;测试添加,上面&ndash;&gt;-->
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>cn.ac.iie.flume.interceptor.FlumeFieldSplitterApp</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<!--<resources>-->
<!--<resource>-->
<!--<directory>config</directory>-->
<!--<includes>-->
<!--<include>realtime_config.properties</include>-->
<!--<include>storm_config.properties</include>-->
<!--<include>db.properties</include>-->
<!--<include>db_pz.properties</include>-->
<!--<include>clickhouse.properties</include>-->
<!--&lt;!&ndash;<include>**/*.properties</include>&ndash;&gt;-->
<!--&lt;!&ndash;<include>**/*.xml</include>&ndash;&gt;-->
<!--</includes>-->
<!--<filtering>false</filtering>-->
<!--</resource>-->
<!--<resource>-->
<!--<directory>src/main/java</directory>-->
<!--<includes>-->
<!--<include>log4j.properties</include>-->
<!--</includes>-->
<!--<filtering>false</filtering>-->
<!--</resource>-->
<!--</resources>-->
</build>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,232 @@
package cn.ac.iie.flume.interceptor;
import cn.ac.iie.flume.interceptor.commonUtils.HashTableConfig;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.log4j.Logger;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FlumeFieldSplitterApp implements Interceptor {
private static Logger logger = Logger.getLogger(FlumeFieldSplitterApp.class);
private static HashMap<String, String> hmFromPz;
private String handleField;//从原数据中获取---需要进行切分的字段,即最外层的表字段,即"app_label":"PROTO_ID=10062;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0;"中的 app_label
private String splitFields;//用户定义---需要切分的字段的内容字段,key-value形式,key为要补充的名称协议,value为原数据字段名,暂定格式为 proto:PROTO_ID;app:APP_ID;web:WEB_ID;os:OS_ID;bs:BS_ID
private String fieldsSeparator;//多个字段之间的分隔符,例如"app_label":"PROTO_ID=10062;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0;"中的 ;
private String fieldDelimiter;//单个字段内部的分隔符,例如"app_label":"PROTO_ID=10062;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0;"中的 =
private String defaultFieldValue;//该字段不存在时的默认值value,注意不是json的key,是json的value
public void initialize() {
if (!("noSetSplitFields".equals(splitFields))) {
hmFromPz = new HashMap<>();
if (splitFields.contains(";")) {
String[] splitFieldsToKeyValue = splitFields.split(";");
for (String splitFieldToKeyValue : splitFieldsToKeyValue) {
if (splitFieldToKeyValue.contains(":")) {
String[] splitSingleFieldToKeyValue = splitFieldToKeyValue.split(":");
hmFromPz.put(splitSingleFieldToKeyValue[0], splitSingleFieldToKeyValue[1]);
} else {
logger.error("key-value must split by ===> : <=== !!!");
}
}
} else {
if (splitFields.contains(":")) {
String[] splitSingleFieldToKeyValue = splitFields.split(":");
hmFromPz.put(splitSingleFieldToKeyValue[0], splitSingleFieldToKeyValue[1]);
} else {
logger.error("field must be split by ===> ; <=== ,and key-value must split by ===> : <=== !!!");
}
}
}
}
public FlumeFieldSplitterApp(String handleField, String splitFields, String fieldsSeparator, String fieldDelimiter, String defaultFieldValue) {
this.handleField = handleField;
this.splitFields = splitFields;
this.fieldsSeparator = fieldsSeparator;
this.fieldDelimiter = fieldDelimiter;
this.defaultFieldValue = defaultFieldValue;
}
public Event intercept(Event event) {
String message = null;
try {
message = new String(event.getBody(), "utf-8");
} catch (UnsupportedEncodingException e) {
message = new String(event.getBody());
}
try {
if (StringUtils.isNotBlank(message)) {
logger.info("Field_Splitter_Interceptor this cycle message begin ---><<" + message + ">><---");
Map<String, Object> map = JSONObject.parseObject(message, Map.class);
if (!("noSetHandleField".equals(handleField)) && !("noSetSplitFields".equals(splitFields))
&& !("noSetFieldsSeparator".equals(fieldsSeparator)) && !("noSetFieldDelimiter".equals(fieldDelimiter))) {
/**
* 切分存储原数据中需要映射的原始字段
*/
HashMap<String, String> hmInnerFields = new HashMap<>();
String handleFieldValue = (String) map.get(handleField);
if (StringUtil.isBlank(handleFieldValue)) {
handleFieldValue = defaultFieldValue;
}
String[] splitHandleFieldValues = handleFieldValue.split(fieldsSeparator);
for (int i = 0; i < splitHandleFieldValues.length; i++) {
if (StringUtils.isNotBlank(splitHandleFieldValues[i])) {
String splitHandleFieldValue = splitHandleFieldValues[i];
String[] innerFields = splitHandleFieldValue.split(fieldDelimiter);
hmInnerFields.put(innerFields[0], innerFields[1]);
}
}
/**
* 针对hmInnerFields和hmFromPz开始做映射处理
* hmInnerFields格式为hmInnerFields(PROTO_ID,10062)
* hmFromPz格式为hmFromPz(proto,PROTO_ID)
*/
//进行协议名称映射
if (hmFromPz.containsKey("proto")) {
String protoID = hmInnerFields.get(hmFromPz.get("proto"));
if (StringUtils.isNotBlank(protoID)) {
String protoName = HashTableConfig.PROTOCOL_TYPE.get(protoID);
if (StringUtils.isNotBlank(protoName)) {
map.put(hmFromPz.get("proto").toLowerCase() + "_name", protoName);
}
}
}
//进行应用名称映射
if (hmFromPz.containsKey("app")) {
String appID = hmInnerFields.get(hmFromPz.get("app"));
if (StringUtils.isNotBlank(appID)) {
String appName = HashTableConfig.APP_TYPE.get(appID);
if (StringUtils.isNotBlank(appName)) {
map.put(hmFromPz.get("app").toLowerCase() + "_name", appName);
}
}
}
//进行domain映射
if (hmFromPz.containsKey("web")) {
String webID = hmInnerFields.get(hmFromPz.get("web"));
if (StringUtils.isNotBlank(webID)) {
String webName = HashTableConfig.DOMAIN_TYPE.get(webID);
if (StringUtils.isNotBlank(webName)) {
map.put(hmFromPz.get("web").toLowerCase() + "_name", webName);
}
}
}
//进行OS映射-操作系统映射-20200630新增-os:OS_ID
if (hmFromPz.containsKey("os")) {
String osID = hmInnerFields.get(hmFromPz.get("os"));
if (StringUtils.isNotBlank(osID)) {
String osName = HashTableConfig.OS_TYPE.get(osID);
if (StringUtils.isNotBlank(osName)) {
map.put(hmFromPz.get("os").toLowerCase() + "_name", osName);
}
}
}
//进行BS映射-浏览器映射-20200630新增-bs:BS_ID
if (hmFromPz.containsKey("bs")) {
String bsID = hmInnerFields.get(hmFromPz.get("bs"));
if (StringUtils.isNotBlank(bsID)) {
String bsName = HashTableConfig.BS_TYPE.get(bsID);
if (StringUtils.isNotBlank(bsName)) {
map.put(hmFromPz.get("bs").toLowerCase() + "_name", bsName);
}
}
}
} else {
if ("noSetHandleField".equals(handleField)) {
logger.error("Field_Splitter_Interceptor If you want to split,please set handleField!");
}
if ("noSetSplitFields".equals(splitFields)) {
logger.error("Field_Splitter_Interceptor If you want to split,please set splitFields!");
}
if ("noSetFieldsSeparator".equals(fieldsSeparator)) {
logger.error("Field_Splitter_Interceptor If you want to split,please set fieldsSeparator!");
}
if ("noSetFieldDelimiter".equals(fieldDelimiter)) {
logger.error("Field_Splitter_Interceptor If you want to split,please set fieldDelimiter!");
}
}
message = JSONObject.toJSONString(map);
logger.info("Field_Splitter_Interceptor this cycle message over ---><<" + message + ">><---");
event.setBody(message.getBytes());
return event;
}
} catch (Exception e) {
logger.error("FlumeFieldSplitterApp intercept(Event event) method is error !!!--->{" + e + "}<---,this message is ===>" + message + "<===");
e.printStackTrace();
}
return event;
}
public List<Event> intercept(List<Event> list) {
List resultList = new ArrayList();
for (Event event : list) {
Event r = intercept(event);
if (r != null) {
resultList.add(r);
}
}
return resultList;
}
public void close() {
}
public static class FlumeFieldSplitterAppBuilder implements Interceptor.Builder {
private String handleField;
private String splitFields;
private String fieldsSeparator;
private String fieldDelimiter;
private String defaultFieldValue;
public Interceptor build() {
return new FlumeFieldSplitterApp(this.handleField, this.splitFields,
this.fieldsSeparator, this.fieldDelimiter, this.defaultFieldValue);
}
public void configure(Context context) {
//需要处理的字段,类比于app_label
this.handleField = context.getString("handleField", "noSetHandleField");
//切分字段,暂定格式为proto:PROTO_ID;app:APP_ID;web:WEB_ID;os:OS_ID;bs:BS_ID,每组之间以;分割,组内key-value以:分割
this.splitFields = context.getString("splitFields", "noSetSplitFields");
//多个字段之间的分隔符,例如"app_label":"PROTO_ID=10062;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0;"中的 ;
this.fieldsSeparator = context.getString("fieldsSeparator", "noSetFieldsSeparator");
//单个字段内部的分隔符,例如"app_label":"PROTO_ID=10062;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0;"中的 =
this.fieldDelimiter = context.getString("fieldDelimiter", "noSetFieldDelimiter");
//切分的字段的默认值,比如"app_label":"PROTO_ID=10062;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0;"中的 PROTO_ID=10062;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0; 部分的默认值
this.defaultFieldValue = context.getString("defaultFieldValue", "PROTO_ID=0;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0;");
logger.info("FlumeFieldSplitterApp defaultFieldValue is set as : {" + handleField + ":" + defaultFieldValue + "}");
}
}
}