diff --git a/examples/node/binding.cc b/examples/node/binding.cc index 1cf58b9..f1d8b0e 100644 --- a/examples/node/binding.cc +++ b/examples/node/binding.cc @@ -127,6 +127,19 @@ void myZeroTierEventCallback(void *msgPtr) } } +zts_sockaddr_in sockaddr_in(const char *remoteAddr, const int remotePort) +{ + struct zts_sockaddr_in in4; + in4.sin_port = zts_htons(remotePort); +#if defined(_WIN32) + in4.sin_addr.S_addr = zts_inet_addr(remoteAddr); +#else + in4.sin_addr.s_addr = zts_inet_addr(remoteAddr); +#endif + in4.sin_family = ZTS_AF_INET; + return in4; +} + /** * * IDENTITIES and AUTHORIZATION: @@ -211,6 +224,8 @@ void myZeroTierEventCallback(void *msgPtr) class ZeroTier { public: + static Node getMyNode() { return myNode; } + /** * @brief Starts the ZeroTier service and notifies user application of events via callback * @@ -299,89 +314,220 @@ public: return err; } - static int connectStream(const char *remoteAddr, const int remotePort) + static int openStream() { - return connect(ZTS_AF_INET, ZTS_SOCK_STREAM, 0, remoteAddr, remotePort); + return open(ZTS_AF_INET, ZTS_SOCK_STREAM, 0); } - static int connectDgram(const char *remoteAddr, const int remotePort) + static int openDgram() { - return connect(ZTS_AF_INET, ZTS_SOCK_DGRAM, 0, remoteAddr, remotePort); + return open(ZTS_AF_INET, ZTS_SOCK_DGRAM, 0); } - static int connectRaw(const char *remoteAddr, const int remotePort, const int protocol) + static int openRaw(const int protocol) { - return connect(ZTS_AF_INET, ZTS_SOCK_RAW, protocol, remoteAddr, remotePort); + return open(ZTS_AF_INET, ZTS_SOCK_RAW, protocol); } - static int connectStream6(const char *remoteAddr, const int remotePort) + static int openStream6() { - return connect(ZTS_AF_INET6, ZTS_SOCK_STREAM, 0, remoteAddr, remotePort); + return open(ZTS_AF_INET6, ZTS_SOCK_STREAM, 0); } - static int connectDgram6(const char *remoteAddr, const int remotePort) + static int openDgram6() { - return connect(ZTS_AF_INET6, ZTS_SOCK_DGRAM, 0, remoteAddr, remotePort); + return open(ZTS_AF_INET6, ZTS_SOCK_DGRAM, 0); } - static int connectRaw6(const char *remoteAddr, const int remotePort, const int protocol) + static int openRaw6(const int protocol) { - return connect(ZTS_AF_INET6, ZTS_SOCK_RAW, protocol, remoteAddr, remotePort); + return open(ZTS_AF_INET6, ZTS_SOCK_RAW, protocol); + } + + /** + * @brief Create a socket (sets zts_errno) + * + * @param socket_family Address family (ZTS_AF_INET, ZTS_AF_INET6) + * @param socket_type Type of socket (ZTS_SOCK_STREAM, ZTS_SOCK_DGRAM, ZTS_SOCK_RAW) + * @param protocol Protocols supported on this socket + * @return Numbered file descriptor on success. ZTS_ERR_SERVICE or ZTS_ERR_SOCKET on failure. + */ + static int open(const int socket_family, const int socket_type, const int protocol) + { + int fd; + if ((fd = zts_socket(socket_family, socket_type, protocol)) < 0) { + printf("Error creating ZeroTier socket (fd=%d, zts_errno=%d).\n", fd, zts_errno); + } + return fd; + } + + /** + * @brief Close a socket (sets zts_errno) + * + * @param fd Socket file descriptor + * @return ZTS_ERR_OK on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE on failure. + */ + static int close(int fd) + { + return zts_close(fd); + } + + /** + * @brief Shut down some aspect of a socket (sets zts_errno) + * + * @param fd Socket file descriptor + * @return ZTS_ERR_OK on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE, ZTS_ERR_ARG on failure. + */ + static int shutdown(int fd) + { + return zts_shutdown(fd, ZTS_SHUT_RDWR); + } + + /** + * @brief Bind a socket to a virtual interface (sets zts_errno) + * + * @param fd Socket file descriptor + * @param remoteAddr Remote Address to connect to + * @param remotePort Remote Port to connect to + * @return ZTS_ERR_OK on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE, ZTS_ERR_ARG on failure. + */ + static int bind(int fd, const char *localAddr, const int localPort) + { + struct zts_sockaddr_in in4 = sockaddr_in(localAddr, localPort); + + int err = ZTS_ERR_OK; + if ((err = zts_bind(fd, (const struct zts_sockaddr *)&in4, sizeof(in4))) < 0) { + printf("Error binding to interface (fd=%d, ret=%d, zts_errno=%d).\n", + fd, err, zts_errno); + } + return err; + } + + static int bind6(int fd, const char *remoteAddr, const int remotePort) + { + printf("IPv6 NOT IMPLEMENTED.\n"); + return ZTS_ERR_ARG; } /** * @brief Connect a socket to a remote host (sets zts_errno) * - * @param socket_family Address family (ZTS_AF_INET, ZTS_AF_INET6) - * @param socket_type Type of socket (ZTS_SOCK_STREAM, ZTS_SOCK_DGRAM, ZTS_SOCK_RAW) - * @param protocol Protocols supported on this socket + * @param fd Socket file descriptor * @param remoteAddr Remote Address to connect to * @param remotePort Remote Port to connect to * @return ZTS_ERR_OK on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE, ZTS_ERR_ARG on failure. */ - static int connect(const int socket_family, const int socket_type, const int protocol, const char *remoteAddr, const int remotePort) + static int connect(int fd, const char *remoteAddr, const int remotePort) { - if (socket_family == ZTS_AF_INET6) { - printf("IPv6 NOT IMPLEMENTED.\n"); - return -1; - } - struct zts_sockaddr_in in4 = sockaddr_in(remoteAddr, remotePort); - int fd; - if ((fd = zts_socket(socket_family, socket_type, protocol)) < 0) { - printf("Error creating ZeroTier socket (fd=%d, zts_errno=%d).\n", fd, zts_errno); - return -1; - } - // Retries are often required since ZT uses transport-triggered links (explained above) int err = ZTS_ERR_OK; - for (;;) { - printf("Connecting to remote host...\n"); - if ((err = zts_connect(fd, (const struct zts_sockaddr *)&in4, sizeof(in4))) < 0) { - printf("Error connecting to remote host (fd=%d, ret=%d, zts_errno=%d). Trying again.\n", + if ((err = zts_connect(fd, (const struct zts_sockaddr *)&in4, sizeof(in4))) < 0) { + printf("Error connecting to remote host (fd=%d, ret=%d, zts_errno=%d).\n", fd, err, zts_errno); - zts_close(fd); - // printf("Creating socket...\n"); - if ((fd = zts_socket(socket_family, socket_type, protocol)) < 0) { - printf("Error creating ZeroTier socket (fd=%d, zts_errno=%d).\n", fd, zts_errno); - return -1; - } - zts_delay_ms(250); - } - else { - printf("Connected.\n"); - break; - } + } else { + // Set non-blocking mode + fcntl(fd, ZTS_F_SETFL, ZTS_O_NONBLOCK); } + return err; - // Set non-blocking mode - fcntl(fd, ZTS_F_SETFL, ZTS_O_NONBLOCK); - - return fd; + // int err = ZTS_ERR_OK; + // for (;;) { + // printf("Connecting to remote host...\n"); + // if ((err = zts_connect(fd, (const struct zts_sockaddr *)&in4, sizeof(in4))) < 0) { + // printf("Error connecting to remote host (fd=%d, ret=%d, zts_errno=%d). Trying again.\n", + // fd, err, zts_errno); + // zts_close(fd); + // // printf("Creating socket...\n"); + // if ((fd = zts_socket(socket_family, socket_type, protocol)) < 0) { + // printf("Error creating ZeroTier socket (fd=%d, zts_errno=%d).\n", fd, zts_errno); + // return -1; + // } + // zts_delay_ms(250); + // } + // else { + // printf("Connected.\n"); + // break; + // } + // } } - static int fcntlSetBlocking(int fd, bool isBlocking) + static int connect6(int fd, const char *remoteAddr, const int remotePort) + { + printf("IPv6 NOT IMPLEMENTED.\n"); + return ZTS_ERR_ARG; + } + + /** + * @brief Read bytes from socket onto buffer (sets zts_errno) + * + * @param fd Socket file descriptor + * @param buf Pointer to data buffer + * @return Byte count received on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE, ZTS_ERR_ARG on failure. + */ + static ssize_t read(int fd, nbind::Buffer buf) + { + return zts_read(fd, buf.data(), buf.length()); + } + + /** + * @brief Write bytes from buffer to socket (sets zts_errno) + * + * @param fd Socket file descriptor + * @param buf Pointer to data buffer + * @return Byte count sent on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE, ZTS_ERR_ARG on failure. + */ + static ssize_t write(int fd, nbind::Buffer buf) + { + return zts_write(fd, buf.data(), buf.length()); + } + + /** + * @brief Write data from multiple buffers to socket. (sets zts_errno) + * + * @param fd Socket file descriptor + * @param bufs Array of source buffers + * @return Byte count sent on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE, ZTS_ERR_ARG on failure. + */ + static ssize_t writev(int fd, std::vector bufs) + { + std::size_t size = bufs.size(); + zts_iovec iov[size]; + for (std::size_t i = 0; i != size; ++i) { + iov[i].iov_base = bufs[i].data(); + iov[i].iov_len = bufs[i].length(); + } + return zts_writev(fd, iov, bufs.size()); + } + + /** + * @brief Receive data from remote host (sets zts_errno) + * + * @param fd Socket file descriptor + * @param buf Pointer to data buffer + * @param flags + * @return Byte count received on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE, ZTS_ERR_ARG on failure. + */ + static ssize_t recv(int fd, nbind::Buffer buf, int flags) + { + return zts_recv(fd, buf.data(), buf.length(), flags); + } + + /** + * @brief Send data to remote host (sets zts_errno) + * + * @param fd Socket file descriptor + * @param buf data buffer + * @param flags + * @return Byte count sent on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE, ZTS_ERR_ARG on failure. + */ + static ssize_t send(int fd, nbind::Buffer buf, int flags) + { + return zts_send(fd, buf.data(), buf.length(), flags); + } + + static int setBlocking(int fd, bool isBlocking) { int flags = fcntl(fd, ZTS_F_GETFL, 0); if (isBlocking) { @@ -392,6 +538,27 @@ public: return fcntl(fd, ZTS_F_SETFL, flags); } + static int setNoDelay(int fd, bool isNdelay) + { + int flags = fcntl(fd, ZTS_F_GETFL, 0); + if (isNdelay) { + flags &= ~ZTS_O_NDELAY; + } else { + flags &= ZTS_O_NDELAY; + } + return fcntl(fd, ZTS_F_SETFL, flags); + } + + static int setKeepalive(int fd, int yes) + { + return setsockopt(fd, ZTS_SOL_SOCKET, ZTS_SO_KEEPALIVE, &yes, sizeof(yes)); + } + + static int setKeepidle(int fd, int idle) + { + return setsockopt(fd, ZTS_IPPROTO_TCP, ZTS_TCP_KEEPIDLE, &idle, sizeof(idle)); + } + /** * @brief Issue file control commands on a socket * @@ -405,56 +572,109 @@ public: return zts_fcntl(fd, cmd, flags); } - static ssize_t read(int fd, nbind::Buffer buf) + /** + * @brief Set socket options (sets zts_errno) + * + * @param fd Socket file descriptor + * @param level Protocol level to which option name should apply + * @param optname Option name to set + * @param optval Source of option value to set + * @param optlen Length of option value + * @return ZTS_ERR_OK on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE, ZTS_ERR_ARG on failure. + */ + static int setsockopt(int fd, int level, int optname, const void *optval, zts_socklen_t optlen) { - return zts_read(fd, buf.data(), buf.length()); + return zts_setsockopt(fd, level, optname, optval, optlen); } - static ssize_t write(int fd, nbind::Buffer buf) + /** + * @brief Get socket options (sets zts_errno) + * + * @param fd Socket file descriptor + * @param level Protocol level to which option name should apply + * @param optname Option name to get + * @param optval Where option value will be stored + * @param optlen Length of value + * @return ZTS_ERR_OK on success. ZTS_ERR_SOCKET, ZTS_ERR_SERVICE, ZTS_ERR_ARG on failure. + */ + static int getsockopt(int fd, int level, int optname, void *optval, zts_socklen_t *optlen) { - return zts_write(fd, buf.data(), buf.length()); + return zts_getsockopt(fd, level, optname, optval, optlen); } - static ssize_t writev(int fd, std::vector bufs) + /** + * @brief Get socket name (sets zts_errno) + * + * @param fd Socket file descriptor + * @param addr Name associated with this socket + * @param addrlen Length of name + * @return Sockaddress structure + */ + static zts_sockaddr_in getsockname(int fd) { - std::size_t size = bufs.size(); - zts_iovec iov[size]; - for (std::size_t i = 0; i != size; ++i) { - iov[i].iov_base = bufs[i].data(); - iov[i].iov_len = bufs[i].length(); - } - return zts_writev(fd, iov, bufs.size()); - } - - static ssize_t recv(int fd, nbind::Buffer buf, int flags) - { - return zts_recv(fd, buf.data(), buf.length(), flags); - } - - static ssize_t send(int fd, nbind::Buffer buf, int flags) - { - return zts_send(fd, buf.data(), buf.length(), flags); - } - - static int close(int fd) - { - return zts_close(fd); - } - - static zts_sockaddr_in sockaddr_in(const char *remoteAddr, const int remotePort) - { - struct zts_sockaddr_in in4; - in4.sin_port = zts_htons(remotePort); - #if defined(_WIN32) - in4.sin_addr.S_addr = zts_inet_addr(remoteAddr); - #else - in4.sin_addr.s_addr = zts_inet_addr(remoteAddr); - #endif - in4.sin_family = ZTS_AF_INET; + struct zts_sockaddr_in in4; + zts_socklen_t addrlen; + zts_getsockname(fd, (struct zts_sockaddr *)&in4, &addrlen); return in4; - } + } - static Node getMyNode() { return myNode; } + static zts_sockaddr_in6 getsockname6(int fd) + { + struct zts_sockaddr_in6 in6; + zts_socklen_t addrlen; + zts_getsockname(fd, (struct zts_sockaddr *)&in6, &addrlen); + return in6; + } + + /** + * @brief Get the peer name for the remote end of a connected socket + * + * @param fd Socket file descriptor + * @param addr Name associated with remote end of this socket + * @param addrlen Length of name + * @return Sockaddress structure + */ + static zts_sockaddr_in getpeername(int fd) + { + struct zts_sockaddr_in in4; + zts_socklen_t addrlen; + zts_getpeername(fd, (struct zts_sockaddr *)&in4, &addrlen); + return in4; + } + + static zts_sockaddr_in6 getpeername6(int fd) + { + struct zts_sockaddr_in6 in6; + zts_socklen_t addrlen; + zts_getpeername(fd, (struct zts_sockaddr *)&in6, &addrlen); + return in6; + } + + /** + * Convert IPv4 and IPv6 address structures to human-readable text form. + * + * @param af Address family (ZTS_AF_INET, ZTS_AF_INET6) + * @param src Pointer to source address structure + * @param dst Pointer to destination character array + * @param size Size of the destination buffer + * @return On success, returns a non-null pointer to the destination character array + */ + static const char * inet_ntop(const zts_sockaddr in) + { + if (in.sa_family == ZTS_AF_INET) { + const zts_sockaddr_in *in4 = (const zts_sockaddr_in *)∈ + char ipstr[ZTS_INET_ADDRSTRLEN]; + zts_inet_ntop(ZTS_AF_INET, &(in4->sin_addr), ipstr, ZTS_INET_ADDRSTRLEN); + return ipstr; + } else if (in.sa_family == ZTS_AF_INET6) { + const zts_sockaddr_in6 *in6 = (const zts_sockaddr_in6 *)∈ + char ipstr[ZTS_INET6_ADDRSTRLEN]; + zts_inet_ntop(ZTS_AF_INET6, &(in6->sin6_addr), ipstr, ZTS_INET6_ADDRSTRLEN); + return ipstr; + } else { + return ""; + } + } }; #include "nbind/nbind.h" @@ -467,24 +687,44 @@ NBIND_CLASS(Node) { NBIND_CLASS(ZeroTier) { method(start); + method(restart); + method(stop); + method(free); + method(join); - method(connectStream); - method(connectDgram); - method(connectRaw); - method(connectStream6); - method(connectDgram6); - method(connectRaw6); + + method(openStream); + method(openDgram); + method(openRaw); + method(openStream6); + method(openDgram6); + method(openRaw6); + method(open); + method(close); + method(shutdown); + + method(bind); + method(bind6); method(connect); + method(connect6); + method(read); method(write); method(writev); method(recv); method(send); - method(fcntlSetBlocking); + + method(setBlocking); + method(setNoDelay); + method(setKeepalive); + method(setKeepidle); method(fcntl); - method(close); - method(restart); - method(stop); - method(free); + + method(getsockname); + method(getsockname6); + method(getpeername); + method(getpeername6); + method(inet_ntop); + method(getMyNode); } diff --git a/examples/node/libzt.js b/examples/node/libzt.js index 0908765..683f596 100644 --- a/examples/node/libzt.js +++ b/examples/node/libzt.js @@ -1,53 +1,101 @@ 'use strict'; const net = require('net'); +const stream = require('stream'); +const { types: { isUint8Array } } = require('util'); + const nbind = require('@mcesystems/nbind') const ZeroTier = nbind.init().lib.ZeroTier +const { + errnoException, + writevGeneric, + writeGeneric, + onStreamRead, + kAfterAsyncWrite, + kHandle, + kUpdateTimer, + // setStreamTimeout, + kBuffer, + kBufferCb, + kBufferGen +} = require('./stream_commons'); + +const kLastWriteQueueSize = Symbol('lastWriteQueueSize'); + /* - * EXAMPLE USAGE + * EXAMPLE of Low-level usage * Usage: `nc -lv 4444` */ -function example() { +function example(nwid, address, port) { // Start ZeroTier service ZeroTier.start(".zerotier", 9994); // Join virtual network - ZeroTier.join("8056c2e21c000001"); + ZeroTier.join(nwid); - // Open the socket - let fd = ZeroTier.connectStream("29.49.7.203", 4444); + // Connect the socket + const _connect = (address, port, callback) => { + // Open the socket + const fd = ZeroTier.openStream(); + if (fd < 0) { callback(new Error('Could not open socket, errno: ' + fd)); return; } - // Send some data - ZeroTier.send(fd, Buffer.from("Name?\n", 'utf8'), 0) + // Try connect + const status = ZeroTier.connect(fd, address, port); - // Set blocking read mode - // ZeroTier.fcntlSetBlocking(fd, true); - let heartbeat = setInterval(() => process.stderr.write('.'), 100) + console.log(status); + if (status === 0) { + callback(null, fd); + } else { + // Close previous socket + ZeroTier.close(fd); + setTimeout(_connect, 250, address, port, callback); + } + } // Receive some data - const _read = () => { + const _read = (fd, callback) => { const buf = Buffer.alloc(32) let bytes = -1 do { bytes = ZeroTier.recv(fd, buf, 0) - if (bytes > 0) { process.stdout.write(buf.toString('utf8')) } + if (bytes > 0) { callback(null, buf); } } while (bytes > 0); if (!ZeroTier.getMyNode().online || buf.toString('utf8').includes("exit")) { - // Close the socket - ZeroTier.close(fd) - // Stop ZeroTier service - ZeroTier.stop() - // Clear the interval - clearInterval(heartbeat) + callback('end'); } else { - setTimeout(_read, 500) + setTimeout(_read, 500, fd, callback) } } - _read() + + _connect(address, port, (err, fd) => { + if (err) { console.error(err); return; } + console.debug("Connected."); + + // Send some data + ZeroTier.send(fd, Buffer.from("Name?\n", 'utf8'), 0); + + // Set blocking read mode + // ZeroTier.setBlocking(fd, true); + const heartbeat = setInterval(() => process.stderr.write('.'), 100); + + _read(fd, (stop, buf) => { + if (stop) { + // Close the socket + ZeroTier.close(fd); + // Stop ZeroTier service + ZeroTier.stop(); + // Clear the interval + clearInterval(heartbeat); + return; + } + + process.stdout.write(buf.toString('utf8')); + }); + }); } @@ -67,7 +115,8 @@ function connect(...args) { const normalized = net._normalizeArgs(args); const options = normalized[0]; // debug('createConnection', normalized); - const socket = new Socket(options); + + const socket = new Socket(Object.assign({ handle: new ZTCP() }, options)); if (options.timeout) { socket.setTimeout(options.timeout); @@ -76,199 +125,171 @@ function connect(...args) { return socket.connect(normalized); } +/* + * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L567 + */ +function tryReadStart(socket) { + // Not already reading, start the flow + // debug('Socket._handle.readStart'); + socket._handle.reading = true; + const err = socket._handle.readStart(); + if (err) + socket.destroy(errnoException(err, 'read')); +} + /* * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L1107 */ -function afterConnect(status, self, req, readable, writable) { - // const self = handle[owner_symbol]; +// function afterConnect(status, self, req, readable, writable) { +// // const self = handle[owner_symbol]; - // Callback may come after call to destroy - if (self.destroyed) { - return; +// // Callback may come after call to destroy +// if (self.destroyed) { +// return; +// } + +// // debug('afterConnect'); + +// // assert(self.connecting); +// self.connecting = false; +// self._sockname = null; + +// if (status === 0) { +// self.readable = readable; +// if (!self._writableState.ended) +// self.writable = writable; +// self._unrefTimer(); + +// self.emit('connect'); +// self.emit('ready'); + +// // Start the first read, or get an immediate EOF. +// // this doesn't actually consume any bytes, because len=0. +// if (readable && !self.isPaused()) +// self.read(0); + +// } else { +// self.connecting = false; +// let details; +// if (req.localAddress && req.localPort) { +// details = req.localAddress + ':' + req.localPort; +// } +// const ex = new Error(status, +// 'connect', +// req.address, +// req.port, +// details); +// if (details) { +// ex.localAddress = req.localAddress; +// ex.localPort = req.localPort; +// } +// self.destroy(ex); +// } +// } + +// function afterShutdown(self, _status) { +// // const self = this.handle[owner_symbol]; + +// // debug('afterShutdown destroyed=%j', self.destroyed, +// // self._readableState); + +// this.callback(); + +// // Callback may come after call to destroy. +// if (self.destroyed) +// return; + +// if (!self.readable || self.readableEnded) { +// // debug('readableState ended, destroying'); +// self.destroy(); +// } +// } + +// function writeGeneric(self, chunk, encoding, callback) { +// const decodeStrings = self._writableState && self._writableState.decodeStrings +// const buf = (!decodeStrings && !Buffer.isBuffer(chunk)) ? Buffer.from(chunk, encoding) : chunk + +// let bytes +// const err = ZeroTier.send(self._fd, buf, 0) +// switch (err) { +// case -1: +// callback(new Error("ZeroTier Socket error")) +// break +// case -2: +// callback(new Error("ZeroTier Service error")) +// break +// case -3: +// callback(new Error("ZeroTier Invalid argument")) +// break +// default: +// bytes = err +// callback() +// } + +// return { +// async: true, +// bytes: bytes, +// } +// } + +// function writevGeneric(self, chunks, callback) { +// const decodeStrings = self._writableState && self._writableState.decodeStrings +// const bufs = chunks.map(({ chunk, encoding }) => (!decodeStrings && !Buffer.isBuffer(chunk)) ? Buffer.from(chunk, encoding) : chunk) + +// let bytes +// const err = ZeroTier.writev(self._fd, bufs) +// switch (err) { +// case -1: +// callback(new Error("ZeroTier Socket error")) +// break +// case -2: +// callback(new Error("ZeroTier Service error")) +// break +// case -3: +// callback(new Error("ZeroTier Invalid argument")) +// break +// default: +// bytes = err +// callback() +// } + +// return { +// async: true, +// bytes: bytes, +// } +// } + + + +class ZTCP { + bytesRead = 0 + bytesWritten = 0 + writeQueueSize = 0 + + _fd = null + _reading = false + readTimer = null + + get reading() { + return this._reading; } - // debug('afterConnect'); - - // assert(self.connecting); - self.connecting = false; - self._sockname = null; - - if (status === 0) { - self.readable = readable; - if (!self._writableState.ended) - self.writable = writable; - self._unrefTimer(); - - self.emit('connect'); - self.emit('ready'); - - // Start the first read, or get an immediate EOF. - // this doesn't actually consume any bytes, because len=0. - if (readable && !self.isPaused()) - self.read(0); - - } else { - self.connecting = false; - let details; - if (req.localAddress && req.localPort) { - details = req.localAddress + ':' + req.localPort; - } - const ex = new Error(status, - 'connect', - req.address, - req.port, - details); - if (details) { - ex.localAddress = req.localAddress; - ex.localPort = req.localPort; - } - self.destroy(ex); - } -} - -function writeGeneric(self, chunk, encoding, callback) { - const decodeStrings = self._writableState && self._writableState.decodeStrings - const buf = (!decodeStrings && !Buffer.isBuffer(chunk)) ? Buffer.from(chunk, encoding) : chunk - - let bytes - const err = ZeroTier.send(self._fd, buf, 0) - switch (err) { - case -1: - callback(new Error("ZeroTier Socket error")) - break - case -2: - callback(new Error("ZeroTier Service error")) - break - case -3: - callback(new Error("ZeroTier Invalid argument")) - break - default: - bytes = err - callback(null) + set reading(val) { + return this._reading = val; } - return { - async: true, - bytes: bytes, - } -} - -function writevGeneric(self, chunks, callback) { - const decodeStrings = self._writableState && self._writableState.decodeStrings - const bufs = chunks.map(({ chunk, encoding }) => (!decodeStrings && !Buffer.isBuffer(chunk)) ? Buffer.from(chunk, encoding) : chunk) - - let bytes - const err = ZeroTier.writev(self._fd, bufs) - switch (err) { - case -1: - callback(new Error("ZeroTier Socket error")) - break - case -2: - callback(new Error("ZeroTier Service error")) - break - case -3: - callback(new Error("ZeroTier Invalid argument")) - break - default: - bytes = err - callback(null) - } - - return { - async: true, - bytes: bytes, - } -} - -class Socket extends net.Socket { - /* - * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L929 - */ - connect(...args) { - let normalized; - // If passed an array, it's treated as an array of arguments that have - // already been normalized (so we don't normalize more than once). This has - // been solved before in https://github.com/nodejs/node/pull/12342, but was - // reverted as it had unintended side effects. - if (Array.isArray(args[0])) { - normalized = args[0]; - } else { - normalized = net._normalizeArgs(args); - } - const options = normalized[0]; - const cb = normalized[1]; - - // if (this.write !== net.Socket.prototype.write) - // this.write = net.Socket.prototype.write; - - if (this.destroyed) { - this._handle = null; - this._peername = null; - this._sockname = null; + readStart() { + if (!this._buf) { + this._buf = Buffer.alloc(128); } - // const { path } = options; - // const pipe = !!path; - // debug('pipe', pipe, path); - - // if (!this._handle) { - // this._handle = pipe ? - // new Pipe(PipeConstants.SOCKET) : - // new TCP(TCPConstants.SOCKET); - // initSocketHandle(this); - // } - - if (cb !== null) { - this.once('connect', cb); - } - - this._unrefTimer(); - - this.connecting = true; - this.writable = true; - - // if (pipe) { - // validateString(path, 'options.path'); - // defaultTriggerAsyncIdScope( - // this[async_id_symbol], internalConnect, this, path - // ); - // } else { - // lookupAndConnect(this, options); - // } - - const { host, port } = options; - // If host is an IP, skip performing a lookup - const addressType = net.isIP(host); - if (addressType) { - this._fd = ZeroTier.connectStream(host, port); - afterConnect(0, this, {}, true, true); - } else { - throw new Error("DNS LOOKUP NOT IMPLEMENTED"); - } - - return this; - } - - /* - * https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_readable_read_size_1 - */ - _read(size) { - // debug('_read'); - - if (this.connecting) { - // debug('_read wait for connection'); - this.once('connect', () => this._read(size)); - return - } - - if (!this.readChunk || this.readChunk.length < size) { - this.readChunk = Buffer.alloc(size) - } - - let bytes = -1 - let moreData = true + let bytes = 0 do { - bytes = ZeroTier.recv(this._fd, this.readChunk, 0) + bytes = ZeroTier.read(this._fd, this._buf) + if (bytes >= 0) { + this.bytesRead += bytes; + bytes = 0; + } switch (bytes) { case -2: throw new Error("ZeroTier Service error") @@ -276,13 +297,356 @@ class Socket extends net.Socket { throw new Error("ZeroTier Invalid argument") default: if (bytes > 0) { - // this.bytesRead += bytes - moreData = this.push(this.readChunk) + this.bytesRead += bytes + this._buf = this.onread(this._buf) } } - } while (bytes > 0 && moreData) + } while (bytes > 0 && this._reading) - if (moreData) { setTimeout(() => this._read(size), 500) } + if (this._reading) { readTimer = setTimeout(() => this._read(size), 500) } + } + + readStop() { + if (readTimer) { + clearTimeout(readTimer); + readTimer = null; + } + this._reading = false + } + + writev(req, chunks, allBuffers) { + let bufs = []; + + if (allBuffers) { + bufs = chunks; + } else { + const arr = chunks; + for (let i = 0; i < arr.length; i+=2) { + const chunk = arr[i]; + const encoding = arr[i+1]; + chunks.push(Buffer.from(chunk, encoding)); + } + } + + let bytes = ZeroTier.writev(this._fd, bufs); + if (bytes >= 0) { + this.bytesWritten += bytes; + bytes = 0; + } + + const status = bytes; + // https://github.com/nodejs/node/blob/v12.18.3/lib/internal/stream_base_commons.js#L80 + if (req.oncomplete) { req.oncomplete.call(req, status); } + return status; + } + + writeBuffer(req, buf) { + let bytes = ZeroTier.write(this._fd, buf); + if (bytes >= 0) { + this.bytesWritten += bytes; + bytes = 0; + } + + const status = bytes; + // https://github.com/nodejs/node/blob/v12.18.3/lib/internal/stream_base_commons.js#L80 + if (req.oncomplete) { req.oncomplete.call(req, status); } + return status; + } + + writeLatin1String(req, data) { + return this.writeBuffer(req, Buffer.from(data, 'latin1')); + } + + writeUtf8String(req, data) { + return this.writeBuffer(req, Buffer.from(data, 'utf8')); + } + + writeAsciiString(req, data) { + return this.writeBuffer(req, Buffer.from(data, 'ascii')); + } + + writeUcs2String(req, data) { + return this.writeBuffer(req, Buffer.from(data, 'ucs2')); + } + + getAsyncId() { + return -1; + } + + useUserBuffer(buf) { + this._buf = buf; + } + + setBlocking(newValue) { + return ZeroTier.setBlocking(this._fd, newValue); + } + + setNoDelay(newValue) { + return ZeroTier.setNoDelay(this._fd, newValue); + } + + setKeepalive(enable, initialDelay) { + ZeroTier.setKeepidle(initialDelay); + return ZeroTier.setKeepalive(this._fd, +enable); + } + + bind(localAddress, localPort) { + return ZeroTier.bind(this._fd, localAddress, localPort); + } + + bind6(localAddress, localPort, _flags) { + return ZeroTier.bind6(this._fd, localAddress, localPort); + } + + open(fd) { + if (fd) { + this._fd = fd; + return 0; + } else { + const err = ZeroTier.openStream(); + if (err < 0) { + return err; + } else { + this._fd = err; + return 0; + } + } + } + + close(callback) { + const err = ZeroTier.close(this._fd); + this._fd = null; + if (callback) { callback(err); } + } + + shutdown(req) { + const status = ZeroTier.shutdown(this._fd); + // https://github.com/nodejs/node/blob/v12.18.3/test/parallel/test-tcp-wrap-connect.js + if (req.oncomplete) { req.oncomplete.call(req, status, this); } + return status; + } + + connect(req, address, port) { + let status = ZeroTier.connect(this._fd, address, port); + + // Retries are often required since ZT uses transport-triggered links + if (status !== 0) { + let count = 0; + while (count < 10) { + // Close previous socket + this.close(); + status = this.open(); + if (status !== 0) { + // Break if reopen-socket fails + break; + } + + // Reconnect + status = ZeroTier.connect(this._fd, address, port); + if (status === 0) { break; } + + count++; + } + } + + // https://github.com/nodejs/node/blob/v12.18.3/test/parallel/test-tcp-wrap-connect.js + if (req && req.oncomplete) { req.oncomplete.call(status, this, req, true, true); } + + return status; + } + + connect6(req, address, port) { + let status = ZeroTier.connect6(this._fd, address, port); + + // Retries are often required since ZT uses transport-triggered links + if (status !== 0) { + let count = 0; + while (count < 10) { + // Close previous socket + this.close(); + status = this.open(); + if (status !== 0) { + // Break if reopen-socket fails + break; + } + + // Reconnect + status = ZeroTier.connect6(this._fd, address, port); + if (status === 0) { break; } + + count++; + } + } + + // https://github.com/nodejs/node/blob/v12.18.3/test/parallel/test-tcp-wrap-connect.js + if (req.oncomplete) { req.oncomplete.call(status, this, req, true, true); } + + return status; + } + + getpeername(out) { + const in4 = ZeroTier.getpeername(this._fd); + out.address = ZeroTier.inet_ntop(in4); + out.family = in4.sin_family; + out.port = in4.sin_port; + return 0 + } + + getsockname(out) { + const in4 = ZeroTier.getsockname(this._fd); + out.address = ZeroTier.inet_ntop(in4); + out.family = in4.sin_family; + out.port = in4.sin_port; + return 0; + } + + listen(port) { + // TODO + // this.onconnection + } + + fchmod(mode) { + // TODO + return 0; + } +} + +class Socket extends net.Socket { + [kLastWriteQueueSize] = 0; + [kBuffer] = null; + [kBufferCb] = null; + [kBufferGen] = null; + + [kHandle] = null; + get _handle() { return this[kHandle]; } + set _handle(v) { return this[kHandle] = v; } + + /* + * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L929 + */ + // connect(...args) { + // let normalized; + // // If passed an array, it's treated as an array of arguments that have + // // already been normalized (so we don't normalize more than once). This has + // // been solved before in https://github.com/nodejs/node/pull/12342, but was + // // reverted as it had unintended side effects. + // if (Array.isArray(args[0])) { + // normalized = args[0]; + // } else { + // normalized = net._normalizeArgs(args); + // } + // const options = normalized[0]; + // const cb = normalized[1]; + + // if (this.write !== net.Socket.prototype.write) + // this.write = net.Socket.prototype.write; + + // if (this.destroyed) { + // this._handle = null; + // this._peername = null; + // this._sockname = null; + // } + + // if (!this._handle) { + // this._handle = new ZTCP(); + // initSocketHandle(this); + // } + + // if (cb !== null) { + // this.once('connect', cb); + // } + + // this._unrefTimer(); + + // this.connecting = true; + // this.writable = true; + + // const { host, port } = options; + // // If host is an IP, skip performing a lookup + // const addressType = net.isIP(host); + // if (addressType) { + // this._fd = ZeroTier.connectStream(host, port); + // afterConnect(0, this, {}, true, true); + // } else { + // throw new Error("DNS LOOKUP NOT IMPLEMENTED"); + // } + + // return this; + // } + + /* + * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L596 + */ + pause() { + if (this[kBuffer] && !this.connecting && this._handle && + this._handle.reading) { + this._handle.reading = false; + if (!this.destroyed) { + const err = this._handle.readStop(); + if (err) + this.destroy(errnoException(err, 'read')); + } + } + return stream.Duplex.prototype.pause.call(this); + } + + /* + * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L610 + */ + resume() { + if (this[kBuffer] && !this.connecting && this._handle && + !this._handle.reading) { + tryReadStart(this); + } + return stream.Duplex.prototype.resume.call(this); + } + + /* + * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L619 + */ + read(n) { + if (this[kBuffer] && !this.connecting && this._handle && + !this._handle.reading) { + tryReadStart(this); + } + return stream.Duplex.prototype.read.call(this, n); + } + + /* + * https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_readable_read_size_1 + */ + _read(n) { + // debug('_read'); + + if (this.connecting || !this._handle) { + // debug('_read wait for connection'); + this.once('connect', () => this._read(n)); + } else if (!this._handle.reading) { + tryReadStart(this); + } + + // if (!this.readChunk || this.readChunk.length < n) { + // this.readChunk = Buffer.alloc(n) + // } + + // let bytes = -1 + // let moreData = true + // do { + // bytes = ZeroTier.recv(this._fd, this.readChunk, 0) + // switch (bytes) { + // case -2: + // throw new Error("ZeroTier Service error") + // case -3: + // throw new Error("ZeroTier Invalid argument") + // default: + // if (bytes > 0) { + // // this.bytesRead += bytes + // moreData = this.push(this.readChunk) + // } + // } + // } while (bytes > 0 && moreData) + + // if (moreData) { setTimeout(() => this._read(n), 500) } } /* @@ -302,20 +666,27 @@ class Socket extends net.Socket { /* * https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_writable_final_callback */ - _final(callback) { - const err = ZeroTier.close(this._fd) - - switch (err) { - case -1: - return callback(new Error("ZeroTier Socket error")) - break - case -2: - return callback(new Error("ZeroTier Service error")) - break - default: - return super._final(callback) - } - } + // _final(cb) { + // // If still connecting - defer handling `_final` until 'connect' will happen + // if (this.pending) { + // // debug('_final: not yet connected'); + // return this.once('connect', () => this._final(cb)); + // } + + // if (!this._handle) + // return cb(); + + // // debug('_final: not ended, call shutdown()'); + + // // const req = new ShutdownWrap(); + // const req = {}; + // req.oncomplete = afterShutdown; + // req.handle = this._handle; + // req.callback = cb; + // // const err = this._handle.shutdown(req); + // const err = ZeroTier.shutdown(this._fd); + // return afterShutdown.call(req, this, 0); + // } /* * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L760 @@ -334,34 +705,74 @@ class Socket extends net.Socket { } this._pendingData = null; this._pendingEncoding = ''; - - // if (!this._handle) { - // cb(new ERR_SOCKET_CLOSED()); - // return false; - // } - + + if (!this._handle) { + cb(new Error('ERR_SOCKET_CLOSED')); + return false; + } + this._unrefTimer(); - + let req; if (writev) req = writevGeneric(this, data, cb); else req = writeGeneric(this, data, encoding, cb); - if (req.async) { - // this[kLastWriteQueueSize] = req.bytes; + if (req.async) + this[kLastWriteQueueSize] = req.bytes; + } + + /* + * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L552 + */ + get bufferSize() { + if (this._handle) { + return this[kLastWriteQueueSize] + this.writableLength; } } + + /* + * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L756 + */ + [kAfterAsyncWrite]() { + this[kLastWriteQueueSize] = 0; + } + + /* + * https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L468 + */ + _onTimeout() { + const handle = this._handle; + const lastWriteQueueSize = this[kLastWriteQueueSize]; + if (lastWriteQueueSize > 0 && handle) { + // `lastWriteQueueSize !== writeQueueSize` means there is + // an active write in progress, so we suppress the timeout. + const { writeQueueSize } = handle; + if (lastWriteQueueSize !== writeQueueSize) { + this[kLastWriteQueueSize] = writeQueueSize; + this._unrefTimer(); + return; + } + } + // debug('_onTimeout'); + this.emit('timeout'); + } + + get [kUpdateTimer]() { + return this._unrefTimer; + } } module.exports = { - example, start: ZeroTier.start, join: ZeroTier.join, + restart: ZeroTier.restart, + stop: ZeroTier.stop, + free: ZeroTier.free, + example, connect, createConnection: connect, Socket, Stream: Socket, // Legacy naming - restart: ZeroTier.restart, - stop: ZeroTier.stop, - free: ZeroTier.free, + TCP: ZTCP, }; diff --git a/examples/node/stream_commons.js b/examples/node/stream_commons.js new file mode 100644 index 0000000..072641d --- /dev/null +++ b/examples/node/stream_commons.js @@ -0,0 +1,242 @@ +'use strict'; + +const kMaybeDestroy = Symbol('kMaybeDestroy'); +const kUpdateTimer = Symbol('kUpdateTimer'); +const kAfterAsyncWrite = Symbol('kAfterAsyncWrite'); +const kHandle = Symbol('kHandle'); +const kSession = Symbol('kSession'); + +// const debug = require('internal/util/debuglog').debuglog('stream'); +const kBuffer = Symbol('kBuffer'); +const kBufferGen = Symbol('kBufferGen'); +const kBufferCb = Symbol('kBufferCb'); + +let excludedStackFn; + +function errnoException(err, syscall, original) { + // TODO(joyeecheung): We have to use the type-checked + // getSystemErrorName(err) to guard against invalid arguments from users. + // This can be replaced with [ code ] = errmap.get(err) when this method + // is no longer exposed to user land. + if (util === undefined) util = require('util'); + const code = util.getSystemErrorName(err); + const message = original ? + `${syscall} ${code} ${original}` : `${syscall} ${code}`; + + // eslint-disable-next-line no-restricted-syntax + const ex = new Error(message); + // TODO(joyeecheung): errno is supposed to err, like in uvException + ex.code = ex.errno = code; + ex.syscall = syscall; + + // eslint-disable-next-line no-restricted-syntax + Error.captureStackTrace(ex, excludedStackFn || errnoException); + return ex; +} + +function handleWriteReq(req, data, encoding) { + const { handle } = req; + + switch (encoding) { + case 'buffer': + { + const ret = handle.writeBuffer(req, data); + // if (streamBaseState[kLastWriteWasAsync]) + // req.buffer = data; + return ret; + } + case 'latin1': + case 'binary': + return handle.writeLatin1String(req, data); + case 'utf8': + case 'utf-8': + return handle.writeUtf8String(req, data); + case 'ascii': + return handle.writeAsciiString(req, data); + case 'ucs2': + case 'ucs-2': + case 'utf16le': + case 'utf-16le': + return handle.writeUcs2String(req, data); + default: + { + const buffer = Buffer.from(data, encoding); + const ret = handle.writeBuffer(req, buffer); + // if (streamBaseState[kLastWriteWasAsync]) + // req.buffer = buffer; + return ret; + } + } +} + +function onWriteComplete(status) { + debug('onWriteComplete', status, this.error); + + const stream = this.handle[owner_symbol]; + + if (stream.destroyed) { + if (typeof this.callback === 'function') + this.callback(null); + return; + } + + if (status < 0) { + const ex = errnoException(status, 'write', this.error); + stream.destroy(ex, this.callback); + return; + } + + stream[kUpdateTimer](); + stream[kAfterAsyncWrite](this); + + if (typeof this.callback === 'function') + this.callback(null); +} + +function createWriteWrap(handle) { + // const req = new WriteWrap(); + const req = {}; + + req.handle = handle; + req.oncomplete = onWriteComplete; + req.async = false; + req.bytes = 0; + req.buffer = null; + + return req; +} + +function writevGeneric(self, data, cb) { + const req = createWriteWrap(self[kHandle]); + const allBuffers = data.allBuffers; + let chunks; + if (allBuffers) { + chunks = data; + for (let i = 0; i < data.length; i++) + data[i] = data[i].chunk; + } else { + chunks = new Array(data.length << 1); + for (let i = 0; i < data.length; i++) { + const entry = data[i]; + chunks[i * 2] = entry.chunk; + chunks[i * 2 + 1] = entry.encoding; + } + } + const err = req.handle.writev(req, chunks, allBuffers); + + // Retain chunks + if (err === 0) req._chunks = chunks; + + afterWriteDispatched(self, req, err, cb); + return req; +} + +function writeGeneric(self, data, encoding, cb) { + const req = createWriteWrap(self[kHandle]); + const err = handleWriteReq(req, data, encoding); + + afterWriteDispatched(self, req, err, cb); + return req; +} + +function afterWriteDispatched(self, req, err, cb) { + // req.bytes = streamBaseState[kBytesWritten]; + // req.async = !!streamBaseState[kLastWriteWasAsync]; + + if (err !== 0) + return self.destroy(errnoException(err, 'write', req.error), cb); + + if (!req.async) { + cb(); + } else { + req.callback = cb; + } +} + +function onStreamRead(arrayBuffer, offset, nread) { + // const nread = streamBaseState[kReadBytesOrError]; + + const handle = this; + const stream = this[owner_symbol]; + + stream[kUpdateTimer](); + + if (nread > 0 && !stream.destroyed) { + let ret; + let result; + const userBuf = stream[kBuffer]; + if (userBuf) { + result = (stream[kBufferCb](nread, userBuf) !== false); + const bufGen = stream[kBufferGen]; + if (bufGen !== null) { + const nextBuf = bufGen(); + if (isUint8Array(nextBuf)) + stream[kBuffer] = ret = nextBuf; + } + } else { + // const offset = streamBaseState[kArrayBufferOffset]; + const buf = Buffer.from(arrayBuffer, offset, nread); + result = stream.push(buf); + } + if (!result) { + handle.reading = false; + if (!stream.destroyed) { + const err = handle.readStop(); + if (err) + stream.destroy(errnoException(err, 'read')); + } + } + + return ret; + } + + if (nread === 0) { + return; + } + + // if (nread !== UV_EOF) { + // return stream.destroy(errnoException(nread, 'read')); + // } + + // Defer this until we actually emit end + if (stream._readableState.endEmitted) { + if (stream[kMaybeDestroy]) + stream[kMaybeDestroy](); + } else { + if (stream[kMaybeDestroy]) + stream.on('end', stream[kMaybeDestroy]); + + // TODO(ronag): Without this `readStop`, `onStreamRead` + // will be called once more (i.e. after Readable.ended) + // on Windows causing a ECONNRESET, failing the + // test-https-truncate test. + if (handle.readStop) { + const err = handle.readStop(); + if (err) + return stream.destroy(errnoException(err, 'read')); + } + + // Push a null to signal the end of data. + // Do it before `maybeDestroy` for correct order of events: + // `end` -> `close` + stream.push(null); + stream.read(0); + } +} + +module.exports = { + errnoException, + createWriteWrap, + writevGeneric, + writeGeneric, + onStreamRead, + kAfterAsyncWrite, + kMaybeDestroy, + kUpdateTimer, + kHandle, + kSession, + // setStreamTimeout, + kBuffer, + kBufferCb, + kBufferGen +}; diff --git a/examples/node/test.js b/examples/node/test.js index 2aeb3cb..91c3a82 100644 --- a/examples/node/test.js +++ b/examples/node/test.js @@ -2,7 +2,7 @@ const libzt = require('./libzt') -// libzt.example() +// libzt.example("8056c2e21c000001", "29.49.7.203", 4444) libzt.start(".zerotier", 9994) @@ -10,15 +10,16 @@ libzt.join("8056c2e21c000001") // Usage: `nc -lv 4444` let client = libzt.createConnection({ port: 4444, host: '29.49.7.203' }, () => { - // 'connect' listener. console.log('connected to server!'); - // client.write('world!\r\n'); }); -client.write("Name?\n", 'utf8'); +client.on('ready', () => { + client.write("Name?\n", 'utf8'); +}); client.on('data', (data) => { - console.log(data.toString()); - client.end(); + console.log(data.toString('utf8').trimEnd()); + if (data.toString('utf8').includes("exit")) { client.end(); } }); client.on('end', () => { console.log('disconnected from server'); + libzt.stop() });