#include #include #include #include #include #include #include #include #include #include enum watchdog_kni_conn_state { CONN_STATE_DISCONNECT = 0, CONN_STATE_CONNECTING = 1, CONN_STATE_CONNECTED = 2 }; struct watchdog_kni { struct tfe_proxy * proxy; const char * profile; void * logger; unsigned int enable; enum watchdog_kni_conn_state conn_state; struct sockaddr_in sk_kni_watchdog; struct event_base * ev_base; struct bufferevent * bev; struct event * ev_retry; pthread_t pthread; unsigned int retry_times; }; static void watchdog_kni_eventcb(struct bufferevent *bev, short what, void *ctx); static void watchdog_kni_readcb(struct bufferevent *bev, void *ctx); static int watchdog_kni_fd_make_keepalive(int fd) { unsigned int so_keepalive = 1; unsigned int tcp_keepcnt = 1; unsigned int tcp_keepintvl = 1; unsigned int tcp_keepidle = 1; evutil_make_socket_nonblocking(fd); if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (const void *) &so_keepalive, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "watchdog fd setup setsockopt(SO_KEEPALIVE, %d) failed : %s", so_keepalive, strerror(errno)); goto errout; } if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, (const void *) &tcp_keepcnt, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "watchdog fd setup setsockopt(TCP_KEEPCNT, %d) failed: %s", tcp_keepcnt, strerror(errno)); goto errout; } if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, (const void *) &tcp_keepintvl, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "watchdog fd setup setsockopt(TCP_KEEPINTVL, %d) failed: %s", tcp_keepintvl, strerror(errno)); goto errout; } if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, (const void *) &tcp_keepidle, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "watchdog fd setup setsockopt(TCP_KEEPIDLE, %d) failed: %s", tcp_keepidle, strerror(errno)); goto errout; } return 0; errout: return -1; } static void watchdog_kni_readcb(struct bufferevent *bev, void *ctx) { struct evbuffer * evbuffer_in = bufferevent_get_input(bev); evbuffer_drain(evbuffer_in, evbuffer_get_length(evbuffer_in)); } static void watchdog_kni_try_connect(struct watchdog_kni * __ctx) { assert(__ctx->conn_state == CONN_STATE_DISCONNECT); bufferevent_socket_connect(__ctx->bev, (const sockaddr *) &__ctx->sk_kni_watchdog, sizeof(__ctx->sk_kni_watchdog)); bufferevent_setcb(__ctx->bev, watchdog_kni_readcb, NULL, watchdog_kni_eventcb, __ctx); bufferevent_enable(__ctx->bev, EV_READ | EV_WRITE); char str_kni_addr[INET_ADDRSTRLEN] = {}; uint16_t kni_port = ntohs(__ctx->sk_kni_watchdog.sin_port); inet_ntop(AF_INET, &__ctx->sk_kni_watchdog.sin_addr, str_kni_addr, sizeof(str_kni_addr)); TFE_LOG_INFO(g_default_logger, "watchdog connecting(retry times: %d) to %s:%u. ", __ctx->retry_times, str_kni_addr, kni_port); } static void watchdog_kni_reset(struct watchdog_kni * __ctx) { if (__ctx->bev) { bufferevent_disable(__ctx->bev, EV_READ | EV_WRITE); bufferevent_free(__ctx->bev); } __ctx->bev = bufferevent_socket_new(__ctx->ev_base, -1, BEV_OPT_CLOSE_ON_FREE); if (unlikely(__ctx->bev == NULL)) { DIE("Failed at bufferevent_socket_new(), Exit."); return; } __ctx->conn_state = CONN_STATE_DISCONNECT; } static void watchdog_kni_retry_cb(evutil_socket_t fd, short what, void *ctx) { struct watchdog_kni * __ctx = (struct watchdog_kni *) ctx; watchdog_kni_try_connect(__ctx); } static void watchdog_kni_eventcb(struct bufferevent *bev, short what, void *ctx) { struct watchdog_kni * __ctx = (struct watchdog_kni *)ctx; if (what & BEV_EVENT_CONNECTED) { TFE_LOG_INFO(__ctx->bev, "KNI watchdog connection is established."); __ctx->conn_state = CONN_STATE_CONNECTED; __ctx->retry_times = 0; int fd = bufferevent_getfd(bev); watchdog_kni_fd_make_keepalive(fd); assert(fd >= 0); return; } if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { if (what & BEV_EVENT_TIMEOUT) { TFE_LOG_ERROR(__ctx->logger, "KNI watchdog connection timeout, KNI is shutdown."); goto retry; } if (what & BEV_EVENT_EOF) { TFE_LOG_ERROR(__ctx->logger, "KNI watchdog connection broken, KNI is shutdown."); goto retry; } if (what & BEV_EVENT_ERROR) { TFE_LOG_ERROR(__ctx->logger, "KNI watchdog connection broken: %s.", strerror(errno)); goto retry; } } return; retry: watchdog_kni_reset(__ctx); struct timeval timeval { .tv_sec = 2, .tv_usec = 0}; /* Free the old retry event and alloc a new retry event */ if (__ctx->ev_retry) { event_free(__ctx->ev_retry); __ctx->ev_retry = NULL; } __ctx->ev_retry = event_new(__ctx->ev_base, -1, 0, watchdog_kni_retry_cb, __ctx); if (unlikely(__ctx->ev_retry == NULL)) { DIE("Failed at event_new() for retry event."); return; } event_add(__ctx->ev_retry, &timeval); __ctx->retry_times++; } void * watchdog_kni_thread(void * arg) { struct watchdog_kni * __ctx = (struct watchdog_kni *)arg; while(event_base_dispatch(__ctx->ev_base) >= 0) {} DIE("watchdog thread is terminated."); } struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * profile, void * logger) { struct watchdog_kni * __ctx = ALLOC(struct watchdog_kni, 1); int ret = 0; __ctx->proxy = proxy; __ctx->profile = profile; __ctx->logger = logger; unsigned int en_watchdog = 0; MESA_load_profile_uint_def(profile, "kni", "watchdog_switch", &en_watchdog, 0); __ctx->enable = en_watchdog; if (!__ctx->enable) { return __ctx; } char str_kni_ip[TFE_STRING_MAX] = {0}; MESA_load_profile_string_def(profile, "kni", "ip", str_kni_ip, sizeof(str_kni_ip), "127.0.0.1"); struct sockaddr_in sk_kni_address{}; sk_kni_address.sin_family = AF_INET; ret = inet_pton(AF_INET, str_kni_ip, &sk_kni_address.sin_addr); if (ret < 0) { TFE_LOG_ERROR(logger, "failed at parsing kni's address, in file %s, section %s, entry %s: %s", profile, "kni", "ip", str_kni_ip); goto __errout; } unsigned int kni_port; MESA_load_profile_uint_def(profile, "kni", "watchdog_port", &kni_port, 2476); __ctx->sk_kni_watchdog = sk_kni_address; __ctx->sk_kni_watchdog.sin_port = htons(kni_port); __ctx->ev_base = event_base_new(); if (!__ctx->ev_base) { TFE_LOG_ERROR(logger, "failed at watchdog event_base_new(): %s", strerror(errno)); goto __errout; } watchdog_kni_reset(__ctx); watchdog_kni_try_connect(__ctx); /* Create a thread to dispatch ctx->evbase */ ret = pthread_create(&__ctx->pthread, NULL, watchdog_kni_thread, (void *) __ctx); if (unlikely(ret < 0)) { TFE_LOG_ERROR(__ctx->logger, "Failed at creating watchdog thread: %s", strerror(errno)); goto __errout; } TFE_LOG_INFO(__ctx->logger, "KNI watchdog module init successfully."); return __ctx; __errout: return NULL; };