Close #169 增加Watchdog连接重试机制,避免KNI退出后TFE也随之退出。
* 原实现在KNI退出后,即退出TFE进程,导致TFE重启后需要过多的时间完成初始化,影响业务的恢复速度; * 现改为KNI退出后,不退出TFE进程并尝试重试建立与TFE的保活连接。
This commit is contained in:
@@ -5,11 +5,18 @@
|
||||
#include <event2/event.h>
|
||||
#include <event2/buffer.h>
|
||||
#include <unistd.h>
|
||||
#include <assert.h>
|
||||
|
||||
#include <tfe_utils.h>
|
||||
#include <watchdog_kni.h>
|
||||
#include <MESA/MESA_prof_load.h>
|
||||
|
||||
enum watchdog_kni_conn_state
|
||||
{
|
||||
CONN_STATE_DISCONNECT = 0,
|
||||
CONN_STATE_CONNECTING = 1,
|
||||
CONN_STATE_CONNECTED = 2
|
||||
};
|
||||
|
||||
struct watchdog_kni
|
||||
{
|
||||
@@ -18,16 +25,21 @@ struct watchdog_kni
|
||||
void * logger;
|
||||
|
||||
unsigned int enable;
|
||||
enum watchdog_kni_conn_state conn_state;
|
||||
|
||||
struct sockaddr_in sk_kni_watchdog;
|
||||
int fd;
|
||||
struct event_base * ev_base;
|
||||
struct bufferevent * bev;
|
||||
struct event * ev_retry;
|
||||
pthread_t pthread;
|
||||
unsigned int retry_times;
|
||||
};
|
||||
|
||||
static int watchdog_kni_fd_create()
|
||||
static void watchdog_kni_eventcb(struct bufferevent *bev, short what, void *ctx);
|
||||
static void watchdog_kni_readcb(struct bufferevent *bev, void *ctx);
|
||||
|
||||
static int watchdog_kni_fd_make_keepalive(int fd)
|
||||
{
|
||||
int fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
unsigned int so_keepalive = 1;
|
||||
unsigned int tcp_keepcnt = 1;
|
||||
unsigned int tcp_keepintvl = 1;
|
||||
@@ -50,62 +62,128 @@ static int watchdog_kni_fd_create()
|
||||
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, (const void *) &tcp_keepintvl, sizeof(int)) == -1)
|
||||
{
|
||||
TFE_LOG_ERROR(g_default_logger, "watchdog fd setup setsockopt(TCP_KEEPINTVL, %d) failed: %s",
|
||||
tcp_keepintvl, strerror(errno));
|
||||
tcp_keepintvl, strerror(errno)); goto errout;
|
||||
}
|
||||
|
||||
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, (const void *) &tcp_keepidle, sizeof(int)) == -1)
|
||||
{
|
||||
TFE_LOG_ERROR(g_default_logger, "watchdog fd setup setsockopt(TCP_KEEPIDLE, %d) failed: %s",
|
||||
tcp_keepidle, strerror(errno));
|
||||
}
|
||||
{
|
||||
TFE_LOG_ERROR(g_default_logger, "watchdog fd setup setsockopt(TCP_KEEPIDLE, %d) failed: %s",
|
||||
tcp_keepidle, strerror(errno)); goto errout;
|
||||
}
|
||||
|
||||
return fd;
|
||||
return 0;
|
||||
|
||||
errout:
|
||||
if(fd > 0) close(fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void watchdog_kni_readcb(struct bufferevent *bev, void *ctx)
|
||||
{
|
||||
struct evbuffer * evbuffer_in = bufferevent_get_input(bev);
|
||||
evbuffer_drain(evbuffer_in, evbuffer_get_length(evbuffer_in));
|
||||
struct evbuffer * evbuffer_in = bufferevent_get_input(bev);
|
||||
evbuffer_drain(evbuffer_in, evbuffer_get_length(evbuffer_in));
|
||||
}
|
||||
|
||||
static void watchdog_kni_try_connect(struct watchdog_kni * __ctx)
|
||||
{
|
||||
assert(__ctx->conn_state == CONN_STATE_DISCONNECT);
|
||||
bufferevent_socket_connect(__ctx->bev, (const sockaddr *) &__ctx->sk_kni_watchdog, sizeof(__ctx->sk_kni_watchdog));
|
||||
bufferevent_setcb(__ctx->bev, watchdog_kni_readcb, NULL, watchdog_kni_eventcb, __ctx);
|
||||
bufferevent_enable(__ctx->bev, EV_READ | EV_WRITE);
|
||||
|
||||
char str_kni_addr[INET_ADDRSTRLEN] = {};
|
||||
uint16_t kni_port = ntohs(__ctx->sk_kni_watchdog.sin_port);
|
||||
inet_ntop(AF_INET, &__ctx->sk_kni_watchdog.sin_addr, str_kni_addr, sizeof(str_kni_addr));
|
||||
|
||||
TFE_LOG_INFO(g_default_logger, "watchdog connecting(retry times: %d) to %s:%u. ",
|
||||
__ctx->retry_times, str_kni_addr, kni_port);
|
||||
}
|
||||
|
||||
static void watchdog_kni_reset(struct watchdog_kni * __ctx)
|
||||
{
|
||||
if (__ctx->bev)
|
||||
{
|
||||
bufferevent_disable(__ctx->bev, EV_READ | EV_WRITE);
|
||||
bufferevent_free(__ctx->bev);
|
||||
}
|
||||
|
||||
__ctx->bev = bufferevent_socket_new(__ctx->ev_base, -1, BEV_OPT_CLOSE_ON_FREE);
|
||||
if (unlikely(__ctx->bev == NULL))
|
||||
{
|
||||
DIE("Failed at bufferevent_socket_new(), Exit.");
|
||||
return;
|
||||
}
|
||||
|
||||
__ctx->conn_state = CONN_STATE_DISCONNECT;
|
||||
}
|
||||
|
||||
static void watchdog_kni_retry_cb(evutil_socket_t fd, short what, void *ctx)
|
||||
{
|
||||
struct watchdog_kni * __ctx = (struct watchdog_kni *) ctx;
|
||||
watchdog_kni_try_connect(__ctx);
|
||||
}
|
||||
|
||||
static void watchdog_kni_eventcb(struct bufferevent *bev, short what, void *ctx)
|
||||
{
|
||||
struct watchdog_kni * __ctx = (struct watchdog_kni *)ctx;
|
||||
if (what & BEV_EVENT_CONNECTED)
|
||||
{
|
||||
TFE_LOG_INFO(__ctx->bev, "KNI watchdog connection is established.");
|
||||
return;
|
||||
}
|
||||
struct watchdog_kni * __ctx = (struct watchdog_kni *)ctx;
|
||||
if (what & BEV_EVENT_CONNECTED)
|
||||
{
|
||||
TFE_LOG_INFO(__ctx->bev, "KNI watchdog connection is established.");
|
||||
__ctx->conn_state = CONN_STATE_CONNECTED;
|
||||
__ctx->retry_times = 0;
|
||||
|
||||
if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
|
||||
{
|
||||
if (what & BEV_EVENT_EOF)
|
||||
{
|
||||
TFE_LOG_ERROR(__ctx->logger, "KNI watchdog connection broken, KNI is shutdown, EXIT.");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
int fd = bufferevent_getfd(bev);
|
||||
watchdog_kni_fd_make_keepalive(fd);
|
||||
assert(fd >= 0);
|
||||
|
||||
if (what & BEV_EVENT_ERROR)
|
||||
{
|
||||
if (errno)
|
||||
{
|
||||
TFE_LOG_ERROR(__ctx->logger, "KNI watchdog connection broken: %s, EXIT.", strerror(errno));
|
||||
}
|
||||
else
|
||||
{
|
||||
TFE_LOG_ERROR(__ctx->logger, "KNI watchdog connection broken: Unknown, EXIT.");
|
||||
}
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
return;
|
||||
if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT))
|
||||
{
|
||||
if (what & BEV_EVENT_TIMEOUT)
|
||||
{
|
||||
TFE_LOG_ERROR(__ctx->logger, "KNI watchdog connection timeout, KNI is shutdown.");
|
||||
goto retry;
|
||||
}
|
||||
|
||||
if (what & BEV_EVENT_EOF)
|
||||
{
|
||||
TFE_LOG_ERROR(__ctx->logger, "KNI watchdog connection broken, KNI is shutdown.");
|
||||
goto retry;
|
||||
}
|
||||
|
||||
if (what & BEV_EVENT_ERROR)
|
||||
{
|
||||
TFE_LOG_ERROR(__ctx->logger, "KNI watchdog connection broken: %s.", strerror(errno));
|
||||
goto retry;
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
retry:
|
||||
watchdog_kni_reset(__ctx);
|
||||
struct timeval timeval { .tv_sec = 2, .tv_usec = 0};
|
||||
|
||||
/* Free the old retry event and alloc a new retry event */
|
||||
if (__ctx->ev_retry)
|
||||
{
|
||||
event_free(__ctx->ev_retry);
|
||||
__ctx->ev_retry = NULL;
|
||||
}
|
||||
|
||||
__ctx->ev_retry = event_new(__ctx->ev_base, -1, 0, watchdog_kni_retry_cb, __ctx);
|
||||
if (unlikely(__ctx->ev_retry == NULL))
|
||||
{
|
||||
DIE("Failed at event_new() for retry event.");
|
||||
return;
|
||||
}
|
||||
|
||||
event_add(__ctx->ev_retry, &timeval);
|
||||
__ctx->retry_times++;
|
||||
}
|
||||
|
||||
|
||||
void * watchdog_kni_thread(void * arg)
|
||||
{
|
||||
struct watchdog_kni * __ctx = (struct watchdog_kni *)arg;
|
||||
@@ -115,86 +193,62 @@ void * watchdog_kni_thread(void * arg)
|
||||
|
||||
struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * profile, void * logger)
|
||||
{
|
||||
struct watchdog_kni * __ctx = ALLOC(struct watchdog_kni, 1);
|
||||
int ret = 0;
|
||||
struct watchdog_kni * __ctx = ALLOC(struct watchdog_kni, 1);
|
||||
int ret = 0;
|
||||
|
||||
__ctx->proxy = proxy;
|
||||
__ctx->profile = profile;
|
||||
__ctx->logger = logger;
|
||||
__ctx->proxy = proxy;
|
||||
__ctx->profile = profile;
|
||||
__ctx->logger = logger;
|
||||
|
||||
unsigned int en_watchdog = 0;
|
||||
MESA_load_profile_uint_def(profile, "kni", "watchdog_switch", &en_watchdog, 0);
|
||||
__ctx->enable = en_watchdog;
|
||||
unsigned int en_watchdog = 0;
|
||||
MESA_load_profile_uint_def(profile, "kni", "watchdog_switch", &en_watchdog, 0);
|
||||
__ctx->enable = en_watchdog;
|
||||
|
||||
if (!__ctx->enable)
|
||||
{
|
||||
return __ctx;
|
||||
}
|
||||
if (!__ctx->enable)
|
||||
{
|
||||
return __ctx;
|
||||
}
|
||||
|
||||
char str_kni_ip[TFE_STRING_MAX] = {0};
|
||||
MESA_load_profile_string_def(profile, "kni", "ip", str_kni_ip, sizeof(str_kni_ip), "127.0.0.1");
|
||||
char str_kni_ip[TFE_STRING_MAX] = {0};
|
||||
MESA_load_profile_string_def(profile, "kni", "ip", str_kni_ip, sizeof(str_kni_ip), "127.0.0.1");
|
||||
|
||||
struct sockaddr_in sk_kni_address{};
|
||||
sk_kni_address.sin_family = AF_INET;
|
||||
ret = inet_pton(AF_INET, str_kni_ip, &sk_kni_address.sin_addr);
|
||||
struct sockaddr_in sk_kni_address{};
|
||||
sk_kni_address.sin_family = AF_INET;
|
||||
ret = inet_pton(AF_INET, str_kni_ip, &sk_kni_address.sin_addr);
|
||||
|
||||
if (ret < 0)
|
||||
{
|
||||
TFE_LOG_ERROR(logger, "failed at parsing kni's address, in file %s, section %s, entry %s: %s",
|
||||
profile, "kni", "ip", str_kni_ip); goto __errout;
|
||||
}
|
||||
if (ret < 0)
|
||||
{
|
||||
TFE_LOG_ERROR(logger, "failed at parsing kni's address, in file %s, section %s, entry %s: %s",
|
||||
profile, "kni", "ip", str_kni_ip);
|
||||
goto __errout;
|
||||
}
|
||||
|
||||
unsigned int kni_port;
|
||||
MESA_load_profile_uint_def(profile, "kni", "watchdog_port", &kni_port, 2476);
|
||||
unsigned int kni_port;
|
||||
MESA_load_profile_uint_def(profile, "kni", "watchdog_port", &kni_port, 2476);
|
||||
|
||||
__ctx->sk_kni_watchdog = sk_kni_address;
|
||||
__ctx->sk_kni_watchdog.sin_port = htons(kni_port);
|
||||
__ctx->sk_kni_watchdog = sk_kni_address;
|
||||
__ctx->sk_kni_watchdog.sin_port = htons(kni_port);
|
||||
__ctx->ev_base = event_base_new();
|
||||
if (!__ctx->ev_base)
|
||||
{
|
||||
TFE_LOG_ERROR(logger, "failed at watchdog event_base_new(): %s", strerror(errno));
|
||||
goto __errout;
|
||||
}
|
||||
|
||||
/* Prepare watchdog fd */
|
||||
__ctx->fd = watchdog_kni_fd_create();
|
||||
if (__ctx->fd < 0)
|
||||
{
|
||||
TFE_LOG_ERROR(logger, "failed at creating watchdog fd : %s", strerror(errno));
|
||||
goto __errout;
|
||||
}
|
||||
watchdog_kni_reset(__ctx);
|
||||
watchdog_kni_try_connect(__ctx);
|
||||
|
||||
__ctx->ev_base = event_base_new();
|
||||
if (!__ctx->ev_base)
|
||||
{
|
||||
TFE_LOG_ERROR(logger, "failed at watchdog event_base_new(): %s", strerror(errno));
|
||||
goto __errout;
|
||||
}
|
||||
/* Create a thread to dispatch ctx->evbase */
|
||||
ret = pthread_create(&__ctx->pthread, NULL, watchdog_kni_thread, (void *) __ctx);
|
||||
if (unlikely(ret < 0))
|
||||
{
|
||||
TFE_LOG_ERROR(__ctx->logger, "Failed at creating watchdog thread: %s", strerror(errno));
|
||||
goto __errout;
|
||||
}
|
||||
|
||||
__ctx->bev = bufferevent_socket_new(__ctx->ev_base, __ctx->fd, BEV_OPT_CLOSE_ON_FREE);
|
||||
if (!__ctx->bev)
|
||||
{
|
||||
TFE_LOG_ERROR(logger, "failed at watchdog bufferevent_socket_new(): %s", strerror(errno));
|
||||
goto __errout;
|
||||
}
|
||||
|
||||
ret = bufferevent_socket_connect(__ctx->bev, (const sockaddr *)&__ctx->sk_kni_watchdog,
|
||||
sizeof(__ctx->sk_kni_watchdog));
|
||||
|
||||
if (ret < 0)
|
||||
{
|
||||
TFE_LOG_ERROR(logger, "failed at watchdog connect(): %s", strerror(errno));
|
||||
goto __errout;
|
||||
}
|
||||
|
||||
bufferevent_setcb(__ctx->bev, watchdog_kni_readcb, NULL, watchdog_kni_eventcb, __ctx);
|
||||
bufferevent_enable(__ctx->bev, EV_READ | EV_WRITE);
|
||||
|
||||
/* Create a thread to dispatch ctx->evbase */
|
||||
ret = pthread_create(&__ctx->pthread, NULL, watchdog_kni_thread, (void *) __ctx);
|
||||
if (unlikely(ret < 0))
|
||||
{
|
||||
TFE_LOG_ERROR(__ctx->logger, "Failed at creating watchdog thread: %s", strerror(errno));
|
||||
goto __errout;
|
||||
}
|
||||
|
||||
TFE_LOG_INFO(__ctx->logger, "KNI watchdong init successfully.");
|
||||
return __ctx;
|
||||
TFE_LOG_INFO(__ctx->logger, "KNI watchdog module init successfully.");
|
||||
return __ctx;
|
||||
|
||||
__errout:
|
||||
return NULL;
|
||||
}
|
||||
return NULL;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user