This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files

107 lines
3.5 KiB
Python
Raw Permalink Normal View History

import socket
from confluent_kafka import Producer
import socket
import struct
def convert_ipv4_address(ip_int):
return socket.inet_ntoa(struct.pack('!I', ip_int))
def parse_and_convert_ip(data_part):
parts = data_part.split('-')
src_ip = convert_ipv4_address(int(parts[0]))
dst_ip = convert_ipv4_address(int(parts[1]))
src_port = int(parts[2])
dst_port = int(parts[3])
protocol = int(parts[4])
return src_ip, dst_ip, src_port, dst_port, protocol
def send_test_data(host, port, data):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
s.sendall(data.encode())
def format_data_to_json(test_data):
data_parts = test_data.split(",")
task_id = data_parts[29]
rule_id_value = data_parts[30]
protect_object_is_src_dst = data_parts[31]
src_ip_v4, dst_ip_v4, src_port_v4, dst_port_v4, protocol_v4 = parse_and_convert_ip(data_parts[1])
print("task_id:", task_id, "rule_id:", rule_id_value, "protect_object_is_src_dst:", protect_object_is_src_dst, "src_ip_v4:", src_ip_v4, "dst_ip_v4:", dst_ip_v4, "src_port_v4:", src_port_v4, "dst_port_v4:", dst_port_v4, "protocol_v4:", protocol_v4)
formatted_data = {
"task_id": task_id,
"rule_id": rule_id_value,
"protect_object_is_src_dst": protect_object_is_src_dst,
"five_tuple_with_mask": {
"src_ip": src_ip_v4,
"dst_ip": dst_ip_v4,
"src_port": src_port_v4,
"dst_port": dst_port_v4,
"protocol": protocol_v4,
"event_id": data_parts[24],
# test_data_2.split(",")[-7],
# "mask_src_ip": data_parts[5],
# "mask_dst_ip": data_parts[6],
# "mask_src_port": int(data_parts[7]),
# "mask_dst_ip": int(data_parts[8]),
# "mask_protocol": int(data_parts[9])
},
"content": test_data
}
return formatted_data
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
conf = {
'bootstrap.servers': "192.168.107.49:9092",
# bootstrap.servers: '10.58.72.125:9092,10.58.72.126:9092,10.58.72.127:9092,10.58.72.128:9092,10.58.72.129:9092,\
# 10.58.72.130:9092,10.58.72.131:9092,10.58.72.132:9092,10.58.72.133:9092,10.58.72.134:9092',
'client.id': 'python-server'
}
producer = Producer(conf)
def process_data(data, topic):
producer.produce(topic, data.encode('utf-8'), callback=delivery_report)
producer.poll(0)
def start_server(host, port, topic):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen(1)
print(f"Listening on {host}:{port}")
while True:
conn, addr = server_socket.accept()
print(f"Connected by {addr}")
try:
while True:
data = conn.recv(4096)
if not data:
break
print(f"Received data: {data}")
formatted_json = format_data_to_json(data.decode())
print(f"Formatted JSON: {formatted_json}")
import json
json_data = json.dumps(formatted_json)
process_data(json_data, topic)
finally:
conn.close()
if __name__ == "__main__":
HOST = '127.0.0.1'
PORT = 65432
TOPIC = 'topic-alert'
start_server(HOST, PORT, TOPIC)