TSG-5978 TFE 与 KNI 保活的 watchdog 线程增加对 tfe worker 线程健康状态检测的功能
This commit is contained in:
@@ -94,8 +94,7 @@ do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); abo
|
|||||||
#define ATOMIC_READ(x) __atomic_fetch_add(x,0,__ATOMIC_RELAXED)
|
#define ATOMIC_READ(x) __atomic_fetch_add(x,0,__ATOMIC_RELAXED)
|
||||||
#define ATOMIC_ADD(x, y) __atomic_fetch_add(x,y,__ATOMIC_RELAXED)
|
#define ATOMIC_ADD(x, y) __atomic_fetch_add(x,y,__ATOMIC_RELAXED)
|
||||||
#define ATOMIC_ZERO(x) __atomic_fetch_and(x,0,__ATOMIC_RELAXED)
|
#define ATOMIC_ZERO(x) __atomic_fetch_and(x,0,__ATOMIC_RELAXED)
|
||||||
|
#define ATOMIC_SET(x,y) __atomic_store_n(x,y,__ATOMIC_RELAXED)
|
||||||
|
|
||||||
#ifndef MAX
|
#ifndef MAX
|
||||||
#define MAX(a, b) (((a) > (b)) ? (a) : (b))
|
#define MAX(a, b) (((a) > (b)) ? (a) : (b))
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ struct tfe_thread_ctx
|
|||||||
struct event_base * evbase;
|
struct event_base * evbase;
|
||||||
struct evdns_base* dnsbase;
|
struct evdns_base* dnsbase;
|
||||||
struct evhttp_connection* evhttp;
|
struct evhttp_connection* evhttp;
|
||||||
unsigned char running;
|
time_t lastime;
|
||||||
|
|
||||||
unsigned int nr_modules;
|
unsigned int nr_modules;
|
||||||
const struct tfe_plugin * modules;
|
const struct tfe_plugin * modules;
|
||||||
|
|||||||
@@ -100,6 +100,7 @@ struct tfe_proxy
|
|||||||
void * fs_handle;
|
void * fs_handle;
|
||||||
unsigned int nr_work_threads;
|
unsigned int nr_work_threads;
|
||||||
struct tfe_thread_ctx * work_threads[TFE_THREAD_MAX];
|
struct tfe_thread_ctx * work_threads[TFE_THREAD_MAX];
|
||||||
|
int make_work_thread_sleep;
|
||||||
|
|
||||||
unsigned int nr_modules;
|
unsigned int nr_modules;
|
||||||
struct tfe_plugin * modules;
|
struct tfe_plugin * modules;
|
||||||
|
|||||||
@@ -57,7 +57,7 @@
|
|||||||
extern struct ssl_policy_enforcer* ssl_policy_enforcer_create(void* logger);
|
extern struct ssl_policy_enforcer* ssl_policy_enforcer_create(void* logger);
|
||||||
extern enum ssl_stream_action ssl_policy_enforce(struct ssl_stream *upstream, void* u_para);
|
extern enum ssl_stream_action ssl_policy_enforce(struct ssl_stream *upstream, void* u_para);
|
||||||
|
|
||||||
static int signals[] = {SIGHUP, SIGPIPE, SIGUSR1};
|
static int signals[] = {SIGHUP, SIGPIPE, SIGUSR1, SIGUSR2};
|
||||||
|
|
||||||
/* Global Resource */
|
/* Global Resource */
|
||||||
void * g_default_logger = NULL;
|
void * g_default_logger = NULL;
|
||||||
@@ -228,8 +228,18 @@ void tfe_proxy_free(tfe_proxy * ctx)
|
|||||||
|
|
||||||
static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
|
static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
|
||||||
{
|
{
|
||||||
//printf("%s alive\n",__FUNCTION__);
|
struct tfe_thread_ctx *ctx = (struct tfe_thread_ctx *)arg;
|
||||||
return;
|
struct timespec now;
|
||||||
|
|
||||||
|
clock_gettime(CLOCK_MONOTONIC, &now);
|
||||||
|
ATOMIC_SET(&(ctx->lastime), now.tv_sec);
|
||||||
|
TFE_LOG_DEBUG(g_default_logger, "Worker thread %d Update time %lds", ctx->thread_id, now.tv_sec);
|
||||||
|
|
||||||
|
while (ATOMIC_READ(&(ctx->proxy->make_work_thread_sleep)) > 0)
|
||||||
|
{
|
||||||
|
TFE_LOG_ERROR(g_default_logger, "recv SIGUSR1, make worker thread %d sleep", ctx->thread_id);
|
||||||
|
sleep(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg)
|
static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg)
|
||||||
@@ -243,7 +253,16 @@ static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg)
|
|||||||
TFE_LOG_ERROR(ctx->logger, "recv SIGHUP, reload zlog.conf");
|
TFE_LOG_ERROR(ctx->logger, "recv SIGHUP, reload zlog.conf");
|
||||||
MESA_handle_runtime_log_reconstruction(NULL);
|
MESA_handle_runtime_log_reconstruction(NULL);
|
||||||
break;
|
break;
|
||||||
case SIGUSR1: break;
|
case SIGUSR1:
|
||||||
|
// enable work thread sleep
|
||||||
|
TFE_LOG_ERROR(ctx->logger, "recv SIGUSR1, make worker thread sleep");
|
||||||
|
ATOMIC_SET(&(ctx->make_work_thread_sleep), 1);
|
||||||
|
break;
|
||||||
|
case SIGUSR2:
|
||||||
|
// disable work thread sleep
|
||||||
|
TFE_LOG_ERROR(ctx->logger, "recv SIGUSR2, wake worker thread from sleep");
|
||||||
|
ATOMIC_ZERO(&(ctx->make_work_thread_sleep));
|
||||||
|
break;
|
||||||
case SIGPIPE:
|
case SIGPIPE:
|
||||||
TFE_PROXY_STAT_INCREASE(STAT_SIGPIPE, 1);
|
TFE_PROXY_STAT_INCREASE(STAT_SIGPIPE, 1);
|
||||||
TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n");
|
TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n");
|
||||||
@@ -269,9 +288,9 @@ static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg)
|
|||||||
static void * tfe_work_thread(void * arg)
|
static void * tfe_work_thread(void * arg)
|
||||||
{
|
{
|
||||||
struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg;
|
struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg;
|
||||||
struct timeval timer_delay = {60, 0};
|
struct timeval timer_delay = {2, 0};
|
||||||
|
|
||||||
struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL);
|
struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, ctx);
|
||||||
if (unlikely(ev == NULL))
|
if (unlikely(ev == NULL))
|
||||||
{
|
{
|
||||||
TFE_LOG_ERROR(g_default_logger, "Failed at creating dummy event for thread %u", ctx->thread_id);
|
TFE_LOG_ERROR(g_default_logger, "Failed at creating dummy event for thread %u", ctx->thread_id);
|
||||||
@@ -279,7 +298,6 @@ static void * tfe_work_thread(void * arg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
evtimer_add(ev, &timer_delay);
|
evtimer_add(ev, &timer_delay);
|
||||||
ctx->running = 1;
|
|
||||||
__currect_thread_id = ctx->thread_id;
|
__currect_thread_id = ctx->thread_id;
|
||||||
char thread_name[16];
|
char thread_name[16];
|
||||||
snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id);
|
snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id);
|
||||||
|
|||||||
@@ -7,6 +7,8 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
|
||||||
|
#include <proxy.h>
|
||||||
|
#include <platform.h>
|
||||||
#include <tfe_utils.h>
|
#include <tfe_utils.h>
|
||||||
#include <watchdog_kni.h>
|
#include <watchdog_kni.h>
|
||||||
#include <MESA/MESA_prof_load.h>
|
#include <MESA/MESA_prof_load.h>
|
||||||
@@ -205,10 +207,32 @@ void * watchdog_kni_thread(void * arg)
|
|||||||
DIE("watchdog thread is terminated.");
|
DIE("watchdog thread is terminated.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void health_check_for_thread_worker(evutil_socket_t fd, short what, void * arg)
|
||||||
|
{
|
||||||
|
struct tfe_proxy *proxy = (struct tfe_proxy *)arg;
|
||||||
|
struct timespec now;
|
||||||
|
time_t temp;
|
||||||
|
|
||||||
|
clock_gettime(CLOCK_MONOTONIC, &now);
|
||||||
|
|
||||||
|
for (unsigned int i = 0; i < proxy->nr_work_threads; i++)
|
||||||
|
{
|
||||||
|
temp = ATOMIC_READ(&(proxy->work_threads[i]->lastime));
|
||||||
|
if (temp + 2 + 2 + 1 < now.tv_sec)
|
||||||
|
{
|
||||||
|
TFE_LOG_ERROR(g_default_logger, "Watchdog thread nowtime %ld, Worker thread %d lastime %ld, Worker thread no reply, Exit ! ! ! ", now.tv_sec, proxy->work_threads[i]->thread_id, temp);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
TFE_LOG_DEBUG(g_default_logger, "Watchdog thread nowtime %ld, Worker thread %d lastime %lds ", now.tv_sec, proxy->work_threads[i]->thread_id, temp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * profile, void * logger)
|
struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * profile, void * logger)
|
||||||
{
|
{
|
||||||
struct watchdog_kni * __ctx = ALLOC(struct watchdog_kni, 1);
|
struct watchdog_kni * __ctx = ALLOC(struct watchdog_kni, 1);
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
struct event *ev = NULL;
|
||||||
|
struct timeval timer_delay = {2, 0};
|
||||||
|
|
||||||
__ctx->proxy = proxy;
|
__ctx->proxy = proxy;
|
||||||
__ctx->profile = profile;
|
__ctx->profile = profile;
|
||||||
@@ -232,7 +256,7 @@ struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char *
|
|||||||
|
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
{
|
{
|
||||||
TFE_LOG_ERROR(logger, "failed at parsing kni's address, in file %s, section %s, entry %s: %s",
|
TFE_LOG_ERROR(__ctx->logger, "failed at parsing kni's address, in file %s, section %s, entry %s: %s",
|
||||||
profile, "kni", "ip", str_kni_ip);
|
profile, "kni", "ip", str_kni_ip);
|
||||||
goto __errout;
|
goto __errout;
|
||||||
}
|
}
|
||||||
@@ -245,12 +269,22 @@ struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char *
|
|||||||
__ctx->ev_base = event_base_new();
|
__ctx->ev_base = event_base_new();
|
||||||
if (!__ctx->ev_base)
|
if (!__ctx->ev_base)
|
||||||
{
|
{
|
||||||
TFE_LOG_ERROR(logger, "failed at watchdog event_base_new(): %s", strerror(errno));
|
TFE_LOG_ERROR(__ctx->logger, "failed at watchdog event_base_new(): %s", strerror(errno));
|
||||||
/* after log, reset errno */
|
/* after log, reset errno */
|
||||||
errno = 0;
|
errno = 0;
|
||||||
goto __errout;
|
goto __errout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ev = event_new(__ctx->ev_base, -1, EV_PERSIST, health_check_for_thread_worker, proxy);
|
||||||
|
if (unlikely(ev == NULL))
|
||||||
|
{
|
||||||
|
TFE_LOG_ERROR(__ctx->logger, "Failed at creating health check event for worker thread");
|
||||||
|
/* after log, reset errno */
|
||||||
|
errno = 0;
|
||||||
|
goto __errout;
|
||||||
|
}
|
||||||
|
evtimer_add(ev, &timer_delay);
|
||||||
|
|
||||||
watchdog_kni_reset(__ctx);
|
watchdog_kni_reset(__ctx);
|
||||||
watchdog_kni_try_connect(__ctx);
|
watchdog_kni_try_connect(__ctx);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user