diff --git a/plugin/business/traffic-mirror/CMakeLists.txt b/plugin/business/traffic-mirror/CMakeLists.txt index 07848f5..a8cbc65 100644 --- a/plugin/business/traffic-mirror/CMakeLists.txt +++ b/plugin/business/traffic-mirror/CMakeLists.txt @@ -1,4 +1,4 @@ add_library(traffic-mirror src/entry.cpp src/ethdev.cpp src/rebuild.cpp) target_include_directories(traffic-mirror PRIVATE include) -target_link_libraries(traffic-mirror common cjson pcap) +target_link_libraries(traffic-mirror common cjson pcap mrzcpd) diff --git a/plugin/business/traffic-mirror/include/traffic_mirror.h b/plugin/business/traffic-mirror/include/traffic_mirror.h index b91196b..ca1525f 100644 --- a/plugin/business/traffic-mirror/include/traffic_mirror.h +++ b/plugin/business/traffic-mirror/include/traffic_mirror.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -50,11 +51,28 @@ struct profile_table_ex_data struct ether_addr * ether_addrs; }; +struct traffic_mirror_ethdev_pcap +{ + pcap_t * pcap_device_handle; + char * sendbuf[TFE_THREAD_MAX]; +}; + +struct traffic_mirror_ethdev_mr4 +{ + /* DEVICE INSTANCES */ + struct mr_instance * instance; + struct mr_vdev * device; + struct mr_sendpath * sendpath; + + /* THREAD SENDBUF */ + unsigned int nr_tx_threads; + marsio_buff_t * sendbuf[TFE_THREAD_MAX]; +}; + struct traffic_mirror_ethdev { /* PUBLIC */ char str_device[TFE_SYMBOL_MAX]; - enum traffic_mirror_ethdev_type type; unsigned int mtu; unsigned int en_offload_vlan; @@ -63,23 +81,42 @@ struct traffic_mirror_ethdev /* PRIVATE, FOR PCAP */ pcap_t * pcap_device_handle; + char local_ether_addr[6]; + enum traffic_mirror_ethdev_type type; + union + { + struct traffic_mirror_ethdev_pcap * detail_pcap; + struct traffic_mirror_ethdev_mr4 * detail_mr4; + }; + /* FUNCTIONS */ - int (*fn_inject)(struct traffic_mirror_ethdev *, const char * pkt, unsigned int pktlen); + char * (*fn_send_prepare)(struct traffic_mirror_ethdev *, unsigned int tid); + int (*fn_send_finish)(struct traffic_mirror_ethdev * ethdev, unsigned int, unsigned int); + void (*fn_send_abort)(struct traffic_mirror_ethdev * ethdev, unsigned int tid); void (*fn_destroy)(struct traffic_mirror_ethdev * ethdev); }; -struct traffic_mirror_ethdev * traffic_mirror_ethdev_pcap_create(const char * str_ethdev, void * logger); -void traffic_mirror_ethdev_destroy(struct traffic_mirror_ethdev * ethdev); -int traffic_mirror_ethdev_inject(struct traffic_mirror_ethdev * ethdev, const char * pkt, unsigned int pktlen); +struct traffic_mirror_rebuild_target +{ + struct ether_addr ether_addr; + unsigned int vlan_tci; +}; +struct traffic_mirror_ethdev * traffic_mirror_ethdev_pcap_create(const char * str_ethdev, void * logger); +struct traffic_mirror_ethdev * traffic_mirror_ethdev_mr4_create(const char * str_ethdev, + unsigned int nr_threads, void * logger); + +void traffic_mirror_ethdev_destroy(struct traffic_mirror_ethdev * ethdev); +int traffic_mirror_ethdev_finish(struct traffic_mirror_ethdev * ethdev, unsigned int tid, unsigned int pktlen); struct traffic_mirror_rebuild * traffic_mirror_rebuild_create(struct tfe_stream_addr * addr, - struct profile_table_ex_data * target, struct traffic_mirror_ethdev * ethdev); -void traffic_mirror_rebuild_destroy(struct traffic_mirror_rebuild * instance); -void traffic_mirror_rebuild_handshake(struct traffic_mirror_rebuild * instance); -void traffic_mirror_rebuild_data(struct traffic_mirror_rebuild * instance, const char * data, - unsigned int datalen, enum tfe_conn_dir dir); -void traffic_mirror_rebuild_farewell(struct traffic_mirror_rebuild * instance); + struct traffic_mirror_rebuild_target * target, struct traffic_mirror_ethdev * ethdev); + +void traffic_mirror_rebuild_destroy(struct traffic_mirror_rebuild * instance); +void traffic_mirror_rebuild_handshake(struct traffic_mirror_rebuild * instance, unsigned int tid); +void traffic_mirror_rebuild_data(struct traffic_mirror_rebuild * instance, unsigned int tid, const char * data, + unsigned int datalen, enum tfe_conn_dir dir); +void traffic_mirror_rebuild_farewell(struct traffic_mirror_rebuild * instance, unsigned int tid); diff --git a/plugin/business/traffic-mirror/src/entry.cpp b/plugin/business/traffic-mirror/src/entry.cpp index d276a86..3c31394 100644 --- a/plugin/business/traffic-mirror/src/entry.cpp +++ b/plugin/business/traffic-mirror/src/entry.cpp @@ -429,11 +429,28 @@ static int traffic_mirror_ethdev_init(struct traffic_mirror_instance * instance) return -1; } - instance->ethdev = traffic_mirror_ethdev_pcap_create(str_ethdev, instance->logger); + unsigned int device_type; + MESA_load_profile_uint_def(profile, "traffic_mirror", "type", &device_type, TRAFFIC_MIRROR_ETHDEV_AF_PACKET); + + if (device_type == TRAFFIC_MIRROR_ETHDEV_AF_PACKET) + { + instance->ethdev = traffic_mirror_ethdev_pcap_create(str_ethdev, instance->logger); + } + else if(device_type == TRAFFIC_MIRROR_ETHDEV_MARSIO) + { + instance->ethdev = traffic_mirror_ethdev_mr4_create(str_ethdev, + tfe_proxy_get_work_thread_count(), instance->logger); + } + else + { + TFE_LOG_ERROR(instance->logger, "invalid traffic mirror device type, [traffic_mirror]type = %d", device_type); + return -2; + } + if (!instance->ethdev) { - TFE_LOG_ERROR(instance->logger, "failed at traffic mirror device init "); - return -2; + TFE_LOG_ERROR(instance->logger, "failed at traffic mirror device init. "); + return -3; } return 0; @@ -515,6 +532,9 @@ int traffic_mirror_on_open_cb(const struct tfe_stream * stream, unsigned int thr struct traffic_mirror_instance * instance = g_traffic_mirror_instance; struct tfe_cmsg * cmsg = tfe_stream_get0_cmsg(stream); + unsigned int target_id; + struct traffic_mirror_rebuild_target * rebuild_target = NULL; + assert(instance != NULL); assert(cmsg != NULL); @@ -527,7 +547,7 @@ int traffic_mirror_on_open_cb(const struct tfe_stream * stream, unsigned int thr struct policy_table_ex_data * policy_ex_data = NULL; struct profile_table_ex_data * profile_ex_data = NULL; - int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char*)&opt_val, sizeof(opt_val), &opt_out_size); + int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *) &opt_val, sizeof(opt_val), &opt_out_size); if (ret < 0) { TFE_LOG_ERROR(instance->logger, "failed at getting policy id from cmsg, detach the stream."); @@ -535,7 +555,7 @@ int traffic_mirror_on_open_cb(const struct tfe_stream * stream, unsigned int thr } snprintf(str_policy_id, sizeof(str_policy_id), "%u", opt_val); - policy_ex_data = (struct policy_table_ex_data *)Maat_plugin_get_EX_data(instance->maat_feather, + policy_ex_data = (struct policy_table_ex_data *) Maat_plugin_get_EX_data(instance->maat_feather, instance->policy_table_id, str_policy_id); if (!policy_ex_data) @@ -550,7 +570,7 @@ int traffic_mirror_on_open_cb(const struct tfe_stream * stream, unsigned int thr } snprintf(str_profile_id, sizeof(str_policy_id), "%u", policy_ex_data->profile_id); - profile_ex_data = (struct profile_table_ex_data *)Maat_plugin_get_EX_data(instance->maat_feather, + profile_ex_data = (struct profile_table_ex_data *) Maat_plugin_get_EX_data(instance->maat_feather, instance->profile_table_id, str_profile_id); if (!profile_ex_data) @@ -560,16 +580,21 @@ int traffic_mirror_on_open_cb(const struct tfe_stream * stream, unsigned int thr goto detach; } + target_id = random() % profile_ex_data->nr_targets; + rebuild_target = ALLOC(struct traffic_mirror_rebuild_target, 1); + rebuild_target->vlan_tci = profile_ex_data->vlans[target_id]; + rebuild_target->ether_addr = profile_ex_data->ether_addrs[target_id]; me = ALLOC(struct traffic_mirror_me, 1); - me->rebuild_ctx = traffic_mirror_rebuild_create(stream->addr, profile_ex_data, instance->ethdev); + me->rebuild_ctx = traffic_mirror_rebuild_create(stream->addr, rebuild_target, instance->ethdev); me->profile_ex_data = profile_ex_data; + *pme = (void *) me; - /* profile_ex_data's ownership is transfer to me */ + /* the ownership is transfer to struct me and rebuild_target */ profile_ex_data = NULL; - traffic_mirror_rebuild_handshake(me->rebuild_ctx); + rebuild_target = NULL; - *pme = (void *)me; + traffic_mirror_rebuild_handshake(me->rebuild_ctx, thread_id); return ACTION_FORWARD_DATA; detach: @@ -588,15 +613,20 @@ detach: profile_table_ex_data_free(profile_ex_data); } + if (rebuild_target) + { + free(rebuild_target); + } + tfe_stream_detach(stream); - return ACTION_FORWARD_DATA; + return ACTION_FORWARD_DATA; } enum tfe_stream_action traffic_mirror_on_data_cb(const struct tfe_stream * stream, unsigned int thread_id, enum tfe_conn_dir dir, const unsigned char * data, size_t len, void ** pme) { struct traffic_mirror_me * me = (struct traffic_mirror_me *)(*pme); - traffic_mirror_rebuild_data(me->rebuild_ctx, (const char *)data, (size_t)len, dir); + traffic_mirror_rebuild_data(me->rebuild_ctx, 0, (const char *) data, (size_t) len, dir); return ACTION_FORWARD_DATA; } @@ -604,7 +634,7 @@ void traffic_mirror_on_close_cb(const struct tfe_stream * stream, unsigned int t enum tfe_stream_close_reason reason, void ** pme) { struct traffic_mirror_me * me = (struct traffic_mirror_me *)(*pme); - traffic_mirror_rebuild_farewell(me->rebuild_ctx); + traffic_mirror_rebuild_farewell(me->rebuild_ctx, 0); traffic_mirror_rebuild_destroy(me->rebuild_ctx); profile_table_ex_data_free(me->profile_ex_data); diff --git a/plugin/business/traffic-mirror/src/ethdev.cpp b/plugin/business/traffic-mirror/src/ethdev.cpp index 12deae9..e5824ea 100644 --- a/plugin/business/traffic-mirror/src/ethdev.cpp +++ b/plugin/business/traffic-mirror/src/ethdev.cpp @@ -3,21 +3,110 @@ #include #include #include +#include #include #include #include -struct traffic_mirror_ethdev_pkt_desc{}; -struct traffic_mirror_ethdev_pkt_desc_pcap +static char * mr4_ethdev_send_prepare(struct traffic_mirror_ethdev * ethdev, unsigned int tid) { - unsigned int pktlen; - char * pkt; -}; + struct traffic_mirror_ethdev_mr4 * detail_mr4 = ethdev->detail_mr4; + assert(tid < detail_mr4->nr_tx_threads); -static int pcap_ethdev_inject(struct traffic_mirror_ethdev * ethdev, const char * pkt, unsigned int pktlen) + marsio_buff_t * buff; + int ret = marsio_buff_malloc_device(detail_mr4->device, &buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY); + if (unlikely(ret < 0)) + { + assert(0); + return NULL; + } + + char * buff_start = marsio_buff_mtod(buff); + if (unlikely(!buff_start)) + { + assert(0); + return NULL; + } + + detail_mr4->sendbuf[tid] = buff; + return buff_start; +} + +static int mr4_ethdev_send_finish(struct traffic_mirror_ethdev * ethdev, unsigned int tid, unsigned int pktlen) { - return pcap_sendpacket(ethdev->pcap_device_handle, (const u_char *)pkt, pktlen); + struct traffic_mirror_ethdev_mr4 * detail_mr4 = ethdev->detail_mr4; + assert(detail_mr4 != NULL && detail_mr4->sendbuf[tid] != NULL); + + marsio_buff_t * buff = detail_mr4->sendbuf[tid]; + char * append_ptr = marsio_buff_append(buff, pktlen); + assert(append_ptr != NULL); + + int ret = marsio_send_burst_with_options(detail_mr4->sendpath, tid, &buff, 1, MARSIO_SEND_OPT_REHASH); + if (unlikely(ret < 0)) + { + marsio_buff_free(detail_mr4->instance, &buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY); + return -1; + } + + return 0; +} + +static void mr4_ethdev_send_abort(struct traffic_mirror_ethdev * ethdev, unsigned int tid) +{ + struct traffic_mirror_ethdev_mr4 * detail_mr4 = ethdev->detail_mr4; + assert(detail_mr4 != NULL && detail_mr4->sendbuf[tid] != NULL); + + marsio_buff_t * buff = detail_mr4->sendbuf[tid]; + marsio_buff_free(detail_mr4->instance, &buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY); + detail_mr4->sendbuf[tid] = NULL; +} + +static void mr4_ethdev_destroy(struct traffic_mirror_ethdev * ethdev) +{ + struct traffic_mirror_ethdev_mr4 * detail_mr4 = ethdev->detail_mr4; + if(detail_mr4 && detail_mr4->sendpath) + { + marsio_sendpath_destory(detail_mr4->sendpath); + } + + if(detail_mr4 && detail_mr4->device) + { + marsio_close_device(detail_mr4->device); + } + + if(detail_mr4 && detail_mr4->instance) + { + marsio_destory(detail_mr4->instance); + } + + if(detail_mr4) + { + free(detail_mr4); + } + + free(ethdev); +} + +/* =============================== MODE LIBPCAP ========================================== */ +static char * pcap_ethdev_send_prepare(struct traffic_mirror_ethdev *ethdev, unsigned int tid) +{ + struct traffic_mirror_ethdev_pcap * detail_pcap = ethdev->detail_pcap; + return detail_pcap->sendbuf[tid]; +} + +static int pcap_ethdev_send_finish(struct traffic_mirror_ethdev * ethdev, unsigned int tid, unsigned int pktlen) +{ + struct traffic_mirror_ethdev_pcap * detail_pcap = ethdev->detail_pcap; + assert(detail_pcap != NULL && detail_pcap->sendbuf[tid] != NULL); + + char * sendbuf = detail_pcap->sendbuf[tid]; + return pcap_sendpacket(ethdev->pcap_device_handle, (const u_char *)sendbuf, pktlen); +} + +static void pcap_ethdev_send_abort(struct traffic_mirror_ethdev * ethdev, unsigned int tid) +{ + return; } static void pcap_ethdev_destroy(struct traffic_mirror_ethdev * ethdev) @@ -26,14 +115,77 @@ static void pcap_ethdev_destroy(struct traffic_mirror_ethdev * ethdev) return free(ethdev); } +/* =============================== VIRTUAL INTERFACE START ========================================== */ void traffic_mirror_ethdev_destroy(struct traffic_mirror_ethdev * ethdev) { ethdev->fn_destroy(ethdev); } -int traffic_mirror_ethdev_inject(struct traffic_mirror_ethdev * ethdev, const char * pkt, unsigned int pktlen) +int traffic_mirror_ethdev_finish(struct traffic_mirror_ethdev * ethdev, unsigned int tid, unsigned int pktlen) { - return ethdev->fn_inject(ethdev, pkt, pktlen); + return ethdev->fn_send_finish(ethdev, tid, pktlen); +} + +char * traffic_mirror_ethdev_send_prepare(struct traffic_mirror_ethdev * ethdev, unsigned int tid) +{ + return ethdev->fn_send_prepare(ethdev, tid); +} + +/* =============================== VIRTUAL INTERFACE END =========================================== */ + +struct traffic_mirror_ethdev * traffic_mirror_ethdev_mr4_create(const char * str_ethdev, + unsigned int nr_threads, void * logger) +{ + struct traffic_mirror_ethdev * ethdev = ALLOC(struct traffic_mirror_ethdev, 1); + ethdev->type = TRAFFIC_MIRROR_ETHDEV_MARSIO; + + ethdev->detail_mr4 = ALLOC(struct traffic_mirror_ethdev_mr4, 1); + strncpy(ethdev->str_device, str_ethdev, sizeof(ethdev->str_device) - 1); + + /* PREPARE MR4 INSTANCES */ + struct traffic_mirror_ethdev_mr4 * detail_mr4 = ethdev->detail_mr4; + detail_mr4->nr_tx_threads = nr_threads; + + /* create a marsio instance and setup options, + * the instance need to run in no-thread-affinity mode */ + detail_mr4->instance = marsio_create(); + assert(detail_mr4->instance != NULL); + + if (marsio_init(detail_mr4->instance, "tfe-traffic-mirror") < 0) + { + TFE_LOG_ERROR(logger, "failed at init MARSIOv4 instance."); + goto errout; + } + + detail_mr4->device = marsio_open_device(detail_mr4->instance, str_ethdev, 0, nr_threads); + if (!detail_mr4->device) + { + TFE_LOG_ERROR(logger, "failed at open device %s, nr_threads = %u", str_ethdev, nr_threads); + goto errout; + } + + detail_mr4->sendpath = marsio_sendpath_create_by_vdev(detail_mr4->device); + if (!detail_mr4->sendpath) + { + TFE_LOG_ERROR(logger, "failed at creating sendpath for device %s", str_ethdev); + goto errout; + } + + /* TODO: load the mtu and mac address from mrzcpd */ + ethdev->mtu = ETHER_MAX_LEN; + ethdev->fn_send_prepare = mr4_ethdev_send_prepare; + ethdev->fn_send_finish = mr4_ethdev_send_finish; + ethdev->fn_send_abort = mr4_ethdev_send_abort; + ethdev->fn_destroy = mr4_ethdev_destroy; + + return ethdev; + +errout: + if (ethdev) + { + mr4_ethdev_destroy(ethdev); + } + return NULL; } struct traffic_mirror_ethdev * traffic_mirror_ethdev_pcap_create(const char * str_ethdev, void * logger) @@ -96,7 +248,9 @@ struct traffic_mirror_ethdev * traffic_mirror_ethdev_pcap_create(const char * st ethdev->local_ether_addr[2], ethdev->local_ether_addr[3], ethdev->local_ether_addr[4], ethdev->local_ether_addr[5], ethdev->mtu); - ethdev->fn_inject = pcap_ethdev_inject; + ethdev->fn_send_prepare = pcap_ethdev_send_prepare; + ethdev->fn_send_finish = pcap_ethdev_send_finish; + ethdev->fn_send_abort = pcap_ethdev_send_abort; ethdev->fn_destroy = pcap_ethdev_destroy; close(fd); diff --git a/plugin/business/traffic-mirror/src/rebuild.cpp b/plugin/business/traffic-mirror/src/rebuild.cpp index fe856d9..4c80fd5 100644 --- a/plugin/business/traffic-mirror/src/rebuild.cpp +++ b/plugin/business/traffic-mirror/src/rebuild.cpp @@ -3,13 +3,16 @@ #include #include #include +#include +#include +#include struct traffic_mirror_rebuild { struct tfe_stream_addr * c_s_addr; struct tfe_stream_addr * s_c_addr; - struct profile_table_ex_data * target; + struct traffic_mirror_rebuild_target * target; struct traffic_mirror_ethdev * ethdev; uint32_t c_seq; @@ -35,6 +38,8 @@ struct tcp_hdr { uint16_t tcp_urp; /**< TCP urgent pointer, if any. */ } __attribute__((__packed__)); + + #define TCP_URG_FLAG 0x20 #define TCP_ACK_FLAG 0x10 #define TCP_PSH_FLAG 0x08 @@ -75,8 +80,8 @@ static inline uint32_t __rte_raw_cksum(const void *buf, size_t len, uint32_t sum static inline uint16_t __rte_raw_cksum_reduce(uint32_t sum) { - sum = ((sum & 0xffff0000) >> 16) + (sum & 0xffff); - sum = ((sum & 0xffff0000) >> 16) + (sum & 0xffff); + sum = ((sum & 0xffff0000U) >> 16) + (sum & 0xffffU); + sum = ((sum & 0xffff0000U) >> 16) + (sum & 0xffffU); return (uint16_t) sum; } @@ -92,8 +97,8 @@ static inline uint16_t __ipv4_cksum(const struct iphdr * ipv4_hdr) return (cksum == 0xffff) ? cksum : ~cksum; } -static inline uint16_t __tcpudp_cksum_by_stream_addr_v4(const struct tfe_stream_addr * addr, - const void *l4_hdr, unsigned int l4_len) +static inline uint16_t __tcp_cksum_by_stream_addr_v4(const struct tfe_stream_addr * addr, + const void * l4_hdr, unsigned int l4_len) { struct ipv4_psd_header { @@ -113,14 +118,32 @@ static inline uint16_t __tcpudp_cksum_by_stream_addr_v4(const struct tfe_stream_ uint32_t cksum = rte_raw_cksum(l4_hdr, l4_len); cksum += rte_raw_cksum(&psd_hdr, sizeof(psd_hdr)); - cksum = ((cksum & 0xffff0000) >> 16) + (cksum & 0xffff); - cksum = (~cksum) & 0xffff; - if (cksum == 0) - cksum = 0xffff; - + cksum = ((cksum & 0xffff0000U) >> 16U) + (cksum & 0xffffU); + cksum = (~cksum) & 0xffffU; + if (cksum == 0) cksum = 0xffff; return cksum; } +static inline uint16_t __tcp_cksum_by_stream_addr_v6(const struct tfe_stream_addr * addr, + const void * l4_hdr, unsigned int l4_len) +{ + uint32_t sum; + struct + { + uint32_t len; /* L4 length. */ + uint32_t proto; /* L4 protocol - top 3 bytes must be zero */ + } psd_hdr + { + .len = l4_len, + .proto =(uint32_t)(IPPROTO_TCP << 24U) + }; + + sum = __rte_raw_cksum(&addr->tuple4_v6->saddr, sizeof(addr->tuple4_v6->saddr), 0); + sum = __rte_raw_cksum(&addr->tuple4_v6->daddr, sizeof(addr->tuple4_v6->daddr), sum); + sum = __rte_raw_cksum(&psd_hdr, sizeof(psd_hdr), sum); + return __rte_raw_cksum_reduce(sum); +} + static int tcp_header_construct(unsigned char *buf, unsigned short sp, unsigned short dp, unsigned int seq, unsigned int ack, unsigned char flags, unsigned short win, unsigned short urg) @@ -142,12 +165,11 @@ static int tcp_header_construct(unsigned char *buf, unsigned short sp, static int tcp_header_construct_by_stream_addr(struct tfe_stream_addr * addr, unsigned char *buf, unsigned int seq, unsigned int ack, unsigned char flags, unsigned short win, unsigned short urg, - unsigned int payload_len) + unsigned char * payload, unsigned int payload_len) { unsigned short sport; unsigned short dport; unsigned short cksum; - unsigned int tcphdr_len; if (addr->addrtype == TFE_ADDR_STREAM_TUPLE4_V4) { @@ -164,23 +186,21 @@ static int tcp_header_construct_by_stream_addr(struct tfe_stream_addr * addr, un assert(0); } - tcphdr_len = tcp_header_construct(buf, sport, dport, seq, ack, flags, win, urg); + unsigned int tcphdr_len = tcp_header_construct(buf, sport, dport, seq, ack, flags, win, urg); + memcpy(buf + tcphdr_len, payload, payload_len); + if (addr->addrtype == TFE_ADDR_STREAM_TUPLE4_V4) { - cksum = __tcpudp_cksum_by_stream_addr_v4(addr, (void *)buf, tcphdr_len + payload_len); + cksum = __tcp_cksum_by_stream_addr_v4(addr, (void *) buf, tcphdr_len + payload_len); } else if (addr->addrtype == TFE_ADDR_STREAM_TUPLE4_V6) { - assert(0); - } - else - { - assert(0); + cksum = __tcp_cksum_by_stream_addr_v6(addr, (void *)buf, tcphdr_len + payload_len); } struct tcp_hdr * tcp_hdr = (struct tcp_hdr *)buf; tcp_hdr->cksum = cksum; - return tcphdr_len; + return tcphdr_len + payload_len; } static int ipv4_header_construct(unsigned char *buf, unsigned short carry_layer_len, @@ -205,6 +225,22 @@ static int ipv4_header_construct(unsigned char *buf, unsigned short carry_layer_ return sizeof(struct iphdr); } +static int ipv6_header_construct(unsigned char *buf, unsigned short carry_layer_len, + unsigned char ttl, unsigned char protocol, struct in6_addr * src, struct in6_addr * dst) +{ + struct ip6_hdr * ip6_hdr = (struct ip6_hdr *) buf; + + ip6_hdr->ip6_vfc = ntohl(0x60000000U); + ip6_hdr->ip6_flow = 0; + ip6_hdr->ip6_plen = htons(carry_layer_len); + ip6_hdr->ip6_nxt = protocol; + ip6_hdr->ip6_hlim = 128; + + ip6_hdr->ip6_src = *src; + ip6_hdr->ip6_dst = *dst; + return sizeof(ip6_hdr); +} + static int ip_header_construct_by_stream_addr(struct tfe_stream_addr * addr, unsigned char *buf, unsigned short carry_layer_len, unsigned char tos, unsigned short id, unsigned short frag, unsigned char ttl, unsigned char protocol) @@ -216,12 +252,8 @@ static int ip_header_construct_by_stream_addr(struct tfe_stream_addr * addr, } else if (addr->addrtype == TFE_ADDR_STREAM_TUPLE4_V6) { - /* TODO: IPv6 */ - assert(0); - } - else - { - assert(0); + return ipv6_header_construct(buf, carry_layer_len, ttl, protocol, + &addr->tuple4_v6->saddr, &addr->tuple4_v6->daddr); } assert(0); @@ -244,79 +276,78 @@ static void vlan_tag_construct(unsigned char *buf, unsigned short tci, unsigned vlan_hdr->eth_proto = htons(type); } -static void l2_send_to_target(struct traffic_mirror_ethdev * ethdev, - unsigned char * snd_buffer, unsigned int l3_data_offset, unsigned int l3_data_len, +static int ether_header_construct(struct traffic_mirror_ethdev * ethdev, char * buffer, struct ether_addr * target_addr, unsigned int vlan_tci, unsigned l3_protocol) { - assert(l3_data_offset >= (sizeof(struct ethhdr) + sizeof(struct vlan_hdr))); - unsigned int header_offset = l3_data_offset; unsigned int header_len = 0; + unsigned int eth_protocol = vlan_tci > 0 ? ETH_P_8021Q : l3_protocol; + + ether_header_construct((unsigned char *)buffer + header_len, + (unsigned char *)target_addr->ether_addr_octet, + (unsigned char *)ethdev->local_ether_addr, eth_protocol); + + header_len += sizeof(struct ethhdr); /* need to construct vlan header */ if (vlan_tci > 0) { - header_offset -= sizeof(struct vlan_hdr); header_len += sizeof(struct vlan_hdr); - vlan_tag_construct(snd_buffer + header_offset, vlan_tci, l3_protocol); + vlan_tag_construct((unsigned char *)buffer + header_len, vlan_tci, l3_protocol); } - unsigned int eth_protocol = vlan_tci > 0 ? ETH_P_8021Q : l3_protocol; - header_offset -= sizeof(struct ethhdr); - header_len += sizeof(struct ethhdr); - - ether_header_construct(snd_buffer + header_offset, (unsigned char *)target_addr->ether_addr_octet, - (unsigned char *)ethdev->local_ether_addr, eth_protocol); - - traffic_mirror_ethdev_inject(ethdev, (char *)snd_buffer + header_offset, header_len + l3_data_len); + return header_len; } -static void l2_send_to_target_group(struct traffic_mirror_ethdev * ethdev, struct profile_table_ex_data * t_group, - unsigned char * snd_buffer, unsigned int l3_data_offset, unsigned int l3_data_len, unsigned l3_protocol) +static void tcp_segment_send_to_target_group(struct tfe_stream_addr * addr, + struct traffic_mirror_ethdev * ethdev, struct traffic_mirror_rebuild_target * target, + unsigned int tid, const char * payload, unsigned int payload_len, + unsigned int seq, unsigned int ack, char flags) { - for(unsigned int i = 0; i < t_group->nr_targets; i++) + char * pkt_buffer = ethdev->fn_send_prepare(ethdev, tid); + assert(pkt_buffer != NULL); + + unsigned int l3_protocol = 0; + switch(addr->addrtype) { - l2_send_to_target(ethdev, snd_buffer, l3_data_offset, l3_data_len, - &t_group->ether_addrs[i], t_group->vlans[i], l3_protocol); + case TFE_ADDR_STREAM_TUPLE4_V4: { l3_protocol = ETH_P_IP; break;} + case TFE_ADDR_STREAM_TUPLE4_V6: { l3_protocol = ETH_P_IPV6; break;} + default: assert(0); + } + + unsigned int pkt_len = 0; + + /* Ethernet and VLAN header */ + pkt_len += ether_header_construct(ethdev, pkt_buffer, &target->ether_addr, target->vlan_tci, l3_protocol); + + /* IPv4/IPv6 Header */ + pkt_len += ip_header_construct_by_stream_addr(addr, + (unsigned char *)pkt_buffer + pkt_len, + payload_len + sizeof(struct tcphdr), 0, 0x1000, 0, 128, IPPROTO_TCP); + + /* TCP header and payload */ + pkt_len += tcp_header_construct_by_stream_addr(addr, + (unsigned char *)pkt_buffer + pkt_len, seq, ack, flags, 0xffff, 0, + (unsigned char *)payload, payload_len); + + int ret = traffic_mirror_ethdev_finish(ethdev, tid, pkt_len); + if (unlikely(ret < 0)) + { + //TODO: 统计计数 + return; } } -static void tcp_segment_send_to_target_group(struct tfe_stream_addr * addr, struct traffic_mirror_ethdev * ethdev, - struct profile_table_ex_data * t_group, const char * payload, unsigned int payload_len, - unsigned int seq, unsigned int ack, char flags) -{ - char pkt[ETHER_MAX_LEN]; - - unsigned sz_pkt_prepend = sizeof(struct ethhdr) + sizeof(struct vlan_hdr) + - sizeof(struct iphdr) + sizeof(struct tcphdr); - - unsigned header_len = 0; - - assert(sizeof(pkt) - sz_pkt_prepend >= payload_len); - memcpy(pkt + sz_pkt_prepend, payload, payload_len); - - sz_pkt_prepend -= sizeof(struct tcp_hdr); - header_len += tcp_header_construct_by_stream_addr(addr, (unsigned char *)pkt + sz_pkt_prepend, - seq, ack, flags, 0xffff, 0, payload_len); - - sz_pkt_prepend -= sizeof(struct iphdr); - header_len += ip_header_construct_by_stream_addr(addr, (unsigned char *)pkt + sz_pkt_prepend, - header_len + payload_len, 0, 0x1000, 0, 128, IPPROTO_TCP); - - l2_send_to_target_group(ethdev, t_group, (unsigned char *)pkt, sz_pkt_prepend, - header_len + payload_len, ETHERTYPE_IP); -} - -static void tcp_send_to_target_group(struct tfe_stream_addr * addr, struct traffic_mirror_ethdev * ethdev, - struct profile_table_ex_data * t_group, const char * payload, - unsigned int payload_len, unsigned int seq, unsigned int ack, char flags) +static void tcp_send_to_target(struct tfe_stream_addr * addr, struct traffic_mirror_ethdev * ethdev, + struct traffic_mirror_rebuild_target * target, unsigned int tid, const char * payload, + unsigned int payload_len, unsigned int seq, unsigned int ack, char flags) { unsigned int payload_offset = 0; - unsigned mss = ethdev->mtu - (sizeof(struct iphdr) + sizeof(struct tcphdr)); + unsigned mss = ethdev->mtu - (MAX(sizeof(struct iphdr), sizeof(struct ip6_hdr)) + sizeof(struct tcphdr)); - /* Handshake or farewall */ + /* handshake or farewell */ if (payload == NULL || payload_len == 0) { - tcp_segment_send_to_target_group(addr, ethdev, t_group, NULL, 0, seq, ack, flags); + tcp_segment_send_to_target_group(addr, ethdev, target, tid, NULL, 0, seq, ack, flags); return; } @@ -325,14 +356,14 @@ static void tcp_send_to_target_group(struct tfe_stream_addr * addr, struct traff unsigned int payload_sz_seg = MIN(payload_len - payload_offset, mss); const char * payload_ptr_seg = payload + payload_offset; - tcp_segment_send_to_target_group(addr, ethdev, t_group, payload_ptr_seg, payload_sz_seg, seq, ack, flags); + tcp_segment_send_to_target_group(addr, ethdev, target, tid, payload_ptr_seg, payload_sz_seg, seq, ack, flags); seq += payload_sz_seg; payload_offset += payload_sz_seg; } } struct traffic_mirror_rebuild * traffic_mirror_rebuild_create(struct tfe_stream_addr * addr, - struct profile_table_ex_data * target, struct traffic_mirror_ethdev * ethdev) + struct traffic_mirror_rebuild_target * target, struct traffic_mirror_ethdev * ethdev) { struct traffic_mirror_rebuild * instance = ALLOC(struct traffic_mirror_rebuild, 1); instance->c_s_addr = addr; @@ -353,22 +384,22 @@ void traffic_mirror_rebuild_destroy(struct traffic_mirror_rebuild * instance) free(instance); } -void traffic_mirror_rebuild_handshake(struct traffic_mirror_rebuild * instance) +void traffic_mirror_rebuild_handshake(struct traffic_mirror_rebuild * instance, unsigned int tid) { - tcp_send_to_target_group(instance->c_s_addr, instance->ethdev, instance->target, + tcp_send_to_target(instance->c_s_addr, instance->ethdev, instance->target, tid, NULL, 0, instance->c_seq, 0, TCP_SYN_FLAG); - tcp_send_to_target_group(instance->s_c_addr, instance->ethdev, instance->target, + tcp_send_to_target(instance->s_c_addr, instance->ethdev, instance->target, tid, NULL, 0, instance->s_seq, instance->c_seq + 1, TCP_SYN_FLAG | TCP_ACK_FLAG); - tcp_send_to_target_group(instance->c_s_addr, instance->ethdev, instance->target, + tcp_send_to_target(instance->c_s_addr, instance->ethdev, instance->target, tid, NULL, 0, instance->c_seq + 1, instance->s_seq + 1, TCP_ACK_FLAG); instance->s_seq++; instance->c_seq++; } -void traffic_mirror_rebuild_data(struct traffic_mirror_rebuild * instance, +void traffic_mirror_rebuild_data(struct traffic_mirror_rebuild * instance, unsigned int tid, const char * data, unsigned int datalen, enum tfe_conn_dir dir) { if (data == NULL || datalen == 0) @@ -378,36 +409,36 @@ void traffic_mirror_rebuild_data(struct traffic_mirror_rebuild * instance, if (dir == CONN_DIR_DOWNSTREAM) { - tcp_send_to_target_group(instance->c_s_addr, instance->ethdev, instance->target, + tcp_send_to_target(instance->c_s_addr, instance->ethdev, instance->target, tid, data, datalen, instance->c_seq, instance->s_seq, TCP_ACK_FLAG); instance->c_seq += datalen; } else { - tcp_send_to_target_group(instance->s_c_addr, instance->ethdev, instance->target, + tcp_send_to_target(instance->s_c_addr, instance->ethdev, instance->target, tid, data, datalen, instance->s_seq, instance->c_seq, TCP_ACK_FLAG); instance->s_seq += datalen; } } -void traffic_mirror_rebuild_farewell(struct traffic_mirror_rebuild * instance) +void traffic_mirror_rebuild_farewell(struct traffic_mirror_rebuild * instance, unsigned int tid) { /* C->S FIN */ - tcp_send_to_target_group(instance->c_s_addr, instance->ethdev, instance->target, + tcp_send_to_target(instance->c_s_addr, instance->ethdev, instance->target, tid, NULL, 0, instance->c_seq, instance->s_seq, TCP_FIN_FLAG | TCP_ACK_FLAG); /* C->S FIN, ACK */ - tcp_send_to_target_group(instance->s_c_addr, instance->ethdev, instance->target, + tcp_send_to_target(instance->s_c_addr, instance->ethdev, instance->target, tid, NULL, 0, instance->s_seq, instance->c_seq + 1, TCP_ACK_FLAG); instance->c_seq += 1; /* S->C FIN */ - tcp_send_to_target_group(instance->s_c_addr, instance->ethdev, instance->target, + tcp_send_to_target(instance->s_c_addr, instance->ethdev, instance->target, tid, NULL, 0, instance->s_seq, instance->c_seq, TCP_FIN_FLAG | TCP_ACK_FLAG); /* C->S FIN, ACK */ - tcp_send_to_target_group(instance->c_s_addr, instance->ethdev, instance->target, + tcp_send_to_target(instance->c_s_addr, instance->ethdev, instance->target, tid, NULL, 0, instance->c_seq, instance->s_seq + 1, TCP_ACK_FLAG); instance->s_seq += 1; diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index 40e2028..e08926f 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -122,6 +122,8 @@ set_property(TARGET gmock PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/ set(MESA_FRAMEWORK_LIB_DIR /opt/MESA/lib) set(MESA_FRAMEWORK_INCLUDE_DIR /opt/MESA/include) +set(MRZCPD_LIB_DIR /opt/mrzcpd/lib) +set(MRZCPD_INCLUDE_DIR /opt/mrzcpd/include) add_library(MESA_handle_logger SHARED IMPORTED GLOBAL) set_property(TARGET MESA_handle_logger PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libMESA_handle_logger.so) @@ -155,6 +157,10 @@ add_library(librdkafka SHARED IMPORTED GLOBAL) set_property(TARGET librdkafka PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/librdkafka.so) set_property(TARGET librdkafka PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}) +add_library(mrzcpd SHARED IMPORTED GLOBAL) +set_property(TARGET mrzcpd PROPERTY IMPORTED_LOCATION ${MRZCPD_LIB_DIR}/libmarsio.so) +set_property(TARGET mrzcpd PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MRZCPD_INCLUDE_DIR}) + ### cJSON ExternalProject_Add(cJSON PREFIX cJSON URL ${CMAKE_CURRENT_SOURCE_DIR}/cJSON-1.7.7.tar.gz