diff --git a/.gitignore b/.gitignore index 5f95c16..26aed40 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ build/ docker-ubuntu/protection_ubuntu.tar VSCode.zip Writerside* +venv ### STS ### .apt_generated diff --git a/fake_dynamic_test/fakecc_test_send2.py b/fake_dynamic_test/fakecc_test_send2.py new file mode 100644 index 0000000..14e3dc1 --- /dev/null +++ b/fake_dynamic_test/fakecc_test_send2.py @@ -0,0 +1,114 @@ +import socket +import struct +from flask import request, jsonify,Flask +import json + +app = Flask(__name__) + +def convert_ipv4_address(ip_int): + + return socket.inet_ntoa(struct.pack('!I', ip_int)) + +def parse_and_convert_ip(data_part): + + parts = data_part.split('-') + src_ip = convert_ipv4_address(int(parts[0])) + dst_ip = convert_ipv4_address(int(parts[1])) + src_port = int(parts[2]) + dst_port = int(parts[3]) + protocol = int(parts[4]) + return src_ip, dst_ip, src_port, dst_port, protocol + + +def send_test_data(host, port, data): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((host, port)) + s.sendall(data.encode()) + + +def format_data_to_json(test_data): + data_parts = test_data.split(",") + + src_ip_v4, dst_ip_v4, src_port_v4, dst_port_v4, protocol_v4 = parse_and_convert_ip(data_parts[1]) + + formatted_data = { + "task_id": 30650, + "rule_id": 39, + "five_tuple_with_mask": { + "src_ip": src_ip_v4, + "dst_ip": dst_ip_v4, + "src_port": src_port_v4, + "dst_port": dst_port_v4, + "protocol": protocol_v4, + "mask_src_ip": data_parts[5], + "mask_dst_ip": data_parts[6], + "mask_src_port": int(data_parts[7]), + "mask_dst_ip": int(data_parts[8]), + "mask_protocol": int(data_parts[9]) + }, + "content": test_data # Keeping the original content in the 'content' field + } + return formatted_data + + + +# if __name__ == "__main__": +# HOST = '127.0.0.1' +# PORT = 65432 +# send_test_data(HOST, PORT, test_data) +# +# test_data="1702017420-1-175833107,1921297587-310737541-53420-6379-6-127-0,140717936336976-140717936336992-0-0-0-131-4481,0,000,440000,1,7,107,1,111,Amazon Data Services UK,0,0,440100,0,0,0,NULL,NULL,NULL,NULL,NULL,NULL,NULL,,9,10060101189,tcp.banner=$5115,39,0" +# test_data_2= "log_info,stream_v4,stream_v6,tunnel_v4,s_boundary,s_region,d_boundary,d_region,s_operators,s_owner,d_operators,d_owner,priority,flowid,s_city,s_district,d_city,d_district,msisdn,imsi,meid,uli,bsid,apn,event_id,return_info,file_type,file_name,file,url,cookie" +# print(len(test_data.split(",")), len(test_data_2.split(","))) +# print(test_data.split(",")[-7]) +# print(test_data_2.split(",")[-7]) +# print(test_data_2.split(",")[24]) +# +# # formatted_json = format_data_to_json(test_data) +# # print(formatted_json) +# # +# # send_test_data(HOST, PORT, formatted_json) +# send_test_data(HOST, PORT, test_data) + + +# +# @app.route('/api/v1/kafkasend', methods=['POST']) +# def kafka_send(): +# if request.method == 'POST' and request.is_json: +# data = request.json +# task_id = data.get('task_id', '') +# rule_id = data.get('rule_id', '') +# +# test_data = "1702017420-1-175833107,1921297587-310737541-53420-6379-6-127-0,140717936336976-140717936336992-0-0-0-131-4481,0,000,440000,1,7,107,1,111,Amazon Data Services UK,0,0,440100,0,0,0,NULL,NULL,NULL,NULL,NULL,NULL,NULL,,9,10060101189,tcp.banner=$5115,39,0" +# +# modified_data = f"{test_data}, {task_id}, {rule_id},{0}" +# HOST = '127.0.0.1' +# PORT = 65432 +# +# send_test_data(HOST, PORT, modified_data) +# return jsonify({'message': 'Data processed successfully'}), 200 +# +# +# return jsonify({'error': 'Invalid request'}), 400 + +@app.route('/api/v1/kafkasend', methods=['POST']) +def kafka_send(): + if request.method == 'POST' and request.is_json: + data = request.get_json() + + base_data = "1702017420-1-175833107,1921297587-310737541-53420-6379-6-127-0,140717936336976-140717936336992-0-0-0-131-4481,0,000,440000,1,7,107,1,111,Amazon Data Services UK,0,0,440100,0,0,0,NULL,NULL,NULL,NULL,NULL,NULL,NULL,,9,10060101189,tcp.banner=$5115" + for item in data: + task_id = item.get('task_id', '') + rule_id = item.get('rule_id', '') + modified_data = f"{base_data}, {task_id}, {rule_id},{0}" + HOST = '127.0.0.1' + PORT = 65432 + send_test_data(HOST, PORT, modified_data) + + return jsonify({'message': 'Data processed successfully'}), 200 + + return jsonify({'error': 'Invalid request'}), 400 + + +if __name__ == "__main__": + app.run(host='0.0.0.0', port=8081, debug=True) \ No newline at end of file diff --git a/fake_dynamic_test/main.py b/fake_dynamic_test/main.py new file mode 100644 index 0000000..1d07ecd --- /dev/null +++ b/fake_dynamic_test/main.py @@ -0,0 +1,104 @@ +import socket +from confluent_kafka import Producer + + +import socket +import struct + +def convert_ipv4_address(ip_int): + + return socket.inet_ntoa(struct.pack('!I', ip_int)) + +def parse_and_convert_ip(data_part): + + parts = data_part.split('-') + src_ip = convert_ipv4_address(int(parts[0])) + dst_ip = convert_ipv4_address(int(parts[1])) + src_port = int(parts[2]) + dst_port = int(parts[3]) + protocol = int(parts[4]) + return src_ip, dst_ip, src_port, dst_port, protocol + + +def send_test_data(host, port, data): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((host, port)) + s.sendall(data.encode()) + + +def format_data_to_json(test_data): + data_parts = test_data.split(",") + + task_id = data_parts[29] + rule_id_value = data_parts[30] + protect_object_is_src_dst = data_parts[31] + + src_ip_v4, dst_ip_v4, src_port_v4, dst_port_v4, protocol_v4 = parse_and_convert_ip(data_parts[1]) + print("task_id:", task_id, "rule_id:", rule_id_value, "protect_object_is_src_dst:", protect_object_is_src_dst, "src_ip_v4:", src_ip_v4, "dst_ip_v4:", dst_ip_v4, "src_port_v4:", src_port_v4, "dst_port_v4:", dst_port_v4, "protocol_v4:", protocol_v4) + formatted_data = { + "task_id": task_id, + "rule_id": rule_id_value, + "protect_object_is_src_dst": protect_object_is_src_dst, + "five_tuple_with_mask": { + "src_ip": src_ip_v4, + "dst_ip": dst_ip_v4, + "src_port": src_port_v4, + "dst_port": dst_port_v4, + "protocol": protocol_v4, + "event_id": data_parts[24], + # test_data_2.split(",")[-7], + "mask_src_ip": data_parts[5], + "mask_dst_ip": data_parts[6], + "mask_src_port": int(data_parts[7]), + "mask_dst_ip": int(data_parts[8]), + "mask_protocol": int(data_parts[9]) + }, + "content": test_data + } + return formatted_data + +def delivery_report(err, msg): + if err is not None: + print('Message delivery failed: {}'.format(err)) + else: + print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) + +conf = { + 'bootstrap.servers': "192.168.107.49:9092", + 'client.id': 'python-server' + } + +producer = Producer(conf) +def process_data(data, topic): + producer.produce(topic, data.encode('utf-8'), callback=delivery_report) + producer.poll(0) +def start_server(host, port, topic): + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.bind((host, port)) + server_socket.listen(1) + print(f"Listening on {host}:{port}") + + while True: + conn, addr = server_socket.accept() + print(f"Connected by {addr}") + + try: + while True: + data = conn.recv(4096) + if not data: + break + print(f"Received data: {data}") + formatted_json = format_data_to_json(data.decode()) + print(f"Formatted JSON: {formatted_json}") + import json + json_data = json.dumps(formatted_json) + + process_data(json_data, topic) + finally: + conn.close() + +if __name__ == "__main__": + HOST = '127.0.0.1' + PORT = 65432 + TOPIC = 'topic-test' + start_server(HOST, PORT, TOPIC) diff --git a/fake_dynamic_test/test_0.py b/fake_dynamic_test/test_0.py new file mode 100644 index 0000000..9961bf5 --- /dev/null +++ b/fake_dynamic_test/test_0.py @@ -0,0 +1,55 @@ +import requests +import json + + +test_data = [ + { + "task_id": 1, + "rule_id": 1, + "source_system": "BW系统", + "event_type": "xxx", + "start_time": "2024-01-17T00:00:00.000Z", + "end_time": "2024-01-31T00:00:00.000Z", + "log_rule_id": "11343234524145136635224567746", + "protect_objects": [ + { + "ip": "192.168.122.11", + "port": 1656, + "url": "alice.bob.com", + "protocol": "TCP" + }, + { + "ip": "192.168.122.114", + "port": 514, + "url": "alice.bob.com", + "protocol": "UDP" + } + ] + }, + { + "task_id": 2, + "rule_id": 2, + "source_system": "BW系统", + "event_type": "yyy", + "start_time": "2024-02-17T00:00:00.000Z", + "end_time": "2024-02-31T00:00:00.000Z", + "log_rule_id": "22343234524145136635224567746", + "protect_objects": [ + { + "ip": "192.168.123.11", + "port": 1657, + "url": "charlie.dave.com", + "protocol": "TCP" + } + ] + } +] + +url = 'http://127.0.0.1:8081/api/v1/kafkasend' + + +response = requests.post(url, json=test_data) + +# 打印响应内容 +print(f"Status Code: {response.status_code}") +print(f"Response Body: {response.text}") diff --git a/src/main/java/com/realtime/protection/configuration/entity/task/FiveTupleWithMask.java b/src/main/java/com/realtime/protection/configuration/entity/task/FiveTupleWithMask.java index e868f5d..baf5747 100644 --- a/src/main/java/com/realtime/protection/configuration/entity/task/FiveTupleWithMask.java +++ b/src/main/java/com/realtime/protection/configuration/entity/task/FiveTupleWithMask.java @@ -1,6 +1,7 @@ package com.realtime.protection.configuration.entity.task; import com.fasterxml.jackson.annotation.JsonProperty; +import com.realtime.protection.configuration.utils.enums.ProtocolEnum; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; @@ -83,4 +84,10 @@ public class FiveTupleWithMask { this.maskDestinationPort = original.maskDestinationPort; this.maskProtocol = original.maskProtocol; } + + public void setProtocolNum() { + ProtocolEnum protocol = ProtocolEnum.getProtocolEnumByProtocol(this.protocol); + assert protocol != null; + this.protocolNum = protocol.getNumber(); + } } diff --git a/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java b/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java index 98258e9..e4c1ac9 100644 --- a/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java +++ b/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java @@ -127,7 +127,9 @@ public class TaskCommandInfo { this.templateId = original.templateId; this.protectLevel = original.protectLevel; this.taskStatus = original.taskStatus; - } + public void setProtocolNum() { + this.fiveTupleWithMask.setProtocolNum(); + } } diff --git a/src/main/java/com/realtime/protection/configuration/utils/enums/ProtocolEnum.java b/src/main/java/com/realtime/protection/configuration/utils/enums/ProtocolEnum.java index 2d578ab..56ee4c5 100644 --- a/src/main/java/com/realtime/protection/configuration/utils/enums/ProtocolEnum.java +++ b/src/main/java/com/realtime/protection/configuration/utils/enums/ProtocolEnum.java @@ -1,30 +1,35 @@ package com.realtime.protection.configuration.utils.enums; +import lombok.Getter; + import java.util.HashMap; import java.util.Map; +@Getter public enum ProtocolEnum { - TCP(6), - UDP(17); + TCP(6, "TCP"), + UDP(17, "UDP"); private final Integer number; - private static final Map map = new HashMap<>(); + private final String protocol; + + private static final Map protocolNumMap = new HashMap<>(); + private static final Map protocolMap = new HashMap<>(); static { for (ProtocolEnum protocol : ProtocolEnum.values()) { - map.put(protocol.getProtocolNumber(), protocol); + protocolNumMap.put(protocol.getNumber(), protocol); + protocolMap.put(protocol.getProtocol(), protocol); } } - ProtocolEnum(int protocolNumber) { + ProtocolEnum(int protocolNumber, String protocol) { this.number = protocolNumber; - } - - public Integer getProtocolNumber() { - return this.number; + this.protocol = protocol; } public static ProtocolEnum getProtocolEnumByNumber(Integer protocolNum) { - return map.get(protocolNum); + return protocolNumMap.get(protocolNum); } + public static ProtocolEnum getProtocolEnumByProtocol(String protocol) { return protocolMap.get(protocol); } } diff --git a/src/main/java/com/realtime/protection/server/task/TaskService.java b/src/main/java/com/realtime/protection/server/task/TaskService.java index d61a16a..20f5847 100644 --- a/src/main/java/com/realtime/protection/server/task/TaskService.java +++ b/src/main/java/com/realtime/protection/server/task/TaskService.java @@ -108,7 +108,9 @@ public class TaskService { } public List getStaticCommandInfos(Long taskId) { - return taskMapper.getStaticCommandInfos(taskId); + List staticCommandInfos = taskMapper.getStaticCommandInfos(taskId); + staticCommandInfos.forEach(TaskCommandInfo::setProtocolNum); + return staticCommandInfos; } public List getDynamicTaskInfos(Long taskId) {