Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4179a0a887 | ||
|
|
6ebefc9026 | ||
|
|
701019c38a | ||
|
|
6ae7fdef06 |
2
pom.xml
2
pom.xml
@@ -7,7 +7,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>sip-rtp-correlation</artifactId>
|
||||
<version>1.2-rc1</version>
|
||||
<version>1.2-rc2</version>
|
||||
|
||||
<name>Flink : SIP-RTP : Correlation</name>
|
||||
|
||||
|
||||
@@ -99,7 +99,9 @@ public class CorrelateApp {
|
||||
new JsonNodeSerializationSchema(),
|
||||
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
|
||||
|
||||
voIpOperator.union(sipDoubleDirOperator).addSink(producer);
|
||||
voIpOperator
|
||||
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
|
||||
.addSink(producer);
|
||||
|
||||
env.execute("SIP-RTP-CORRELATION");
|
||||
}
|
||||
|
||||
@@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig;
|
||||
import org.apache.flink.api.common.state.ValueState;
|
||||
import org.apache.flink.api.common.state.ValueStateDescriptor;
|
||||
import org.apache.flink.api.common.time.Time;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.flink.util.OutputTag;
|
||||
|
||||
/**
|
||||
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
|
||||
@@ -23,6 +25,9 @@ import org.apache.flink.util.Collector;
|
||||
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
|
||||
implements FunctionHelper {
|
||||
|
||||
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
|
||||
new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
|
||||
|
||||
private transient Time fireInterval;
|
||||
|
||||
private transient ValueState<ObjectNode> valueState;
|
||||
@@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
|
||||
out.collect(value);
|
||||
valueState.clear();
|
||||
} else {
|
||||
// If the address is not yet in the mapState.
|
||||
// If the address is not yet in the valueState.
|
||||
valueState.update(value);
|
||||
}
|
||||
} else {
|
||||
@@ -77,6 +82,10 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
|
||||
public void onTimer(long timestamp,
|
||||
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
||||
Collector<ObjectNode> out) throws Exception {
|
||||
final ObjectNode value = valueState.value();
|
||||
if (value != null) {
|
||||
ctx.output(SIP_OUTPUT_TAG, value);
|
||||
}
|
||||
valueState.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,7 +61,8 @@ public class Record {
|
||||
* @return The VSys ID as an integer.
|
||||
*/
|
||||
public int getVSysID() {
|
||||
return Record.getInt(obj, F_COMMON_VSYS_ID);
|
||||
int v = Record.getInt(obj, F_COMMON_VSYS_ID);
|
||||
return v == 0 ? 1 : v;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user