This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
PushM 0b6174da96 1、fakecc删去发来掩码
2、c3pzff的url和sso的url动态配置
3、修改application-test从环境变量获取配置
2024-04-29 22:53:35 +08:00

107 lines
3.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
from confluent_kafka import Producer
import socket
import struct
def convert_ipv4_address(ip_int):
return socket.inet_ntoa(struct.pack('!I', ip_int))
def parse_and_convert_ip(data_part):
parts = data_part.split('-')
src_ip = convert_ipv4_address(int(parts[0]))
dst_ip = convert_ipv4_address(int(parts[1]))
src_port = int(parts[2])
dst_port = int(parts[3])
protocol = int(parts[4])
return src_ip, dst_ip, src_port, dst_port, protocol
def send_test_data(host, port, data):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
s.sendall(data.encode())
def format_data_to_json(test_data):
data_parts = test_data.split(",")
task_id = data_parts[29]
rule_id_value = data_parts[30]
protect_object_is_src_dst = data_parts[31]
src_ip_v4, dst_ip_v4, src_port_v4, dst_port_v4, protocol_v4 = parse_and_convert_ip(data_parts[1])
print("task_id:", task_id, "rule_id:", rule_id_value, "protect_object_is_src_dst:", protect_object_is_src_dst, "src_ip_v4:", src_ip_v4, "dst_ip_v4:", dst_ip_v4, "src_port_v4:", src_port_v4, "dst_port_v4:", dst_port_v4, "protocol_v4:", protocol_v4)
formatted_data = {
"task_id": task_id,
"rule_id": rule_id_value,
"protect_object_is_src_dst": protect_object_is_src_dst,
"five_tuple_with_mask": {
"src_ip": src_ip_v4,
"dst_ip": dst_ip_v4,
"src_port": src_port_v4,
"dst_port": dst_port_v4,
"protocol": protocol_v4,
"event_id": data_parts[24],
# test_data_2.split(",")[-7],
# "mask_src_ip": data_parts[5],
# "mask_dst_ip": data_parts[6],
# "mask_src_port": int(data_parts[7]),
# "mask_dst_ip": int(data_parts[8]),
# "mask_protocol": int(data_parts[9])
},
"content": test_data
}
return formatted_data
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
conf = {
'bootstrap.servers': "192.168.107.49:9092",
# bootstrap.servers: '10.58.72.125:9092,10.58.72.126:9092,10.58.72.127:9092,10.58.72.128:9092,10.58.72.129:9092,\
# 10.58.72.130:9092,10.58.72.131:9092,10.58.72.132:9092,10.58.72.133:9092,10.58.72.134:9092',
'client.id': 'python-server'
}
producer = Producer(conf)
def process_data(data, topic):
producer.produce(topic, data.encode('utf-8'), callback=delivery_report)
producer.poll(0)
def start_server(host, port, topic):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen(1)
print(f"Listening on {host}:{port}")
while True:
conn, addr = server_socket.accept()
print(f"Connected by {addr}")
try:
while True:
data = conn.recv(4096)
if not data:
break
print(f"Received data: {data}")
formatted_json = format_data_to_json(data.decode())
print(f"Formatted JSON: {formatted_json}")
import json
json_data = json.dumps(formatted_json)
process_data(json_data, topic)
finally:
conn.close()
if __name__ == "__main__":
HOST = '127.0.0.1'
PORT = 65432
TOPIC = 'topic-alert'
start_server(HOST, PORT, TOPIC)