TFE Packet IO支持带封装报文格式

This commit is contained in:
wangmenglan
2023-05-26 18:50:18 +08:00
parent f00ef65963
commit 4d26281338
14 changed files with 1577 additions and 156 deletions

View File

@@ -29,6 +29,8 @@ enum {
MPACK_ARRAY_ACK_SIDS,
MPACK_ARRAY_SEQ_ROUTE_CTX,
MPACK_ARRAY_ACK_ROUTE_CTX,
MPACK_ARRAY_SEQ_PKG_HEADER,
MPACK_ARRAY_ACK_PKG_HEADER,
};
struct mpack_mmap_id2type
@@ -74,9 +76,14 @@ struct mpack_mmap_id2type
{.id = 32, .type = TFE_CMSG_TLV_NR_MAX, .str_name = "TFE_SEQ_SIDS", .size = 2, .array_index = MPACK_ARRAY_SEQ_SIDS},
{.id = 33, .type = TFE_CMSG_TLV_NR_MAX, .str_name = "TFE_ACK_SIDS", .size = 2, .array_index = MPACK_ARRAY_ACK_SIDS},
{.id = 34, .type = TFE_CMSG_TLV_NR_MAX, .str_name = "TFE_SEQ_ROUTE_CTX", .size = 1, .array_index = MPACK_ARRAY_SEQ_ROUTE_CTX},
{.id = 35, .type = TFE_CMSG_TLV_NR_MAX, .str_name = "TFE_ACK_ROUTE_CTX", .size = 1, .array_index = MPACK_ARRAY_ACK_ROUTE_CTX}
{.id = 35, .type = TFE_CMSG_TLV_NR_MAX, .str_name = "TFE_ACK_ROUTE_CTX", .size = 1, .array_index = MPACK_ARRAY_ACK_ROUTE_CTX},
{.id = 36, .type = TFE_CMSG_TLV_NR_MAX, .str_name = "TFE_SEQ_PKG_HEADER", .size = 1, .array_index = MPACK_ARRAY_SEQ_PKG_HEADER},
{.id = 37, .type = TFE_CMSG_TLV_NR_MAX, .str_name = "TFE_ACK_PKG_HEADER", .size = 1, .array_index = MPACK_ARRAY_ACK_PKG_HEADER},
{.id = 38, .type = TFE_CMSG_TLV_NR_MAX, .str_name = "TFE_FLAG", .size = 1, .array_index = MPACK_ARRAY_INIT}
};
extern void * g_packet_io_logger;
static int fqdn_id_set_cmsg(struct ctrl_pkt_parser *handler, mpack_node_t node, int table_index)
{
char empty_str[4] = {0};
@@ -114,15 +121,33 @@ static int sids_array_parse_mpack(struct ctrl_pkt_parser *handler, mpack_node_t
static int route_ctx_parse_mpack(struct ctrl_pkt_parser *handler, mpack_node_t node, int table_index, int is_seq)
{
struct route_ctx *ctx = is_seq ? &handler->seq_route_ctx : &handler->ack_route_ctx;
if (mpack_node_array_length(node) > 64) {
size_t len = mpack_node_bin_size(node);
if (len < 0 || len > 64) {
TFE_LOG_ERROR(g_packet_io_logger, "%s: session %lu unexpected control packet: (%s route len[%ld] is invalid)", LOG_TAG_CTRLPKT, handler->session_id, is_seq ? "seq" : "ack", len);
return -1;
}
ctx->len = mpack_node_array_length(node);
for (int i = 0; i < ctx->len; i++)
{
ctx->data[i] = mpack_node_u8(mpack_node_array_at(node, i));
ctx->len = len;
memcpy(ctx->data, mpack_node_bin_data(node), len);
return 0;
}
static int pkt_header_parse_mpack(struct ctrl_pkt_parser *handler, mpack_node_t node, int table_index, int is_seq)
{
char **header = is_seq ? &handler->seq_header : &handler->ack_header;
int *header_len = is_seq ? &handler->seq_len : &handler->ack_len;
size_t len = mpack_node_bin_size(node);
if (len < 0) {
TFE_LOG_ERROR(g_packet_io_logger, "%s: session %lu unexpected control packet: (%s package header len[%ld] is invalid)", LOG_TAG_CTRLPKT, handler->session_id, is_seq ? "seq" : "ack", len);
return -1;
}
if (len == 0)
return 0;
*header = (char *)calloc(len, sizeof(char));
memcpy(*header, mpack_node_bin_data(node), len);
*header_len = len;
return 0;
}
@@ -155,8 +180,13 @@ static int proxy_parse_messagepack(mpack_node_t node, void *ctx, void *logger)
switch (mpack_node_type(ptr)) {
case mpack_type_uint:
value = mpack_node_u64(ptr);
tfe_cmsg_set(handler->cmsg, mpack_table[i].type, (const unsigned char *)&value, mpack_table[i].size);
if (i == 38) {
handler->intercpet_data = mpack_node_u8(ptr);
}
else {
value = mpack_node_u64(ptr);
tfe_cmsg_set(handler->cmsg, mpack_table[i].type, (const unsigned char *)&value, mpack_table[i].size);
}
break;
case mpack_type_str:
mpack_node_copy_cstr(ptr, cmsg_str, sizeof(cmsg_str));
@@ -165,6 +195,33 @@ static int proxy_parse_messagepack(mpack_node_t node, void *ctx, void *logger)
case mpack_type_nil:
tfe_cmsg_set(handler->cmsg, mpack_table[i].type, (const unsigned char *)empty_str, 0);
break;
case mpack_type_bin:
switch(mpack_table[i].array_index)
{
case MPACK_ARRAY_SEQ_ROUTE_CTX:
ret = route_ctx_parse_mpack(handler, ptr, i, 1);
if (ret != 0)
return -1;
break;
case MPACK_ARRAY_ACK_ROUTE_CTX:
ret = route_ctx_parse_mpack(handler, ptr, i, 0);
if (ret != 0)
return -1;
break;
case MPACK_ARRAY_SEQ_PKG_HEADER:
ret = pkt_header_parse_mpack(handler, ptr, i, 1);
if (ret != 0)
return -1;
break;
case MPACK_ARRAY_ACK_PKG_HEADER:
ret = pkt_header_parse_mpack(handler, ptr, i, 0);
if (ret != 0)
return -1;
break;
default:
break;
}
break;
case mpack_type_array:
switch(mpack_table[i].array_index)
{
@@ -181,16 +238,6 @@ static int proxy_parse_messagepack(mpack_node_t node, void *ctx, void *logger)
if (ret != 0)
return -1;
break;
case MPACK_ARRAY_SEQ_ROUTE_CTX:
ret = route_ctx_parse_mpack(handler, ptr, i, 1);
if (ret != 0)
return -1;
break;
case MPACK_ARRAY_ACK_ROUTE_CTX:
ret = route_ctx_parse_mpack(handler, ptr, i, 0);
if (ret != 0)
return -1;
break;
default:
break;
}
@@ -306,7 +353,6 @@ int ctrl_packet_parser_parse(void *ctx, const char* data, size_t length, void *l
}
handler->cmsg = tfe_cmsg_init();
tfe_cmsg_dup(handler->cmsg);
proxy_map = mpack_node_map_cstr(params, "proxy");
ret = proxy_parse_messagepack(proxy_map, handler, logger);
if (ret != 0)
@@ -318,7 +364,6 @@ succ:
error:
mpack_tree_destroy(&tree);
tfe_cmsg_destroy(handler->cmsg);
tfe_cmsg_destroy(handler->cmsg);
return -1;
}
@@ -348,7 +393,16 @@ void ctrl_packet_cmsg_destroy(struct ctrl_pkt_parser *handler)
{
if (handler) {
tfe_cmsg_destroy(handler->cmsg);
tfe_cmsg_destroy(handler->cmsg);
if (handler->seq_header) {
free(handler->seq_header);
handler->seq_header = NULL;
}
if (handler->ack_header) {
free(handler->ack_header);
handler->ack_header = NULL;
}
}
}