TSG-14654: 控制报文格式调整, 增加将cmsg字段发送给TFE, 控制报文采用mpack封装格式

This commit is contained in:
刘学利
2023-05-06 02:23:12 +00:00
parent 5bc9831e03
commit 224f503289
28 changed files with 17186 additions and 396 deletions

View File

@@ -2,53 +2,50 @@
#include <stdlib.h>
#include <string.h>
#include <MESA/cJSON.h>
#include <MESA/MESA_handle_logger.h>
#include "tsg_variable.h"
#include "tsg_sync_state.h"
#include "tsg_send_log.h"
#include "mpack.h"
// i don't need this
int set_exec_profile_ids(const struct streaminfo *a_stream, struct parse_handle *p);
const char *policy_key[ POLICY_UPDATE_MAX] =
const char *policy_key[POLICY_UPDATE_MAX] = {"sce", "shaper", "proxy"};
char *mpack_data = NULL;
size_t mpack_size = 0;
static int tsg_mpack_init_map(const struct streaminfo *a_stream, mpack_writer_t *writer, const char *state)
{
"service_chaining",
"shaping",
};
mpack_writer_init_growable(writer, &mpack_data, &mpack_size);
mpack_build_map(writer);
static int tsg_send_ctrl_pkt(const struct streaminfo *a_stream, cJSON *object)
// tsync : 2.0
mpack_write_cstr(writer, "tsync");
mpack_write_cstr(writer, "2.0");
// session_id
mpack_write_cstr(writer, "session_id");
mpack_write_u64(writer, tsg_get_stream_trace_id((struct streaminfo *)a_stream));
// state
mpack_write_cstr(writer, "state");
mpack_write_cstr(writer, state);
return 0;
}
static int tsg_mpack_send_pkt(const struct streaminfo *a_stream, mpack_writer_t *writer)
{
if (object == NULL)
mpack_complete_map(writer); // tsg_mpack_init_map
if (mpack_writer_destroy(writer) != mpack_ok)
{
MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "MPACK_WRITER", "An error occurred encoding the data!");
return -1;
}
char *payload = NULL;
uint64_t session_id = tsg_get_stream_trace_id((struct streaminfo *)a_stream);
// tsg_get_stream_trace_id maybe return -1
if (session_id && session_id != (uint64_t)-1)
{
char trace_id[128]={0};
snprintf(trace_id, sizeof(trace_id), "%lu", session_id);
cJSON_AddStringToObject(object, "session_id", trace_id);
}
cJSON_AddStringToObject(object, "tsync", "1.0");
payload = cJSON_PrintUnformatted(object);
if (payload == NULL)
{
cJSON_Delete(object);
return -1;
}
// send//
sapp_inject_ctrl_pkt((struct streaminfo *)a_stream, SIO_DEFAULT, payload, strlen(payload)+1, a_stream->routedir);
cJSON_free(payload);
cJSON_Delete(object);
MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_DEBUG, "MSGPACK_PROXY_BUFF", "send buff_len = %lu", mpack_size);
sapp_inject_ctrl_pkt((struct streaminfo *)a_stream, SIO_DEFAULT, mpack_data, mpack_size, a_stream->routedir);
free(mpack_data);
mpack_data = NULL;
mpack_size = 0;
return 0;
}
@@ -59,23 +56,21 @@ int tsg_send_session_state(const struct streaminfo *a_stream, unsigned char stat
return -1;
}
cJSON *object = cJSON_CreateObject();
if (state== OP_STATE_PENDING)
mpack_writer_t writer;
if (state == OP_STATE_PENDING)
{
cJSON_AddStringToObject(object, "state", "opening");
tsg_mpack_init_map(a_stream, &writer, "opening");
}
else if (state == OP_STATE_CLOSE)
else if (state == OP_STATE_CLOSE)
{
cJSON_AddStringToObject(object, "state", "closing");
tsg_mpack_init_map(a_stream, &writer, "closing");
}
else
{
cJSON_Delete(object);
return -1;
}
return tsg_send_ctrl_pkt(a_stream, object);
return tsg_mpack_send_pkt(a_stream, &writer);
}
int tsg_sync_resetall_state(const struct streaminfo *a_stream)
@@ -85,93 +80,211 @@ int tsg_sync_resetall_state(const struct streaminfo *a_stream)
return -1;
}
cJSON *object = cJSON_CreateObject();
cJSON_AddStringToObject(object, "state", "resetall");
mpack_writer_t writer;
tsg_mpack_init_map(a_stream, &writer, "resetall");
return tsg_send_ctrl_pkt(a_stream, object);
return tsg_mpack_send_pkt(a_stream, &writer);
}
int tsg_sync_policy_update(const struct streaminfo *a_stream, struct update_policy *policy_array, int policy_array_num)
static void tsg_mpack_append_str(mpack_writer_t *writer, char *str)
{
if (a_stream == NULL || policy_array == NULL || policy_array_num > (int) POLICY_UPDATE_MAX || policy_array_num <= 0)
if (str)
{
return -1;
mpack_write_cstr(writer, str);
}
else
{
mpack_write_nil(writer);
}
cJSON *params_object = NULL;
cJSON *policy_arr = NULL;
cJSON *object = cJSON_CreateObject();
cJSON_AddStringToObject(object, "state", "active");
cJSON_AddStringToObject(object, "method", "policy_update");
params_object = cJSON_AddObjectToObject(object, "params");
for (int i = 0; i < policy_array_num; i++)
{
int tmp_ids[8]={0};
int n_tmp_ids=MIN(policy_array[i].n_ids, 8);
for(int j=0; j<n_tmp_ids; j++)
{
tmp_ids[j]=(int)(policy_array[i].ids[j]);
}
policy_arr = cJSON_CreateIntArray(tmp_ids, n_tmp_ids);
if (policy_arr == NULL || policy_array[i].type >= POLICY_UPDATE_MAX)
{
cJSON_Delete(object);
return -1;
}
cJSON_AddItemToObject(params_object, policy_key[policy_array[i].type], policy_arr);
policy_arr = NULL;
}
return tsg_send_ctrl_pkt(a_stream, object);
return;
}
int tsg_recv_control_pkt(const struct streaminfo *a_stream, const void *payload, int payload_len)
static void tsg_mpack_append_array_u32(mpack_writer_t *writer, struct cmsg_int32_array *array)
{
if (a_stream == NULL || payload == NULL || payload_len == 0)
if (array->num > 0)
{
mpack_build_array(writer);
for (size_t i = 0; i < array->num; i++)
{
mpack_write_u32(writer, array->value[i]);
}
mpack_complete_array(writer);
}
else
{
mpack_write_nil(writer);
}
return;
}
static void tsg_mpack_append_array_u16(mpack_writer_t *writer, struct cmsg_int16_array *array)
{
if (array->num > 0)
{
mpack_build_array(writer);
for (size_t i = 0; i < array->num; i++)
{
mpack_write_u16(writer, array->value[i]);
}
mpack_complete_array(writer);
}
else
{
mpack_write_nil(writer);
}
return;
}
static void tsg_mpack_append_array_u8(mpack_writer_t *writer, struct cmsg_int8_array *array)
{
if (array->num > 0)
{
mpack_build_array(writer);
for (size_t i = 0; i < array->num; i++)
{
mpack_write_u8(writer, array->value[i]);
}
mpack_complete_array(writer);
}
else
{
mpack_write_nil(writer);
}
return;
}
static void tsg_mpack_append_cmsg_value(mpack_writer_t *writer, struct proxy_cmsg *cmsg)
{
if (cmsg == NULL)
{
mpack_write_nil(writer);
MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_DEBUG, "MSGPACK_PROXY", "No cmsg!");
}
else
{
mpack_build_array(writer); // array
mpack_write_u32(writer, cmsg->tcp_seq);
mpack_write_u32(writer, cmsg->tcp_ack);
mpack_write_u16(writer, cmsg->tcp_mss_client);
mpack_write_u16(writer, cmsg->tcp_mss_server);
if (cmsg->tcp_wsacle_exist == 1)
{
mpack_write_u8(writer, cmsg->tcp_wsacle_client);
mpack_write_u8(writer, cmsg->tcp_wsacle_server);
}
else
{
mpack_write_nil(writer);
mpack_write_nil(writer);
}
mpack_write_u8(writer, cmsg->tcp_sack_client);
mpack_write_u8(writer, cmsg->tcp_sack_server);
mpack_write_u8(writer, cmsg->tcp_ts_client);
mpack_write_u8(writer, cmsg->tcp_ts_server);
mpack_write_u8(writer, cmsg->tcp_protocol);
mpack_write_u16(writer, cmsg->tcp_window_client);
mpack_write_u16(writer, cmsg->tcp_window_server);
mpack_write_u32(writer, cmsg->tcp_ts_client_val);
mpack_write_u32(writer, cmsg->tcp_ts_server_val);
mpack_write_u8(writer, cmsg->tcp_info_packet_cur_dir);
tsg_mpack_append_str(writer, cmsg->src_sub_id);
tsg_mpack_append_str(writer, cmsg->dst_sub_id);
tsg_mpack_append_str(writer, cmsg->src_asn);
tsg_mpack_append_str(writer, cmsg->dst_asn);
tsg_mpack_append_str(writer, cmsg->src_organization);
tsg_mpack_append_str(writer, cmsg->dst_organization);
tsg_mpack_append_str(writer, cmsg->src_ip_location_country);
tsg_mpack_append_str(writer, cmsg->dst_ip_location_country);
tsg_mpack_append_str(writer, cmsg->src_ip_location_provine);
tsg_mpack_append_str(writer, cmsg->dst_ip_location_provine);
tsg_mpack_append_str(writer, cmsg->src_ip_location_city);
tsg_mpack_append_str(writer, cmsg->dst_ip_location_city);
tsg_mpack_append_str(writer, cmsg->src_ip_location_subdivision);
tsg_mpack_append_str(writer, cmsg->dst_ip_location_subdivision);
tsg_mpack_append_str(writer, cmsg->ssl_client_ja3_fingerprint);
// fqdn_cat_id_val
tsg_mpack_append_array_u32(writer, &cmsg->fqdn_cat_id_val);
// tcp_seq_sids
tsg_mpack_append_array_u16(writer, &cmsg->tcp_seq_sids);
// tcp_ack_sids
tsg_mpack_append_array_u16(writer, &cmsg->tcp_ack_sids);
// tcp_seq_route_ctx
tsg_mpack_append_array_u8(writer, &cmsg->tcp_seq_route_ctx);
// tcp_ack_route_ctx
tsg_mpack_append_array_u8(writer, &cmsg->tcp_ack_route_ctx);
mpack_complete_array(writer); // array
}
return;
}
static void tsg_mpack_append_update_policy(mpack_writer_t *writer, struct update_policy *policy_update, enum policy_type type)
{
mpack_write_cstr(writer, policy_key[type]);
mpack_build_map(writer); // update_policy_type
mpack_write_cstr(writer, "rule_ids");
if (policy_update->n_ids > 0)
{
mpack_build_array(writer); // rule_ids
for (int i = 0; i < policy_update->n_ids; i++)
{
mpack_write_i64(writer, policy_update->ids[i]);
}
mpack_complete_array(writer); // rule_ids
}
else
{
mpack_write_nil(writer);
}
if (type == POLICY_UPDATE_INTERCEPT)
{
mpack_write_cstr(writer, "tcp_handshake");
tsg_mpack_append_cmsg_value(writer, &policy_update->cmsg);
}
mpack_complete_map(writer); // update_policy_type
return;
}
int tsg_sync_policy_update(const struct streaminfo *a_stream, struct update_policy *policy_update, size_t n_policy_update)
{
if (a_stream == NULL || policy_update == NULL || policy_update->type >= POLICY_UPDATE_MAX || n_policy_update == 0)
{
return -1;
}
char *state = NULL;
char *method = NULL;
char *tsync = NULL;
cJSON *params_object = NULL;
cJSON *sf_ids_array = NULL;
struct parse_handle result = {0};
mpack_writer_t writer;
cJSON *object = cJSON_Parse((char *)payload);
if (object == NULL)
tsg_mpack_init_map((struct streaminfo *)a_stream, &writer, "active");
// method: policy_update
mpack_write_cstr(&writer, "method");
mpack_write_cstr(&writer, "policy_update");
// params
mpack_write_cstr(&writer, "params");
mpack_build_map(&writer);
for (int i = 0; i < (int)n_policy_update; i++)
{
return -1;
}
tsync = cJSON_GetObjectItem(object, "tsync")->valuestring;
memcpy(result.tsync, tsync, strlen(tsync));
//result.session_id = (uint64_t)atoll(cJSON_GetObjectItem(object, "session_id")->string);
state = cJSON_GetObjectItem(object, "state")->valuestring;
memcpy(result.state, state, strlen(state));
method = cJSON_GetObjectItem(object, "method")->valuestring;
memcpy(result.method, method, strlen(method));
params_object = cJSON_GetObjectItem(object, "params");
sf_ids_array = cJSON_GetObjectItem(params_object, "sf_profile_ids");
result.sf_ids.n_ids = cJSON_GetArraySize(sf_ids_array);
for (int i = 0; i < result.sf_ids.n_ids; i ++)
{
result.sf_ids.ids[i] = cJSON_GetArrayItem(sf_ids_array, i)->valueint;
tsg_mpack_append_update_policy(&writer, &policy_update[i], policy_update[i].type);
}
mpack_complete_map(&writer); // params
//set_exec_profile_ids(a_stream, &result);
cJSON_Delete(object);
return 0;
return tsg_mpack_send_pkt(a_stream, &writer);
}
int tsg_sync_closing_state(const struct streaminfo *a_stream, unsigned char state)
@@ -185,4 +298,3 @@ int tsg_sync_opening_state(const struct streaminfo *a_stream, unsigned char stat
return 0;
}