diff --git a/examples/ztproxy/ztproxy.cpp b/examples/ztproxy/ztproxy.cpp index c24f8f6..129ebc7 100644 --- a/examples/ztproxy/ztproxy.cpp +++ b/examples/ztproxy/ztproxy.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -46,6 +47,7 @@ #include #include +#include "RingBuffer.hpp" #include "ztproxy.hpp" #include "libzt.h" @@ -104,217 +106,260 @@ namespace ZeroTier { void ZTProxy::threadMain() throw() { + TcpConnection *conn = NULL; + + uint32_t msecs = 1; + struct timeval tv; + tv.tv_sec = msecs / 1000; + tv.tv_usec = (msecs % 1000) * 1000; + int ret = 0; + + // Main I/O loop + // Moves data between client application socket and libzt VirtualSocket while(_run) { - _phy.poll(10); + + _phy.poll(5); + + conn_m.lock(); + // build fd_sets to select upon + FD_ZERO(&read_set); + FD_ZERO(&write_set); + nfds = 0; + for (int i=0; izfd, &read_set); + FD_SET(clist[i]->zfd, &write_set); + nfds = clist[i]->zfd > nfds ? clist[i]->zfd : nfds; + } + + ret = zts_select(nfds + 1, &read_set, &write_set, NULL, &tv); + + if (ret > 0) { + for (int fd_i=0; fd_irx_m.lock(); + if (conn->RXbuf->count() > 0) { + DEBUG_INFO("libzt has incoming data on fd=%d, we will receive it via conn=%p, sock=%p", conn->zfd, conn, conn->client_sock); + } + if ((rd = zts_read(conn->zfd, conn->RXbuf->get_buf(),ZT_MAX_MTU)) < 0) { + DEBUG_ERROR("there was an error while reading data from libzt, err=%d", rd); + } + else { + //DEBUG_INFO("LIBZT -> RXBUFFER = %d bytes", rd); + conn->RXbuf->produce(rd); + } + // attempt to write data to client from buffer + if ((wr = _phy.streamSend(conn->client_sock, conn->RXbuf->get_buf(), conn->RXbuf->count())) < 0) { + DEBUG_ERROR("there was an error while writing the data from the RXbuf to the client PhySocket, err=%d", wr); + } + else { + //DEBUG_INFO("RXBUFFER -> CLIENT = %d bytes", wr); + conn->RXbuf->consume(wr); + } + conn->rx_m.unlock(); + } + + // TX, Handle data outgoing from client to libzt + if (FD_ISSET(fd_i, &write_set)) { + int rd, wr = 0; + conn = zmap[fd_i]; + if (conn == NULL) { + DEBUG_ERROR("invalid conn=%p", conn); + exit(0); + } + // read data from client and place it on ring buffer + + // + conn->tx_m.lock(); + if (conn->TXbuf->count() > 0) { + DEBUG_INFO("client has outgoing data of len=%d on fd=%d, we will send it via conn=%p, sock=%p", conn->TXbuf->count(), conn->zfd, conn, conn->client_sock); + wr = zts_write(conn->zfd, conn->TXbuf->get_buf(), conn->TXbuf->count()); + if (wr < 0) { + DEBUG_ERROR("there was an error while sending the data over libzt, err=%d", wr); + } + else { + //DEBUG_INFO("TXBUFFER -> LIBZT = %d bytes", wr); + conn->TXbuf->consume(wr); // data is presumed sent, mark it as such in the ringbuffer + } + } + conn->tx_m.unlock(); + + } + } + } + conn_m.unlock(); } } void ZTProxy::phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) { - DEBUG_INFO("phyOnTcpData(sock=%p, len=%lu)", sock, len); + int wr = 0, zfd = -1, err = 0; + DEBUG_INFO("sock=%p, len=%lu", sock, len); unsigned char *buf = (unsigned char*)data; std::string host = _internal_addr; // Get the TcpConnection object TcpConnection *conn = cmap[sock]; if(conn == NULL) { - conn = cmap[dmap[sock]]; - if(conn == NULL) { - DEBUG_ERROR("no connection object"); - return; // Nothing - } + DEBUG_ERROR("no connection object"); + return; } - if(!conn->destination_sock) { // no connection yet + if(conn->zfd < 0) { // no connection yet + DEBUG_INFO("no connection yet, proxying..."); if(host != "") { uint16_t dest_port, ipv; dest_port = _internal_port; - - // Save buffer to TcpConnection's write buffer, we'll forward - // this data along only after the phyOnTcpConnect callback is called successfully - // Got data for connection but it hasn't been fully established, save to buffer for later writing - conn->tcp_client_m.lock(); - memcpy(conn->client_buf, buf, len); - conn->client_buf_len = len; - conn->tcp_client_m.unlock(); - host = _internal_addr; - // check for address type - if(host.find(":") != std::string::npos) - ipv = 6; - else - ipv = 4; + ipv = host.find(":") != std::string::npos ? 6 : 4; - bool connected; - if(ipv == 4) - { + if(ipv == 4) { // Connect to proxied host via libzt - DEBUG_INFO("ipv4, %s -> %s:%d", host.c_str(), host.c_str(), dest_port); + DEBUG_INFO("attempting to proxy [0.0.0.0:%d -> %s:%d]", _proxy_listen_port, host.c_str(), dest_port); struct sockaddr_in in4; memset(&in4,0,sizeof(in4)); in4.sin_family = AF_INET; in4.sin_addr.s_addr = inet_addr(host.c_str()); in4.sin_port = Utils::hton(dest_port); - int sockfd = zts_socket(AF_INET, SOCK_STREAM, 0); - if(zts_connect(sockfd, (const struct sockaddr *)&in4, sizeof(in4)) < 0) { - DEBUG_ERROR("error while connecting to remote host"); - } - else { - conn->destination_sock = _phy.wrapSocket(sockfd); - conn->origin_sock = sock; - cmap[conn->destination_sock] = conn; - // Once connection through libzt is established, write data we received from the local host - conn->tcp_client_m.lock(); - int n = 0, tot = conn->client_buf_len; - while(tot > 0) { - if((n = _phy.streamSend(conn->destination_sock, conn->client_buf, conn->client_buf_len)) > 0) { - tot -= n; - if(n < conn->client_buf_len) { // If we couldn't write the entire buffer - memmove(conn->client_buf, conn->client_buf+n, BUF_SZ-n); - conn->client_buf_len-=n; - } - else { - conn->client_buf_len = 0; - } - } - else - DEBUG_ERROR(" an error occured while writing to the destination_sock"); - } - conn->tcp_client_m.unlock(); - } + zfd = zts_socket(AF_INET, SOCK_STREAM, 0); + err = zts_connect(zfd, (const struct sockaddr *)&in4, sizeof(in4)); } - if(ipv == 6) - { - DEBUG_INFO("ipv6, %s -> [%s]:%d\n", host.c_str(), host.c_str(), dest_port); + if(ipv == 6) { + // Connect to proxied host via libzt + //DEBUG_INFO("attempting to proxy [0.0.0.0:%d -> %s:%d]", _proxy_listen_port, host.c_str(), dest_port); struct sockaddr_in6 in6; memset(&in6,0,sizeof(in6)); in6.sin6_family = AF_INET; struct hostent *server; server = gethostbyname2((char*)host.c_str(),AF_INET6); memmove((char *) &in6.sin6_addr.s6_addr, (char *) server->h_addr, server->h_length); - in6.sin6_port = Utils::hton(dest_port); - conn->destination_sock = _phy.tcpConnect((const struct sockaddr *)&in6, connected, this); + in6.sin6_port = Utils::hton(dest_port); + zfd = zts_socket(AF_INET, SOCK_STREAM, 0); + err = zts_connect(zfd, (const struct sockaddr *)&in6, sizeof(in6)); } - dmap[conn->destination_sock] = conn->origin_sock; // for reverse lookup from callbacks - if(!conn->destination_sock) { - DEBUG_ERROR(" there was an error connecting to the remote host"); + if (zfd < 0 || err < 0) { + // now release TX buffer contents we previously saved, since we can't connect + DEBUG_ERROR("error while connecting to remote host (zfd=%d, err=%d)", zfd, err); + conn->tx_m.lock(); + DEBUG_INFO("resetting TX buffer"); + conn->TXbuf->reset(); + conn->tx_m.unlock(); return; - } + } + else { + DEBUG_INFO("successfully connected to remote host"); + } } + + conn_m.lock(); + // on success, add connection entry to map, set physock for later + clist.push_back(conn); + conn->zfd = zfd; + conn->client_sock = sock; + cmap[conn->client_sock] = conn; + zmap[zfd] = conn; + conn_m.unlock(); + } + // Write data coming from client TCP connection to its TX buffer, later emptied into libzt by threadMain I/O loop + conn->tx_m.lock(); + if ((wr = conn->TXbuf->write((const unsigned char *)data, len)) < 0) { + DEBUG_ERROR("there was an error while writing data from client to tx buffer, err=%d", wr); } else { - // Read data from localhost socket and send it into libzt - conn->tcp_client_m.lock(); - int n = 0, tot = len; - while(tot > 0) { - if((n = _phy.streamSend(conn->destination_sock, buf, tot)) > 0) { - tot -= n; - printf("sent %d into libzt", n); - } - } - conn->tcp_client_m.unlock(); + DEBUG_INFO("CLIENT -> TXBUFFER = %d bytes", wr); } + conn->tx_m.unlock(); } void ZTProxy::phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len) { // Not used, connections are handled via user space network stack and VirtualTap - DEBUG_INFO("phyOnDatagram"); + DEBUG_INFO("not used. exiting..."); exit(0); } void ZTProxy::phyOnTcpWritable(PhySocket *sock,void **uptr) { // Not used, connections are handled via user space network stack and VirtualTap - DEBUG_INFO("phyOnTcpWritable"); - exit(0); + DEBUG_INFO(); + //exit(0); } void ZTProxy::phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) { // Not used, connections are handled via user space network stack and VirtualTap - DEBUG_INFO("phyOnFileDescriptorActivity, sock=%p", sock); - exit(0); + DEBUG_INFO("sock=%p", sock); + //exit(0); } void ZTProxy::phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) { // Not used, connections are handled via user space network stack and VirtualTap - DEBUG_INFO("phyOnTcpConnect, sock=%p", sock); - exit(0); + DEBUG_INFO("sock=%p", sock); + //exit(0); } void ZTProxy::phyOnUnixClose(PhySocket *sock,void **uptr) { // Not used, connections are handled via user space network stack and VirtualTap - DEBUG_INFO("phyOnUnixClose, sock=%p", sock); - exit(0); + DEBUG_INFO("sock=%p", sock); + //exit(0); } void ZTProxy::phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) { - DEBUG_INFO("phyOnTcpAccept, sockL=%p, sockN=%p", sockL, sockN); - TcpConnection *conn; - // try to recycle TcpConnection objects instead of allocating new ones - if(cqueue.size()) { - conn = cqueue.front(); - cqueue.pop(); - } - else { - conn = new TcpConnection(); - } - conn->origin_sock = sockN; - cmap[sockN]=conn; // add new connection + DEBUG_INFO("sockL=%p, sockN=%p", sockL, sockN); + TcpConnection *conn = new TcpConnection(); + conn->client_sock = sockN; + cmap[sockN]=conn; } void ZTProxy::phyOnUnixData(PhySocket *sock,void **uptr,void *data,ssize_t len) { - DEBUG_INFO("phyOnUnixData(sock=%p, len=%lu)", sock, len); + DEBUG_INFO("sock=%p, len=%lu", sock, len); unsigned char *buf = (unsigned char*)data; - // Get the TcpConnection object TcpConnection *conn = cmap[sock]; if(conn == NULL) { - conn = cmap[dmap[sock]]; - if(conn == NULL) { - DEBUG_ERROR("no connection object"); - return; // Nothing - } + DEBUG_ERROR("no connection object"); + return; } else // If connection to host already established, just forward the data in the correct direction { - int n = 0; - if(sock == conn->destination_sock) { // RX - conn->tcp_client_m.lock(); - if(!conn->client_buf_len) // If nothing is buffered, attempt to write, otherwise copy to buffer to preserver order - n = _phy.streamSend(conn->origin_sock, buf, len); - if(n < len) { - memcpy(conn->client_buf+conn->client_buf_len, buf+n, len-n); - conn->client_buf_len += len-n; - _phy.setNotifyWritable(conn->origin_sock, true); - } - conn->tcp_client_m.unlock(); - } + } } void ZTProxy::phyOnUnixWritable(PhySocket *sock,void **uptr,bool lwip_invoked) { - DEBUG_INFO("phyOnUnixWritable, sock=%p", sock); + DEBUG_INFO("sock=%p", sock); exit(0); } void ZTProxy::phyOnTcpClose(PhySocket *sock,void **uptr) { - DEBUG_INFO("phyOnTcpClose, sock=%p", sock); - + DEBUG_INFO("sock=%p", sock); + conn_m.lock(); TcpConnection *conn = cmap[sock]; - if(conn) - { - conn->origin_sock=NULL; - conn->destination_sock=NULL; - conn->client_buf_len=0; - cqueue.push(conn); + if(conn) { + conn->client_sock=NULL; + cmap.erase(sock); + for (int i=0; izfd] = NULL; + delete conn; + conn = NULL; } - cmap.erase(sock); - dmap.erase(sock); close(_phy.getDescriptor(sock)); + conn_m.unlock(); } } @@ -346,4 +391,3 @@ int main(int argc, char **argv) } return 0; } -//#endif \ No newline at end of file diff --git a/examples/ztproxy/ztproxy.hpp b/examples/ztproxy/ztproxy.hpp index d1dd545..9af905a 100644 --- a/examples/ztproxy/ztproxy.hpp +++ b/examples/ztproxy/ztproxy.hpp @@ -35,6 +35,9 @@ #include "OSUtils.hpp" #include +#include +#include +#include #define BUF_SZ 1024*1024 @@ -43,18 +46,29 @@ namespace ZeroTier { typedef void PhySocket; class ZTProxy; - struct TcpConnection + class TcpConnection { - PhySocket *origin_sock; - PhySocket *destination_sock; - char client_buf[BUF_SZ]; - int client_buf_len; + public: + int zfd; + PhySocket *client_sock; + RingBuffer *TXbuf; + RingBuffer *RXbuf; + Mutex tx_m, rx_m; - char server_buf[BUF_SZ]; - int server_buf_len; + TcpConnection() { + printf("TcpConnection()\n"); + zfd = -1; + TXbuf = new RingBuffer(BUF_SZ); + RXbuf = new RingBuffer(BUF_SZ); + } - Mutex tcp_client_m; - Mutex tcp_server_m; + ~TcpConnection() { + printf("~TcpConnection()\n"); + delete TXbuf; + delete RXbuf; + TXbuf = NULL; + RXbuf = NULL; + } }; class ZTProxy @@ -90,6 +104,10 @@ namespace ZeroTier { volatile bool _enabled; volatile bool _run; + Mutex conn_m; + fd_set read_set, write_set; + int nfds = 0; + int _proxy_listen_port; int _internal_port; std::string _nwid; @@ -100,10 +118,12 @@ namespace ZeroTier { PhySocket *_tcpListenSocket; PhySocket *_tcpListenSocket6; + // mapping from ZeroTier VirtualSocket fd to TcpConnection pointer + std::map zmap; + // mapping from ZeroTier PhySocket to TcpConnection pointer std::map cmap; - std::map dmap; - std::queue cqueue; // for recycling TcpConnection objects + std::vector clist; }; }