diff --git a/src/SDK_EthernetTap.cpp b/src/SDK_EthernetTap.cpp index abe7911..f9c95a9 100644 --- a/src/SDK_EthernetTap.cpp +++ b/src/SDK_EthernetTap.cpp @@ -39,6 +39,7 @@ #include "Utils.hpp" #include "OSUtils.hpp" +#include "Constants.hpp" #include "Phy.hpp" #include "SDK_LWIPStack.hpp" @@ -439,18 +440,23 @@ void NetconEthernetTap::processReceivedData(PhySocket *sock,void **uptr,bool lwi Connection *conn = getConnection(sock); if(conn && conn->rxsz) { float max = conn->type == SOCK_STREAM ? (float)DEFAULT_TCP_RX_BUF_SZ : (float)DEFAULT_UDP_RX_BUF_SZ; - long n = _phy.streamSend(conn->sock, conn->rxbuf, TEMP_MTU); - int payload_sz; - memcpy(&payload_sz, conn->rxbuf, sizeof(int)); // OPT: - if(n == TEMP_MTU) { + long n = _phy.streamSend(conn->sock, conn->rxbuf, ZT_MAX_MTU); + int payload_sz, addr_sz_offset = sizeof(struct sockaddr_storage); + memcpy(&payload_sz, conn->rxbuf + addr_sz_offset, sizeof(int)); // OPT: + // extract address + struct sockaddr_storage addr; + memcpy(&addr, conn->rxbuf, addr_sz_offset); + + if(n == ZT_MAX_MTU) { if(conn->rxsz-n > 0) // If more remains on buffer - memcpy(conn->rxbuf, conn->rxbuf+TEMP_MTU, conn->rxsz - TEMP_MTU); - conn->rxsz -= TEMP_MTU; + memcpy(conn->rxbuf, conn->rxbuf+ZT_MAX_MTU, conn->rxsz - ZT_MAX_MTU); + conn->rxsz -= ZT_MAX_MTU; + // DGRAM if(conn->type==SOCK_DGRAM){ _phy.setNotifyWritable(conn->sock, false); #if DEBUG_LEVEL >= MSG_TRANSFER - struct sockaddr_in * addr_in2 = (struct sockaddr_in *)conn->peer_addr; + struct sockaddr_in * addr_in2 = (struct sockaddr_in *)&addr; int port = lwipstack->__lwip_ntohs(addr_in2->sin_port); int ip = addr_in2->sin_addr.s_addr; unsigned char d[4]; @@ -458,11 +464,11 @@ void NetconEthernetTap::processReceivedData(PhySocket *sock,void **uptr,bool lwi d[1] = (ip >> 8) & 0xFF; d[2] = (ip >> 16) & 0xFF; d[3] = (ip >> 24) & 0xFF; - dwr(MSG_TRANSFER,"UDP RX <--- :: {TX: %.3f%%, RX: %d, sock=%x} :: payload = %d bytes (%d.%d.%d.%d:%d)\n", (float)conn->txsz / max, conn->rxsz/* / max*/, conn->sock, payload_sz, d[0],d[1],d[2],d[3], port); #endif } + // STREAM //dwr(MSG_DEBUG, "phyOnUnixWritable(): tid = %d\n", pthread_mach_thread_np(pthread_self())); if(conn->type==SOCK_STREAM) { // Only acknolwedge receipt of TCP packets lwipstack->__tcp_recved(conn->TCP_pcb, n); @@ -746,40 +752,44 @@ void NetconEthernetTap::nc_udp_recved(void * arg, struct udp_pcb * upcb, struct Larg *l = (Larg*)arg; //dwr(MSG_DEBUG_EXTRA, "nc_udp_recved(conn=%p,pcb=%p,port=%d)\n", (void*)&(l->conn), (void*)&upcb, port); int tot = 0; - unsigned char *nextpos, *sizepos; + unsigned char *addr_pos, *sz_pos, *payload_pos; struct pbuf* q = p; + + struct sockaddr_in addr_in; + addr_in.sin_addr.s_addr = addr->addr; + addr_in.sin_port = port; + Mutex::Lock _l2(l->tap->_rx_buf_m); // Cycle through pbufs and write them to the RX buffer // The RX "buffer" will be emptied via phyOnUnixWritable() if(p) { - // assign provided address info to "connection" - struct sockaddr_in addr_in; - addr_in.sin_addr.s_addr = addr->addr; - addr_in.sin_port = port; - l->conn->peer_addr = (struct sockaddr_storage*)&addr_in; - + // Intra-API "packetization" structure: [addr_len|addr|payload_len|payload] if(l->conn->rxsz == DEFAULT_UDP_RX_BUF_SZ) { // if UDP buffer full dwr(MSG_DEBUG, "nc_udp_recved(): UDP RX buffer full. Discarding oldest payload segment\n"); - memmove(l->conn->rxbuf, l->conn->rxbuf + TEMP_MTU, DEFAULT_UDP_RX_BUF_SZ - TEMP_MTU); - sizepos = l->conn->rxbuf + (DEFAULT_UDP_RX_BUF_SZ - TEMP_MTU); - l->conn->rxsz -= TEMP_MTU; + memmove(l->conn->rxbuf, l->conn->rxbuf + ZT_MAX_MTU, DEFAULT_UDP_RX_BUF_SZ - ZT_MAX_MTU); + sz_pos = l->conn->rxbuf + (DEFAULT_UDP_RX_BUF_SZ - ZT_MAX_MTU) + sizeof(struct sockaddr_storage); + l->conn->rxsz -= ZT_MAX_MTU; } - else - sizepos = l->conn->rxbuf + l->conn->rxsz; // where we'll prepend the size of the payload - nextpos = sizepos + sizeof(tot); // next position we can write data to + else { + addr_pos = l->conn->rxbuf + l->conn->rxsz; // where we'll prepend the size of the address + sz_pos = addr_pos + sizeof(struct sockaddr_storage); + } + payload_pos = addr_pos + sizeof(struct sockaddr_storage) + sizeof(tot); // where we'll write the payload + // write remote host address + memcpy(addr_pos, &addr_in, sizeof(struct sockaddr_storage)); } while(p != NULL) { if(p->len <= 0) break; int len = p->len; - memcpy(nextpos, p->payload, len); - nextpos = nextpos + len; + memcpy(payload_pos, p->payload, len); + payload_pos = payload_pos + len; p = p->next; tot += len; } if(tot) { - l->conn->rxsz += TEMP_MTU; - memcpy(sizepos, &tot, sizeof(tot)); + l->conn->rxsz += ZT_MAX_MTU; + memcpy(sz_pos, &tot, sizeof(tot)); //dwr(MSG_DEBUG_EXTRA, " nc_udp_recved(): data_len = %d, rxsz = %d, addr_info_len = %d\n", // tot, l->conn->rxsz, sizeof(u32_t) + sizeof(u16_t)); l->tap->phyOnUnixWritable(l->conn->sock, NULL, true); @@ -957,7 +967,6 @@ void NetconEthernetTap::handleGetsockname(PhySocket *sock, PhySocket *rpcSock, v { Mutex::Lock _l(_tcpconns_m); Connection *conn = getConnection(sock); - if(conn->addr == NULL){ dwr(MSG_DEBUG_EXTRA," handleGetsockname(): No address info available. Is it bound?"); struct sockaddr_storage storage; @@ -972,7 +981,6 @@ void NetconEthernetTap::handleGetpeername(PhySocket *sock, PhySocket *rpcSock, v { Mutex::Lock _l(_tcpconns_m); Connection *conn = getConnection(sock); - if(conn->peer_addr == NULL){ dwr(MSG_DEBUG_EXTRA," handleGetpeername(): No peer address info available. Is it connected?"); struct sockaddr_storage storage; diff --git a/src/SDK_EthernetTap.hpp b/src/SDK_EthernetTap.hpp index 5ba7b77..77f79e4 100644 --- a/src/SDK_EthernetTap.hpp +++ b/src/SDK_EthernetTap.hpp @@ -95,7 +95,7 @@ namespace ZeroTier { struct tcp_pcb *TCP_pcb; struct udp_pcb *UDP_pcb; struct sockaddr_storage *addr; // TODO: Rename - struct sockaddr_storage *peer_addr; + struct sockaddr_storage *peer_addr; // Only set by connection procedure unsigned short port; unsigned char txbuf[DEFAULT_TCP_TX_BUF_SZ]; unsigned char rxbuf[DEFAULT_TCP_RX_BUF_SZ]; diff --git a/src/SDK_Sockets.c b/src/SDK_Sockets.c index 63b4e55..672f89d 100644 --- a/src/SDK_Sockets.c +++ b/src/SDK_Sockets.c @@ -258,10 +258,11 @@ int (*realclose)(CLOSE_SIG); { struct sockaddr_in addr; jbyte *body = (*env)->GetByteArrayElements(env, buf, 0); - unsigned char buffer[TEMP_MTU]; + unsigned char buffer[ZT_MAX_MTU]; + int payload_offset = sizeof(int) + sizeof(struct sockaddr_storage); int rxbytes = zts_recvfrom(fd, &buffer, len, flags, &addr, sizeof(struct sockaddr)); if(rxbytes > 0) - memcpy(body, (jbyte*)buffer + sizeof(int), rxbytes); + memcpy(body, (jbyte*)buffer + payload_offset, rxbytes); (*env)->ReleaseByteArrayElements(env, buf, body, 0); // Update fields of Java ZTAddress object jfieldID fid; @@ -281,15 +282,16 @@ int (*realclose)(CLOSE_SIG); ssize_t zts_recvfrom(RECVFROM_SIG) #endif { + int tmpsz; // payload size // dwr(MSG_DEBUG_EXTRA,"zt_recvfrom(%d, ...)\n", socket); - ssize_t err = read(socket, buffer, TEMP_MTU); - int tmpsz; - memcpy(&tmpsz, buffer, sizeof(tmpsz)); - if(err < 0) { - perror("read:\n"); + ssize_t err = read(socket, buffer, ZT_MAX_MTU); + if(err > 0) { + // TODO: case for address size mismatch? + memcpy(address, buffer, address_len); + memcpy(&tmpsz, buffer + sizeof(struct sockaddr_storage), sizeof(tmpsz)); } else { - zts_getpeername(socket, address, address_len); + perror("read:\n"); } return tmpsz; }