diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index c059c64..937b0d3 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -94,8 +94,7 @@ do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); abo #define ATOMIC_READ(x) __atomic_fetch_add(x,0,__ATOMIC_RELAXED) #define ATOMIC_ADD(x, y) __atomic_fetch_add(x,y,__ATOMIC_RELAXED) #define ATOMIC_ZERO(x) __atomic_fetch_and(x,0,__ATOMIC_RELAXED) - - +#define ATOMIC_SET(x,y) __atomic_store_n(x,y,__ATOMIC_RELAXED) #ifndef MAX #define MAX(a, b) (((a) > (b)) ? (a) : (b)) #endif diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 9868353..00371ca 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -17,7 +17,7 @@ struct tfe_thread_ctx struct event_base * evbase; struct evdns_base* dnsbase; struct evhttp_connection* evhttp; - unsigned char running; + time_t lastime; unsigned int nr_modules; const struct tfe_plugin * modules; diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index 9c19108..e0978fc 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -100,6 +100,7 @@ struct tfe_proxy void * fs_handle; unsigned int nr_work_threads; struct tfe_thread_ctx * work_threads[TFE_THREAD_MAX]; + int make_work_thread_sleep; unsigned int nr_modules; struct tfe_plugin * modules; diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 4f1dc2e..0da98c0 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -57,7 +57,7 @@ extern struct ssl_policy_enforcer* ssl_policy_enforcer_create(void* logger); extern enum ssl_stream_action ssl_policy_enforce(struct ssl_stream *upstream, void* u_para); -static int signals[] = {SIGHUP, SIGPIPE, SIGUSR1}; +static int signals[] = {SIGHUP, SIGPIPE, SIGUSR1, SIGUSR2}; /* Global Resource */ void * g_default_logger = NULL; @@ -228,8 +228,18 @@ void tfe_proxy_free(tfe_proxy * ctx) static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) { - //printf("%s alive\n",__FUNCTION__); - return; + struct tfe_thread_ctx *ctx = (struct tfe_thread_ctx *)arg; + struct timespec now; + + clock_gettime(CLOCK_MONOTONIC, &now); + ATOMIC_SET(&(ctx->lastime), now.tv_sec); + TFE_LOG_DEBUG(g_default_logger, "Worker thread %d Update time %lds", ctx->thread_id, now.tv_sec); + + while (ATOMIC_READ(&(ctx->proxy->make_work_thread_sleep)) > 0) + { + TFE_LOG_ERROR(g_default_logger, "recv SIGUSR1, make worker thread %d sleep", ctx->thread_id); + sleep(1); + } } static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) @@ -243,7 +253,16 @@ static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) TFE_LOG_ERROR(ctx->logger, "recv SIGHUP, reload zlog.conf"); MESA_handle_runtime_log_reconstruction(NULL); break; - case SIGUSR1: break; + case SIGUSR1: + // enable work thread sleep + TFE_LOG_ERROR(ctx->logger, "recv SIGUSR1, make worker thread sleep"); + ATOMIC_SET(&(ctx->make_work_thread_sleep), 1); + break; + case SIGUSR2: + // disable work thread sleep + TFE_LOG_ERROR(ctx->logger, "recv SIGUSR2, wake worker thread from sleep"); + ATOMIC_ZERO(&(ctx->make_work_thread_sleep)); + break; case SIGPIPE: TFE_PROXY_STAT_INCREASE(STAT_SIGPIPE, 1); TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n"); @@ -269,9 +288,9 @@ static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg) static void * tfe_work_thread(void * arg) { struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; - struct timeval timer_delay = {60, 0}; + struct timeval timer_delay = {2, 0}; - struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL); + struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, ctx); if (unlikely(ev == NULL)) { TFE_LOG_ERROR(g_default_logger, "Failed at creating dummy event for thread %u", ctx->thread_id); @@ -279,7 +298,6 @@ static void * tfe_work_thread(void * arg) } evtimer_add(ev, &timer_delay); - ctx->running = 1; __currect_thread_id = ctx->thread_id; char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id); diff --git a/platform/src/watchdog_kni.cpp b/platform/src/watchdog_kni.cpp index 1769b16..f0300b6 100644 --- a/platform/src/watchdog_kni.cpp +++ b/platform/src/watchdog_kni.cpp @@ -7,6 +7,8 @@ #include #include +#include +#include #include #include #include @@ -205,10 +207,32 @@ void * watchdog_kni_thread(void * arg) DIE("watchdog thread is terminated."); } +static void health_check_for_thread_worker(evutil_socket_t fd, short what, void * arg) +{ + struct tfe_proxy *proxy = (struct tfe_proxy *)arg; + struct timespec now; + time_t temp; + + clock_gettime(CLOCK_MONOTONIC, &now); + + for (unsigned int i = 0; i < proxy->nr_work_threads; i++) + { + temp = ATOMIC_READ(&(proxy->work_threads[i]->lastime)); + if (temp + 2 + 2 + 1 < now.tv_sec) + { + TFE_LOG_ERROR(g_default_logger, "Watchdog thread nowtime %ld, Worker thread %d lastime %ld, Worker thread no reply, Exit ! ! ! ", now.tv_sec, proxy->work_threads[i]->thread_id, temp); + exit(-1); + } + TFE_LOG_DEBUG(g_default_logger, "Watchdog thread nowtime %ld, Worker thread %d lastime %lds ", now.tv_sec, proxy->work_threads[i]->thread_id, temp); + } +} + struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * profile, void * logger) { struct watchdog_kni * __ctx = ALLOC(struct watchdog_kni, 1); int ret = 0; + struct event *ev = NULL; + struct timeval timer_delay = {2, 0}; __ctx->proxy = proxy; __ctx->profile = profile; @@ -232,7 +256,7 @@ struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * if (ret < 0) { - TFE_LOG_ERROR(logger, "failed at parsing kni's address, in file %s, section %s, entry %s: %s", + TFE_LOG_ERROR(__ctx->logger, "failed at parsing kni's address, in file %s, section %s, entry %s: %s", profile, "kni", "ip", str_kni_ip); goto __errout; } @@ -245,12 +269,22 @@ struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * __ctx->ev_base = event_base_new(); if (!__ctx->ev_base) { - TFE_LOG_ERROR(logger, "failed at watchdog event_base_new(): %s", strerror(errno)); + TFE_LOG_ERROR(__ctx->logger, "failed at watchdog event_base_new(): %s", strerror(errno)); /* after log, reset errno */ errno = 0; goto __errout; } + ev = event_new(__ctx->ev_base, -1, EV_PERSIST, health_check_for_thread_worker, proxy); + if (unlikely(ev == NULL)) + { + TFE_LOG_ERROR(__ctx->logger, "Failed at creating health check event for worker thread"); + /* after log, reset errno */ + errno = 0; + goto __errout; + } + evtimer_add(ev, &timer_delay); + watchdog_kni_reset(__ctx); watchdog_kni_try_connect(__ctx);