107 lines
3.5 KiB
Python
107 lines
3.5 KiB
Python
import socket
|
||
from confluent_kafka import Producer
|
||
|
||
|
||
import socket
|
||
import struct
|
||
|
||
def convert_ipv4_address(ip_int):
|
||
|
||
return socket.inet_ntoa(struct.pack('!I', ip_int))
|
||
|
||
def parse_and_convert_ip(data_part):
|
||
|
||
parts = data_part.split('-')
|
||
src_ip = convert_ipv4_address(int(parts[0]))
|
||
dst_ip = convert_ipv4_address(int(parts[1]))
|
||
src_port = int(parts[2])
|
||
dst_port = int(parts[3])
|
||
protocol = int(parts[4])
|
||
return src_ip, dst_ip, src_port, dst_port, protocol
|
||
|
||
|
||
def send_test_data(host, port, data):
|
||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
s.connect((host, port))
|
||
s.sendall(data.encode())
|
||
|
||
|
||
def format_data_to_json(test_data):
|
||
data_parts = test_data.split(",")
|
||
|
||
task_id = data_parts[29]
|
||
rule_id_value = data_parts[30]
|
||
protect_object_is_src_dst = data_parts[31]
|
||
|
||
src_ip_v4, dst_ip_v4, src_port_v4, dst_port_v4, protocol_v4 = parse_and_convert_ip(data_parts[1])
|
||
print("task_id:", task_id, "rule_id:", rule_id_value, "protect_object_is_src_dst:", protect_object_is_src_dst, "src_ip_v4:", src_ip_v4, "dst_ip_v4:", dst_ip_v4, "src_port_v4:", src_port_v4, "dst_port_v4:", dst_port_v4, "protocol_v4:", protocol_v4)
|
||
formatted_data = {
|
||
"task_id": task_id,
|
||
"rule_id": rule_id_value,
|
||
"protect_object_is_src_dst": protect_object_is_src_dst,
|
||
"five_tuple_with_mask": {
|
||
"src_ip": src_ip_v4,
|
||
"dst_ip": dst_ip_v4,
|
||
"src_port": src_port_v4,
|
||
"dst_port": dst_port_v4,
|
||
"protocol": protocol_v4,
|
||
"event_id": data_parts[24],
|
||
# test_data_2.split(",")[-7],
|
||
# "mask_src_ip": data_parts[5],
|
||
# "mask_dst_ip": data_parts[6],
|
||
# "mask_src_port": int(data_parts[7]),
|
||
# "mask_dst_ip": int(data_parts[8]),
|
||
# "mask_protocol": int(data_parts[9])
|
||
},
|
||
"content": test_data
|
||
}
|
||
return formatted_data
|
||
|
||
def delivery_report(err, msg):
|
||
if err is not None:
|
||
print('Message delivery failed: {}'.format(err))
|
||
else:
|
||
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
|
||
|
||
conf = {
|
||
'bootstrap.servers': "192.168.107.49:9092",
|
||
# ‘bootstrap.servers’: '10.58.72.125:9092,10.58.72.126:9092,10.58.72.127:9092,10.58.72.128:9092,10.58.72.129:9092,\
|
||
# 10.58.72.130:9092,10.58.72.131:9092,10.58.72.132:9092,10.58.72.133:9092,10.58.72.134:9092',
|
||
'client.id': 'python-server'
|
||
}
|
||
|
||
producer = Producer(conf)
|
||
def process_data(data, topic):
|
||
producer.produce(topic, data.encode('utf-8'), callback=delivery_report)
|
||
producer.poll(0)
|
||
def start_server(host, port, topic):
|
||
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
server_socket.bind((host, port))
|
||
server_socket.listen(1)
|
||
print(f"Listening on {host}:{port}")
|
||
|
||
while True:
|
||
conn, addr = server_socket.accept()
|
||
print(f"Connected by {addr}")
|
||
|
||
try:
|
||
while True:
|
||
data = conn.recv(4096)
|
||
if not data:
|
||
break
|
||
print(f"Received data: {data}")
|
||
formatted_json = format_data_to_json(data.decode())
|
||
print(f"Formatted JSON: {formatted_json}")
|
||
import json
|
||
json_data = json.dumps(formatted_json)
|
||
|
||
process_data(json_data, topic)
|
||
finally:
|
||
conn.close()
|
||
|
||
if __name__ == "__main__":
|
||
HOST = '127.0.0.1'
|
||
PORT = 65432
|
||
TOPIC = 'topic-alert'
|
||
start_server(HOST, PORT, TOPIC)
|