test(utils): add test data generator

This commit is contained in:
chaoc
2023-08-03 17:18:00 +08:00
parent 14d569edfd
commit cbc7c91abe
2 changed files with 170 additions and 0 deletions

View File

@@ -0,0 +1,103 @@
package com.zdjizhi.flink.voip.data;
import com.github.javafaker.Faker;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
/**
* The abstract class Generator<T> serves as the base class for implementing record generators.
* It provides common functionalities for generating random data and controlling the generation ratio.
*
* @param <T> The type of records to be generated.
* @author chaoc
* @since 1.0
*/
public abstract class Generator<T> {
protected final ThreadLocalRandom random;
/**
* The ratio of generated records. The total number of records generated will be 100 / ratio.
*/
protected final int ratio;
private final Faker faker;
private final AtomicReference<T> state;
/**
* Creates a new Generator with the given ratio.
*
* @param ratio The ratio of generated records. The total number of records generated will be 100 / ratio.
*/
public Generator(final int ratio) {
this.ratio = ratio;
this.faker = new Faker();
this.random = ThreadLocalRandom.current();
this.state = new AtomicReference<>();
}
/**
* Generates the next record based on the specified ratio and the current state.
* It randomly selects whether to generate a new record or return the last generated record.
*
* @return The next generated record of type T.
*/
public final T next() {
int i = random.nextInt(100);
if (i < ratio && state.get() != null) {
T t = afterState(state.get());
state.set(null);
return t;
} else {
return state.updateAndGet(t -> generate());
}
}
/**
* Generates a new record of type T.
*
* @return The newly generated record of type T.
*/
protected abstract T generate();
/**
* Performs post-processing on the generated record of type T.
* Subclasses can override this method to modify the generated record before returning it.
*
* @param v The generated record of type T.
* @return The post-processed record of type T.
*/
protected abstract T afterState(T v);
/**
* Generates a random IP address (either IPv4 or IPv6) .
*
* @return A randomly generated IP address as a string.
*/
public final String nextIp() {
if (random.nextBoolean()) {
return faker.internet().ipV4Address();
}
return faker.internet().ipV6Address();
}
/**
* Generates a random ID number.
*
* @return A randomly generated ID number as a string.
*/
public final String nextId() {
return faker.idNumber().valid();
}
/**
* Generates a random port number within the range of 0 to 65535 (inclusive).
*
* @return A randomly generated port number as an integer.
*/
public final int nextPort() {
return random.nextInt(65535);
}
}

View File

@@ -0,0 +1,67 @@
package com.zdjizhi.flink.voip.data;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* SIPGenerator extends Generator<ObjectNode> and is responsible for generating SIP (Session Initiation Protocol) records.
* It generates random SIP records with specific properties such as call ID, stream dir, server IP, server port,
* client IP, client port... information for both originator and responder.
*
* @author chaoc
* @since 1.0
*/
public class SIPGenerator extends Generator<ObjectNode> {
private final ObjectMapper mapper;
public SIPGenerator(final int ratio) {
super(ratio);
this.mapper = new ObjectMapper();
}
/**
* Creates a new SIPGenerator with the default ratio of 40.
*/
public SIPGenerator() {
this(40);
}
@Override
protected ObjectNode generate() {
final ObjectNode obj = mapper.createObjectNode();
final SIPRecord record = new SIPRecord(obj);
record.setString(SIPRecord.F_CALL_ID, nextId());
record.setInt(SIPRecord.F_COMMON_STREAM_DIR, (random.nextBoolean() ? StreamDir.S2C : StreamDir.C2S).getValue());
record.setString(Record.F_COMMON_SERVER_IP, nextIp());
record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
record.setString(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, nextIp());
record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort());
record.setString(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, nextIp());
record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort());
return obj;
}
@Override
protected ObjectNode afterState(ObjectNode v) {
final Record record = new Record(v);
switch (StreamDir.of(record.getStreamDir())) {
case C2S:
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
break;
case S2C:
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
break;
default:
}
return v;
}
}