Port over TCP relay functionality from ZeroTier One

This commit is contained in:
Joseph Henry
2023-07-24 18:56:40 -07:00
parent ca83e941a4
commit 5874e442eb
4 changed files with 404 additions and 33 deletions

View File

@@ -17,6 +17,8 @@
* ZeroTier Node Service
*/
#include <stdlib.h>
#include "NodeService.hpp"
#include "../version.h"
@@ -28,14 +30,16 @@
#include "VirtualTap.hpp"
#if defined(__WINDOWS__)
#include <shlobj.h>
#include <winsock2.h>
#include <windows.h>
#include <iphlpapi.h>
#include <netioapi.h>
#include <shlobj.h>
#include <windows.h>
#include <winsock2.h>
#define stat _stat
#endif
#define ZT_TCP_FALLBACK_RELAY "204.80.128.1/443"
namespace ZeroTier {
static int SnodeVirtualNetworkConfigFunction(
@@ -173,7 +177,12 @@ NodeService::NodeService()
, _randomPortRangeEnd(0)
, _udpPortPickerCounter(0)
, _lastDirectReceiveFromGlobal(0)
, _fallbackRelayAddress(ZT_TCP_FALLBACK_RELAY)
, _allowTcpRelay(true)
, _forceTcpRelay(false)
, _lastSendToGlobalV4(0)
, _lastRestart(0)
, _tcpFallbackTunnel((TcpConnection*)0)
, _nextBackgroundTaskDeadline(0)
, _run(false)
, _termReason(ONE_STILL_RUNNING)
@@ -286,7 +295,8 @@ NodeService::ReasonForTermination NodeService::run()
if (_allowSecondaryPort) {
if (_secondaryPort) {
_ports[1] = _secondaryPort;
} else {
}
else {
_ports[1] = _getRandomPort(minPort, maxPort);
}
}
@@ -301,9 +311,10 @@ NodeService::ReasonForTermination NodeService::run()
if (_ports[1]) {
if (_tertiaryPort) {
_ports[2] = _tertiaryPort;
} else {
}
else {
_ports[2] = minPort + (_ports[0] % 40000);
for(int i=0;;++i) {
for (int i = 0;; ++i) {
if (i > 1000) {
_ports[2] = 0;
break;
@@ -398,7 +409,10 @@ NodeService::ReasonForTermination NodeService::run()
p[pc++] = _ports[i];
}
}
_binder.refresh(_phy, p, pc, explicitBind, *this);
if (! _forceTcpRelay) {
// Only bother binding UDP ports if we aren't forcing TCP-relay mode
_binder.refresh(_phy, p, pc, explicitBind, *this);
}
}
// Generate callback messages for user application
@@ -411,6 +425,12 @@ NodeService::ReasonForTermination NodeService::run()
dl = _nextBackgroundTaskDeadline;
}
// Close TCP fallback tunnel if we have direct UDP
if (! _forceTcpRelay && (_tcpFallbackTunnel)
&& ((now - _lastDirectReceiveFromGlobal) < (ZT_TCP_FALLBACK_AFTER / 2))) {
_phy.close(_tcpFallbackTunnel->sock);
}
// Sync multicast group memberships
if ((now - lastTapMulticastGroupCheck) >= ZT_TAP_CHECK_MULTICAST_INTERVAL) {
lastTapMulticastGroupCheck = now;
@@ -616,6 +636,9 @@ void NodeService::phyOnDatagram(
void* data,
unsigned long len)
{
if (_forceTcpRelay) {
return;
}
ZTS_UNUSED_ARG(uptr);
ZTS_UNUSED_ARG(localAddr);
if ((len >= 16) && (reinterpret_cast<const InetAddress*>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
@@ -639,6 +662,185 @@ void NodeService::phyOnDatagram(
}
}
void NodeService::phyOnTcpConnect(PhySocket* sock, void** uptr, bool success)
{
if (! success) {
phyOnTcpClose(sock, uptr);
return;
}
TcpConnection* const tc = reinterpret_cast<TcpConnection*>(*uptr);
if (! tc) { // sanity check
_phy.close(sock, true);
return;
}
tc->sock = sock;
if (tc->type == TcpConnection::TCP_TUNNEL_OUTGOING) {
if (_tcpFallbackTunnel)
_phy.close(_tcpFallbackTunnel->sock);
_tcpFallbackTunnel = tc;
_phy.streamSend(sock, ZT_TCP_TUNNEL_HELLO, sizeof(ZT_TCP_TUNNEL_HELLO));
}
else {
_phy.close(sock, true);
}
}
void NodeService::phyOnTcpClose(PhySocket* sock, void** uptr)
{
TcpConnection* tc = (TcpConnection*)*uptr;
if (tc) {
if (tc == _tcpFallbackTunnel) {
_tcpFallbackTunnel = (TcpConnection*)0;
}
{
Mutex::Lock _l(_tcpConnections_m);
_tcpConnections.erase(
std::remove(_tcpConnections.begin(), _tcpConnections.end(), tc),
_tcpConnections.end());
}
delete tc;
}
}
void NodeService::phyOnTcpData(PhySocket* sock, void** uptr, void* data, unsigned long len)
{
try {
if (! len) {
return; // sanity check, should never happen
}
TcpConnection* tc = reinterpret_cast<TcpConnection*>(*uptr);
tc->lastReceive = OSUtils::now();
switch (tc->type) {
case TcpConnection::TCP_TUNNEL_OUTGOING:
tc->readq.append((const char*)data, len);
while (tc->readq.length() >= 5) {
const char* data = tc->readq.data();
const unsigned long mlen =
(((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff));
if (tc->readq.length() >= (mlen + 5)) {
InetAddress from;
unsigned long plen = mlen; // payload length, modified if there's an IP header
data += 5; // skip forward past pseudo-TLS junk and mlen
if (plen == 4) {
// Hello message, which isn't sent by proxy and would be ignored by client
}
else if (plen) {
// Messages should contain IPv4 or IPv6 source IP address data
switch (data[0]) {
case 4: // IPv4
if (plen >= 7) {
from.set(
(const void*)(data + 1),
4,
((((unsigned int)data[5]) & 0xff) << 8) | (((unsigned int)data[6]) & 0xff));
data += 7; // type + 4 byte IP + 2 byte port
plen -= 7;
}
else {
_phy.close(sock);
return;
}
break;
case 6: // IPv6
if (plen >= 19) {
from.set(
(const void*)(data + 1),
16,
((((unsigned int)data[17]) & 0xff) << 8)
| (((unsigned int)data[18]) & 0xff));
data += 19; // type + 16 byte IP + 2 byte port
plen -= 19;
}
else {
_phy.close(sock);
return;
}
break;
case 0: // none/omitted
++data;
--plen;
break;
default: // invalid address type
_phy.close(sock);
return;
}
if (from) {
InetAddress fakeTcpLocalInterfaceAddress((uint32_t)0xffffffff, 0xffff);
const ZT_ResultCode rc = _node->processWirePacket(
(void*)0,
OSUtils::now(),
-1,
reinterpret_cast<struct sockaddr_storage*>(&from),
data,
plen,
&_nextBackgroundTaskDeadline);
if (ZT_ResultCode_isFatal(rc)) {
char tmp[256];
OSUtils::ztsnprintf(
tmp,
sizeof(tmp),
"fatal error code from processWirePacket: %d",
(int)rc);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
_phy.close(sock);
return;
}
}
}
if (tc->readq.length() > (mlen + 5)) {
tc->readq.erase(tc->readq.begin(), tc->readq.begin() + (mlen + 5));
}
else {
tc->readq.clear();
}
}
else {
break;
}
}
return;
}
}
catch (...) {
_phy.close(sock);
}
}
void NodeService::phyOnTcpWritable(PhySocket* sock, void** uptr)
{
TcpConnection* tc = reinterpret_cast<TcpConnection*>(*uptr);
bool closeit = false;
{
Mutex::Lock _l(tc->writeq_m);
if (tc->writeq.length() > 0) {
long sent = (long)_phy.streamSend(sock, tc->writeq.data(), (unsigned long)tc->writeq.length(), true);
if (sent > 0) {
if ((unsigned long)sent >= (unsigned long)tc->writeq.length()) {
tc->writeq.clear();
_phy.setNotifyWritable(sock, false);
}
else {
tc->writeq.erase(tc->writeq.begin(), tc->writeq.begin() + sent);
}
}
}
else {
_phy.setNotifyWritable(sock, false);
}
}
if (closeit) {
_phy.close(sock);
}
}
int NodeService::nodeVirtualNetworkConfigFunction(
uint64_t net_id,
void** nuptr,
@@ -1225,7 +1427,7 @@ uint64_t NodeService::getNodeId()
int NodeService::setIdentity(const char* keypair, unsigned int len)
{
if (keypair == NULL || len < ZT_IDENTITY_STRING_BUFFER_LENGTH) {
return ZTS_ERR_ARG;
// return ZTS_ERR_ARG;
}
// Double check user-provided keypair
Identity id;
@@ -1454,6 +1656,79 @@ int NodeService::nodeWirePacketSendFunction(
unsigned int len,
unsigned int ttl)
{
if (_allowTcpRelay) {
if (addr->ss_family == AF_INET) {
// TCP fallback tunnel support, currently IPv4 only
if ((len >= 16)
&& (reinterpret_cast<const InetAddress*>(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
// Engage TCP tunnel fallback if we haven't received anything valid from a global
// IP address in ZT_TCP_FALLBACK_AFTER milliseconds. If we do start getting
// valid direct traffic we'll stop using it and close the socket after a while.
const int64_t now = OSUtils::now();
if (_forceTcpRelay
|| (((now - _lastDirectReceiveFromGlobal) > ZT_TCP_FALLBACK_AFTER)
&& ((now - _lastRestart) > ZT_TCP_FALLBACK_AFTER))) {
if (_tcpFallbackTunnel) {
bool flushNow = false;
{
Mutex::Lock _l(_tcpFallbackTunnel->writeq_m);
if (_tcpFallbackTunnel->writeq.size() < (1024 * 64)) {
if (_tcpFallbackTunnel->writeq.length() == 0) {
_phy.setNotifyWritable(_tcpFallbackTunnel->sock, true);
flushNow = true;
}
const unsigned long mlen = len + 7;
_tcpFallbackTunnel->writeq.push_back((char)0x17);
_tcpFallbackTunnel->writeq.push_back((char)0x03);
_tcpFallbackTunnel->writeq.push_back((char)0x03); // fake TLS 1.2 header
_tcpFallbackTunnel->writeq.push_back((char)((mlen >> 8) & 0xff));
_tcpFallbackTunnel->writeq.push_back((char)(mlen & 0xff));
_tcpFallbackTunnel->writeq.push_back((char)4); // IPv4
_tcpFallbackTunnel->writeq.append(
reinterpret_cast<const char*>(reinterpret_cast<const void*>(
&(reinterpret_cast<const struct sockaddr_in*>(addr)->sin_addr.s_addr))),
4);
_tcpFallbackTunnel->writeq.append(
reinterpret_cast<const char*>(reinterpret_cast<const void*>(
&(reinterpret_cast<const struct sockaddr_in*>(addr)->sin_port))),
2);
_tcpFallbackTunnel->writeq.append((const char*)data, len);
}
}
if (flushNow) {
void* tmpptr = (void*)_tcpFallbackTunnel;
phyOnTcpWritable(_tcpFallbackTunnel->sock, &tmpptr);
}
}
else if (
_forceTcpRelay
|| (((now - _lastSendToGlobalV4) < ZT_TCP_FALLBACK_AFTER)
&& ((now - _lastSendToGlobalV4) > (ZT_PING_CHECK_INVERVAL / 2)))) {
const InetAddress addr(_fallbackRelayAddress);
TcpConnection* tc = new TcpConnection();
{
Mutex::Lock _l(_tcpConnections_m);
_tcpConnections.push_back(tc);
}
tc->type = TcpConnection::TCP_TUNNEL_OUTGOING;
tc->remoteAddr = addr;
tc->lastReceive = OSUtils::now();
tc->parent = this;
tc->sock = (PhySocket*)0; // set in connect handler
bool connected = false;
_phy.tcpConnect(reinterpret_cast<const struct sockaddr*>(&addr), connected, (void*)tc, true);
}
}
_lastSendToGlobalV4 = now;
}
}
}
if (_forceTcpRelay) {
// Shortcut here so that we don't emit any UDP packets
return 0;
}
// Even when relaying we still send via UDP. This way if UDP starts
// working we can instantly "fail forward" to it and stop using TCP
// proxy fallback, which is slow.
@@ -1682,9 +1957,9 @@ int NodeService::shouldBindInterface(const char* ifname, const InetAddress& ifad
unsigned int NodeService::_getRandomPort(unsigned int minPort, unsigned int maxPort)
{
unsigned int randp = 0;
Utils::getSecureRandom(&randp,sizeof(randp));
Utils::getSecureRandom(&randp, sizeof(randp));
randp = (randp % (maxPort - minPort + 1)) + minPort;
for(int i=0;;++i) {
for (int i = 0;; ++i) {
if (i > 1000) {
return 0;
}
@@ -1812,6 +2087,21 @@ void NodeService::enableEvents()
_events->enable();
}
void NodeService::setTcpRelayAddress(const char* tcpRelayAddr, unsigned short tcpRelayPort)
{
_fallbackRelayAddress = InetAddress(std::string(std::string(tcpRelayAddr) + std::string("/") + std::to_string(tcpRelayPort)).c_str());
}
void NodeService::allowTcpRelay(bool enabled)
{
_allowTcpRelay = true;
}
void NodeService::forceTcpRelay(bool enabled)
{
_forceTcpRelay = true;
}
int NodeService::setRoots(const void* rootsData, unsigned int len)
{
if (! rootsData || len <= 0 || len > ZTS_STORE_DATA_LEN) {