diff --git a/include/stellar/log.h b/include/stellar/log.h index a075267..52cc174 100644 --- a/include/stellar/log.h +++ b/include/stellar/log.h @@ -16,37 +16,61 @@ enum log_level }; #define STELLAR_LOG_TRACE(logger, module, format, ...) \ - if (log_check_level((logger), LOG_TRACE)) \ + if ((logger) == NULL) \ + { \ + printf("TRACE: (%s): " format "\n", module, ##__VA_ARGS__); \ + } \ + else if (log_check_level((logger), LOG_TRACE)) \ { \ log_print((logger), LOG_TRACE, (module), (format), ##__VA_ARGS__); \ } #define STELLAR_LOG_DEBUG(logger, module, format, ...) \ - if (log_check_level((logger), LOG_DEBUG)) \ + if ((logger) == NULL) \ + { \ + printf("DEBUG: (%s): " format "\n", module, ##__VA_ARGS__); \ + } \ + else if (log_check_level((logger), LOG_DEBUG)) \ { \ log_print((logger), LOG_DEBUG, (module), (format), ##__VA_ARGS__); \ } #define STELLAR_LOG_INFO(logger, module, format, ...) \ - if (log_check_level((logger), LOG_INFO)) \ + if ((logger) == NULL) \ + { \ + printf("INFO: (%s): " format "\n", module, ##__VA_ARGS__); \ + } \ + else if (log_check_level((logger), LOG_INFO)) \ { \ log_print((logger), LOG_INFO, (module), (format), ##__VA_ARGS__); \ } #define STELLAR_LOG_WARN(logger, module, format, ...) \ - if (log_check_level((logger), LOG_WARN)) \ + if ((logger) == NULL) \ + { \ + printf("WARN: (%s): " format "\n", module, ##__VA_ARGS__); \ + } \ + else if (log_check_level((logger), LOG_WARN)) \ { \ log_print((logger), LOG_WARN, (module), (format), ##__VA_ARGS__); \ } #define STELLAR_LOG_ERROR(logger, module, format, ...) \ - if (log_check_level((logger), LOG_ERROR)) \ + if ((logger) == NULL) \ + { \ + printf("ERROR: (%s): " format "\n", module, ##__VA_ARGS__); \ + } \ + else if (log_check_level((logger), LOG_ERROR)) \ { \ log_print((logger), LOG_ERROR, (module), (format), ##__VA_ARGS__); \ } #define STELLAR_LOG_FATAL(logger, module, format, ...) \ - if (log_check_level((logger), LOG_FATAL)) \ + if ((logger) == NULL) \ + { \ + printf("FATAL: (%s): " format "\n", module, ##__VA_ARGS__); \ + } \ + else if (log_check_level((logger), LOG_FATAL)) \ { \ log_print((logger), LOG_FATAL, (module), (format), ##__VA_ARGS__); \ } diff --git a/include/stellar/packet.h b/include/stellar/packet.h index d1431a0..5ea2e75 100644 --- a/include/stellar/packet.h +++ b/include/stellar/packet.h @@ -185,16 +185,6 @@ uint16_t packet_get_raw_len(const struct packet *pkt); const char *packet_get_payload(const struct packet *pkt); uint16_t packet_get_payload_len(const struct packet *pkt); -enum packet_stage -{ - PACKET_STAGE_PREROUTING, - PACKET_STAGE_INPUT, - PACKET_STAGE_FORWARD, - PACKET_STAGE_OUTPUT, - PACKET_STAGE_POSTROUTING, - PACKET_STAGE_MAX, -}; - #ifdef __cplusplus } #endif diff --git a/include/stellar/packet_manager.h b/include/stellar/packet_manager.h index 2a31974..87e6aed 100644 --- a/include/stellar/packet_manager.h +++ b/include/stellar/packet_manager.h @@ -7,6 +7,16 @@ extern "C" #include "packet.h" +enum packet_stage +{ + PACKET_STAGE_PREROUTING, + PACKET_STAGE_INPUT, + PACKET_STAGE_FORWARD, + PACKET_STAGE_OUTPUT, + PACKET_STAGE_POSTROUTING, + PACKET_STAGE_MAX, +}; + struct packet_manager; struct packet_manager_schema; struct packet_manager_runtime; @@ -15,8 +25,10 @@ struct packet_manager_schema *packet_manager_get_schema(struct packet_manager *p struct packet_manager_runtime *packet_manager_get_runtime(struct packet_manager *pkt_mgr, uint16_t thr_idx); typedef void on_packet_stage_callback(enum packet_stage stage, struct packet *pkt, void *args); -int packet_manager_schema_add_subscriber(struct packet_manager_schema *schema, enum packet_stage stage, on_packet_stage_callback cb, void *args); +int packet_manager_schema_add_subscriber(struct packet_manager_schema *pkt_mgr_schema, enum packet_stage stage, on_packet_stage_callback cb, void *args); +// take 只执行一次 ??? +// 同一 stage 后面的 msg 是否中断???不中断 void packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt); void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage); diff --git a/infra/packet_manager/CMakeLists.txt b/infra/packet_manager/CMakeLists.txt index f764a51..ada4303 100644 --- a/infra/packet_manager/CMakeLists.txt +++ b/infra/packet_manager/CMakeLists.txt @@ -11,6 +11,6 @@ target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra) -target_link_libraries(packet_manager tuple logger dablooms) +target_link_libraries(packet_manager tuple logger dablooms mq) add_subdirectory(test) \ No newline at end of file diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c index efc14a7..b13802f 100644 --- a/infra/packet_manager/packet_manager.c +++ b/infra/packet_manager/packet_manager.c @@ -1,9 +1,12 @@ +#include + #include "utils.h" #include "stellar/mq.h" #include "packet_private.h" #include "packet_manager_private.h" #define PACKET_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet manager", format, ##__VA_ARGS__) +#define PACKET_MANAGER_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "packet manager", format, ##__VA_ARGS__) #define PACKET_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_WARN(__thread_local_logger, "packet manager", format, ##__VA_ARGS__) TAILQ_HEAD(packet_queue, packet); @@ -32,6 +35,7 @@ struct packet_manager_schema struct packet_manager_runtime { + uint16_t idx; struct mq_runtime *mq; struct packet_queue queue[PACKET_STAGE_MAX]; struct packet_manager_stat stat; @@ -44,6 +48,30 @@ struct packet_manager struct packet_manager_runtime *runtime[MAX_THREAD_NUM]; }; +/****************************************************************************** + * packet stage + ******************************************************************************/ + +const char *packet_stage_to_str(enum packet_stage stage) +{ + switch (stage) + { + case PACKET_STAGE_PREROUTING: + return "prerouting"; + case PACKET_STAGE_INPUT: + return "input"; + case PACKET_STAGE_FORWARD: + return "forward"; + case PACKET_STAGE_OUTPUT: + return "output"; + case PACKET_STAGE_POSTROUTING: + return "postrouting"; + default: + assert(0); + return "unknown"; + } +} + /****************************************************************************** * packet manager config ******************************************************************************/ @@ -66,12 +94,14 @@ static struct packet_manager_config *packet_manager_config_new(const char *toml_ return NULL; } - if (load_and_validate_toml_integer_config(toml_file, "packet_io.nr_worker_thread", (uint64_t *)&cfg->nr_worker_thread, 1, MAX_THREAD_NUM) != 0) + uint64_t val = 0; + if (load_and_validate_toml_integer_config(toml_file, "packet_io.nr_worker_thread", &val, 1, MAX_THREAD_NUM) != 0) { PACKET_MANAGER_LOG_ERROR("failed to load packet_io.nr_worker_thread from %s", toml_file); free(cfg); return NULL; } + cfg->nr_worker_thread = val; return cfg; } @@ -80,8 +110,11 @@ static struct packet_manager_config *packet_manager_config_new(const char *toml_ * packet manager schema ******************************************************************************/ -static void on_packet_stage_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_arg, void *dispatch_arg) +static void on_packet_stage_dispatch(int topic_id, const void *msg, on_msg_cb_func *cb, void *cb_arg, void *dispatch_arg) { + assert(msg); + assert(dispatch_arg); + struct packet_manager_schema *schema = (struct packet_manager_schema *)dispatch_arg; struct packet *pkt = (struct packet *)msg; @@ -98,69 +131,59 @@ static void on_packet_stage_dispatch(int topic_id, void *msg, on_msg_cb_func *cb ((on_packet_stage_callback *)cb)(stage, pkt, cb_arg); } -static void packet_manager_schema_free(struct packet_manager_schema *schema) +static void packet_manager_schema_free(struct packet_manager_schema *pkt_mgr_schema) { - if (schema) + if (pkt_mgr_schema) { - if (schema->mq) + if (pkt_mgr_schema->mq) { for (int i = 0; i < PACKET_STAGE_MAX; i++) { - if (schema->topic_id[i] != -1) + if (pkt_mgr_schema->topic_id[i] >= 0) { - mq_schema_destroy_topic(schema->mq, schema->topic_id[i]); + mq_schema_destroy_topic(pkt_mgr_schema->mq, pkt_mgr_schema->topic_id[i]); } } - - mq_schema_free(schema->mq); } - free(schema); - schema = NULL; + free(pkt_mgr_schema); + pkt_mgr_schema = NULL; } } -static struct packet_manager_schema *packet_manager_schema_new() +static struct packet_manager_schema *packet_manager_schema_new(struct mq_schema *mq) { - struct packet_manager_schema *schema = calloc(1, sizeof(struct packet_manager_schema)); - if (schema == NULL) + struct packet_manager_schema *pkt_mgr_schema = calloc(1, sizeof(struct packet_manager_schema)); + if (pkt_mgr_schema == NULL) { PACKET_MANAGER_LOG_ERROR("failed to allocate memory for packet_manager_schema"); return NULL; } - schema->mq = mq_schema_new(); - if (schema->mq == NULL) - { - PACKET_MANAGER_LOG_ERROR("failed to create mq_schema"); - goto error_out; - } + pkt_mgr_schema->mq = mq; for (int i = 0; i < PACKET_STAGE_MAX; i++) { - schema->topic_id[i] = -1; - } - schema->topic_id[PACKET_STAGE_PREROUTING] = mq_schema_create_topic(schema->mq, "packet_stage_prerouting", on_packet_stage_dispatch, schema, NULL, NULL); - schema->topic_id[PACKET_STAGE_INPUT] = mq_schema_create_topic(schema->mq, "packet_stage_input", on_packet_stage_dispatch, schema, NULL, NULL); - schema->topic_id[PACKET_STAGE_FORWARD] = mq_schema_create_topic(schema->mq, "packet_stage_forward", on_packet_stage_dispatch, schema, NULL, NULL); - schema->topic_id[PACKET_STAGE_OUTPUT] = mq_schema_create_topic(schema->mq, "packet_stage_output", on_packet_stage_dispatch, schema, NULL, NULL); - schema->topic_id[PACKET_STAGE_POSTROUTING] = mq_schema_create_topic(schema->mq, "packet_stage_postrouting", on_packet_stage_dispatch, schema, NULL, NULL); - for (int i = 0; i < PACKET_STAGE_MAX; i++) - { - if (schema->topic_id[i] < 0) + pkt_mgr_schema->topic_id[i] = mq_schema_create_topic(pkt_mgr_schema->mq, packet_stage_to_str(i), (on_msg_dispatch_cb_func *)on_packet_stage_dispatch, pkt_mgr_schema, NULL, NULL); + if (pkt_mgr_schema->topic_id[i] < 0) { - PACKET_MANAGER_LOG_ERROR("failed to create topic"); + PACKET_MANAGER_LOG_ERROR("failed to create topic %s", packet_stage_to_str(i)); goto error_out; } } - return schema; + return pkt_mgr_schema; error_out: - packet_manager_schema_free(schema); + packet_manager_schema_free(pkt_mgr_schema); return NULL; } +int packet_manager_schema_add_subscriber(struct packet_manager_schema *pkt_mgr_schema, enum packet_stage stage, on_packet_stage_callback cb, void *args) +{ + return mq_schema_subscribe(pkt_mgr_schema->mq, pkt_mgr_schema->topic_id[stage], (on_msg_cb_func *)cb, args); +} + /****************************************************************************** * packet manager runtime ******************************************************************************/ @@ -169,10 +192,6 @@ static void packet_manager_runtime_free(struct packet_manager_runtime *runtime) { if (runtime) { - if (runtime->mq) - { - mq_runtime_free(runtime->mq); - } for (int i = 0; i < PACKET_STAGE_MAX; i++) { struct packet *pkt = NULL; @@ -189,7 +208,7 @@ static void packet_manager_runtime_free(struct packet_manager_runtime *runtime) runtime = NULL; } -static struct packet_manager_runtime *packet_manager_runtime_new(struct packet_manager_schema *schema) +static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx) { struct packet_manager_runtime *runtime = calloc(1, sizeof(struct packet_manager_runtime)); if (runtime == NULL) @@ -198,13 +217,7 @@ static struct packet_manager_runtime *packet_manager_runtime_new(struct packet_m return NULL; } - runtime->mq = mq_runtime_new(schema->mq); - if (runtime->mq == NULL) - { - PACKET_MANAGER_LOG_ERROR("failed to create mq_runtime"); - free(runtime); - return NULL; - } + runtime->idx = idx; for (int i = 0; i < PACKET_STAGE_MAX; i++) { @@ -216,78 +229,20 @@ static struct packet_manager_runtime *packet_manager_runtime_new(struct packet_m static void packet_manager_runtime_stat(struct packet_manager_runtime *runtime) { - PACKET_MANAGER_LOG_INFO("input_pkts: %lu, output_pkts: %lu, take_pkts: %lu, schedule_pkts: %lu", - runtime->stat.input_pkts, runtime->stat.output_pkts, runtime->stat.take_pkts, runtime->stat.schedule_pkts); - for (int i = 0; i < PACKET_STAGE_MAX; i++) - { - PACKET_MANAGER_LOG_INFO("curr_queue_len[%d]: %lu", i, runtime->stat.curr_queue_len[i]); - } + PACKET_MANAGER_LOG_DEBUG("runtime[%d] => input_pkts: %lu, output_pkts: %lu, take_pkts: %lu, schedule_pkts: %lu, queue_len: {pre_routing: %lu, input: %lu, forward: %lu, output: %lu, post_routing: %lu}", + runtime->idx, + runtime->stat.input_pkts, runtime->stat.output_pkts, + runtime->stat.take_pkts, runtime->stat.schedule_pkts, + runtime->stat.curr_queue_len[PACKET_STAGE_PREROUTING], + runtime->stat.curr_queue_len[PACKET_STAGE_INPUT], + runtime->stat.curr_queue_len[PACKET_STAGE_FORWARD], + runtime->stat.curr_queue_len[PACKET_STAGE_OUTPUT], + runtime->stat.curr_queue_len[PACKET_STAGE_POSTROUTING]); } -/****************************************************************************** - * Public API - ******************************************************************************/ - -struct packet_manager *packet_manager_new(const char *toml_file) +void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt) { - struct packet_manager *pkt_mgr = calloc(1, sizeof(struct packet_manager)); - if (pkt_mgr == NULL) - { - PACKET_MANAGER_LOG_ERROR("failed to allocate memory for packet_manager"); - return NULL; - } - - pkt_mgr->cfg = packet_manager_config_new(toml_file); - if (pkt_mgr->cfg == NULL) - { - PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_config"); - goto error_out; - } - - pkt_mgr->schema = packet_manager_schema_new(); - if (pkt_mgr->schema == NULL) - { - PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_schema"); - goto error_out; - } - - for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++) - { - pkt_mgr->runtime[i] = packet_manager_runtime_new(pkt_mgr->schema); - if (pkt_mgr->runtime[i] == NULL) - { - PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_runtime"); - goto error_out; - } - } - - return pkt_mgr; - -error_out: - packet_manager_free(pkt_mgr); - return NULL; -} - -void packet_manager_free(struct packet_manager *pkt_mgr) -{ - if (pkt_mgr) - { - packet_manager_config_free(pkt_mgr->cfg); - packet_manager_schema_free(pkt_mgr->schema); - - for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++) - { - if (pkt_mgr->runtime[i]) - { - PACKET_MANAGER_LOG_INFO("worker thread %d packet manager runtime stat", i); - packet_manager_runtime_stat(pkt_mgr->runtime[i]); - packet_manager_runtime_free(pkt_mgr->runtime[i]); - } - } - - free(pkt_mgr); - pkt_mgr = NULL; - } + pkt_mgr_rt->mq = mq_rt; } void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt) @@ -313,11 +268,14 @@ void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt) { for (int i = 0; i < PACKET_STAGE_MAX; i++) { + // packet_manager_runtime_stat(pkt_mgr_rt); + struct packet *pkt = NULL; while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[i]))) { - pkt_mgr_rt->stat.curr_queue_len[i]--; TAILQ_REMOVE(&pkt_mgr_rt->queue[i], pkt, stage_tqe); + pkt_mgr_rt->stat.curr_queue_len[i]--; + mq_runtime_publish_message(pkt_mgr_rt->mq, i, pkt); mq_runtime_dispatch(pkt_mgr_rt->mq); @@ -328,17 +286,97 @@ void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt) if (i + 1 == PACKET_STAGE_MAX) { + TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i], pkt, stage_tqe); + pkt_mgr_rt->stat.curr_queue_len[i]++; break; } else { - pkt_mgr_rt->stat.curr_queue_len[i + 1]++; TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i + 1], pkt, stage_tqe); + pkt_mgr_rt->stat.curr_queue_len[i + 1]++; } } } } +void packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt) +{ + pkt_mgr_rt->stat.take_pkts++; + packet_set_stolen(pkt, true); +} + +void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage) +{ + pkt_mgr_rt->stat.schedule_pkts++; + packet_set_stolen(pkt, false); + TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[stage], pkt, stage_tqe); +} + +/****************************************************************************** + * packet manager + ******************************************************************************/ + +struct packet_manager *packet_manager_new(struct mq_schema *mq, const char *toml_file) +{ + struct packet_manager *pkt_mgr = calloc(1, sizeof(struct packet_manager)); + if (pkt_mgr == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to allocate memory for packet_manager"); + return NULL; + } + + pkt_mgr->cfg = packet_manager_config_new(toml_file); + if (pkt_mgr->cfg == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_config"); + goto error_out; + } + + pkt_mgr->schema = packet_manager_schema_new(mq); + if (pkt_mgr->schema == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_schema"); + goto error_out; + } + + for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++) + { + pkt_mgr->runtime[i] = packet_manager_runtime_new(i); + if (pkt_mgr->runtime[i] == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_runtime"); + goto error_out; + } + } + + return pkt_mgr; + +error_out: + packet_manager_free(pkt_mgr); + return NULL; +} + +void packet_manager_free(struct packet_manager *pkt_mgr) +{ + if (pkt_mgr) + { + for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++) + { + if (pkt_mgr->runtime[i]) + { + packet_manager_runtime_stat(pkt_mgr->runtime[i]); + packet_manager_runtime_free(pkt_mgr->runtime[i]); + } + } + + packet_manager_schema_free(pkt_mgr->schema); + packet_manager_config_free(pkt_mgr->cfg); + + free(pkt_mgr); + pkt_mgr = NULL; + } +} + struct packet_manager_schema *packet_manager_get_schema(struct packet_manager *pkt_mgr) { if (pkt_mgr) @@ -362,21 +400,3 @@ struct packet_manager_runtime *packet_manager_get_runtime(struct packet_manager return NULL; } } - -int packet_manager_schema_add_subscriber(struct packet_manager_schema *schema, enum packet_stage stage, on_packet_stage_callback cb, void *args) -{ - return mq_schema_subscribe(schema->mq, schema->topic_id[stage], (on_msg_cb_func *)cb, args); -} - -void packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt) -{ - pkt_mgr_rt->stat.take_pkts++; - packet_set_stolen(pkt, true); -} - -void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage) -{ - pkt_mgr_rt->stat.schedule_pkts++; - packet_set_stolen(pkt, false); - TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[stage], pkt, stage_tqe); -} \ No newline at end of file diff --git a/infra/packet_manager/packet_manager_private.h b/infra/packet_manager/packet_manager_private.h index 85c5106..3f0dcf3 100644 --- a/infra/packet_manager/packet_manager_private.h +++ b/infra/packet_manager/packet_manager_private.h @@ -7,12 +7,15 @@ extern "C" #include "stellar/packet_manager.h" -struct packet_manager *packet_manager_new(const char *toml_file); +struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, const char *toml_file); void packet_manager_free(struct packet_manager *pkt_mgr); +void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt); void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt); struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt); void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt); +// for debug +const char *packet_stage_to_str(enum packet_stage stage); #ifdef __cplusplus } diff --git a/infra/packet_manager/test/CMakeLists.txt b/infra/packet_manager/test/CMakeLists.txt index 3d12184..0c8314c 100644 --- a/infra/packet_manager/test/CMakeLists.txt +++ b/infra/packet_manager/test/CMakeLists.txt @@ -55,6 +55,9 @@ target_link_libraries(gtest_packet_filter packet_manager gtest) add_executable(gtest_packet_ldbc gtest_packet_ldbc.cpp) target_link_libraries(gtest_packet_ldbc packet_manager gtest) +add_executable(gtest_packet_manager gtest_packet_manager.cpp) +target_link_libraries(gtest_packet_manager packet_manager gtest) + include(GoogleTest) gtest_discover_tests(gtest_tunnel) gtest_discover_tests(gtest_udp_utils) @@ -74,4 +77,7 @@ gtest_discover_tests(gtest_packet_frag) gtest_discover_tests(gtest_packet_parser) gtest_discover_tests(gtest_packet_builder) gtest_discover_tests(gtest_packet_filter) -gtest_discover_tests(gtest_packet_ldbc) \ No newline at end of file +gtest_discover_tests(gtest_packet_ldbc) +gtest_discover_tests(gtest_packet_manager) + +file(COPY ../../../conf/ DESTINATION ./conf/) \ No newline at end of file diff --git a/infra/packet_manager/test/gtest_packet_manager.cpp b/infra/packet_manager/test/gtest_packet_manager.cpp new file mode 100644 index 0000000..2fee4a4 --- /dev/null +++ b/infra/packet_manager/test/gtest_packet_manager.cpp @@ -0,0 +1,167 @@ +#include + +#include "stellar/mq.h" +#include "packet_parser.h" +#include "packet_private.h" +#include "packet_manager_private.h" + +/****************************************************************************** + * [Protocols in frame: eth:ethertype:ip:ipv6:tcp] + ****************************************************************************** + * + * Frame 1: 106 bytes on wire (848 bits), 106 bytes captured (848 bits) + * Ethernet II, Src: JuniperN_45:88:29 (2c:6b:f5:45:88:29), Dst: JuniperN_2a:a2:00 (5c:5e:ab:2a:a2:00) + * Destination: JuniperN_2a:a2:00 (5c:5e:ab:2a:a2:00) + * Source: JuniperN_45:88:29 (2c:6b:f5:45:88:29) + * Type: IPv4 (0x0800) + * Internet Protocol Version 4, Src: 210.77.88.163, Dst: 59.66.4.50 + * 0100 .... = Version: 4 + * .... 0101 = Header Length: 20 bytes (5) + * Differentiated Services Field: 0x00 (DSCP: CS0, ECN: Not-ECT) + * Total Length: 92 + * Identification: 0x0b4d (2893) + * 000. .... = Flags: 0x0 + * ...0 0000 0000 0000 = Fragment Offset: 0 + * Time to Live: 59 + * Protocol: IPv6 (41) + * Header Checksum: 0x09c8 [validation disabled] + * [Header checksum status: Unverified] + * Source Address: 210.77.88.163 + * Destination Address: 59.66.4.50 + * Internet Protocol Version 6, Src: 2001:da8:200:900e:200:5efe:d24d:58a3, Dst: 2600:140e:6::1702:1058 + * 0110 .... = Version: 6 + * .... 0000 0000 .... .... .... .... .... = Traffic Class: 0x00 (DSCP: CS0, ECN: Not-ECT) + * .... 0000 0000 0000 0000 0000 = Flow Label: 0x00000 + * Payload Length: 32 + * Next Header: TCP (6) + * Hop Limit: 64 + * Source Address: 2001:da8:200:900e:200:5efe:d24d:58a3 + * Destination Address: 2600:140e:6::1702:1058 + * [Source ISATAP IPv4: 210.77.88.163] + * Transmission Control Protocol, Src Port: 52556, Dst Port: 80, Seq: 0, Len: 0 + * Source Port: 52556 + * Destination Port: 80 + * [Stream index: 0] + * [Conversation completeness: Complete, WITH_DATA (31)] + * [TCP Segment Len: 0] + * Sequence Number: 0 (relative sequence number) + * Sequence Number (raw): 2172673142 + * [Next Sequence Number: 1 (relative sequence number)] + * Acknowledgment Number: 0 + * Acknowledgment number (raw): 0 + * 1000 .... = Header Length: 32 bytes (8) + * Flags: 0x002 (SYN) + * Window: 8192 + * [Calculated window size: 8192] + * Checksum: 0xf757 [unverified] + * [Checksum Status: Unverified] + * Urgent Pointer: 0 + * Options: (12 bytes), Maximum segment size, No-Operation (NOP), Window scale, No-Operation (NOP), No-Operation (NOP), SACK permitted + * [Timestamps] + */ + +unsigned char data[] = { + 0x5c, 0x5e, 0xab, 0x2a, 0xa2, 0x00, 0x2c, 0x6b, 0xf5, 0x45, 0x88, 0x29, 0x08, 0x00, 0x45, 0x00, 0x00, 0x5c, 0x0b, 0x4d, 0x00, 0x00, 0x3b, 0x29, 0x09, 0xc8, + 0xd2, 0x4d, 0x58, 0xa3, 0x3b, 0x42, 0x04, 0x32, 0x60, 0x00, 0x00, 0x00, 0x00, 0x20, 0x06, 0x40, 0x20, 0x01, 0x0d, 0xa8, 0x02, 0x00, 0x90, 0x0e, 0x02, 0x00, + 0x5e, 0xfe, 0xd2, 0x4d, 0x58, 0xa3, 0x26, 0x00, 0x14, 0x0e, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x02, 0x10, 0x58, 0xcd, 0x4c, 0x00, 0x50, + 0x81, 0x80, 0x5c, 0x76, 0x00, 0x00, 0x00, 0x00, 0x80, 0x02, 0x20, 0x00, 0xf7, 0x57, 0x00, 0x00, 0x02, 0x04, 0x04, 0xc4, 0x01, 0x03, 0x03, 0x08, 0x01, 0x01, + 0x04, 0x02}; + +#if 1 +TEST(PACKET_MANAGER, NEW_FREE) +{ + struct mq_schema *mq_schema = mq_schema_new(); + EXPECT_TRUE(mq_schema); + + struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml"); + EXPECT_TRUE(pkt_mgr); + + EXPECT_TRUE(packet_manager_get_schema(pkt_mgr)); + EXPECT_TRUE(packet_manager_get_runtime(pkt_mgr, 0)); + EXPECT_TRUE(packet_manager_get_runtime(pkt_mgr, 1) == NULL); + + packet_manager_free(pkt_mgr); + + mq_schema_free(mq_schema); +} +#endif + +static void on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args) +{ + printf("stage: %s\n", packet_stage_to_str(stage)); + + static int count = 0; + EXPECT_TRUE(count == stage); + count++; + + EXPECT_TRUE(packet_is_ctrl(pkt)); + EXPECT_TRUE(args == NULL); +} + +#if 1 +TEST(PACKET_MANAGER, NORNMAL) +{ + // global init + struct mq_schema *mq_schema = mq_schema_new(); + EXPECT_TRUE(mq_schema); + struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); + EXPECT_TRUE(mq_rt); + + // module init + struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml"); + EXPECT_TRUE(pkt_mgr); + struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr); + EXPECT_TRUE(schema); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, on_packet_stage, NULL) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, on_packet_stage, NULL) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, on_packet_stage, NULL) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, on_packet_stage, NULL) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, on_packet_stage, NULL) == 0); + + // per-thread init + struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0); + EXPECT_TRUE(runtime); + packet_manager_runtime_init(runtime, mq_rt); + + // per-thread run + struct packet pkt; + memset(&pkt, 0, sizeof(pkt)); + packet_parse(&pkt, (const char *)data, sizeof(data)); + packet_set_ctrl(&pkt, true); + + packet_manager_runtime_input(runtime, &pkt); + packet_manager_runtime_dispatch(runtime); + EXPECT_TRUE(packet_manager_runtime_output(runtime) == &pkt); + + // per-thread free + + // module free + packet_manager_free(pkt_mgr); + + // global free + mq_runtime_free(mq_rt); + mq_schema_free(mq_schema); +} +#endif + +#if 1 +TEST(PACKET_MANAGER, TAKE) +{ + // TODO + // packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt); +} +#endif + +#if 1 +TEST(PACKET_MANAGER, SCHEDULE) +{ + // TODO + // packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage); +} +#endif + +int main(int argc, char **argv) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}