From 6836348386b5771a39a0b0c63d4b2ca2b67ab704 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Wed, 24 Aug 2016 13:08:19 -0700 Subject: [PATCH] Updated UDP packetization between service and client-side socket API implementation --- src/SDK.h | 7 ++++++ src/SDK_EthernetTap.cpp | 56 ++++++++++++++++++++++------------------- src/SDK_EthernetTap.hpp | 8 +++--- src/SDK_Service.cpp | 40 ++++++++++++++++++++--------- src/SDK_Sockets.c | 17 ++++++++----- 5 files changed, 80 insertions(+), 48 deletions(-) diff --git a/src/SDK.h b/src/SDK.h index 9ea4df0..d8262d5 100644 --- a/src/SDK.h +++ b/src/SDK.h @@ -43,6 +43,7 @@ extern "C" { #define INTERCEPT_ENABLED 111 #define INTERCEPT_DISABLED 222 +#define TEMP_MTU 2800 extern void load_symbols(); extern void zt_init_rpc(const char *path, const char *nwid); @@ -96,6 +97,7 @@ void zts_get_addresses(const char * nwid, char * addrstr); int zts_get_device_id(); void zts_stop_service(); bool zts_is_relayed(); +char *zts_get_homepath(); // ZT Intercept/RPC Controls void set_intercept_status(int mode); /* TODO: Rethink this */ @@ -140,10 +142,15 @@ ssize_t zts_recvmsg(RECVMSG_SIG); JNIEXPORT jboolean JNICALL Java_ZeroTier_SDK_zt_1running(JNIEnv *env, jobject thisObj); JNIEXPORT jobject JNICALL Java_ZeroTier_SDK_zt_1get_1addresses(JNIEnv *env, jobject thisObj, jstring nwid); JNIEXPORT jboolean JNICALL Java_ZeroTier_SDK_zt_1is_1relayed(); + // Returns the homepath + JNIEXPORT jstring JNICALL Java_ZeroTier_SDK_zt_1get_1homepath(JNIEnv *env, jobject thisObj); + // Exported JNI : SOCKS5 PROXY SERVER CONTROLS + // Stops the SOCKS5 proxy server for a given ZeroTier network JNIEXPORT jint JNICALL Java_ZeroTier_SDK_zt_1start_1proxy_1server(JNIEnv *env, jobject thisObj, jstring nwid, jobject zaddr); JNIEXPORT jint JNICALL Java_ZeroTier_SDK_zt_1stop_1proxy_1server(JNIEnv *env, jobject thisObj, jstring nwid); JNIEXPORT jint JNICALL Java_ZeroTier_SDK_zt_1get_1proxy_1server_1address(JNIEnv *env, jobject thisObj, jstring nwid, jobject zaddr); + // Exported JNI : SOCKET API JNIEXPORT jint JNICALL Java_ZeroTier_SDK_zt_1socket(JNIEnv *env, jobject thisObj, jint family, jint type, jint protocol); JNIEXPORT jint JNICALL Java_ZeroTier_SDK_zt_1connect(JNIEnv *env, jobject thisObj, jint fd, jstring addrstr, jint port); diff --git a/src/SDK_EthernetTap.cpp b/src/SDK_EthernetTap.cpp index a49a5fc..abe7911 100644 --- a/src/SDK_EthernetTap.cpp +++ b/src/SDK_EthernetTap.cpp @@ -431,19 +431,21 @@ void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) { void NetconEthernetTap::processReceivedData(PhySocket *sock,void **uptr,bool lwip_invoked) { - dwr(MSG_DEBUG_EXTRA,"processReceivedData(sock=%p): lwip_invoked = %d\n", - (void*)&sock, lwip_invoked); + //dwr(MSG_DEBUG_EXTRA,"processReceivedData(sock=%p): lwip_invoked = %d\n", (void*)&sock, lwip_invoked); if(!lwip_invoked) { _tcpconns_m.lock(); _rx_buf_m.lock(); } Connection *conn = getConnection(sock); if(conn && conn->rxsz) { - long n = _phy.streamSend(conn->sock, conn->rxbuf, conn->rxsz); - if(n > 0) { - if(conn->rxsz-n > 0) - memcpy(conn->rxbuf, conn->rxbuf+n, conn->rxsz-n); - conn->rxsz -= n; + float max = conn->type == SOCK_STREAM ? (float)DEFAULT_TCP_RX_BUF_SZ : (float)DEFAULT_UDP_RX_BUF_SZ; + long n = _phy.streamSend(conn->sock, conn->rxbuf, TEMP_MTU); + int payload_sz; + memcpy(&payload_sz, conn->rxbuf, sizeof(int)); // OPT: + if(n == TEMP_MTU) { + if(conn->rxsz-n > 0) // If more remains on buffer + memcpy(conn->rxbuf, conn->rxbuf+TEMP_MTU, conn->rxsz - TEMP_MTU); + conn->rxsz -= TEMP_MTU; if(conn->type==SOCK_DGRAM){ _phy.setNotifyWritable(conn->sock, false); @@ -457,15 +459,13 @@ void NetconEthernetTap::processReceivedData(PhySocket *sock,void **uptr,bool lwi d[2] = (ip >> 16) & 0xFF; d[3] = (ip >> 24) & 0xFF; - dwr(MSG_TRANSFER,"UDP RX <--- :: {TX: ------, RX: ------, sock=%x} :: %d bytes (%d.%d.%d.%d:%d)\n", - conn->sock, n, d[0],d[1],d[2],d[3], port); + dwr(MSG_TRANSFER,"UDP RX <--- :: {TX: %.3f%%, RX: %d, sock=%x} :: payload = %d bytes (%d.%d.%d.%d:%d)\n", + (float)conn->txsz / max, conn->rxsz/* / max*/, conn->sock, payload_sz, d[0],d[1],d[2],d[3], port); #endif - conn->unread_udp_packet = false; } //dwr(MSG_DEBUG, "phyOnUnixWritable(): tid = %d\n", pthread_mach_thread_np(pthread_self())); if(conn->type==SOCK_STREAM) { // Only acknolwedge receipt of TCP packets lwipstack->__tcp_recved(conn->TCP_pcb, n); - float max = conn->type == SOCK_STREAM ? (float)DEFAULT_TCP_TX_BUF_SZ : (float)DEFAULT_UDP_TX_BUF_SZ; dwr(MSG_TRANSFER,"TCP RX <--- :: {TX: %.3f%%, RX: %.3f%%, sock=%x} :: %d bytes\n", (float)conn->txsz / max, (float)conn->rxsz / max, conn->sock, n); } @@ -487,7 +487,7 @@ void NetconEthernetTap::processReceivedData(PhySocket *sock,void **uptr,bool lwi void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr,bool lwip_invoked) { - dwr(MSG_DEBUG_EXTRA," phyOnUnixWritable(sock=%p): lwip_invoked = %d\n", (void*)&sock, lwip_invoked); + //dwr(MSG_DEBUG_EXTRA," phyOnUnixWritable(sock=%p): lwip_invoked = %d\n", (void*)&sock, lwip_invoked); processReceivedData(sock,uptr,lwip_invoked); } @@ -744,40 +744,44 @@ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newPCB, err_t err) void NetconEthernetTap::nc_udp_recved(void * arg, struct udp_pcb * upcb, struct pbuf * p, struct ip_addr * addr, u16_t port) { Larg *l = (Larg*)arg; - dwr(MSG_DEBUG_EXTRA, "nc_udp_recved(conn=%p,pcb=%p,port=%d)\n", (void*)&(l->conn), (void*)&upcb, port); + //dwr(MSG_DEBUG_EXTRA, "nc_udp_recved(conn=%p,pcb=%p,port=%d)\n", (void*)&(l->conn), (void*)&upcb, port); int tot = 0; + unsigned char *nextpos, *sizepos; struct pbuf* q = p; Mutex::Lock _l2(l->tap->_rx_buf_m); // Cycle through pbufs and write them to the RX buffer // The RX "buffer" will be emptied via phyOnUnixWritable() - if(l->conn->unread_udp_packet) { - dwr(MSG_DEBUG, "nc_udp_recved(): dropping packet\n"); - l->tap->lwipstack->__pbuf_free(q); - return; - } if(p) { // assign provided address info to "connection" struct sockaddr_in addr_in; addr_in.sin_addr.s_addr = addr->addr; addr_in.sin_port = port; l->conn->peer_addr = (struct sockaddr_storage*)&addr_in; - // reset buffer contents - l->conn->rxsz = 0; - memset(l->conn->rxbuf, 0, DEFAULT_UDP_RX_BUF_SZ); + + if(l->conn->rxsz == DEFAULT_UDP_RX_BUF_SZ) { // if UDP buffer full + dwr(MSG_DEBUG, "nc_udp_recved(): UDP RX buffer full. Discarding oldest payload segment\n"); + memmove(l->conn->rxbuf, l->conn->rxbuf + TEMP_MTU, DEFAULT_UDP_RX_BUF_SZ - TEMP_MTU); + sizepos = l->conn->rxbuf + (DEFAULT_UDP_RX_BUF_SZ - TEMP_MTU); + l->conn->rxsz -= TEMP_MTU; + } + else + sizepos = l->conn->rxbuf + l->conn->rxsz; // where we'll prepend the size of the payload + nextpos = sizepos + sizeof(tot); // next position we can write data to } while(p != NULL) { if(p->len <= 0) break; int len = p->len; - memcpy(l->conn->rxbuf + (l->conn->rxsz), p->payload, len); - l->conn->rxsz += len; + memcpy(nextpos, p->payload, len); + nextpos = nextpos + len; p = p->next; tot += len; } if(tot) { - dwr(MSG_DEBUG_EXTRA, " nc_udp_recved(): data_len = %d, rxsz = %d, addr_info_len = %d\n", - tot, l->conn->rxsz, sizeof(u32_t) + sizeof(u16_t)); - l->conn->unread_udp_packet = true; + l->conn->rxsz += TEMP_MTU; + memcpy(sizepos, &tot, sizeof(tot)); + //dwr(MSG_DEBUG_EXTRA, " nc_udp_recved(): data_len = %d, rxsz = %d, addr_info_len = %d\n", + // tot, l->conn->rxsz, sizeof(u32_t) + sizeof(u16_t)); l->tap->phyOnUnixWritable(l->conn->sock, NULL, true); l->tap->_phy.setNotifyWritable(l->conn->sock, true); } diff --git a/src/SDK_EthernetTap.hpp b/src/SDK_EthernetTap.hpp index 95ff448..5ba7b77 100644 --- a/src/SDK_EthernetTap.hpp +++ b/src/SDK_EthernetTap.hpp @@ -76,8 +76,8 @@ struct accept_st; #define DEFAULT_TCP_RX_BUF_SOFTMIN DEFAULT_TCP_RX_BUF_SZ * 0.20 // UDP Buffer sizes (should be about the size of your MTU) -#define DEFAULT_UDP_TX_BUF_SZ 1500 -#define DEFAULT_UDP_RX_BUF_SZ 1500 +#define DEFAULT_UDP_TX_BUF_SZ ZT_MAX_MTU +#define DEFAULT_UDP_RX_BUF_SZ ZT_MAX_MTU * 128 namespace ZeroTier { @@ -101,7 +101,6 @@ namespace ZeroTier { unsigned char rxbuf[DEFAULT_TCP_RX_BUF_SZ]; // TODO: necessary still? - bool unread_udp_packet; int proxy_conn_state; }; @@ -167,6 +166,8 @@ namespace ZeroTier { void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable); // --- end Proxy + std::string _homePath; + private: // LWIP callbacks // NOTE: these are called from within LWIP, meaning that lwipstack->_lock is ALREADY @@ -496,7 +497,6 @@ namespace ZeroTier { MAC _mac; Thread _thread; - std::string _homePath; std::string _dev; // path to Unix domain socket std::vector _multicastGroups; diff --git a/src/SDK_Service.cpp b/src/SDK_Service.cpp index 1b446f7..c85bc63 100644 --- a/src/SDK_Service.cpp +++ b/src/SDK_Service.cpp @@ -148,10 +148,7 @@ void zts_join_network(const char * nwid) { // SOCKS5 Proxy server // Default is 127.0.0.1:RANDOM_PORT - LOGV("-----USE_SOCKS_PROXY ?\n"); #if defined(USE_SOCKS_PROXY) - LOGV("-----USE_SOCKS_PROXY!\\n"); - zts_start_proxy_server(homeDir.c_str(), nwid, NULL); // NULL addr for default #endif } @@ -193,6 +190,10 @@ bool zts_is_relayed() { return false; } +char *zts_get_homepath() { + return (char*)givenHomeDir.c_str(); +} + // Android JNI wrapper // JNI naming convention: Java_PACKAGENAME_CLASSNAME_METHODNAME #if defined(__ANDROID__) @@ -265,20 +266,35 @@ bool zts_is_relayed() { } // Returns the local address of the SOCKS5 Proxy server JNIEXPORT jint JNICALL Java_ZeroTier_SDK_zt_1get_1proxy_1server_1address(JNIEnv *env, jobject thisObj, jstring nwid, jobject ztaddr) { - // TODO - //const char *nwid_str = env->GetStringUTFChars(nwid, NULL); - //return zts_get_proxy_server_address(nwid_str, addr); - return 0; + struct sockaddr_in addr; + int err = zts_get_proxy_server_address(env->GetStringUTFChars(nwid, NULL), (struct sockaddr_storage*)&addr); + // SET ZTAddress fields + jfieldID fid; + jclass cls = env->GetObjectClass(ztaddr); + fid = env->GetFieldID(cls, "port", "I"); + env->SetIntField(ztaddr, fid, addr.sin_port); + fid = env->GetFieldID(cls,"_rawAddr", "J"); + env->SetLongField(ztaddr, fid,addr.sin_addr.s_addr); + return err; } // Starts a SOCKS5 proxy server for a given ZeroTier network JNIEXPORT jint JNICALL Java_ZeroTier_SDK_zt_1start_1proxy_1server(JNIEnv *env, jobject thisObj, jstring nwid, jobject ztaddr) { - // TODO - return 0; + const char *nwidstr = env->GetStringUTFChars(nwid, NULL); + struct sockaddr_in addr; + // GET ZTAddress fields + jclass cls = env->GetObjectClass(ztaddr); + jfieldID fid = env->GetFieldID(cls, "port", "I"); + addr.sin_port = htons(env->GetIntField(ztaddr, fid)); + fid = env->GetFieldID(cls, "_rawAddr", "J"); + addr.sin_addr.s_addr = env->GetLongField(ztaddr, fid); + return zts_start_proxy_server((char *)zts_get_homepath, nwidstr, (struct sockaddr_storage *)&addr); } - // Stops the SOCKS5 proxy server for a given ZeroTier network + // JNIEXPORT jint JNICALL Java_ZeroTier_SDK_zt_1stop_1proxy_1server(JNIEnv *env, jobject thisObj, jstring nwid) { - // TODO - return 0; + return zts_stop_proxy_server((char*)env->GetStringUTFChars(nwid, NULL)); + } + JNIEXPORT jstring JNICALL Java_ZeroTier_SDK_zt_1get_1homepath(JNIEnv *env, jobject thisObj) { + return (*env).NewStringUTF(zts_get_homepath()); } #endif diff --git a/src/SDK_Sockets.c b/src/SDK_Sockets.c index 3f857b5..63b4e55 100644 --- a/src/SDK_Sockets.c +++ b/src/SDK_Sockets.c @@ -254,11 +254,14 @@ int (*realclose)(CLOSE_SIG); #if defined(__ANDROID__) // UDP RX JNIEXPORT jint JNICALL Java_ZeroTier_SDK_zt_1recvfrom( - JNIEnv *env, jobject thisObj, jint fd, jarray buf, jint len, jint flags, jobject ztaddr) + JNIEnv *env, jobject thisObj, jint fd, jbyteArray buf, jint len, jint flags, jobject ztaddr) { struct sockaddr_in addr; jbyte *body = (*env)->GetByteArrayElements(env, buf, 0); - int recvd_bytes = zts_recvfrom(fd, body, len, flags, &addr, sizeof(struct sockaddr)); + unsigned char buffer[TEMP_MTU]; + int rxbytes = zts_recvfrom(fd, &buffer, len, flags, &addr, sizeof(struct sockaddr)); + if(rxbytes > 0) + memcpy(body, (jbyte*)buffer + sizeof(int), rxbytes); (*env)->ReleaseByteArrayElements(env, buf, body, 0); // Update fields of Java ZTAddress object jfieldID fid; @@ -267,7 +270,7 @@ int (*realclose)(CLOSE_SIG); (*env)->SetIntField(env, ztaddr, fid, addr.sin_port); fid = (*env)->GetFieldID(env, cls,"_rawAddr", "J"); (*env)->SetLongField(env, ztaddr, fid,addr.sin_addr.s_addr); - return recvd_bytes; + return rxbytes; } #endif @@ -278,15 +281,17 @@ int (*realclose)(CLOSE_SIG); ssize_t zts_recvfrom(RECVFROM_SIG) #endif { - dwr(MSG_DEBUG_EXTRA,"zt_recvfrom(%d, ...)\n", socket); - ssize_t err = read(socket, buffer, length); + // dwr(MSG_DEBUG_EXTRA,"zt_recvfrom(%d, ...)\n", socket); + ssize_t err = read(socket, buffer, TEMP_MTU); + int tmpsz; + memcpy(&tmpsz, buffer, sizeof(tmpsz)); if(err < 0) { perror("read:\n"); } else { zts_getpeername(socket, address, address_len); } - return err; + return tmpsz; } //#endif