diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/Address.java b/src/main/java/com/zdjizhi/flink/voip/functions/Address.java new file mode 100644 index 0000000..aacd25c --- /dev/null +++ b/src/main/java/com/zdjizhi/flink/voip/functions/Address.java @@ -0,0 +1,54 @@ +package com.zdjizhi.flink.voip.functions; + +import com.google.common.collect.Lists; +import com.zdjizhi.utils.IPUtil; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.List; + + +/** + * A pojo class representing an address with two IP and port pairs. + * + * @author chaoc + * @since 1.0 + */ +@Data +@AllArgsConstructor +class Address { + + // The first IP address. + private final String ip1; + + //The first port number. + private final int port1; + + //The second IP address. + private final String ip2; + + //The second port number. + private final int port2; + + /** + * Creates an Address instance based on two tuples containing (String, Int) representing address information. + * The method sorts the addresses based on the port number, and if the ports are equal, it sorts them based on + * the numeric value of the IP address. + * + * @param a1 The first address information as a tuple (IP address, port). + * @param a2 The second address information as a tuple (IP address, port). + * @return An Address instance with addresses sorted and reordered. + */ + public static Address of(Tuple2 a1, Tuple2 a2) { + List> list = Lists.newArrayList(a1, a2); + list.sort((a, b) -> { + if (a.f1.equals(b.f1)) { + return Long.compare(IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0)); + } else { + return a.f1.compareTo(b.f1); + } + }); + return new Address(list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1); + } +} diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/ObjectNodeInfo.java b/src/main/java/com/zdjizhi/flink/voip/functions/ObjectNodeInfo.java new file mode 100644 index 0000000..81559fd --- /dev/null +++ b/src/main/java/com/zdjizhi/flink/voip/functions/ObjectNodeInfo.java @@ -0,0 +1,31 @@ +package com.zdjizhi.flink.voip.functions; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * A case class representing an ObjectNode with an expiration time and a pair times. + * + * @author chaoc + * @since 1.0 + */ +@Data +@AllArgsConstructor +class ObjectNodeInfo { + + // The ObjectNode containing data. + private final ObjectNode obj; + + // The expiration time for the object. + private final long expireTime; + + // The pair times for the object. + private final int times; + + public ObjectNodeInfo(final ObjectNode obj, final long expireTime) { + this.obj = obj; + this.expireTime = expireTime; + this.times = 0; + } +}