test(utils): optimize data generator
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package com.zdjizhi.flink.voip.data;
|
||||
|
||||
import com.github.javafaker.Faker;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@@ -23,7 +24,9 @@ public abstract class Generator<T> {
|
||||
|
||||
private final Faker faker;
|
||||
|
||||
private final AtomicReference<T> state;
|
||||
protected final AtomicReference<T> state;
|
||||
|
||||
protected final ObjectMapper mapper;
|
||||
|
||||
/**
|
||||
* Creates a new Generator with the given ratio.
|
||||
@@ -35,6 +38,7 @@ public abstract class Generator<T> {
|
||||
this.faker = new Faker();
|
||||
this.random = ThreadLocalRandom.current();
|
||||
this.state = new AtomicReference<>();
|
||||
this.mapper = new ObjectMapper();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -43,16 +47,7 @@ public abstract class Generator<T> {
|
||||
*
|
||||
* @return The next generated record of type T.
|
||||
*/
|
||||
public final T next() {
|
||||
int i = random.nextInt(100);
|
||||
if (i < ratio && state.get() != null) {
|
||||
T t = afterState(state.get());
|
||||
state.set(null);
|
||||
return t;
|
||||
} else {
|
||||
return state.updateAndGet(t -> generate());
|
||||
}
|
||||
}
|
||||
public abstract T next();
|
||||
|
||||
/**
|
||||
* Generates a new record of type T.
|
||||
|
||||
@@ -2,8 +2,8 @@ package com.zdjizhi.flink.voip.data;
|
||||
|
||||
import com.zdjizhi.flink.voip.records.Record;
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
/**
|
||||
@@ -15,11 +15,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
|
||||
* @since 1.0
|
||||
*/
|
||||
public class SIPGenerator extends Generator<ObjectNode> {
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
public SIPGenerator(final int ratio) {
|
||||
super(ratio);
|
||||
this.mapper = new ObjectMapper();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -29,12 +26,27 @@ public class SIPGenerator extends Generator<ObjectNode> {
|
||||
this(40);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ObjectNode next() {
|
||||
int i = random.nextInt(100);
|
||||
if (i < ratio && state.get() != null) {
|
||||
ObjectNode t = afterState(state.get());
|
||||
state.set(null);
|
||||
return t;
|
||||
} else {
|
||||
return state.updateAndGet(t -> generate());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected ObjectNode generate() {
|
||||
final ObjectNode obj = mapper.createObjectNode();
|
||||
|
||||
final SIPRecord record = new SIPRecord(obj);
|
||||
|
||||
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.SIP.getValue());
|
||||
|
||||
record.setString(SIPRecord.F_CALL_ID, nextId());
|
||||
record.setInt(SIPRecord.F_COMMON_STREAM_DIR, (random.nextBoolean() ? StreamDir.S2C : StreamDir.C2S).getValue());
|
||||
|
||||
@@ -46,7 +58,7 @@ public class SIPGenerator extends Generator<ObjectNode> {
|
||||
record.setString(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, nextIp());
|
||||
record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort());
|
||||
record.setString(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, nextIp());
|
||||
record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort());
|
||||
record.setInt(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT, nextPort());
|
||||
return obj;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user