🧪 test(packet injector test): upgrade plugin manager

This commit is contained in:
yangwei
2024-05-17 19:57:22 +08:00
committed by luwenpeng
parent e0ec3f2d52
commit ccaddf7fdb
5 changed files with 49 additions and 73 deletions

View File

@@ -15,6 +15,7 @@
#include "session_priv.h"
#include "inject_priv.h"
#include "stellar/tuple.h"
#include "stellar/session_mq.h"
#define MOCK_PLUGIN_LOG_DEBUG(format, ...) LOG_DEBUG("mock plugin", format, ##__VA_ARGS__)
@@ -49,7 +50,7 @@ struct inject_rule
uint64_t count_num;
} rule;
static void inject_packet_plugin(struct session *sess, struct packet *pkt, struct inject_rule *rule)
static void inject_packet_plugin(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
{
char buffer[1024] = {0};
const struct tuple6 *tuple = session_get_tuple6(sess);
@@ -71,11 +72,11 @@ static void inject_packet_plugin(struct session *sess, struct packet *pkt, struc
{
return;
}
if (rule->count_dir == AFTER_RECV_C2S_N_PACKET && session_get_stat(sess, FLOW_DIRECTION_C2S, STAT_RAW_PACKETS_RECEIVED) != rule->count_num)
if (p_rule->count_dir == AFTER_RECV_C2S_N_PACKET && session_get_stat(sess, FLOW_DIRECTION_C2S, STAT_RAW_PACKETS_RECEIVED) != p_rule->count_num)
{
return;
}
if (rule->count_dir == AFTER_RECV_S2C_N_PACKET && session_get_stat(sess, FLOW_DIRECTION_S2C, STAT_RAW_PACKETS_RECEIVED) != rule->count_num)
if (p_rule->count_dir == AFTER_RECV_S2C_N_PACKET && session_get_stat(sess, FLOW_DIRECTION_S2C, STAT_RAW_PACKETS_RECEIVED) != p_rule->count_num)
{
return;
}
@@ -84,7 +85,7 @@ static void inject_packet_plugin(struct session *sess, struct packet *pkt, struc
{
return;
}
switch (rule->inject_type)
switch (p_rule->inject_type)
{
case INJECT_TYPE_TCP_RST:
packet_set_action(pkt, PACKET_ACTION_DROP);
@@ -121,73 +122,18 @@ static void inject_packet_plugin(struct session *sess, struct packet *pkt, struc
* mock plugin manager
******************************************************************************/
struct plugin_manager
void *plugin_manager_new_ctx(struct session *sess, void *plugin_env)
{
};
void *plugin_manager_new_ctx(struct session *sess)
{
return sess;
return NULL;
}
void plugin_manager_free_ctx(void *ctx)
void plugin_manager_free_ctx(struct session *sess, void *ctx, void *plugin_env)
{
struct session *sess = (struct session *)ctx;
char buff[4096] = {0};
session_to_json(sess, buff, sizeof(buff));
MOCK_PLUGIN_LOG_DEBUG("=> session: %s", buff);
}
struct plugin_manager *plugin_manager_new(void)
{
static struct plugin_manager mgr;
return &mgr;
}
void plugin_manager_free(struct plugin_manager *mgr)
{
}
void plugin_manager_dispatch_session(struct plugin_manager *mgr, struct session *sess, struct packet *pkt)
{
struct tcp_segment *seg;
enum session_type type = session_get_type(sess);
uint16_t thr_idx = stellar_get_current_thread_index();
MOCK_PLUGIN_LOG_DEBUG("=> thread: %d, session: %lu %s, type: %s, state: %s, c2s packet received: %lu, s2c packet received: %lu", thr_idx,
session_get_id(sess), session_get_tuple6_str(sess),
session_type_to_str(type), session_state_to_str(session_get_state(sess)),
session_get_stat(sess, FLOW_DIRECTION_C2S, STAT_RAW_PACKETS_RECEIVED),
session_get_stat(sess, FLOW_DIRECTION_S2C, STAT_RAW_PACKETS_RECEIVED));
if (packet_is_ctrl(pkt))
{
}
else
{
switch (type)
{
case SESSION_TYPE_TCP:
while ((seg = session_get_tcp_segment(sess)) != NULL)
{
session_free_tcp_segment(sess, seg);
}
break;
case SESSION_TYPE_UDP:
break;
default:
assert(0);
break;
}
inject_packet_plugin(sess, pkt, &rule);
}
}
void plugin_manager_dispatch_packet(struct plugin_manager *mgr, struct packet *pkt)
{
}
/******************************************************************************
* main
******************************************************************************/
@@ -396,7 +342,7 @@ static int parse_cmdline(int argc, char **argv, struct inject_rule *rule)
return 0;
}
int main(int argc, char **argv)
int inject_packet_main(int argc, char **argv)
{
if (parse_cmdline(argc, argv, &rule) != 0)
{
@@ -404,6 +350,10 @@ int main(int argc, char **argv)
}
stellar_update_time_cache();
struct stellar st = {runtime};
int sess_plug_id;
int tcp_topic_id;
int udp_topic_id;
signal(SIGINT, signal_handler);
signal(SIGQUIT, signal_handler);
@@ -436,8 +386,7 @@ int main(int argc, char **argv)
STELLAR_LOG_ERROR("unable to create stellar stat");
goto error_out;
}
runtime->plug_mgr = plugin_manager_new();
runtime->plug_mgr = plugin_manager_init(&st, "./stellar_plugin/spec.toml");
if (runtime->plug_mgr == NULL)
{
STELLAR_LOG_ERROR("unable to create plugin manager");
@@ -451,6 +400,13 @@ int main(int argc, char **argv)
goto error_out;
}
sess_plug_id = stellar_session_plugin_register(&st, plugin_manager_new_ctx, plugin_manager_free_ctx, NULL);
tcp_topic_id = stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
udp_topic_id = stellar_session_mq_get_topic_id(&st, TOPIC_UDP);
stellar_session_mq_subscribe(&st, tcp_topic_id, inject_packet_plugin, sess_plug_id);
stellar_session_mq_subscribe(&st, udp_topic_id, inject_packet_plugin, sess_plug_id);
if (stellar_thread_init(runtime, config) != 0)
{
STELLAR_LOG_ERROR("unable to init thread context");
@@ -486,10 +442,15 @@ error_out:
stellar_thread_join(runtime, config);
stellar_thread_clean(runtime, config);
packet_io_free(runtime->packet_io);
plugin_manager_free(runtime->plug_mgr);
plugin_manager_exit(runtime->plug_mgr);
stellar_stat_free(runtime->stat);
STELLAR_LOG_STATE("stellar exit !!!\n");
log_free();
return 0;
}
int __attribute__((weak)) main(int argc, char **argv)
{
return inject_packet_main(argc, argv);
}