4 Commits

Author SHA1 Message Date
梁超
4179a0a887 Merge branch 'bugfix/some-err' into 'main'
[GAL-444] fix: output unmatched sip record

See merge request galaxy/tsg_olap/sip-rtp-correlation!19
2023-12-06 10:17:02 +00:00
chaoc
6ebefc9026 chore: update version 2023-12-06 18:15:37 +08:00
chaoc
701019c38a [GAL-444] fix: output unmatched sip record 2023-12-06 18:14:52 +08:00
chaoc
6ae7fdef06 [GAL-444] fix: use default '1' instead of '0' 2023-12-06 18:14:33 +08:00
4 changed files with 16 additions and 4 deletions

View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.2-rc1</version>
<version>1.2-rc2</version>
<name>Flink : SIP-RTP : Correlation</name>

View File

@@ -99,7 +99,9 @@ public class CorrelateApp {
new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
voIpOperator.union(sipDoubleDirOperator).addSink(producer);
voIpOperator
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
.addSink(producer);
env.execute("SIP-RTP-CORRELATION");
}

View File

@@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
@@ -23,6 +25,9 @@ import org.apache.flink.util.Collector;
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
implements FunctionHelper {
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
private transient Time fireInterval;
private transient ValueState<ObjectNode> valueState;
@@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
out.collect(value);
valueState.clear();
} else {
// If the address is not yet in the mapState.
// If the address is not yet in the valueState.
valueState.update(value);
}
} else {
@@ -77,6 +82,10 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
public void onTimer(long timestamp,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
final ObjectNode value = valueState.value();
if (value != null) {
ctx.output(SIP_OUTPUT_TAG, value);
}
valueState.clear();
}
}

View File

@@ -61,7 +61,8 @@ public class Record {
* @return The VSys ID as an integer.
*/
public int getVSysID() {
return Record.getInt(obj, F_COMMON_VSYS_ID);
int v = Record.getInt(obj, F_COMMON_VSYS_ID);
return v == 0 ? 1 : v;
}
/**