diff --git a/bin/conf/main.conf b/bin/conf/main.conf index 0820712..8b8607e 100644 --- a/bin/conf/main.conf +++ b/bin/conf/main.conf @@ -27,6 +27,7 @@ UnixSocketRecvAddr=/home/mesasoft/frag_rssb/un_recv #udp socket:send frag +UdpSendSwitch=0 UdpSendIP=127.0.0.1;127.0.0.1; UdpSendIPNum=2 UdpSendPort=33082;33083 @@ -237,4 +238,9 @@ capacity=32 cost=32 wlb_report_interval=10 -bfd_recv_port=0 +/*端口为0表示不开启保活功能*/ +bfd_recv_port=3785 +/*wait_queue设置的故障阈值*/ +down_threshold=90 +/*故障判定次数*/ +down_num=20 diff --git a/frag_rssb瀹夎浣跨敤鎵嬪唽.docx b/frag_rssb瀹夎浣跨敤鎵嬪唽.docx index 4e34160..0be6b1b 100644 Binary files a/frag_rssb瀹夎浣跨敤鎵嬪唽.docx and b/frag_rssb瀹夎浣跨敤鎵嬪唽.docx differ diff --git a/src/AV_sendback.h b/src/AV_sendback.h index 07de83b..151c1a8 100644 --- a/src/AV_sendback.h +++ b/src/AV_sendback.h @@ -22,7 +22,7 @@ #define PROTOCOL_MMS 0x09 #define PROTOCOL_RTMP 0x0A #define PROTOCOL_SIP 0x0B - + /*媒体类型*/ #define MEDIA_TYPE_UNKNOWN 0x00 /*媒体类型:视频支持的媒体类型*/ diff --git a/src/frag_send.c b/src/frag_send.c index f13b68d..58b053c 100644 --- a/src/frag_send.c +++ b/src/frag_send.c @@ -271,14 +271,16 @@ void send_data_by_unixsocket(const char* data, uint32_t datalen, int thread_id) } void send_data(const char* data, uint32_t datalen, int thread_id) -{ - /*采用UDP回传*/ -#if K_PROJECT - send_data_by_udp(data, datalen, thread_id); - /*采用unix socket回传*/ -#else - send_data_by_unixsocket(data, datalen, thread_id); -#endif +{ + if(g_frag_cfg.send_udp_switch) + { + send_data_by_udp(data, datalen, thread_id); + } + else + { + /*采用unix socket回传*/ + send_data_by_unixsocket(data, datalen, thread_id); + } } void send_data_bizman(const char* data, uint32_t datalen, uint64_t mid,uint32_t ip, int thread_id) @@ -660,7 +662,7 @@ void pack_and_send_media_info(media_info_t* media_info, frag_in_t* frg, int thre if(g_frag_run.usm_on_flag) { send_data_usm(sendbuf,sendbuflen,thread_id); - } + } else { send_data(sendbuf, sendbuflen, thread_id); diff --git a/src/hard_keepalive.c b/src/hard_keepalive.c index ded8991..e25fbea 100644 --- a/src/hard_keepalive.c +++ b/src/hard_keepalive.c @@ -1,174 +1,202 @@ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include //offsetof - -#include "hard_keepalive.h" - -int udp_socket_recv(int sockfd, uint32_t *src_ip, uint16_t *src_port, uint8_t *buf, uint32_t buf_size) -{ - if (NULL == buf) return -1; - - int numbytes; - struct sockaddr_storage their_addr; - socklen_t addr_len = sizeof(their_addr); - - if ((numbytes = recvfrom(sockfd, buf, buf_size , 0,(struct sockaddr *)&their_addr, &addr_len)) == -1) - { - perror("recvfrom"); - return -1; - } - *src_ip = ((struct sockaddr_in *)&their_addr)->sin_addr.s_addr; - *src_port = ((struct sockaddr_in *)&their_addr)->sin_port; - return numbytes; -} - -// send udp packet -int udp_socket_send(int sockfd, uint32_t addr, uint16_t port, char *data, int datalen) -{ - struct sockaddr_in dst_addr; /* connector's address information */ - dst_addr.sin_family = AF_INET; /* host byte order */ - dst_addr.sin_port = port; /* short, network byte order */ - dst_addr.sin_addr.s_addr = addr; - bzero(&(dst_addr.sin_zero), 8); /* zero the rest of the struct */ - int to_send_len=datalen; - int already_sended_len=0; - while(to_send_len>0) - { - already_sended_len=sendto(sockfd,data, - to_send_len-already_sended_len, - 0, - (struct sockaddr *)&(dst_addr), - sizeof(dst_addr)); - if(already_sended_len==-1) - { - if((EAGAIN == errno)||( EINTR == errno )|| (EWOULDBLOCK==errno)) - { - continue; - } - else - { - return -1; - } - } - to_send_len-=already_sended_len; - } - return already_sended_len; -} - -int create_recv_udp_socket(uint16_t port) -{ - int sockfd = socket(AF_INET, SOCK_DGRAM, 0); - if (-1 == sockfd) - { - perror("listener: socket"); - return -1; - } - struct sockaddr_in my_addr; /* my address information */ - my_addr.sin_family = AF_INET; /* host byte order */ - my_addr.sin_port = port; /* short, network byte order */ - my_addr.sin_addr.s_addr = INADDR_ANY; /* auto-fill with my IP */ - bzero(&(my_addr.sin_zero), 8); /* zero the rest of the struct */ - - if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) - { - perror("listener: bind"); - close(sockfd); - return -1; - } - return sockfd; -} - -void* thread_hard_keepalive(void *param) -{ - char buf[1500] = {0}; - char discriminator_temp[ID_SIZE] = {0}; - bfd_port_t* bfd_port = (bfd_port_t*)param; - int size = 0; - uint32_t src_ip = 0; - uint16_t src_port = 0; - int send_rec = 0; - fd_set rset; - int recv_pkt_sd = create_recv_udp_socket(htons(bfd_port->recv_port)); - - if(-1==recv_pkt_sd) - { - printf("hard_keepalive create socket error.\n"); - return NULL; - } - - while(1) - { - FD_ZERO(&rset); - FD_SET(recv_pkt_sd,&rset); - if(-1==select(recv_pkt_sd+1,&rset,NULL,NULL,NULL)) - { - continue; - } - if(FD_ISSET(recv_pkt_sd, &rset)) - { - size = udp_socket_recv(recv_pkt_sd, &src_ip, &src_port, (unsigned char*)buf, sizeof(buf)); - if(size>0) - { - /*交换discriminator*/ - memcpy(discriminator_temp, buf+MY_ID_OFFSET, ID_SIZE); - memcpy(buf+MY_ID_OFFSET, buf+YOUR_ID_OFFSET, ID_SIZE); - memcpy(buf+YOUR_ID_OFFSET, discriminator_temp, ID_SIZE); - send_rec = udp_socket_send(recv_pkt_sd, - src_ip, - src_port, - (char*)buf,size); - if(-1==send_rec) - { - printf("hard_keepalive send pkt error.\n"); - } - } - } - else - { - continue; - } - } - - return NULL; -} - -int hard_keepalive_run(void* bfd_port) -{ - pthread_t thread_desc; - pthread_attr_t attr; - - memset(&thread_desc, 0, sizeof(thread_desc)); - memset(&attr, 0, sizeof(attr)); - - if(0 != pthread_attr_init(&(attr))) - { - return -1; - } - if(0 != pthread_attr_setdetachstate(&(attr), PTHREAD_CREATE_DETACHED)) - { - return -1; - } - if(0 != pthread_create(&(thread_desc), &(attr), thread_hard_keepalive, (void*)bfd_port)) - { - pthread_attr_destroy(&(attr)); - return -1; - } - pthread_attr_destroy(&(attr)); - return 0; -} - + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include //offsetof + +#include "hard_keepalive.h" +#include "main.h" + +extern frag_rssb_parameter_t g_frag_run; +extern frag_rssb_configure_t g_frag_cfg; +extern frag_rssb_status_t g_frag_stat; + + +int udp_socket_recv(int sockfd, uint32_t *src_ip, uint16_t *src_port, uint8_t *buf, uint32_t buf_size) +{ + if (NULL == buf) return -1; + + int numbytes; + struct sockaddr_storage their_addr; + socklen_t addr_len = sizeof(their_addr); + + if ((numbytes = recvfrom(sockfd, buf, buf_size , 0,(struct sockaddr *)&their_addr, &addr_len)) == -1) + { + perror("recvfrom"); + return -1; + } + *src_ip = ((struct sockaddr_in *)&their_addr)->sin_addr.s_addr; + *src_port = ((struct sockaddr_in *)&their_addr)->sin_port; + return numbytes; +} + +// send udp packet +int udp_socket_send(int sockfd, uint32_t addr, uint16_t port, char *data, int datalen) +{ + struct sockaddr_in dst_addr; /* connector's address information */ + dst_addr.sin_family = AF_INET; /* host byte order */ + dst_addr.sin_port = port; /* short, network byte order */ + dst_addr.sin_addr.s_addr = addr; + bzero(&(dst_addr.sin_zero), 8); /* zero the rest of the struct */ + int to_send_len=datalen; + int already_sended_len=0; + while(to_send_len>0) + { + already_sended_len=sendto(sockfd,data, + to_send_len-already_sended_len, + 0, + (struct sockaddr *)&(dst_addr), + sizeof(dst_addr)); + if(already_sended_len==-1) + { + if((EAGAIN == errno)||( EINTR == errno )|| (EWOULDBLOCK==errno)) + { + continue; + } + else + { + return -1; + } + } + to_send_len-=already_sended_len; + } + return already_sended_len; +} + +int create_recv_udp_socket(uint16_t port) +{ + int sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (-1 == sockfd) + { + perror("listener: socket"); + return -1; + } + struct sockaddr_in my_addr; /* my address information */ + my_addr.sin_family = AF_INET; /* host byte order */ + my_addr.sin_port = port; /* short, network byte order */ + my_addr.sin_addr.s_addr = INADDR_ANY; /* auto-fill with my IP */ + bzero(&(my_addr.sin_zero), 8); /* zero the rest of the struct */ + + if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) + { + perror("listener: bind"); + close(sockfd); + return -1; + } + return sockfd; +} + +void* thread_hard_keepalive(void *param) +{ + char buf[1500] = {0}; + char discriminator_temp[ID_SIZE] = {0}; + bfd_port_t* bfd_port = (bfd_port_t*)param; + int size = 0; + uint32_t src_ip = 0; + uint16_t src_port = 0; + int send_rec = 0; + fd_set rset; + int recv_pkt_sd = create_recv_udp_socket(htons(bfd_port->recv_port)); + + if(-1==recv_pkt_sd) + { + printf("hard_keepalive create socket error.\n"); + return NULL; + } + + while(1) + { + FD_ZERO(&rset); + FD_SET(recv_pkt_sd,&rset); + if(-1==select(recv_pkt_sd+1,&rset,NULL,NULL,NULL)) + { + continue; + } + if(FD_ISSET(recv_pkt_sd, &rset)) + { + size = udp_socket_recv(recv_pkt_sd, &src_ip, &src_port, (unsigned char*)buf, sizeof(buf)); + if(size>0) + { + /*交换discriminator*/ + memcpy(discriminator_temp, buf+MY_ID_OFFSET, ID_SIZE); + memcpy(buf+MY_ID_OFFSET, buf+YOUR_ID_OFFSET, ID_SIZE); + memcpy(buf+YOUR_ID_OFFSET, discriminator_temp, ID_SIZE); + /*发送队列state状态*/ + int state = 0; + for(int i=0;iflags,8); + setbit(((bfd_header_t*)buf )->flags,7); + printf("wait_queue is overflow, app is down.\n"); + MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME, + "{%s:%d} wait_queue is overflow, app is down.", + __FILE__,__LINE__); + } + else + { + /*STATE_UP 2bit: 1 1*/ + setbit(((bfd_header_t*)buf )->flags,8); + setbit(((bfd_header_t*)buf )->flags,7); + } + send_rec = udp_socket_send(recv_pkt_sd, + src_ip, + src_port, + (char*)buf,size); + if(-1==send_rec) + { + printf("hard_keepalive send pkt error.\n"); + } + } + } + else + { + continue; + } + } + + return NULL; +} + +int hard_keepalive_run(void* bfd_port) +{ + pthread_t thread_desc; + pthread_attr_t attr; + + memset(&thread_desc, 0, sizeof(thread_desc)); + memset(&attr, 0, sizeof(attr)); + + if(0 != pthread_attr_init(&(attr))) + { + return -1; + } + if(0 != pthread_attr_setdetachstate(&(attr), PTHREAD_CREATE_DETACHED)) + { + return -1; + } + if(0 != pthread_create(&(thread_desc), &(attr), thread_hard_keepalive, (void*)bfd_port)) + { + pthread_attr_destroy(&(attr)); + return -1; + } + pthread_attr_destroy(&(attr)); + return 0; +} + diff --git a/src/hard_keepalive.h b/src/hard_keepalive.h index 7eaa7d3..22f08c2 100644 --- a/src/hard_keepalive.h +++ b/src/hard_keepalive.h @@ -10,10 +10,15 @@ typedef struct bfd_port_s #define MY_ID_OFFSET 4 #define YOUR_ID_OFFSET 8 #define ID_SIZE 4 + +#define setbit(x,y) x|=(1<> (y)&1) + typedef struct bfd_header_s { unsigned char version_diag; - unsigned char flags; + unsigned char flags; /*flag前2bit是state*/ unsigned char detect_time_multiplier; unsigned char length; unsigned char my_discriminator[4]; /* 业?潞? 虏虏芒*/ diff --git a/src/log.c b/src/log.c index 1439e38..c0997f9 100644 --- a/src/log.c +++ b/src/log.c @@ -194,23 +194,25 @@ void* thread_stat_output(void *param) char buf[32] = {0}; int send_dest_num = 0; -#if K_PROJECT - send_dest_num = g_frag_cfg.send_dest_udp_ip_num; - for(uint32_t m=0;m0) { for(i=0; i