This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
stellar-stellar/deps/utable/ipfix_exporter_example.cpp
2024-11-25 19:22:41 +08:00

197 lines
6.1 KiB
C++

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "utable.h"
#include "cjson/cJSON.h"
#define THREAD_MAX 8
#define TEMPLATE_MAX 13
struct ipfix_template_id_list
{
const char *template_name;
int template_id;
};
struct ipfix_template_id_list template_id_list[TEMPLATE_MAX] = {
{"BASE", 0},
{"SSL", 0},
{"HTTP", 0},
{"MAIL", 0},
{"DNS", 0},
{"DTLS", 0},
{"QUIC", 0},
{"FTP", 0},
{"SIP", 0},
{"RTP", 0},
{"SSH", 0},
{"RDP", 0},
{"Stratum", 0}};
int g_udp_sock_fd = 0;
const char *ipfix_schema_json_path = NULL;
struct ipfix_exporter_schema *g_ipfix_schema = NULL;
static int ipfix_exporter_get_socket_fd(char *collector_ip, uint16_t collector_port)
{
int sock_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (sock_fd <= 0)
{
return -1;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(collector_port);
addr.sin_addr.s_addr = inet_addr(collector_ip);
if (connect(sock_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1)
{
printf("connect error, illegal collector ip or port\n");
printf("expample: ./ipfix_exporter_example 127.0.0.1 4397");
close(sock_fd);
return -1;
}
return sock_fd;
}
void *ipfix_template_send_thread_loop(void *arg)
{
int interval_s = (*(int *)arg);
while (1)
{
size_t blob_len = 0;
char *blob = NULL;
for (int i = 0; i < THREAD_MAX; i++)
{
blob = (char *)utable_ipfix_template_flow_get0(g_ipfix_schema, i, &blob_len);
send(g_udp_sock_fd, blob, blob_len, 0);
blob = NULL;
}
sleep(interval_s);
}
return NULL;
}
extern "C" int load_file_to_memory(const char *file_name, unsigned char **pp_out, size_t *out_sz);
void ipfix_exporter_test_utable_init(struct utable *table, int index, const char *file_name)
{
size_t json_size = 0;
unsigned char *json_str = NULL;
load_file_to_memory(file_name, &json_str, &json_size);
if (json_str == NULL || json_size == 0)
{
return;
}
cJSON *root = NULL;
root = cJSON_Parse((const char *)json_str);
cJSON *template_item = cJSON_GetArrayItem(cJSON_GetObjectItem(root, "templates"), index);
cJSON *template_key_array = cJSON_GetObjectItem(template_item, "elements");
for (int i = 0; i < cJSON_GetArraySize(template_key_array); i++)
{
char *template_key = cJSON_GetArrayItem(template_key_array, i)->valuestring;
cJSON *elements_array = cJSON_GetObjectItem(root, template_key);
for (int j = 0; j < cJSON_GetArraySize(elements_array); j++)
{
cJSON *element = cJSON_GetArrayItem(elements_array, j);
char *element_key = cJSON_GetObjectItem(element, "element_name")->valuestring;
if (strcmp(cJSON_GetObjectItem(element, "element_type")->valuestring, "string") == 0)
{
char temp[128] = {0};
snprintf(temp, 128, "%s_%s_%d", element_key, "string", cJSON_GetObjectItem(element, "element_id")->valueint);
utable_add_cstring(table, element_key, temp, strlen(temp));
}
else if (strcmp(cJSON_GetObjectItem(element, "element_type")->valuestring, "unsigned64") == 0 ||
strcmp(cJSON_GetObjectItem(element, "element_type")->valuestring, "unsigned32") == 0 ||
strcmp(cJSON_GetObjectItem(element, "element_type")->valuestring, "unsigned16") == 0 ||
strcmp(cJSON_GetObjectItem(element, "element_type")->valuestring, "unsigned8") == 0)
{
utable_add_integer(table, element_key, cJSON_GetObjectItem(element, "element_id")->valueint);
}
}
}
free(json_str);
cJSON_Delete(root);
}
void *ipfix_worker_thread_data_flow_send(void *arg)
{
uint16_t worker_id = (*(uint16_t *)arg);
while (1)
{
for (int i = 0; i < TEMPLATE_MAX; i++)
{
struct utable *table = utable_new();
ipfix_exporter_test_utable_init(table, i, ipfix_schema_json_path);
utable_delete(table, "decoded_as", strlen("decoded_as"));
utable_add_cstring(table, "decoded_as", template_id_list[i].template_name, strlen(template_id_list[i].template_name));
size_t blob_len = 0;
char *blob = NULL;
utable_ipfix_data_flow_exporter(table, g_ipfix_schema, template_id_list[i].template_id, worker_id, &blob, &blob_len);
send(g_udp_sock_fd, blob, blob_len, 0);
free(blob);
blob = NULL;
utable_free(table);
}
sleep(5);
}
return NULL;
}
// ./ipfix_exporter_example ipfix_schema.json 127.0.0.1 4397
extern "C" int main(int argc, char *argv[])
{
if (argc != 4)
{
printf("expample: ./ipfix_exporter_example ipfix_schema.json 127.0.0.1 4397\n");
return -1;
}
ipfix_schema_json_path = argv[1];
g_ipfix_schema = utable_ipfix_exporter_schema_new(ipfix_schema_json_path, 1, THREAD_MAX);
if (g_ipfix_schema == NULL)
{
printf("ipfix_exporter_schema_init error, illegal ipfix_schema_json_path: %s\n", ipfix_schema_json_path);
return -1;
}
for (int i = 0; i < TEMPLATE_MAX; i++)
{
template_id_list[i].template_id = utable_ipfix_template_get(g_ipfix_schema, template_id_list[i].template_name);
}
g_udp_sock_fd = ipfix_exporter_get_socket_fd(argv[2], atoi(argv[3]));
int interval_s = 100;
pthread_t template_thread_id;
pthread_create(&template_thread_id, NULL, ipfix_template_send_thread_loop, (void *)&interval_s);
uint16_t worker_id[THREAD_MAX];
pthread_t pid[THREAD_MAX];
for (int i = 0; i < THREAD_MAX; i++)
{
worker_id[i] = i;
pthread_create(&pid[i], NULL, ipfix_worker_thread_data_flow_send, (void *)&worker_id[i]);
}
sleep(1000);
utable_ipfix_exporter_schema_free(g_ipfix_schema);
pthread_join(template_thread_id, NULL);
for (int i = 0; i < THREAD_MAX; i++)
{
pthread_join(pid[i], NULL);
}
close(g_udp_sock_fd);
return 0;
}