结果输出时仅在最内层输出app_name
This commit is contained in:
@@ -18,6 +18,7 @@ import java.util.Map;
|
|||||||
*/
|
*/
|
||||||
public class ResultFlatMapFunction implements FlatMapFunction<String, String> {
|
public class ResultFlatMapFunction implements FlatMapFunction<String, String> {
|
||||||
private static String[] jobList = JsonParseUtil.getHierarchy();
|
private static String[] jobList = JsonParseUtil.getHierarchy();
|
||||||
|
private static final String APP_NAME = "app_name";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@@ -26,6 +27,8 @@ public class ResultFlatMapFunction implements FlatMapFunction<String, String> {
|
|||||||
String name = jobList[0];
|
String name = jobList[0];
|
||||||
Map<String, Object> jsonObject = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
|
Map<String, Object> jsonObject = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
|
||||||
String protocol = JsonParseUtil.getString(jsonObject, name);
|
String protocol = JsonParseUtil.getString(jsonObject, name);
|
||||||
|
String appName = JsonParseUtil.getString(jsonObject, APP_NAME);
|
||||||
|
jsonObject.remove(APP_NAME);
|
||||||
if (StringUtil.isNotBlank(protocol)) {
|
if (StringUtil.isNotBlank(protocol)) {
|
||||||
String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER);
|
String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER);
|
||||||
for (String proto : protocolIds) {
|
for (String proto : protocolIds) {
|
||||||
@@ -35,6 +38,9 @@ public class ResultFlatMapFunction implements FlatMapFunction<String, String> {
|
|||||||
out.collect(JsonMapper.toJsonString(jsonObject));
|
out.collect(JsonMapper.toJsonString(jsonObject));
|
||||||
} else {
|
} else {
|
||||||
stringBuffer.append(jobList[1]).append(proto);
|
stringBuffer.append(jobList[1]).append(proto);
|
||||||
|
if (proto.equals(appName)) {
|
||||||
|
jsonObject.put(APP_NAME, appName);
|
||||||
|
}
|
||||||
jsonObject.put(name, stringBuffer.toString());
|
jsonObject.put(name, stringBuffer.toString());
|
||||||
out.collect(JsonMapper.toJsonString(jsonObject));
|
out.collect(JsonMapper.toJsonString(jsonObject));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -66,13 +66,6 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Stri
|
|||||||
case "hierarchy":
|
case "hierarchy":
|
||||||
String key = JsonParseUtil.getString(dimensionsObj, resultKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM);
|
String key = JsonParseUtil.getString(dimensionsObj, resultKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM);
|
||||||
return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(originalLog));
|
return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(originalLog));
|
||||||
// case "dismantling":
|
|
||||||
// if (StringUtil.isNotBlank(parameters)) {
|
|
||||||
// if (logsValue != null) {
|
|
||||||
// JsonParseUtil.setValue(message, logsKeyName, ParseFunctions.dismantlingUtils(parameters, logsValue));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// break;
|
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -59,7 +59,6 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<Strin
|
|||||||
if (!cacheMap.isEmpty()) {
|
if (!cacheMap.isEmpty()) {
|
||||||
for (String dimensions : cacheMap.keySet()) {
|
for (String dimensions : cacheMap.keySet()) {
|
||||||
Map<String, Object> resultMap = cacheMap.get(dimensions);
|
Map<String, Object> resultMap = cacheMap.get(dimensions);
|
||||||
System.out.println("resultMap"+resultMap.toString());
|
|
||||||
output.collect(new Tuple2<>(dimensions, JsonMapper.toJsonString(resultMap)));
|
output.collect(new Tuple2<>(dimensions, JsonMapper.toJsonString(resultMap)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -118,13 +118,12 @@ public class ParseFunctions {
|
|||||||
/**
|
/**
|
||||||
* 根据表达式解析json
|
* 根据表达式解析json
|
||||||
* <p>
|
* <p>
|
||||||
* // * @param message json
|
* //* @param message json
|
||||||
*
|
*
|
||||||
* @param expr 解析表达式
|
* @param expr 解析表达式
|
||||||
* @return 解析结果
|
* @return 解析结果
|
||||||
*/
|
*/
|
||||||
public static void flattenSpec(Map<String, Object> dimensions, String expr, String resultKeyName, String logsKeyValue) {
|
public static void flattenSpec(Map<String, Object> dimensions, String expr, String resultKeyName, String logsKeyValue) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (StringUtil.isNotBlank(expr)) {
|
if (StringUtil.isNotBlank(expr)) {
|
||||||
ArrayList<Object> read = JsonPath.parse(logsKeyValue).read(expr);
|
ArrayList<Object> read = JsonPath.parse(logsKeyValue).read(expr);
|
||||||
|
|||||||
Reference in New Issue
Block a user