diff --git a/src/test/java/com/zdjizhi/flink/voip/data/Generator.java b/src/test/java/com/zdjizhi/flink/voip/data/Generator.java new file mode 100644 index 0000000..20f4174 --- /dev/null +++ b/src/test/java/com/zdjizhi/flink/voip/data/Generator.java @@ -0,0 +1,103 @@ +package com.zdjizhi.flink.voip.data; + +import com.github.javafaker.Faker; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The abstract class Generator serves as the base class for implementing record generators. + * It provides common functionalities for generating random data and controlling the generation ratio. + * + * @param The type of records to be generated. + * @author chaoc + * @since 1.0 + */ +public abstract class Generator { + + protected final ThreadLocalRandom random; + /** + * The ratio of generated records. The total number of records generated will be 100 / ratio. + */ + protected final int ratio; + + private final Faker faker; + + private final AtomicReference state; + + /** + * Creates a new Generator with the given ratio. + * + * @param ratio The ratio of generated records. The total number of records generated will be 100 / ratio. + */ + public Generator(final int ratio) { + this.ratio = ratio; + this.faker = new Faker(); + this.random = ThreadLocalRandom.current(); + this.state = new AtomicReference<>(); + } + + /** + * Generates the next record based on the specified ratio and the current state. + * It randomly selects whether to generate a new record or return the last generated record. + * + * @return The next generated record of type T. + */ + public final T next() { + int i = random.nextInt(100); + if (i < ratio && state.get() != null) { + T t = afterState(state.get()); + state.set(null); + return t; + } else { + return state.updateAndGet(t -> generate()); + } + } + + /** + * Generates a new record of type T. + * + * @return The newly generated record of type T. + */ + protected abstract T generate(); + + /** + * Performs post-processing on the generated record of type T. + * Subclasses can override this method to modify the generated record before returning it. + * + * @param v The generated record of type T. + * @return The post-processed record of type T. + */ + protected abstract T afterState(T v); + + /** + * Generates a random IP address (either IPv4 or IPv6) . + * + * @return A randomly generated IP address as a string. + */ + public final String nextIp() { + if (random.nextBoolean()) { + return faker.internet().ipV4Address(); + } + return faker.internet().ipV6Address(); + } + + /** + * Generates a random ID number. + * + * @return A randomly generated ID number as a string. + */ + public final String nextId() { + return faker.idNumber().valid(); + } + + /** + * Generates a random port number within the range of 0 to 65535 (inclusive). + * + * @return A randomly generated port number as an integer. + */ + public final int nextPort() { + return random.nextInt(65535); + } +} + diff --git a/src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java b/src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java new file mode 100644 index 0000000..cdb9dff --- /dev/null +++ b/src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java @@ -0,0 +1,67 @@ +package com.zdjizhi.flink.voip.data; + +import com.zdjizhi.flink.voip.records.Record; +import com.zdjizhi.flink.voip.records.SIPRecord; +import com.zdjizhi.flink.voip.records.StreamDir; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * SIPGenerator extends Generator and is responsible for generating SIP (Session Initiation Protocol) records. + * It generates random SIP records with specific properties such as call ID, stream dir, server IP, server port, + * client IP, client port... information for both originator and responder. + * + * @author chaoc + * @since 1.0 + */ +public class SIPGenerator extends Generator { + private final ObjectMapper mapper; + + public SIPGenerator(final int ratio) { + super(ratio); + this.mapper = new ObjectMapper(); + } + + /** + * Creates a new SIPGenerator with the default ratio of 40. + */ + public SIPGenerator() { + this(40); + } + + @Override + protected ObjectNode generate() { + final ObjectNode obj = mapper.createObjectNode(); + + final SIPRecord record = new SIPRecord(obj); + + record.setString(SIPRecord.F_CALL_ID, nextId()); + record.setInt(SIPRecord.F_COMMON_STREAM_DIR, (random.nextBoolean() ? StreamDir.S2C : StreamDir.C2S).getValue()); + + record.setString(Record.F_COMMON_SERVER_IP, nextIp()); + record.setInt(Record.F_COMMON_SERVER_PORT, nextPort()); + record.setString(Record.F_COMMON_CLIENT_IP, nextIp()); + record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort()); + + record.setString(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, nextIp()); + record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort()); + record.setString(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, nextIp()); + record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort()); + return obj; + } + + @Override + protected ObjectNode afterState(ObjectNode v) { + final Record record = new Record(v); + switch (StreamDir.of(record.getStreamDir())) { + case C2S: + record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue()); + break; + case S2C: + record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue()); + break; + default: + } + return v; + } +}