This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/platform/src/acceptor_kni_v2.cpp
2020-08-21 09:45:30 +08:00

261 lines
7.1 KiB
C++

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/errno.h>
#include <pthread.h>
#include <assert.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/bufferevent.h>
#include <MESA/MESA_prof_load.h>
#include <tfe_stream.h>
#include <tfe_cmsg.h>
#include <acceptor_kni_v2.h>
#include <proxy.h>
#include <platform.h>
#include <sys/prctl.h>
#ifndef TFE_CONFIG_SCM_SOCKET_FILE
#define TFE_CONFIG_SCM_SOCKET_FILE "/var/run/.tfe_kmod_scm_socket"
#endif
struct acceptor_kni_v2
{
/* INPUT */
struct tfe_proxy * proxy;
const char * profile;
void * logger;
/* CONFIG */
char str_scm_socket[TFE_STRING_MAX];
/* PERSIST RUNTIME RESOURCE */
int fd_scm_socket;
struct event_base * ev_base;
struct event * ev_scm_socket;
pthread_t thread;
};
void acceptor_kni_v2_event(evutil_socket_t fd, short what, void * user)
{
struct acceptor_kni_v2 * __ctx = (struct acceptor_kni_v2 *) user;
struct cmsghdr * __cmsghdr;
struct tfe_cmsg * cmsg = NULL;
int * __fds = NULL;
unsigned int __nr_fds = 0;
assert(__ctx != NULL && __ctx->thread == pthread_self());
assert(what & EV_READ);
/* We use IOVEC to recieve the fds make by KNI.
* This is a kind of magic skill to share socket fds between two(or more) process.
* http://man7.org/tlpi/code/online/dist/sockets/scm_rights_send.c.html
*/
constexpr static int __TRANS_FDS_MAX = 2;
constexpr static int __CONTROLLEN = CMSG_SPACE(__TRANS_FDS_MAX * sizeof(int));
char __buffer[4096] = {0};
struct iovec __iovec[1];
struct msghdr __msghdr;
char __cmptr[__CONTROLLEN];
__iovec[0].iov_base = __buffer;
__iovec[0].iov_len = sizeof(__buffer);
__msghdr.msg_iov = __iovec;
__msghdr.msg_iovlen = 1;
__msghdr.msg_name = NULL;
__msghdr.msg_namelen = 0;
__msghdr.msg_control = (void *) (__cmptr);
__msghdr.msg_controllen = (size_t) __CONTROLLEN;
ssize_t rd = recvmsg(fd, &__msghdr, 0);
if (rd == -1 && (errno == EINTR || errno == EAGAIN))
{
return;
}
else if (rd <= 0)
{
TFE_LOG_ERROR(__ctx->logger, "failed at recvmsg from scm socket: %s. ", strerror(errno));
goto __die;
}
__cmsghdr = CMSG_FIRSTHDR(&__msghdr);
if (unlikely(__cmsghdr == NULL))
{
TFE_LOG_ERROR(__ctx->logger, "failed at fetch CMSG_FIRSTHDR() from incoming fds.");
goto __drop_recieved_fds;
}
switch(__cmsghdr->cmsg_len)
{
case CMSG_LEN(0 * sizeof(int)): { __nr_fds = 0; break;}
case CMSG_LEN(1 * sizeof(int)): { __nr_fds = 1; break;}
case CMSG_LEN(2 * sizeof(int)): { __nr_fds = 2; break; }
default: assert(0);
}
__fds = (int *) (CMSG_DATA(__cmsghdr));
if (unlikely(__fds == NULL || __nr_fds < 2))
{
TFE_LOG_ERROR(__ctx->logger, "No available file descriptors, drop the incoming fds.");
goto __drop_recieved_fds;
}
/* Apply a cmsg structure */
if (tfe_cmsg_deserialize((const unsigned char *)__buffer, (uint16_t)rd, &cmsg) < 0)
{
/* TODO: dump the buffer in hexdump format */
TFE_LOG_ERROR(__ctx->logger, "failed at cmsg_deserialize(), invalid format.");
goto __drop_recieved_fds;
}
TFE_PROXY_STAT_INCREASE(STAT_FD_OPEN_BY_KNI_ACCEPT, 2);
if (tfe_proxy_fds_accept(__ctx->proxy, __fds[0], __fds[1], cmsg) < 0)
{
goto __drop_recieved_fds;
}
return;
__die:
DIE("Broken kni scm socket connection, abort.");
return;
__drop_recieved_fds:
TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, __nr_fds);
for (unsigned int i = 0; i < __nr_fds; i++)
{
evutil_closesocket(__fds[i]);
}
assert(__nr_fds <= 2);
}
void * acceptor_kni_v2_event_thread_entry(void * args)
{
struct acceptor_kni_v2 * __ctx = (struct acceptor_kni_v2 *) args;
assert(__ctx != NULL && __ctx->thread == pthread_self());
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "tfe:acceptor-v2");
prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
char affinity[32] = {0};
if (__ctx->proxy->enable_cpu_affinity)
{
tfe_thread_set_affinity(__ctx->proxy->cpu_affinity_mask[0]);
snprintf(affinity, sizeof(affinity), "affinity cpu%d", __ctx->proxy->cpu_affinity_mask[0]);
}
TFE_LOG_INFO(__ctx->logger, "scm acceptor thread %s is running.", __ctx->proxy->enable_cpu_affinity ? affinity : "");
event_base_dispatch(__ctx->ev_base);
DIE("scm acceptor thread is exited, abort.");
}
void acceptor_kni_v2_destroy(struct acceptor_kni_v2 * ctx)
{
if (ctx != NULL && ctx->ev_base != NULL)
{
event_base_free(ctx->ev_base);
}
if (ctx != NULL && ctx->fd_scm_socket != 0)
{
close(ctx->fd_scm_socket);
}
if (ctx != NULL)
{
free(ctx);
}
return;
}
struct acceptor_kni_v2 * acceptor_kni_v2_create(struct tfe_proxy * proxy, const char * profile, void * logger)
{
struct acceptor_kni_v2 * __ctx = ALLOC(struct acceptor_kni_v2, 1);
struct sockaddr_un __sockaddr_un{};
int ret = 0;
__ctx->proxy = proxy;
__ctx->profile = profile;
__ctx->logger = logger;
/* Read the unix domain socket file, this file is used to recieve fds from KNI */
MESA_load_profile_string_def(profile, "acceptor_kni_v2", "scm_socket_file", __ctx->str_scm_socket,
sizeof(__ctx->str_scm_socket), TFE_CONFIG_SCM_SOCKET_FILE);
__sockaddr_un.sun_family = AF_UNIX;
strncpy(__sockaddr_un.sun_path, __ctx->str_scm_socket, sizeof(__sockaddr_un.sun_path));
if (remove(__ctx->str_scm_socket) < 0 && errno != ENOENT)
{
TFE_LOG_ERROR(__ctx->logger, "Failed at remove(%s) : %s", __ctx->str_scm_socket, strerror(errno));
goto __errout;
}
/* Create new event base, this event base will be dispatched at separated thread */
__ctx->ev_base = event_base_new();
if (unlikely(__ctx->ev_base == NULL))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at creating event_base. ");
goto __errout;
}
__ctx->fd_scm_socket = socket(AF_UNIX, SOCK_DGRAM, 0);
if (unlikely(__ctx->fd_scm_socket < 0))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at create scm socket fd: %s", strerror(errno));
goto __errout;
}
evutil_make_socket_nonblocking(__ctx->fd_scm_socket);
ret = bind(__ctx->fd_scm_socket, (struct sockaddr *)&__sockaddr_un, sizeof(__sockaddr_un));
if (unlikely(ret < 0))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at binding to %s: %s", __sockaddr_un.sun_path, strerror(errno));
goto __errout;
}
__ctx->ev_scm_socket = event_new(__ctx->ev_base, __ctx->fd_scm_socket,
EV_READ | EV_PERSIST, acceptor_kni_v2_event, __ctx);
if (unlikely(__ctx->ev_scm_socket == NULL))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at setup READ event for scm socket fd %d.", __ctx->fd_scm_socket);
goto __errout;
}
ret = event_add(__ctx->ev_scm_socket, NULL);
if (unlikely(ret < 0))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at adding scm socket event to evbase. ");
goto __errout;
}
/* Create a thread to dispatch ctx->evbase */
ret = pthread_create(&__ctx->thread, NULL, acceptor_kni_v2_event_thread_entry, (void *) __ctx);
if (unlikely(ret < 0))
{
TFE_LOG_ERROR(__ctx->logger, "Failed at creating event thread: %s", strerror(errno));
goto __errout;
}
TFE_LOG_INFO(__ctx->logger, "KNIv2 acceptor init successfully, scm socket file: %s", __ctx->str_scm_socket);
return __ctx;
__errout:
acceptor_kni_v2_destroy(__ctx);
return NULL;
}