diff --git a/pom.xml b/pom.xml index 7d282d6..217901d 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-completion-schema - 211206-radius + 220209-ipLookup log-completion-schema http://www.example.com @@ -115,7 +115,7 @@ com.zdjizhi galaxy - 1.0.7 + 1.0.8 slf4j-log4j12 @@ -134,15 +134,6 @@ 1.2.70 - - - - - - - - - org.apache.flink @@ -168,15 +159,6 @@ ${scope.type} - - - - - - - - - org.apache.flink @@ -295,7 +277,7 @@ cn.hutool hutool-all - 5.5.2 + 5.7.17 diff --git a/properties/default_config.properties b/properties/default_config.properties index 71f83b6..ebf7927 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -32,7 +32,7 @@ max.request.size=10485760 kafka.source.protocol=SASL #kafka sink protocol; SSL or SASL -kafka.sink.protocol=SSL +kafka.sink.protocol=SASL #kafka SASL验证用户名 kafka.user=admin diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 4480bc9..ddd10f6 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,26 +1,26 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=10.224.11.14:9094 +source.kafka.servers=192.168.44.11:9094 #管理输出kafka地址 -sink.kafka.servers=10.224.11.14:9095,10.224.11.15:9095,10.224.11.16:9095,10.224.11.17:9095,10.224.11.18:9095,10.224.11.19:9095,10.224.11.20:9095,10.224.11.21:9095,10.224.11.22:9095,10.224.11.23:9095 +sink.kafka.servers=192.168.44.11:9094 #zookeeper 地址 用于配置log_id -zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 +zookeeper.servers=192.168.44.11:2181 #hbase zookeeper地址 用于连接HBase -hbase.zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 +hbase.zookeeper.servers=192.168.44.11:2181 #--------------------------------HTTP/定位库------------------------------# #定位库地址 tools.library=D:\\workerspace\\dat\\ #网关的schema位置 -schema.http=http://10.224.11.244:9999/metadata/schema/v1/fields/session_record +schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/session_record #网关APP_ID 获取接口 -app.id.http=http://10.224.11.244:9999/open-api/appDicList +app.id.http=http://192.168.44.67:9999/open-api/appDicList #--------------------------------Kafka消费组信息------------------------------# @@ -42,16 +42,16 @@ producer.ack=1 #--------------------------------topology配置------------------------------# #consumer 并行度 -source.parallelism=10 +source.parallelism=1 #转换函数并行度 -transform.parallelism=10 +transform.parallelism=1 #kafka producer 并行度 -sink.parallelism=10 +sink.parallelism=1 #数据中心,取值范围(0-63) -data.center.id.num=7 +data.center.id.num=0 #hbase 更新时间,如填写0则不更新缓存 hbase.tick.tuple.freq.secs=180 diff --git a/src/main/java/com/zdjizhi/utils/general/CityHash.java b/src/main/java/com/zdjizhi/utils/general/CityHash.java new file mode 100644 index 0000000..e87ddc9 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/CityHash.java @@ -0,0 +1,179 @@ +package com.zdjizhi.utils.general; + + + + +/** + * CityHash64算法对logid进行散列计算 + * 版本规划暂不实现-TSG22.01 + * + * @author qidaijie + */ +public class CityHash { + + private static final long k0 = 0xc3a5c85c97cb3127L; + private static final long k1 = 0xb492b66fbe98f273L; + private static final long k2 = 0x9ae16a3b2f90404fL; + private static final long k3 = 0xc949d7c7509e6557L; + private static final long k5 = 0x9ddfea08eb382d69L; + + private CityHash() {} + + public static long CityHash64(byte[] s, int index, int len) { + if (len <= 16 ) { + return HashLen0to16(s, index, len); + } else if (len > 16 && len <= 32) { + return HashLen17to32(s, index, len); + } else if (len > 32 && len <= 64) { + return HashLen33to64(s, index, len); + } else { + long x = Fetch64(s, index); + long y = Fetch64(s, index + len - 16) ^ k1; + long z = Fetch64(s, index + len - 56) ^ k0; + long[] v = WeakHashLen32WithSeeds(s, len - 64, len, y); + long[] w = WeakHashLen32WithSeeds(s, len - 32, len * k1, k0); + z += ShiftMix(v[1]) * k1; + x = Rotate(z + x, 39) * k1; + y = Rotate(y, 33) * k1; + + len = (len - 1) & ~63; + do { + x = Rotate(x + y + v[0] + Fetch64(s, index + 16), 37) * k1; + y = Rotate(y + v[1] + Fetch64(s, index + 48), 42) * k1; + x ^= w[1]; + y ^= v[0]; + z = Rotate(z ^ w[0], 33); + v = WeakHashLen32WithSeeds(s, index, v[1] * k1, x + w[0]); + w = WeakHashLen32WithSeeds(s, index + 32, z + w[1], y); + long t = z; + z = x; + x = t; + index += 64; + len -= 64; + } while (len != 0); + return HashLen16(HashLen16(v[0], w[0]) + ShiftMix(y) * k1 + z, + HashLen16(v[1], w[1]) + x); + } + } + + private static long HashLen0to16(byte[] s, int index, int len) { + if (len > 8) { + long a = Fetch64(s, index); + long b = Fetch64(s, index + len - 8); + return HashLen16(a, RotateByAtLeastOne(b + len, len)) ^ b; + } + if (len >= 4) { + long a = Fetch32(s, index); + return HashLen16(len + (a << 3), Fetch32(s, index + len - 4)); + } + if (len > 0) { + byte a = s[index]; + byte b = s[index + len >>> 1]; + byte c = s[index + len - 1]; + int y = (a) + (b << 8); + int z = len + (c << 2); + return ShiftMix(y * k2 ^ z * k3) * k2; + } + return k2; + } + + private static long HashLen17to32(byte[] s, int index, int len) { + long a = Fetch64(s, index) * k1; + long b = Fetch64(s, index + 8); + long c = Fetch64(s, index + len - 8) * k2; + long d = Fetch64(s, index + len - 16) * k0; + return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d, + a + Rotate(b ^ k3, 20) - c + len); + } + + private static long HashLen33to64(byte[] s, int index, int len) { + long z = Fetch64(s, index + 24); + long a = Fetch64(s, index) + (len + Fetch64(s, index + len - 16)) * k0; + long b = Rotate(a + z, 52); + long c = Rotate(a, 37); + a += Fetch64(s, index + 8); + c += Rotate(a, 7); + a += Fetch64(s, index + 16); + long vf = a + z; + long vs = b + Rotate(a, 31) + c; + a = Fetch64(s, index + 16) + Fetch64(s, index + len - 32); + z = Fetch64(s, index + len - 8); + b = Rotate(a + z, 52); + c = Rotate(a, 37); + a += Fetch64(s, index + len - 24); + c += Rotate(a, 7); + a += Fetch64(s, index + len - 16); + long wf = a + z; + long ws = b + Rotate(a, 31) + c; + long r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0); + return ShiftMix(r * k0 + vs) * k2; + } + + private static long Fetch64(byte[] p, int index) { + return toLongLE(p,index); + } + + private static long Fetch32(byte[] p, int index) { + return toIntLE(p,index); + } + private static long[] WeakHashLen32WithSeeds( + long w, long x, long y, long z, long a, long b) { + a += w; + b = Rotate(b + a + z, 21); + long c = a; + a += x; + a += y; + b += Rotate(a, 44); + return new long[]{a + z, b + c}; + } + + private static long[] WeakHashLen32WithSeeds(byte[] s, int index, long a, long b) { + return WeakHashLen32WithSeeds(Fetch64(s, index), + Fetch64(s, index + 8), + Fetch64(s, index + 16), + Fetch64(s, index + 24), + a, + b); + } + + private static long toLongLE(byte[] b, int i) { + return 0xffffffffffffffffL & (((long) b[i + 7] << 56) + ((long) (b[i + 6] & 255) << 48) + ((long) (b[i + 5] & 255) << 40) + ((long) (b[i + 4] & 255) << 32) + ((long) (b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0)); + } + + private static long toIntLE(byte[] b, int i) { + return 0xffffffffL & (((b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0)); + } + private static long RotateByAtLeastOne(long val, int shift) { + return (val >>> shift) | (val << (64 - shift)); + } + + private static long ShiftMix(long val) { + return val ^ (val >>> 47); + } + + private static long Uint128Low64(long[] x) { + return x[0]; + } + + private static long Rotate(long val, int shift) { + return shift == 0 ? val : (val >>> shift) | (val << (64 - shift)); + } + + private static long Uint128High64(long[] x) { + return x[1]; + } + + private static long Hash128to64(long[] x) { + long a = (Uint128Low64(x) ^ Uint128High64(x)) * k5; + a ^= (a >>> 47); + long b = (Uint128High64(x) ^ a) * k5; + b ^= (b >>> 47); + b *= k5; + return b; + } + + private static long HashLen16(long u, long v) { + return Hash128to64(new long[]{u,v}); + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java index 765e23e..699470f 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java @@ -88,6 +88,8 @@ public class TransFormTypeMap { break; case "snowflake_id": JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); + //版本规划暂不实现TSG-22.01 +// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId())); break; case "geo_ip_detail": if (logValue != null && appendToKeyValue == null) { diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java index 549f3cc..7923fd1 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -1,20 +1,21 @@ package com.zdjizhi.utils.general; import cn.hutool.core.codec.Base64; -import cn.hutool.core.text.StrSpliter; +import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.jayway.jsonpath.InvalidPathException; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.FormatUtils; -import com.zdjizhi.utils.IpLookup; +import com.zdjizhi.utils.IpLookupV2; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.app.AppUtils; import com.zdjizhi.utils.hbase.HBaseUtils; import com.zdjizhi.utils.json.JsonParseUtil; import com.zdjizhi.utils.json.TypeUtils; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Map; import java.util.regex.Matcher; @@ -24,19 +25,21 @@ import java.util.regex.Pattern; * @author qidaijie */ class TransFunction { - private static final Log logger = LogFactory.get(); + /** + * 校验数字正则 + */ private static final Pattern PATTERN = Pattern.compile("[0-9]*"); /** * IP定位库工具类 */ - private static IpLookup ipLookup = new IpLookup.Builder(false) - .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb") - .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6.mmdb") - .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v4.mmdb") - .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v6.mmdb") + private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) + .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb") + .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb") + .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb") + .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb") .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb") .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb") .build(); @@ -49,6 +52,21 @@ class TransFunction { return System.currentTimeMillis() / 1000; } + /** + * CityHash64算法 + * 版本规划暂不实现-TSG22.01 + * + * @param data 原始数据 + * @return 散列结果 + */ + static BigInteger getDecimalHash(long data) { + byte[] dataBytes = String.valueOf(data).getBytes(); + long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length); + String decimalValue = Long.toUnsignedString(hashValue, 10); + BigInteger result = new BigInteger(decimalValue); + return result; + } + /** * 根据clientIp获取location信息 * @@ -56,9 +74,7 @@ class TransFunction { * @return ip地址详细信息 */ static String getGeoIpDetail(String ip) { - return ipLookup.cityLookupDetail(ip); - } /** @@ -68,7 +84,6 @@ class TransFunction { * @return ASN */ static String getGeoAsn(String ip) { - return ipLookup.asnLookup(ip); } @@ -102,7 +117,7 @@ class TransFunction { */ static String appMatch(String appIds) { try { - String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0); + String appId = StrUtil.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0); return AppUtils.getAppName(Integer.parseInt(appId)); } catch (NumberFormatException | ClassCastException exception) { logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds);