提交 TSG-1845

This commit is contained in:
qidaijie
2020-06-12 20:27:05 +08:00
parent 3ad9d0cbd4
commit a556d3ef66
6 changed files with 294 additions and 141 deletions

View File

@@ -22,6 +22,9 @@ import java.util.HashMap;
import java.util.List;
/**
* @author qidaijie
*/
public class FlumeDynamicApp implements Interceptor {
private static Logger logger = Logger.getLogger(FlumeDynamicApp.class);
@@ -38,6 +41,7 @@ public class FlumeDynamicApp implements Interceptor {
private static Object mapObject;
private static ArrayList<String[]> jobList;
@Override
public void initialize() {
map = JsonParseUtil.getMapFromhttp(schemaHttpUrl);
mapObject = JsonParseUtil.generateObject(map);
@@ -74,6 +78,7 @@ public class FlumeDynamicApp implements Interceptor {
this.hbaseTableName = hbaseTableName;
}
@Override
public Event intercept(Event event) {
String message = null;
try {
@@ -94,6 +99,7 @@ public class FlumeDynamicApp implements Interceptor {
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
List resultList = new ArrayList();
for (Event event : list) {
@@ -105,6 +111,7 @@ public class FlumeDynamicApp implements Interceptor {
return resultList;
}
@Override
public void close() {
logger.warn("FlumeDynamicApp is closed.");
}
@@ -138,12 +145,12 @@ public class FlumeDynamicApp implements Interceptor {
JsonParseUtil.setValue(object, appendToKeyName, formatUtils.getSnowflakeId(uidZookeeperIp, dataCenterIdNum));
break;
case "geo_ip_detail":
if (name != null) {
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpDetail(name.toString()));
}
break;
case "geo_asn":
if (name != null) {
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoAsn(name.toString()));
}
break;
@@ -158,7 +165,7 @@ public class FlumeDynamicApp implements Interceptor {
}
break;
case "geo_ip_country":
if (name != null) {
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(name.toString()));
}
break;
@@ -261,6 +268,7 @@ public class FlumeDynamicApp implements Interceptor {
private String hbaseTableName;
@Override
public Interceptor build() {
return new FlumeDynamicApp(this.schemaHttpUrl,
this.uidZookeeperIp, this.dataCenterIdNum,
@@ -268,6 +276,7 @@ public class FlumeDynamicApp implements Interceptor {
this.hbaseZookeeperIp, this.hbaseTableName);
}
@Override
public void configure(Context context) {
try {
this.schemaHttpUrl = context.getString("schemaHttpUrl", "");