This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
stellar-stellar/test/debug_plugin/debug_plugin.cpp
2024-08-21 18:18:12 +08:00

317 lines
10 KiB
C++

#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdarg.h>
#include <time.h>
#include <pthread.h>
#include <arpa/inet.h>
#include "utils.h"
#include "packet_dump.h"
#include "session_private.h"
#include "stellar/session.h"
#include "stellar/packet.h"
#include "stellar/stellar_mq.h"
#include "stellar/stellar_exdata.h"
#pragma GCC diagnostic ignored "-Wunused-parameter"
// NOTE: packet hexdump or tcp segment hexdump may be too long, so we need direct output to fd, instead of using log_print
static void log_print(int fd, const char *module, const char *fmt, ...)
{
static unsigned char weekday_str[7][4] = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"};
static unsigned char month_str[12][4] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
int nwrite;
char buf[4096 * 2] = {0};
char *p = buf;
char *end = buf + sizeof(buf);
va_list args;
struct tm local;
time_t t;
time(&t);
localtime_r(&t, &local);
// add time
p += snprintf(p, end - p, "%s %s %d %02d:%02d:%02d %d ",
weekday_str[local.tm_wday], month_str[local.tm_mon], local.tm_mday, local.tm_hour, local.tm_min, local.tm_sec, local.tm_year + 1900);
// add tid
p += snprintf(p, end - p, "%lu ", pthread_self());
// add module
p += snprintf(p, end - p, "(%s), ", module);
// add content
va_start(args, fmt);
p += vsnprintf(p, end - p, fmt, args);
va_end(args);
// add end of line
p += snprintf(p, end - p, "\n");
do
{
nwrite = write(fd, buf, p - buf);
} while (nwrite == -1 && errno == EINTR);
}
struct plugin_ctx
{
struct stellar *st;
char work_dir[1024];
int sess_exdata_idx;
int sess_plug_id;
int tcp_topic_id;
int udp_topic_id;
int tcp_stream_topic_id;
int fd;
int c2s_tcp_seg_hexdump_fd;
int s2c_tcp_seg_hexdump_fd;
pthread_spinlock_t lock; // for hexdump thread safe
};
struct session_exdata
{
uint64_t c2s_rx_pkts;
uint64_t s2c_rx_pkts;
uint64_t c2s_rx_bytes;
uint64_t s2c_rx_bytes;
uint64_t c2s_rx_tcp_seg;
uint64_t s2c_rx_tcp_seg;
uint64_t c2s_rx_tcp_bytes;
uint64_t s2c_rx_tcp_bytes;
int c2s_tcp_seg_hexdump_fd;
int s2c_tcp_seg_hexdump_fd;
};
static void *on_sess_new(struct session *sess, void *plugin_ctx)
{
char buff[4096];
struct plugin_ctx *ctx = (struct plugin_ctx *)plugin_ctx;
struct session_exdata *exdata = (struct session_exdata *)calloc(1, sizeof(struct session_exdata));
if (session_get_type(sess) == SESSION_TYPE_TCP)
{
memset(buff, 0, sizeof(buff));
sprintf(buff, "%s/log/debug_plugin_%s_c2s_segment", ctx->work_dir, session_get0_readable_addr(sess));
ctx->c2s_tcp_seg_hexdump_fd = open(buff, O_WRONLY | O_APPEND | O_CREAT, 0644);
memset(buff, 0, sizeof(buff));
sprintf(buff, "%s/log/debug_plugin_%s_s2c_segment", ctx->work_dir, session_get0_readable_addr(sess));
ctx->s2c_tcp_seg_hexdump_fd = open(buff, O_WRONLY | O_APPEND | O_CREAT, 0644);
}
session_exdata_set(sess, ctx->sess_exdata_idx, exdata);
memset(buff, 0, sizeof(buff));
session_to_str(sess, 1, buff, sizeof(buff) - 1);
log_print(ctx->fd, "debug plugin", "sess new: %s", buff);
return NULL;
}
static void on_sess_free(struct session *sess, void *sess_ctx, void *plugin_ctx)
{
char buff[4096];
struct plugin_ctx *ctx = (struct plugin_ctx *)plugin_ctx;
struct session_exdata *exdata = (struct session_exdata *)session_exdata_get(sess, ctx->sess_exdata_idx);
memset(buff, 0, sizeof(buff));
session_to_str(sess, 0, buff, sizeof(buff) - 1);
log_print(ctx->fd, "debug plugin", "sess free: %s", buff);
log_print(ctx->fd, "debug plugin", "session %lu %s stat:\n"
"C2S rx packets: %6lu, C2S rx bytes: %6lu\n"
"S2C rx packets: %6lu, S2C rx bytes: %6lu\n"
"C2S rx TCP segments: %6lu, C2S rx TCP bytes: %6lu\n"
"S2C rx TCP segments: %6lu, S2C rx TCP bytes: %6lu\n",
session_get_id(sess), session_get0_readable_addr(sess),
exdata->c2s_rx_pkts, exdata->c2s_rx_bytes,
exdata->s2c_rx_pkts, exdata->s2c_rx_bytes,
exdata->c2s_rx_tcp_seg, exdata->c2s_rx_tcp_bytes,
exdata->s2c_rx_tcp_seg, exdata->s2c_rx_tcp_bytes);
if (exdata->c2s_tcp_seg_hexdump_fd > 0)
{
close(exdata->c2s_tcp_seg_hexdump_fd);
}
if (exdata->s2c_tcp_seg_hexdump_fd > 0)
{
close(exdata->s2c_tcp_seg_hexdump_fd);
}
}
static void on_sess_udp_msg(struct session *sess, int topic_id, const void *msg, void *sess_ctx, void *plugin_ctx)
{
if (msg == NULL)
{
return;
}
char buff[4096];
struct packet *pkt = (struct packet *)msg;
struct plugin_ctx *ctx = (struct plugin_ctx *)plugin_ctx;
struct session_exdata *exdata = (struct session_exdata *)session_exdata_get(sess, ctx->sess_exdata_idx);
if (session_get_current_flow_direction(sess) == FLOW_DIRECTION_C2S)
{
exdata->c2s_rx_pkts++;
exdata->c2s_rx_bytes += packet_get_raw_len(pkt);
}
else
{
exdata->s2c_rx_pkts++;
exdata->s2c_rx_bytes += packet_get_raw_len(pkt);
}
memset(buff, 0, sizeof(buff));
session_to_str(sess, 1, buff, sizeof(buff) - 1);
log_print(ctx->fd, "debug plugin", "on UDP msg: %s", buff);
memset(buff, 0, sizeof(buff));
packet_dump_str(pkt, buff, sizeof(buff) - 1);
log_print(ctx->fd, "debug plugin", "rx UDP packet: \n%s", buff);
pthread_spin_lock(&ctx->lock);
packet_dump_hex(pkt, ctx->fd);
pthread_spin_unlock(&ctx->lock);
}
static void on_sess_tcp_msg(struct session *sess, int topic_id, const void *msg, void *sess_ctx, void *plugin_ctx)
{
if (msg == NULL)
{
return;
}
char buff[4096];
struct packet *pkt = (struct packet *)msg;
struct plugin_ctx *ctx = (struct plugin_ctx *)plugin_ctx;
struct session_exdata *exdata = (struct session_exdata *)session_exdata_get(sess, ctx->sess_exdata_idx);
if (session_get_current_flow_direction(sess) == FLOW_DIRECTION_C2S)
{
exdata->c2s_rx_pkts++;
exdata->c2s_rx_bytes += packet_get_raw_len(pkt);
}
else
{
exdata->s2c_rx_pkts++;
exdata->s2c_rx_bytes += packet_get_raw_len(pkt);
}
memset(buff, 0, sizeof(buff));
session_to_str(sess, 1, buff, sizeof(buff) - 1);
log_print(ctx->fd, "debug plugin", "on TCP msg: %s", buff);
memset(buff, 0, sizeof(buff));
packet_dump_str(pkt, buff, sizeof(buff) - 1);
log_print(ctx->fd, "debug plugin", "rx TCP packet: \n%s", buff);
pthread_spin_lock(&ctx->lock);
packet_dump_hex(pkt, ctx->fd);
pthread_spin_unlock(&ctx->lock);
}
static void on_sess_tcp_stream_msg(struct session *sess, int topic_id, const void *msg, void *sess_ctx, void *plugin_ctx)
{
if (msg == NULL)
{
return;
}
char buff[4096];
struct tcp_segment *seg = (struct tcp_segment *)msg;
struct plugin_ctx *ctx = (struct plugin_ctx *)plugin_ctx;
const char *data = tcp_segment_get_data(seg);
uint16_t len = tcp_segment_get_len(seg);
struct session_exdata *exdata = (struct session_exdata *)session_exdata_get(sess, ctx->sess_exdata_idx);
memset(buff, 0, sizeof(buff));
session_to_str(sess, 1, buff, sizeof(buff) - 1);
log_print(ctx->fd, "debug plugin", "on TCP stream msg: %s", buff);
pthread_spin_lock(&ctx->lock);
if (session_get_current_flow_direction(sess) == FLOW_DIRECTION_C2S)
{
log_print(ctx->fd, "debug plugin", "rx C2S TCP segment: len: %d, data: %p", len, data);
hexdump_to_fd(ctx->fd, exdata->c2s_rx_tcp_bytes, data, len);
hexdump_to_fd(ctx->c2s_tcp_seg_hexdump_fd, exdata->c2s_rx_tcp_bytes, data, len);
exdata->c2s_rx_tcp_seg++;
exdata->c2s_rx_tcp_bytes += len;
}
else
{
log_print(ctx->fd, "debug plugin", "rx S2C TCP segment: len: %d, data: %p", len, data);
hexdump_to_fd(ctx->fd, exdata->s2c_rx_tcp_bytes, data, len);
hexdump_to_fd(ctx->s2c_tcp_seg_hexdump_fd, exdata->s2c_rx_tcp_bytes, data, len);
exdata->s2c_rx_tcp_seg++;
exdata->s2c_rx_tcp_bytes += len;
}
pthread_spin_unlock(&ctx->lock);
}
/******************************************************************************
* Plugin API
******************************************************************************/
extern "C"
{
void *debug_plugin_init(struct stellar *st)
{
struct plugin_ctx *ctx = (struct plugin_ctx *)calloc(1, sizeof(struct plugin_ctx));
if (ctx == NULL)
{
return NULL;
}
ctx->fd = open("./log/debug_plugin_log", O_WRONLY | O_APPEND | O_CREAT, 0644);
if (ctx->fd == -1)
{
printf("[debug plugin] open log file failed: %s\n", strerror(errno));
free(ctx);
return NULL;
}
if (getcwd(ctx->work_dir, sizeof(ctx->work_dir)) == NULL)
{
printf("[debug plugin] getcwd failed: %s\n", strerror(errno));
close(ctx->fd);
free(ctx);
return NULL;
}
pthread_spin_init(&ctx->lock, PTHREAD_PROCESS_PRIVATE);
ctx->st = st;
ctx->sess_exdata_idx = stellar_exdata_new_index(st, "DEBUG_PLUGIN_SESS_EXDATA", stellar_exdata_free_default, NULL);
ctx->sess_plug_id = stellar_session_plugin_register(st, on_sess_new, on_sess_free, ctx);
ctx->udp_topic_id = stellar_mq_get_topic_id(st, TOPIC_UDP);
ctx->tcp_topic_id = stellar_mq_get_topic_id(st, TOPIC_TCP);
ctx->tcp_stream_topic_id = stellar_mq_get_topic_id(st, TOPIC_TCP_STREAM);
stellar_session_mq_subscribe(st, ctx->udp_topic_id, on_sess_udp_msg, ctx->sess_plug_id);
stellar_session_mq_subscribe(st, ctx->tcp_topic_id, on_sess_tcp_msg, ctx->sess_plug_id);
stellar_session_mq_subscribe(st, ctx->tcp_stream_topic_id, on_sess_tcp_stream_msg, ctx->sess_plug_id);
log_print(ctx->fd, "debug plugin", "init");
return ctx;
}
void debug_plugin_exit(void *plugin_ctx)
{
struct plugin_ctx *ctx = (struct plugin_ctx *)plugin_ctx;
if (ctx)
{
log_print(ctx->fd, "debug plugin", "exit");
if (ctx->fd > 0)
{
close(ctx->fd);
}
pthread_spin_destroy(&ctx->lock);
free(ctx);
}
}
}