This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
zhangyang-libzt/examples/nodejs/libzt.js
2021-02-23 21:32:02 -08:00

779 lines
19 KiB
JavaScript

'use strict';
const net = require('net');
const stream = require('stream');
const { types: { isUint8Array } } = require('util');
const nbind = require('@mcesystems/nbind')
const ZeroTier = nbind.init().lib.ZeroTier
const {
errnoException,
writevGeneric,
writeGeneric,
onStreamRead,
kAfterAsyncWrite,
kHandle,
kUpdateTimer,
// setStreamTimeout,
kBuffer,
kBufferCb,
kBufferGen
} = require('./stream_commons');
const kLastWriteQueueSize = Symbol('lastWriteQueueSize');
/*
* EXAMPLE of Low-level usage
* Usage: `nc -lv 4444`
*/
function example(nwid, address, port) {
// Start ZeroTier service
ZeroTier.start(".zerotier", 9994);
// Join virtual network
ZeroTier.join(nwid);
// Connect the socket
const _connect = (address, port, callback) => {
// Open the socket
const fd = ZeroTier.openStream();
if (fd < 0) { callback(new Error('Could not open socket, errno: ' + fd)); return; }
// Try connect
const status = ZeroTier.connect(fd, address, port);
console.log(status);
if (status === 0) {
callback(null, fd);
} else {
// Close previous socket
ZeroTier.close(fd);
setTimeout(_connect, 250, address, port, callback);
}
}
// Receive some data
const _read = (fd, callback) => {
const buf = Buffer.alloc(32)
let bytes = -1
do {
bytes = ZeroTier.recv(fd, buf, 0)
if (bytes > 0) { callback(null, buf); }
} while (bytes > 0);
if (!ZeroTier.getMyNode().online || buf.toString('utf8').includes("exit")) {
callback('end');
} else {
setTimeout(_read, 500, fd, callback)
}
}
_connect(address, port, (err, fd) => {
if (err) { console.error(err); return; }
console.debug("Connected.");
// Send some data
ZeroTier.send(fd, Buffer.from("Name?\n", 'utf8'), 0);
// Set blocking read mode
// ZeroTier.setBlocking(fd, true);
const heartbeat = setInterval(() => process.stderr.write('.'), 100);
_read(fd, (stop, buf) => {
if (stop) {
// Close the socket
ZeroTier.close(fd);
// Stop ZeroTier service
ZeroTier.stop();
// Clear the interval
clearInterval(heartbeat);
return;
}
process.stdout.write(buf.toString('utf8'));
});
});
}
// Target API:
//
// let s = net.connect({port: 80, host: 'google.com'}, function() {
// ...
// });
//
// There are various forms:
//
// connect(options, [cb])
// connect(port, [host], [cb])
// connect(path, [cb]);
//
function connect(...args) {
const normalized = net._normalizeArgs(args);
const options = normalized[0];
// debug('createConnection', normalized);
const socket = new Socket(Object.assign({ handle: new ZTCP() }, options));
if (options.timeout) {
socket.setTimeout(options.timeout);
}
return socket.connect(normalized);
}
/*
* https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L567
*/
function tryReadStart(socket) {
// Not already reading, start the flow
// debug('Socket._handle.readStart');
socket._handle.reading = true;
const err = socket._handle.readStart();
if (err)
socket.destroy(errnoException(err, 'read'));
}
/*
* https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L1107
*/
// function afterConnect(status, self, req, readable, writable) {
// // const self = handle[owner_symbol];
// // Callback may come after call to destroy
// if (self.destroyed) {
// return;
// }
// // debug('afterConnect');
// // assert(self.connecting);
// self.connecting = false;
// self._sockname = null;
// if (status === 0) {
// self.readable = readable;
// if (!self._writableState.ended)
// self.writable = writable;
// self._unrefTimer();
// self.emit('connect');
// self.emit('ready');
// // Start the first read, or get an immediate EOF.
// // this doesn't actually consume any bytes, because len=0.
// if (readable && !self.isPaused())
// self.read(0);
// } else {
// self.connecting = false;
// let details;
// if (req.localAddress && req.localPort) {
// details = req.localAddress + ':' + req.localPort;
// }
// const ex = new Error(status,
// 'connect',
// req.address,
// req.port,
// details);
// if (details) {
// ex.localAddress = req.localAddress;
// ex.localPort = req.localPort;
// }
// self.destroy(ex);
// }
// }
// function afterShutdown(self, _status) {
// // const self = this.handle[owner_symbol];
// // debug('afterShutdown destroyed=%j', self.destroyed,
// // self._readableState);
// this.callback();
// // Callback may come after call to destroy.
// if (self.destroyed)
// return;
// if (!self.readable || self.readableEnded) {
// // debug('readableState ended, destroying');
// self.destroy();
// }
// }
// function writeGeneric(self, chunk, encoding, callback) {
// const decodeStrings = self._writableState && self._writableState.decodeStrings
// const buf = (!decodeStrings && !Buffer.isBuffer(chunk)) ? Buffer.from(chunk, encoding) : chunk
// let bytes
// const err = ZeroTier.send(self._fd, buf, 0)
// switch (err) {
// case -1:
// callback(new Error("ZeroTier Socket error"))
// break
// case -2:
// callback(new Error("ZeroTier Service error"))
// break
// case -3:
// callback(new Error("ZeroTier Invalid argument"))
// break
// default:
// bytes = err
// callback()
// }
// return {
// async: true,
// bytes: bytes,
// }
// }
// function writevGeneric(self, chunks, callback) {
// const decodeStrings = self._writableState && self._writableState.decodeStrings
// const bufs = chunks.map(({ chunk, encoding }) => (!decodeStrings && !Buffer.isBuffer(chunk)) ? Buffer.from(chunk, encoding) : chunk)
// let bytes
// const err = ZeroTier.writev(self._fd, bufs)
// switch (err) {
// case -1:
// callback(new Error("ZeroTier Socket error"))
// break
// case -2:
// callback(new Error("ZeroTier Service error"))
// break
// case -3:
// callback(new Error("ZeroTier Invalid argument"))
// break
// default:
// bytes = err
// callback()
// }
// return {
// async: true,
// bytes: bytes,
// }
// }
class ZTCP {
bytesRead = 0
bytesWritten = 0
writeQueueSize = 0
_fd = null
_reading = false
readTimer = null
get reading() {
return this._reading;
}
set reading(val) {
return this._reading = val;
}
readStart() {
if (!this._buf) {
this._buf = Buffer.alloc(128);
}
let bytes = 0
do {
bytes = ZeroTier.read(this._fd, this._buf)
if (bytes >= 0) {
this.bytesRead += bytes;
bytes = 0;
}
switch (bytes) {
case -2:
throw new Error("ZeroTier Service error")
case -3:
throw new Error("ZeroTier Invalid argument")
default:
if (bytes > 0) {
this.bytesRead += bytes
this._buf = this.onread(this._buf)
}
}
} while (bytes > 0 && this._reading)
if (this._reading) { readTimer = setTimeout(() => this._read(size), 500) }
}
readStop() {
if (readTimer) {
clearTimeout(readTimer);
readTimer = null;
}
this._reading = false
}
writev(req, chunks, allBuffers) {
let bufs = [];
if (allBuffers) {
bufs = chunks;
} else {
const arr = chunks;
for (let i = 0; i < arr.length; i+=2) {
const chunk = arr[i];
const encoding = arr[i+1];
chunks.push(Buffer.from(chunk, encoding));
}
}
let bytes = ZeroTier.writev(this._fd, bufs);
if (bytes >= 0) {
this.bytesWritten += bytes;
bytes = 0;
}
const status = bytes;
// https://github.com/nodejs/node/blob/v12.18.3/lib/internal/stream_base_commons.js#L80
if (req.oncomplete) { req.oncomplete.call(req, status); }
return status;
}
writeBuffer(req, buf) {
let bytes = ZeroTier.write(this._fd, buf);
if (bytes >= 0) {
this.bytesWritten += bytes;
bytes = 0;
}
const status = bytes;
// https://github.com/nodejs/node/blob/v12.18.3/lib/internal/stream_base_commons.js#L80
if (req.oncomplete) { req.oncomplete.call(req, status); }
return status;
}
writeLatin1String(req, data) {
return this.writeBuffer(req, Buffer.from(data, 'latin1'));
}
writeUtf8String(req, data) {
return this.writeBuffer(req, Buffer.from(data, 'utf8'));
}
writeAsciiString(req, data) {
return this.writeBuffer(req, Buffer.from(data, 'ascii'));
}
writeUcs2String(req, data) {
return this.writeBuffer(req, Buffer.from(data, 'ucs2'));
}
getAsyncId() {
return -1;
}
useUserBuffer(buf) {
this._buf = buf;
}
setBlocking(newValue) {
return ZeroTier.setBlocking(this._fd, newValue);
}
setNoDelay(newValue) {
return ZeroTier.setNoDelay(this._fd, newValue);
}
setKeepalive(enable, initialDelay) {
ZeroTier.setKeepidle(initialDelay);
return ZeroTier.setKeepalive(this._fd, +enable);
}
bind(localAddress, localPort) {
return ZeroTier.bind(this._fd, localAddress, localPort);
}
bind6(localAddress, localPort, _flags) {
return ZeroTier.bind6(this._fd, localAddress, localPort);
}
open(fd) {
if (fd) {
this._fd = fd;
return 0;
} else {
const err = ZeroTier.openStream();
if (err < 0) {
return err;
} else {
this._fd = err;
return 0;
}
}
}
close(callback) {
const err = ZeroTier.close(this._fd);
this._fd = null;
if (callback) { callback(err); }
}
shutdown(req) {
const status = ZeroTier.shutdown(this._fd);
// https://github.com/nodejs/node/blob/v12.18.3/test/parallel/test-tcp-wrap-connect.js
if (req.oncomplete) { req.oncomplete.call(req, status, this); }
return status;
}
connect(req, address, port) {
let status = ZeroTier.connect(this._fd, address, port);
// Retries are often required since ZT uses transport-triggered links
if (status !== 0) {
let count = 0;
while (count < 10) {
// Close previous socket
this.close();
status = this.open();
if (status !== 0) {
// Break if reopen-socket fails
break;
}
// Reconnect
status = ZeroTier.connect(this._fd, address, port);
if (status === 0) { break; }
count++;
}
}
// https://github.com/nodejs/node/blob/v12.18.3/test/parallel/test-tcp-wrap-connect.js
if (req && req.oncomplete) { req.oncomplete.call(status, this, req, true, true); }
return status;
}
connect6(req, address, port) {
let status = ZeroTier.connect6(this._fd, address, port);
// Retries are often required since ZT uses transport-triggered links
if (status !== 0) {
let count = 0;
while (count < 10) {
// Close previous socket
this.close();
status = this.open();
if (status !== 0) {
// Break if reopen-socket fails
break;
}
// Reconnect
status = ZeroTier.connect6(this._fd, address, port);
if (status === 0) { break; }
count++;
}
}
// https://github.com/nodejs/node/blob/v12.18.3/test/parallel/test-tcp-wrap-connect.js
if (req.oncomplete) { req.oncomplete.call(status, this, req, true, true); }
return status;
}
getpeername(out) {
const in4 = ZeroTier.getpeername(this._fd);
out.address = ZeroTier.inet_ntop(in4);
out.family = in4.sin_family;
out.port = in4.sin_port;
return 0
}
getsockname(out) {
const in4 = ZeroTier.getsockname(this._fd);
out.address = ZeroTier.inet_ntop(in4);
out.family = in4.sin_family;
out.port = in4.sin_port;
return 0;
}
listen(port) {
// TODO
// this.onconnection
}
fchmod(mode) {
// TODO
return 0;
}
}
class Socket extends net.Socket {
[kLastWriteQueueSize] = 0;
[kBuffer] = null;
[kBufferCb] = null;
[kBufferGen] = null;
[kHandle] = null;
get _handle() { return this[kHandle]; }
set _handle(v) { return this[kHandle] = v; }
/*
* https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L929
*/
// connect(...args) {
// let normalized;
// // If passed an array, it's treated as an array of arguments that have
// // already been normalized (so we don't normalize more than once). This has
// // been solved before in https://github.com/nodejs/node/pull/12342, but was
// // reverted as it had unintended side effects.
// if (Array.isArray(args[0])) {
// normalized = args[0];
// } else {
// normalized = net._normalizeArgs(args);
// }
// const options = normalized[0];
// const cb = normalized[1];
// if (this.write !== net.Socket.prototype.write)
// this.write = net.Socket.prototype.write;
// if (this.destroyed) {
// this._handle = null;
// this._peername = null;
// this._sockname = null;
// }
// if (!this._handle) {
// this._handle = new ZTCP();
// initSocketHandle(this);
// }
// if (cb !== null) {
// this.once('connect', cb);
// }
// this._unrefTimer();
// this.connecting = true;
// this.writable = true;
// const { host, port } = options;
// // If host is an IP, skip performing a lookup
// const addressType = net.isIP(host);
// if (addressType) {
// this._fd = ZeroTier.connectStream(host, port);
// afterConnect(0, this, {}, true, true);
// } else {
// throw new Error("DNS LOOKUP NOT IMPLEMENTED");
// }
// return this;
// }
/*
* https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L596
*/
pause() {
if (this[kBuffer] && !this.connecting && this._handle &&
this._handle.reading) {
this._handle.reading = false;
if (!this.destroyed) {
const err = this._handle.readStop();
if (err)
this.destroy(errnoException(err, 'read'));
}
}
return stream.Duplex.prototype.pause.call(this);
}
/*
* https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L610
*/
resume() {
if (this[kBuffer] && !this.connecting && this._handle &&
!this._handle.reading) {
tryReadStart(this);
}
return stream.Duplex.prototype.resume.call(this);
}
/*
* https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L619
*/
read(n) {
if (this[kBuffer] && !this.connecting && this._handle &&
!this._handle.reading) {
tryReadStart(this);
}
return stream.Duplex.prototype.read.call(this, n);
}
/*
* https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_readable_read_size_1
*/
_read(n) {
// debug('_read');
if (this.connecting || !this._handle) {
// debug('_read wait for connection');
this.once('connect', () => this._read(n));
} else if (!this._handle.reading) {
tryReadStart(this);
}
// if (!this.readChunk || this.readChunk.length < n) {
// this.readChunk = Buffer.alloc(n)
// }
// let bytes = -1
// let moreData = true
// do {
// bytes = ZeroTier.recv(this._fd, this.readChunk, 0)
// switch (bytes) {
// case -2:
// throw new Error("ZeroTier Service error")
// case -3:
// throw new Error("ZeroTier Invalid argument")
// default:
// if (bytes > 0) {
// // this.bytesRead += bytes
// moreData = this.push(this.readChunk)
// }
// }
// } while (bytes > 0 && moreData)
// if (moreData) { setTimeout(() => this._read(n), 500) }
}
/*
* https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_writable_writev_chunks_callback
*/
_writev(chunks, cb) {
this._writeGeneric(true, chunks, '', cb);
}
/*
* https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_writable_write_chunk_encoding_callback_1
*/
_write(data, encoding, cb) {
this._writeGeneric(false, data, encoding, cb);
}
/*
* https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_writable_final_callback
*/
// _final(cb) {
// // If still connecting - defer handling `_final` until 'connect' will happen
// if (this.pending) {
// // debug('_final: not yet connected');
// return this.once('connect', () => this._final(cb));
// }
// if (!this._handle)
// return cb();
// // debug('_final: not ended, call shutdown()');
// // const req = new ShutdownWrap();
// const req = {};
// req.oncomplete = afterShutdown;
// req.handle = this._handle;
// req.callback = cb;
// // const err = this._handle.shutdown(req);
// const err = ZeroTier.shutdown(this._fd);
// return afterShutdown.call(req, this, 0);
// }
/*
* https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L760
*/
_writeGeneric(writev, data, encoding, cb) {
// If we are still connecting, then buffer this for later.
// The Writable logic will buffer up any more writes while
// waiting for this one to be done.
if (this.connecting) {
this._pendingData = data;
this._pendingEncoding = encoding;
this.once('connect', function connect() {
this._writeGeneric(writev, data, encoding, cb);
});
return;
}
this._pendingData = null;
this._pendingEncoding = '';
if (!this._handle) {
cb(new Error('ERR_SOCKET_CLOSED'));
return false;
}
this._unrefTimer();
let req;
if (writev)
req = writevGeneric(this, data, cb);
else
req = writeGeneric(this, data, encoding, cb);
if (req.async)
this[kLastWriteQueueSize] = req.bytes;
}
/*
* https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L552
*/
get bufferSize() {
if (this._handle) {
return this[kLastWriteQueueSize] + this.writableLength;
}
}
/*
* https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L756
*/
[kAfterAsyncWrite]() {
this[kLastWriteQueueSize] = 0;
}
/*
* https://github.com/nodejs/node/blob/v12.18.3/lib/net.js#L468
*/
_onTimeout() {
const handle = this._handle;
const lastWriteQueueSize = this[kLastWriteQueueSize];
if (lastWriteQueueSize > 0 && handle) {
// `lastWriteQueueSize !== writeQueueSize` means there is
// an active write in progress, so we suppress the timeout.
const { writeQueueSize } = handle;
if (lastWriteQueueSize !== writeQueueSize) {
this[kLastWriteQueueSize] = writeQueueSize;
this._unrefTimer();
return;
}
}
// debug('_onTimeout');
this.emit('timeout');
}
get [kUpdateTimer]() {
return this._unrefTimer;
}
}
module.exports = {
start: ZeroTier.start,
join: ZeroTier.join,
restart: ZeroTier.restart,
stop: ZeroTier.stop,
free: ZeroTier.free,
example,
connect,
createConnection: connect,
Socket,
Stream: Socket, // Legacy naming
TCP: ZTCP,
};