更新pom文件
This commit is contained in:
@@ -32,7 +32,7 @@
|
|||||||
<configuration>
|
<configuration>
|
||||||
<transformers>
|
<transformers>
|
||||||
<transformer>
|
<transformer>
|
||||||
<mainClass>cn.ac.iie.flume.interceptor.Json2Csv</mainClass>
|
<mainClass>cn.ac.iie.flume.interceptor.File2Json</mainClass>
|
||||||
</transformer>
|
</transformer>
|
||||||
<transformer>
|
<transformer>
|
||||||
<resource>META-INF/spring.handlers</resource>
|
<resource>META-INF/spring.handlers</resource>
|
||||||
|
|||||||
80
pom.xml
80
pom.xml
@@ -7,6 +7,72 @@
|
|||||||
<groupId>cn.ac.iie.flume</groupId>
|
<groupId>cn.ac.iie.flume</groupId>
|
||||||
<artifactId>flume-interceptor</artifactId>
|
<artifactId>flume-interceptor</artifactId>
|
||||||
<version>1.0-SNAPSHOT</version>
|
<version>1.0-SNAPSHOT</version>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-shade-plugin</artifactId>
|
||||||
|
<version>2.4.2</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>shade</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<transformers>
|
||||||
|
<transformer
|
||||||
|
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||||
|
<mainClass>cn.ac.iie.flume.interceptor.File2Json</mainClass>
|
||||||
|
</transformer>
|
||||||
|
<transformer
|
||||||
|
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||||
|
<resource>META-INF/spring.handlers</resource>
|
||||||
|
</transformer>
|
||||||
|
<transformer
|
||||||
|
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||||
|
<resource>META-INF/spring.schemas</resource>
|
||||||
|
</transformer>
|
||||||
|
</transformers>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<version>2.3.2</version>
|
||||||
|
<configuration>
|
||||||
|
<source>1.8</source>
|
||||||
|
<target>1.8</target>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
<resources>
|
||||||
|
<resource>
|
||||||
|
<directory>properties</directory>
|
||||||
|
<includes>
|
||||||
|
<include>**/*.properties</include>
|
||||||
|
</includes>
|
||||||
|
<filtering>false</filtering>
|
||||||
|
</resource>
|
||||||
|
<resource>
|
||||||
|
<directory>properties</directory>
|
||||||
|
<includes>
|
||||||
|
<include>log4j.properties</include>
|
||||||
|
</includes>
|
||||||
|
<filtering>false</filtering>
|
||||||
|
</resource>
|
||||||
|
</resources>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
@@ -40,18 +106,4 @@
|
|||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
|
||||||
<plugins>
|
|
||||||
<plugin>
|
|
||||||
<groupId>org.apache.maven.plugins</groupId>
|
|
||||||
<artifactId>maven-compiler-plugin</artifactId>
|
|
||||||
<configuration>
|
|
||||||
<source>1.8</source>
|
|
||||||
<target>1.8</target>
|
|
||||||
<encoding>UTF-8</encoding>
|
|
||||||
</configuration>
|
|
||||||
</plugin>
|
|
||||||
</plugins>
|
|
||||||
</build>
|
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
@@ -133,8 +133,6 @@ public class NewConn {
|
|||||||
private String voip_called_number;
|
private String voip_called_number;
|
||||||
private String streaming_media_url;
|
private String streaming_media_url;
|
||||||
private String streaming_media_protocol;
|
private String streaming_media_protocol;
|
||||||
|
|
||||||
|
|
||||||
public long getCommon_log_id() {
|
public long getCommon_log_id() {
|
||||||
return common_log_id;
|
return common_log_id;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class Csv2Json implements Interceptor {
|
public class Csv2Json implements Interceptor {
|
||||||
|
|
||||||
@@ -34,52 +35,64 @@ public class Csv2Json implements Interceptor {
|
|||||||
|
|
||||||
//TODO 处理单个event的代码
|
//TODO 处理单个event的代码
|
||||||
//取出一条csv格式的数据
|
//取出一条csv格式的数据
|
||||||
body = event.getBody();
|
|
||||||
json = new String(body);
|
|
||||||
|
|
||||||
String[] strings = json.replaceAll("\t"," " ).replaceAll("\"","" ).split(",");
|
try{
|
||||||
|
body = event.getBody();
|
||||||
|
|
||||||
newConn = new NewConn();
|
Map<String, String> headers = event.getHeaders();
|
||||||
|
|
||||||
newConn.setCommon_log_id(Long.parseLong("0"));
|
json = new String(body);
|
||||||
newConn.setCommon_policy_id(Integer.parseInt(strings[1]));
|
|
||||||
newConn.setCommon_start_time(Integer.parseInt(strings[2]));
|
String[] strings = json.replaceAll("\t"," " ).replaceAll("\"","" ).split(",");
|
||||||
newConn.setCommon_recv_time(Integer.parseInt(strings[3]));
|
|
||||||
newConn.setCommon_l4_protocol(strings[4]);
|
newConn = new NewConn();
|
||||||
newConn.setCommon_address_type(Integer.parseInt(strings[5]));
|
|
||||||
newConn.setCommon_client_ip(strings[6]);
|
newConn.setCommon_log_id(Long.parseLong("0"));
|
||||||
newConn.setCommon_server_ip(strings[7]);
|
newConn.setCommon_policy_id(Integer.parseInt(strings[1]));
|
||||||
newConn.setCommon_client_port(Integer.parseInt(strings[8]));
|
newConn.setCommon_start_time(Integer.parseInt(strings[2]));
|
||||||
newConn.setCommon_server_port(Integer.parseInt(strings[9]));
|
newConn.setCommon_recv_time(Integer.parseInt(strings[3]));
|
||||||
newConn.setCommon_service(Integer.parseInt(strings[10]));
|
newConn.setCommon_l4_protocol(strings[4]);
|
||||||
newConn.setCommon_entrance_id(Integer.parseInt(strings[11]));
|
newConn.setCommon_address_type(Integer.parseInt(strings[5]));
|
||||||
newConn.setCommon_device_id(strings[12]);
|
newConn.setCommon_client_ip(strings[6]);
|
||||||
newConn.setCommon_link_id(Integer.parseInt(strings[13]));
|
newConn.setCommon_server_ip(strings[7]);
|
||||||
newConn.setCommon_encapsulation(Integer.parseInt(strings[14]));
|
newConn.setCommon_client_port(Integer.parseInt(strings[8]));
|
||||||
newConn.setCommon_direction(Integer.parseInt(strings[15]));
|
newConn.setCommon_server_port(Integer.parseInt(strings[9]));
|
||||||
newConn.setCommon_stream_dir(Integer.parseInt(strings[18]));
|
newConn.setCommon_service(Integer.parseInt(strings[10]));
|
||||||
newConn.setCommon_sled_ip(strings[19]);
|
newConn.setCommon_entrance_id(Integer.parseInt(strings[11]));
|
||||||
newConn.setCommon_address_list(strings[20]);
|
newConn.setCommon_device_id(strings[12]);
|
||||||
newConn.setCommon_server_location(strings[21]);
|
newConn.setCommon_link_id(Integer.parseInt(strings[13]));
|
||||||
newConn.setCommon_client_location(strings[22]);
|
newConn.setCommon_encapsulation(Integer.parseInt(strings[14]));
|
||||||
newConn.setCommon_server_asn(strings[23]);
|
newConn.setCommon_direction(Integer.parseInt(strings[15]));
|
||||||
newConn.setCommon_client_asn(strings[24]);
|
newConn.setCommon_stream_dir(Integer.parseInt(strings[18]));
|
||||||
newConn.setCommon_subscriber_id(strings[26]);
|
newConn.setCommon_sled_ip(strings[19]);
|
||||||
newConn.setCommon_user_region(strings[27]);
|
newConn.setCommon_address_list(strings[20]);
|
||||||
newConn.setCommon_c2s_pkt_num(Integer.parseInt(strings[30]));
|
newConn.setCommon_server_location(strings[21]);
|
||||||
newConn.setCommon_s2c_pkt_num(Integer.parseInt(strings[31]));
|
newConn.setCommon_client_location(strings[22]);
|
||||||
newConn.setCommon_c2s_byte_num(Integer.parseInt(strings[32]));
|
newConn.setCommon_server_asn(strings[23]);
|
||||||
newConn.setCommon_s2c_byte_num(Integer.parseInt(strings[33]));
|
newConn.setCommon_client_asn(strings[24]);
|
||||||
newConn.setCommon_protocol_label(strings[34]);
|
newConn.setCommon_subscriber_id(strings[26]);
|
||||||
newConn.setCommon_app_label(strings[35]);
|
newConn.setCommon_user_region(strings[27]);
|
||||||
newConn.setCommon_action(Integer.parseInt(strings[39]));
|
newConn.setCommon_c2s_pkt_num(Integer.parseInt(strings[30]));
|
||||||
|
newConn.setCommon_s2c_pkt_num(Integer.parseInt(strings[31]));
|
||||||
|
newConn.setCommon_c2s_byte_num(Integer.parseInt(strings[32]));
|
||||||
|
newConn.setCommon_s2c_byte_num(Integer.parseInt(strings[33]));
|
||||||
|
newConn.setCommon_protocol_label(strings[34]);
|
||||||
|
newConn.setCommon_app_label(strings[35]);
|
||||||
|
newConn.setCommon_action(Integer.parseInt(strings[39]));
|
||||||
|
|
||||||
|
|
||||||
json = JSONObject.toJSONString(newConn);
|
json = JSONObject.toJSONString(newConn);
|
||||||
|
|
||||||
|
body = json.getBytes();
|
||||||
|
event.setBody(body);
|
||||||
|
return event;
|
||||||
|
}
|
||||||
|
catch (Exception e){
|
||||||
|
logger.error("消息转换异常========");
|
||||||
|
e.printStackTrace();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
body = json.getBytes();
|
|
||||||
event.setBody(body);
|
|
||||||
return event;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO 处理多个event的函数
|
//TODO 处理多个event的函数
|
||||||
|
|||||||
98
src/main/java/cn/ac/iie/flume/interceptor/File2Json.java
Normal file
98
src/main/java/cn/ac/iie/flume/interceptor/File2Json.java
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
package cn.ac.iie.flume.interceptor;
|
||||||
|
|
||||||
|
import org.apache.flume.Context;
|
||||||
|
import org.apache.flume.Event;
|
||||||
|
import org.apache.flume.interceptor.Interceptor;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class File2Json implements Interceptor {
|
||||||
|
|
||||||
|
//打印日志,便于测试方法的执行顺序
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(File2Json.class);
|
||||||
|
|
||||||
|
private static byte[] body;
|
||||||
|
|
||||||
|
private static String json;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize() {
|
||||||
|
logger.info("----------自定义拦截器的initialize方法执行");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Event intercept(Event event) {
|
||||||
|
logger.info("----------intercept(Event event)方法执行,处理单个event");
|
||||||
|
|
||||||
|
//TODO 处理单个event的代码
|
||||||
|
//取出一条csv格式的数据
|
||||||
|
|
||||||
|
try{
|
||||||
|
body = event.getBody();
|
||||||
|
|
||||||
|
Map<String, String> headers = event.getHeaders();
|
||||||
|
|
||||||
|
|
||||||
|
logger.info( "" + headers.keySet().size());
|
||||||
|
for (String key : headers.keySet()) {
|
||||||
|
|
||||||
|
logger.info("[ key: " + key +"," + "value: " + headers.get(key) + "]");
|
||||||
|
}
|
||||||
|
|
||||||
|
event.setBody(body);
|
||||||
|
return event;
|
||||||
|
}
|
||||||
|
catch (Exception e){
|
||||||
|
logger.error("消息转换异常========");
|
||||||
|
e.printStackTrace();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO 处理多个event的函数
|
||||||
|
@Override
|
||||||
|
public List<Event> intercept(List<Event> events) {
|
||||||
|
|
||||||
|
List<Event> results = new ArrayList<>();
|
||||||
|
Event event;
|
||||||
|
for (Event e : events) {
|
||||||
|
|
||||||
|
event = intercept(e);
|
||||||
|
if (event != null) {
|
||||||
|
results.add(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
logger.info("----------自定义拦截器close方法执行");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Builder implements Interceptor.Builder {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Interceptor build() {
|
||||||
|
logger.info("----------build方法执行");
|
||||||
|
return new File2Json();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Context context) {
|
||||||
|
logger.info("----------configure方法执行");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
42
src/main/java/cn/ac/iie/flume/source/MySource.java
Normal file
42
src/main/java/cn/ac/iie/flume/source/MySource.java
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
package cn.ac.iie.flume.source;
|
||||||
|
|
||||||
|
import cn.ac.iie.flume.bean.NewConn;
|
||||||
|
import org.apache.flume.Context;
|
||||||
|
import org.apache.flume.EventDeliveryException;
|
||||||
|
import org.apache.flume.PollableSource;
|
||||||
|
import org.apache.flume.conf.Configurable;
|
||||||
|
import org.apache.flume.source.AbstractSource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @ClassNameMySource
|
||||||
|
* @Author lixkvip@126.com
|
||||||
|
* @Date2020/7/28 18:29
|
||||||
|
* @Version V1.0
|
||||||
|
**/
|
||||||
|
public class MySource extends AbstractSource implements Configurable, PollableSource {
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Status process() throws EventDeliveryException {
|
||||||
|
|
||||||
|
NewConn newConn = new NewConn();
|
||||||
|
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBackOffSleepIncrement() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMaxBackOffSleepInterval() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Context context) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
21
src/test/java/EventTest.java
Normal file
21
src/test/java/EventTest.java
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
/**
|
||||||
|
* @ClassNameEventTest
|
||||||
|
* @Author lixkvip@126.com
|
||||||
|
* @Date2020/7/29 15:38
|
||||||
|
* @Version V1.0
|
||||||
|
**/
|
||||||
|
public class EventTest {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
|
||||||
|
String json = "\"0e1178ab-986f-4bbb-91f9-94b1f70ce3a2\",0,1546624800,1546624800,\"IPv4_UDP\",0,\"37.150.25.154\",\"46.20.187.219\",32265,44985,160,2,0,0,0,0,\"\",\"\",1,\"10.3.4.10\",\"48975-14441-46.20.187.219-37.150.25.154\",\"\",\"\",\"42322\",\"9198\",\"\",\"\",\"\",\"\",\"PROTO_ID=0;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0;\",3,0,60,0,0,0,0,0,0,0";
|
||||||
|
|
||||||
|
String[] strings = json.replaceAll("\t"," " ).replaceAll("\"","" ).split(",");
|
||||||
|
|
||||||
|
System.out.println(strings[29]);
|
||||||
|
System.out.println(strings[28]);
|
||||||
|
System.out.println(strings[30]);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user