增加连接层的性能统计

This commit is contained in:
Lu Qiuwen
2018-11-02 13:52:30 +08:00
parent 2e13728bfc
commit b3b65369d8
6 changed files with 152 additions and 92 deletions

View File

@@ -15,7 +15,6 @@
#include <errno.h>
#include <pthread.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
@@ -39,7 +38,7 @@
#include <MESA/field_stat2.h>
#include <tfe_plugin.h>
static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGPIPE, SIGUSR1};
static int signals[] = {SIGHUP, SIGPIPE, SIGUSR1};
/* Global Resource */
void * g_default_logger = NULL;
@@ -76,7 +75,7 @@ struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx)
unsigned int min_thread_id = 0;
unsigned int min_load = 0;
for(unsigned int tid = 0; tid < ctx->nr_work_threads; tid++)
for (unsigned int tid = 0; tid < ctx->nr_work_threads; tid++)
{
struct tfe_thread_ctx * thread_ctx = ctx->work_threads[tid];
min_thread_id = min_load > thread_ctx->load ? tid : min_thread_id;
@@ -97,7 +96,7 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_p
tfe_thread_ctx * worker_thread_ctx = tfe_proxy_thread_ctx_acquire(ctx);
struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx);
tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &para->session_type, sizeof(para->session_type));
tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &para->session_type, sizeof(para->session_type));
tfe_stream_option_set(stream, TFE_STREAM_OPT_KEYRING_ID, &para->keyring_id, sizeof(para->keyring_id));
/* FOR DEBUG */
@@ -107,14 +106,15 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_p
enum tfe_stream_proto __session_type = STREAM_PROTO_PLAIN;
tfe_stream_option_set(stream, TFE_STREAM_OPT_PASSTHROUGH, &__true, sizeof(__true));
tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type));
tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type));
}
int ret = tfe_stream_init_by_fds(stream, para->downstream_fd, para->upstream_fd);
if (ret < 0)
{
TFE_LOG_ERROR(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accept failed.",
stream, para->downstream_fd, para->upstream_fd, para->session_type); goto __errout;
stream, para->downstream_fd, para->upstream_fd, para->session_type);
goto __errout;
}
else
{
@@ -125,6 +125,7 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_p
return 0;
__errout:
if(stream != NULL) tfe_stream_destory((struct tfe_stream_private *)stream);
return -1;
}
@@ -151,16 +152,13 @@ static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg)
{
case SIGTERM:
case SIGQUIT:
case SIGHUP:
break;
case SIGUSR1:
break;
case SIGHUP: break;
case SIGUSR1: break;
case SIGPIPE:
ATOMIC_INC(&(ctx->stat_val[STAT_SIGPIPE]));
TFE_PROXY_STAT_INCREASE(STAT_SIGPIPE, 1);
TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n");
break;
default:
TFE_LOG_ERROR(ctx->logger, "Warning: Received unexpected signal %i\n", fd);
default: TFE_LOG_ERROR(ctx->logger, "Warning: Received unexpected signal %i\n", fd);
break;
}
}
@@ -168,8 +166,8 @@ static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg)
static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg)
{
tfe_proxy * ctx = (tfe_proxy *) arg;
int i=0;
for(i=0;i<TFE_STAT_MAX;i++)
int i = 0;
for (i = 0; i < TFE_STAT_MAX; i++)
{
FS_operate(ctx->fs_handle, ctx->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(ctx->stat_val[i])));
}
@@ -195,23 +193,22 @@ static void * tfe_work_thread(void * arg)
__currect_thread_id = ctx->thread_id;
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id);
prctl(PR_SET_NAME,(unsigned long long)thread_name,NULL,NULL,NULL);
prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
TFE_LOG_INFO(g_default_logger, "Work thread %u is running...", ctx->thread_id);
event_base_dispatch(ctx->evbase);
assert(0);
event_free(ev);
TFE_LOG_ERROR(g_default_logger, "Work thread %u is exit...", ctx->thread_id);
return (void *)NULL;
return (void *) NULL;
}
void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy)
{
unsigned int i=0;
for(i=0; i<proxy->nr_work_threads;i++)
unsigned int i = 0;
for (i = 0; i < proxy->nr_work_threads; i++)
{
proxy->work_threads[i]=ALLOC(struct tfe_thread_ctx, 1);
proxy->work_threads[i] = ALLOC(struct tfe_thread_ctx, 1);
proxy->work_threads[i]->thread_id = i;
proxy->work_threads[i]->evbase = event_base_new();
}
@@ -219,16 +216,17 @@ void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy)
}
int tfe_proxy_work_thread_run(struct tfe_proxy * proxy)
{
struct tfe_thread_ctx * __thread_ctx=NULL;
unsigned int i=0;
int ret=0;
for(i=0; i<proxy->nr_work_threads;i++)
struct tfe_thread_ctx * __thread_ctx = NULL;
unsigned int i = 0;
int ret = 0;
for (i = 0; i < proxy->nr_work_threads; i++)
{
__thread_ctx=proxy->work_threads[i];
ret = pthread_create(&__thread_ctx->thr, NULL, tfe_work_thread, (void *)__thread_ctx);
__thread_ctx = proxy->work_threads[i];
ret = pthread_create(&__thread_ctx->thr, NULL, tfe_work_thread, (void *) __thread_ctx);
if (unlikely(ret < 0))
{
TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d, error %d: %s", i, errno, strerror(errno));
TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d, error %d: %s", i, errno,
strerror(errno));
return -1;
}
}
@@ -251,35 +249,56 @@ int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile)
return 0;
}
static const char * __str_stat_spec_map[] =
{
[STAT_SIGPIPE] = "SIGPIPE",
[STAT_FD_OPEN_BY_KNI_ACCEPT] = "FdOpenKNI",
[STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "FdCloseKNI",
[STAT_FD_CLOSE_BY_EVENT_WRITE] = "FdCloseUser",
[STAT_FD_CLOSE_BY_EVENT_EOF] = "FdCloseEOF",
[STAT_FD_CLOSE_BY_EVENT_ERROR] = "FdCloseError",
[STAT_FD_INSTANT_CLOSE] = "FdCloseInstant",
[STAT_FD_DEFER_CLOSE_IN_QUEUE] = "FdCloseDeferInQ",
[STAT_FD_DEFER_CLOSE_SUCCESS] = "FdCloseDeferSuc",
[STAT_STREAM_CREATE] = "StreamCreate",
[STAT_STREAM_DESTROY] = "StreamDestroy",
[STAT_STREAM_TCP_PLAIN] = "StreamTCPPlain",
[STAT_STREAM_TCP_SSL] = "StreamTCPSSL",
[TFE_STAT_MAX] = NULL
};
int tfe_stat_init(struct tfe_proxy * proxy, const char * profile)
{
const char* fieldstat_output="./tfe.fieldstat";
const char* app_name="tfe3a";
int value=0, i=0;
screen_stat_handle_t fs_handle=NULL;
fs_handle=FS_create_handle();
FS_set_para(fs_handle, OUTPUT_DEVICE, fieldstat_output, strlen(fieldstat_output)+1);
value=1;
static const char * fieldstat_output = "./tfe.fieldstat";
static const char * app_name = "tfe3a";
int value = 0, i = 0;
screen_stat_handle_t fs_handle = NULL;
/* TODO: Read Stat-D server and port from profile */
fs_handle = FS_create_handle();
FS_set_para(fs_handle, OUTPUT_DEVICE, fieldstat_output, (int)strlen(fieldstat_output) + 1);
FS_set_para(fs_handle, APP_NAME, app_name, (int)strlen(app_name) + 1);
value = 1;
FS_set_para(fs_handle, PRINT_MODE, &value, sizeof(value));
value=0;
value = 0;
FS_set_para(fs_handle, CREATE_THREAD, &value, sizeof(value));
FS_set_para(fs_handle, APP_NAME, app_name, strlen(app_name)+1);
const char* spec[TFE_STAT_MAX];
spec[STAT_SIGPIPE]="sigpipe";
for(i=0;i<TFE_STAT_MAX; i++)
for (i = 0; i < TFE_STAT_MAX; i++)
{
proxy->fs_id[i]=FS_register(fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT,spec[i]);
}
proxy->fs_id[i] = FS_register(fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, __str_stat_spec_map[i]);
}
FS_start(fs_handle);
proxy->fs_handle=fs_handle;
proxy->fs_handle = fs_handle;
return 0;
}
int main(int argc, char *argv[])
int main(int argc, char * argv[])
{
const char* main_profile="./conf/tfe.conf";
const char * main_profile = "./conf/tfe.conf";
g_default_logger = MESA_create_runtime_log_handle("log/tfe.log", RLOG_LV_DEBUG);
if (unlikely(g_default_logger == NULL))
@@ -290,7 +309,7 @@ int main(int argc, char *argv[])
future_promise_library_init();
tango_cache_global_init();
/* PROXY INSTANCE */
g_default_proxy = ALLOC(struct tfe_proxy, 1);
assert(g_default_proxy);
@@ -301,12 +320,12 @@ int main(int argc, char *argv[])
/* PERFOMANCE MONITOR */
tfe_stat_init(g_default_proxy, main_profile);
/* LOGGER */
g_default_proxy->logger = g_default_logger;
/* adds locking, only required if accessed from separate threads */
evthread_use_pthreads();
/* adds locking, only required if accessed from separate threads */
evthread_use_pthreads();
/* MAIN THREAD EVBASE */
g_default_proxy->evbase = event_base_new();
@@ -317,23 +336,22 @@ int main(int argc, char *argv[])
CHECK_OR_EXIT(g_default_proxy->gcev, "Failed at creating GC event. Exit. ");
/* SSL INIT */
g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl",
g_default_proxy->evbase, g_default_logger);
g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl", g_default_proxy->evbase, g_default_logger);
CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit.");
for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++)
{
g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy);
CHECK_OR_EXIT( g_default_proxy->sev[i], "Failed at create signal event. Exit.");
evsignal_add(g_default_proxy->sev[i], NULL);
}
for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++)
{
g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy);
CHECK_OR_EXIT(g_default_proxy->sev[i], "Failed at create signal event. Exit.");
evsignal_add(g_default_proxy->sev[i], NULL);
}
struct timeval gc_delay = {2, 0};
evtimer_add(g_default_proxy->gcev , &gc_delay);
evtimer_add(g_default_proxy->gcev, &gc_delay);
/* WORKER THREAD CTX Create */
tfe_proxy_work_thread_create_ctx(g_default_proxy);
/* ACCEPTOR INIT */
g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger);
CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. ");
@@ -341,17 +359,17 @@ int main(int argc, char *argv[])
/* PLUGIN INIT */
unsigned int plugin_iterator = 0;
for(struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator);
plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator))
for (struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator);
plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator))
{
ret = plugin_iter->on_init(g_default_proxy);
CHECK_OR_EXIT(ret >= 0, "Plugin %s init failed. Exit. ", plugin_iter->symbol);
TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol);
}
ret=tfe_proxy_work_thread_run(g_default_proxy);
CHECK_OR_EXIT(ret==0, "Failed at creating thread. Exit.");
ret = tfe_proxy_work_thread_run(g_default_proxy);
CHECK_OR_EXIT(ret == 0, "Failed at creating thread. Exit.");
TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. ");
event_base_dispatch(g_default_proxy->evbase);
@@ -370,7 +388,7 @@ unsigned int tfe_proxy_get_work_thread_count(void)
struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id)
{
assert(thread_id<g_default_proxy->nr_work_threads);
assert(thread_id < g_default_proxy->nr_work_threads);
return g_default_proxy->work_threads[thread_id]->evbase;
}
struct event_base * tfe_proxy_get_gc_evbase(void)