Added multithreaded unit tests to selftest, standardization pass: conditional statement spacing

This commit is contained in:
Joseph Henry
2017-09-18 11:58:41 -07:00
parent 5ec6f1846b
commit 055a58ac63
5 changed files with 274 additions and 145 deletions

View File

@@ -395,6 +395,7 @@ Darwin:
// int socket_family, int socket_type, int protocol // int socket_family, int socket_type, int protocol
int zts_socket(ZT_SOCKET_SIG) { int zts_socket(ZT_SOCKET_SIG) {
DEBUG_EXTRA();
int err = errno = 0; int err = errno = 0;
if (socket_family < 0 || socket_type < 0 || protocol < 0) { if (socket_family < 0 || socket_type < 0 || protocol < 0) {
errno = EINVAL; errno = EINVAL;
@@ -1097,7 +1098,7 @@ Linux / Darwin:
int zts_close(ZT_CLOSE_SIG) int zts_close(ZT_CLOSE_SIG)
{ {
//DEBUG_EXTRA("fd=%d", fd); DEBUG_EXTRA("fd=%d", fd);
int err = errno = 0; int err = errno = 0;
if (fd < 0 || fd >= ZT_MAX_SOCKETS) { if (fd < 0 || fd >= ZT_MAX_SOCKETS) {
errno = EBADF; errno = EBADF;
@@ -1667,7 +1668,7 @@ int zts_read(ZT_READ_SIG) {
} }
int zts_write(ZT_WRITE_SIG) { int zts_write(ZT_WRITE_SIG) {
//DEBUG_TRANS("fd=%d", fd); DEBUG_TRANS("fd=%d", fd);
return write(fd, buf, len); return write(fd, buf, len);
} }
@@ -2103,14 +2104,14 @@ ZeroTier::VirtualTap *getTapByAddr(ZeroTier::InetAddress *addr)
// check address schemes // check address schemes
for (int j=0; j<s->_ips.size(); j++) { for (int j=0; j<s->_ips.size(); j++) {
if ((s->_ips[j].isV4() && addr->isV4()) || (s->_ips[j].isV6() && addr->isV6())) { if ((s->_ips[j].isV4() && addr->isV4()) || (s->_ips[j].isV6() && addr->isV6())) {
//DEBUG_INFO("looking at tap %s, <addr=%s> --- for <%s>", s->_dev.c_str(), s->_ips[j].toString(ipbuf), addr->toIpString(ipbuf2)); //DEBUG_EXTRA("looking at tap %s, <addr=%s> --- for <%s>", s->_dev.c_str(), s->_ips[j].toString(ipbuf), addr->toIpString(ipbuf2));
if (s->_ips[j].isEqualPrefix(addr) if (s->_ips[j].isEqualPrefix(addr)
|| s->_ips[j].ipsEqual(addr) || s->_ips[j].ipsEqual(addr)
|| s->_ips[j].containsAddress(addr) || s->_ips[j].containsAddress(addr)
|| (addr->isV6() && ipv6_in_subnet(&s->_ips[j], addr)) || (addr->isV6() && ipv6_in_subnet(&s->_ips[j], addr))
) )
{ {
//DEBUG_INFO("selected tap %s, <addr=%s>", s->_dev.c_str(), s->_ips[j].toString(ipbuf)); //DEBUG_EXTRA("selected tap %s, <addr=%s>", s->_dev.c_str(), s->_ips[j].toString(ipbuf));
ZeroTier::_vtaps_lock.unlock(); ZeroTier::_vtaps_lock.unlock();
return s; return s;
} }
@@ -2125,7 +2126,7 @@ ZeroTier::VirtualTap *getTapByAddr(ZeroTier::InetAddress *addr)
nm = target.netmask(); nm = target.netmask();
via = managed_routes->at(i).via; via = managed_routes->at(i).via;
if (target.containsAddress(addr)) { if (target.containsAddress(addr)) {
// DEBUG_INFO("chose tap with route <target=%s, nm=%s, via=%s>", target.toString(ipbuf), nm.toString(ipbuf2), via.toString(ipbuf3)); //DEBUG_EXTRA("chose tap with route <target=%s, nm=%s, via=%s>", target.toString(ipbuf), nm.toString(ipbuf2), via.toString(ipbuf3));
ZeroTier::_vtaps_lock.unlock(); ZeroTier::_vtaps_lock.unlock();
return s; return s;
} }

View File

@@ -24,8 +24,6 @@
* of your own application. * of your own application.
*/ */
// Comprehensive stress test for socket-like API
#include <unistd.h> #include <unistd.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <arpa/inet.h> #include <arpa/inet.h>
@@ -51,14 +49,15 @@
#include <signal.h> #include <signal.h>
#include "libzt.h" #include "libzt.h"
#include "Utils.hpp"
#define EXIT_ON_FAIL false #define EXIT_ON_FAIL false
#define PASSED 1 #define PASSED 1
#define FAILED 0 #define FAILED 0
#define ECHO_INTERVAL 1000000 // us #define ECHO_INTERVAL 1000000 // microseconds
#define SLAM_INTERVAL 500000 #define SLAM_INTERVAL 500000 // microseconds
#define WAIT_FOR_TEST_TO_CONCLUDE 0 #define WAIT_FOR_TEST_TO_CONCLUDE 0
#define WAIT_FOR_TRANSMISSION_TO_COMPLETE 5 #define WAIT_FOR_TRANSMISSION_TO_COMPLETE 5
@@ -80,12 +79,15 @@
#define MIN_PORT 5000 #define MIN_PORT 5000
#define MAX_PORT 50000 #define MAX_PORT 50000
#define TCP_UNIT_TEST_SIG_4 struct sockaddr_in *addr, int op, int cnt, char *details, bool *passed #define TCP_UNIT_TEST_SIG_4 struct sockaddr_in *addr, int op, int cnt, char *details, \
#define UDP_UNIT_TEST_SIG_4 struct sockaddr_in *local_addr, struct sockaddr_in *remote_addr, int op, int cnt, char *details, bool *passed bool *passed
#define UDP_UNIT_TEST_SIG_4 struct sockaddr_in *local_addr, struct sockaddr_in *remote_addr, \
#define TCP_UNIT_TEST_SIG_6 struct sockaddr_in6 *addr, int op, int cnt, char *details, bool *passed int op, int cnt, char *details, bool *passed
#define UDP_UNIT_TEST_SIG_6 struct sockaddr_in6 *local_addr, struct sockaddr_in6 *remote_addr, int op, int cnt, char *details, bool *passed
#define TCP_UNIT_TEST_SIG_6 struct sockaddr_in6 *addr, int op, int cnt, char *details, \
bool *passed
#define UDP_UNIT_TEST_SIG_6 struct sockaddr_in6 *local_addr, struct sockaddr_in6 *remote_addr, \
int op, int cnt, char *details, bool *passed
#define ECHOTEST_MODE_RX 333 #define ECHOTEST_MODE_RX 333
#define ECHOTEST_MODE_TX 666 #define ECHOTEST_MODE_TX 666
@@ -192,7 +194,8 @@ std::map<std::string, std::string> testConf;
/* Helper Functions */ /* Helper Functions */
/****************************************************************************/ /****************************************************************************/
void displayResults(int *results, int size) { void displayResults(int *results, int size)
{
int success = 0, failure = 0; int success = 0, failure = 0;
for (int i=0; i<size; i++) { for (int i=0; i<size; i++) {
if (results[i] == 0) { if (results[i] == 0) {
@@ -207,7 +210,8 @@ void displayResults(int *results, int size) {
std::cout << " - failure = " << (float)failure / (float)size << std::endl; std::cout << " - failure = " << (float)failure / (float)size << std::endl;
} }
void loadTestConfigFile(std::string filepath) { void loadTestConfigFile(std::string filepath)
{
std::string key, value, prefix; std::string key, value, prefix;
std::ifstream testFile; std::ifstream testFile;
testFile.open(filepath.c_str()); testFile.open(filepath.c_str());
@@ -224,19 +228,22 @@ void loadTestConfigFile(std::string filepath) {
testFile.close(); testFile.close();
} }
long int get_now_ts() { long int get_now_ts()
{
struct timeval tp; struct timeval tp;
gettimeofday(&tp, NULL); gettimeofday(&tp, NULL);
return tp.tv_sec * 1000 + tp.tv_usec / 1000; return tp.tv_sec * 1000 + tp.tv_usec / 1000;
} }
// for syncronizing tests // for syncronizing tests
void wait_until_tplus(long int original_time, int tplus_ms) { void wait_until_tplus(long int original_time, int tplus_ms)
{
while (original_time + tplus_ms > get_now_ts()) { while (original_time + tplus_ms > get_now_ts()) {
sleep(1); sleep(1);
} }
} }
void wait_until_tplus_s(long int original_time, int tplus_s) { void wait_until_tplus_s(long int original_time, int tplus_s)
{
int current_time_offset = (get_now_ts() - original_time) / 1000; int current_time_offset = (get_now_ts() - original_time) / 1000;
fprintf(stderr, "\n\n--- WAITING FOR T+%d --- (current: T+%d)\n\n", tplus_s, current_time_offset); fprintf(stderr, "\n\n--- WAITING FOR T+%d --- (current: T+%d)\n\n", tplus_s, current_time_offset);
if (current_time_offset > tplus_s) { if (current_time_offset > tplus_s) {
@@ -249,15 +256,24 @@ void wait_until_tplus_s(long int original_time, int tplus_s) {
wait_until_tplus(original_time, tplus_s * 1000); wait_until_tplus(original_time, tplus_s * 1000);
} }
void generate_random_data(void *buf, size_t n, int min, int max) { int rand_in_range(int min, int max)
{
unsigned int seed;
ZeroTier::Utils::getSecureRandom((void*)&seed,sizeof(seed));
srand(seed);
return min + rand() % static_cast<int>(max - min + 1);
}
void generate_random_data(void *buf, size_t n, int min, int max)
{
char *b = (char*)buf; char *b = (char*)buf;
srand((unsigned)time(0));
for (int i=0; i<n; i++) { for (int i=0; i<n; i++) {
b[i] = min + (rand() % static_cast<int>(max - min + 1)); b[i] = rand_in_range(min, max);
} }
} }
void str2addr(std::string ipstr, int port, int ipv, struct sockaddr *saddr) { void str2addr(std::string ipstr, int port, int ipv, struct sockaddr *saddr)
{
if (ipv == 4) { if (ipv == 4) {
struct sockaddr_in *in4 = (struct sockaddr_in*)saddr; struct sockaddr_in *in4 = (struct sockaddr_in*)saddr;
in4->sin_port = htons(port); in4->sin_port = htons(port);
@@ -1544,7 +1560,8 @@ void tcp_perf_rx_echo_4(TCP_UNIT_TEST_SIG_4)
int obscure_api_test(bool *passed) int obscure_api_test(bool *passed)
{ {
DEBUG_TEST("obscure_api_test"); fprintf(stderr, "\n\nobscure API test\n\n");
/* /*
// --- // ---
// getpeername() // getpeername()
@@ -1595,14 +1612,14 @@ int obscure_api_test(bool *passed)
socklen_t flag_len = sizeof(optval); socklen_t flag_len = sizeof(optval);
int fd = SOCKET(AF_INET, SOCK_STREAM, 0); int fd = SOCKET(AF_INET, SOCK_STREAM, 0);
DEBUG_TEST("setting level=%d, optname=%d, optval=%d...", level, optname, optval); DEBUG_TEST("setting level=%d, optname=%d, optval=%d...", level, optname, optval);
int err = SETSOCKOPT(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(int)); int err = SETSOCKOPT(fd, level, optname, (char *)&optval, sizeof(int));
if (err < 0) { if (err < 0) {
DEBUG_ERROR("error while setting optval on socket"); DEBUG_ERROR("error while setting optval on socket");
*passed = false; *passed = false;
err = -1; err = -1;
} }
optval = -99; // set junk value to test against optval = -99; // set junk value to test against
if ((err = GETSOCKOPT(fd, IPPROTO_TCP, TCP_NODELAY, &optval, &flag_len)) < 0) { if ((err = GETSOCKOPT(fd, level, optname, &optval, &flag_len)) < 0) {
DEBUG_ERROR("error while getting the optval"); DEBUG_ERROR("error while getting the optval");
*passed = false; *passed = false;
err = -1; err = -1;
@@ -1617,7 +1634,7 @@ int obscure_api_test(bool *passed)
if (optval > 0) { // TODO: what should be expected for each platform? Should this mirror them? if (optval > 0) { // TODO: what should be expected for each platform? Should this mirror them?
optval = 0; optval = 0;
DEBUG_TEST("setting level=%d, optname=%d, optval=%d...", level, optname, optval); DEBUG_TEST("setting level=%d, optname=%d, optval=%d...", level, optname, optval);
if ((err = SETSOCKOPT(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &optval, (socklen_t)sizeof(int))) < 0) { if ((err = SETSOCKOPT(fd, level, optname, (char *) &optval, (socklen_t)sizeof(int))) < 0) {
DEBUG_ERROR("error while setting on socket"); DEBUG_ERROR("error while setting on socket");
*passed = false; *passed = false;
err = -1; err = -1;
@@ -2059,48 +2076,147 @@ void close_while_writing_test()
// TODO: Close a socket while another thread is writing to it or reading from it // TODO: Close a socket while another thread is writing to it or reading from it
} }
void* create_socket(void *arg) /****************************************************************************/
/* test thread model, and locking */
/****************************************************************************/
#define CONCURRENCY_LEVEL 8 // how many threads we want to test with
#define TIME_GRANULARITY 10000 // multiple in microseconds
#define TIME_MULTIPLIER_MIN 1 //
#define TIME_MULTIPLIER_MAX 10 //
#define WORKER_ITERATIONS 100 // number of times a worker shall do its task
#define MASTER_ITERATIONS 10 // number of times we will create a set of workers
// for passing info to worker threads
struct fd_addr_pair {
int fd;
struct sockaddr_in *remote_addr;
};
pthread_t tid[CONCURRENCY_LEVEL];
// over num_iterations, wait a random time, create a socket, wait a random time, and close the socket
void* worker_create_socket(void *arg)
{ {
/*
unsigned long i = 0;
pthread_t id = pthread_self(); pthread_t id = pthread_self();
int fd, rs, rc;
if (pthread_equal(id,tid[0])) // if (pthread_equal(id,tid[0])) { }
{ for (int i=0; i<WORKER_ITERATIONS; i++) {
printf("\n First thread processing\n"); rs = rand_in_range(TIME_MULTIPLIER_MIN, TIME_MULTIPLIER_MAX);
rc = rand_in_range(TIME_MULTIPLIER_MIN, TIME_MULTIPLIER_MAX);
fprintf(stderr, "id=%d, rs = %d, rc = %d\n", id, rs, rc);
usleep(rs * TIME_GRANULARITY);
fd = SOCKET(AF_INET, SOCK_STREAM, 0);
usleep(rc * TIME_GRANULARITY);
CLOSE(fd);
} }
else
{
printf("\n Second thread processing\n");
}
for(i=0; i<(0xFFFFFFFF);i++);
*/
return NULL; return NULL;
} }
// test the core locking logic by creating large numbers of threads and performing random operations over an extended period of time
void multithread_socket_creation() void multithread_test(int num_iterations, bool *passed)
{ {
int err = 0, i = 0;
fprintf(stderr, "\n\nmultithread_socket_creation\n\n"); fprintf(stderr, "\n\nmultithread_socket_creation\n\n");
/* // test zts_socket() and zts_close()
pthread_t tid[2]; for (int j=0; j<num_iterations; j++) {
fprintf(stderr, "iteration=%d\n", j);
err = pthread_create(&(tid[i]), NULL, &create_socket, NULL); // create threads
if (err != 0) for (int i=0; i<CONCURRENCY_LEVEL; i++) {
printf("\ncan't create thread :[%s]", strerror(err)); fprintf(stderr,"creating thread [%d]\n", i);
else if ((err = pthread_create(&(tid[i]), NULL, &worker_create_socket, NULL)) < 0) {
printf("\n Thread created successfully\n"); fprintf(stderr, "there was a problem while creating thread [%d]\n", i);
*/ *passed = false;
// TODO: return;
}
}
// join all threads
char *b;
for (int i=0; i<CONCURRENCY_LEVEL; i++) {
if ((err = pthread_join(tid[i],(void**)&b)) < 0) {
fprintf(stderr, "error while joining thread [%d]\n", i);
*passed = false;
return;
}
}
}
*passed = true;
} }
void multithread_rw() // write a simple string message to a SOCK_DGRAM socket
void* worker_write_to_udp_socket(void *arg) {
fprintf(stderr, "\n\n\nwrite_to_udp_socket\n\n\n");
struct fd_addr_pair *fdp = (struct fd_addr_pair*)arg;
int fd = fdp->fd;
struct sockaddr_in *remote_addr = fdp->remote_addr;
//fprintf(stderr, "fd=%d\n", fd);
int w = 0;
for (int i=0; i<WORKER_ITERATIONS; i++) {
int r = rand_in_range(TIME_MULTIPLIER_MIN, TIME_MULTIPLIER_MAX);
usleep(r * TIME_GRANULARITY);
if ((w = SENDTO(fd, "hello", 5, 0, (struct sockaddr *)remote_addr, sizeof(*remote_addr))) < 0) {
DEBUG_ERROR("error sending packet, err=%d", errno);
}
}
return NULL;
}
// create a single socket and many threads to write to that single socket
void multithread_udp_write(struct sockaddr_in *local_addr, struct sockaddr_in *remote_addr, bool *passed)
{ {
fprintf(stderr, "\n\nmultithread_rw\n\n"); fprintf(stderr, "\n\nmultithread_udp_broadcast\n\n");
int fd, err;
if((fd = SOCKET(AF_INET, SOCK_DGRAM, 0)) < 0) {
DEBUG_ERROR("error while creating socket");
*passed = false;
return;
}
if ((err = BIND(fd, (struct sockaddr *)local_addr, sizeof(struct sockaddr_in)) < 0)) {
DEBUG_ERROR("error binding to interface (%d)", err);
perror("bind");
*passed = false;
return;
}
// params to send to new threads
struct fd_addr_pair fdp;
fdp.fd = fd;
fdp.remote_addr = remote_addr;
for (int i=0; i<CONCURRENCY_LEVEL; i++) {
fprintf(stderr,"creating thread [%d]\n", i);
if ((err = pthread_create(&(tid[i]), NULL, &worker_write_to_udp_socket, (void*)&fdp)) < 0) {
fprintf(stderr, "there was a problem while creating thread [%d]\n", i);
*passed = false;
return;
}
}
// join all threads
char *b;
for (int i=0; i<CONCURRENCY_LEVEL; i++) {
if ((err = pthread_join(tid[i],(void**)&b)) < 0) {
fprintf(stderr, "error while joining thread [%d]\n", i);
*passed = false;
return;
}
}
CLOSE(fd);
}
void multithread_rw_server()
{
fprintf(stderr, "\n\nmultithread_rw_server\n\n");
// TODO: Test read/writes from multiple threads // TODO: Test read/writes from multiple threads
} }
void multithread_rw_client()
{
fprintf(stderr, "\n\nmultithread_rw_client\n\n");
}
/****************************************************************************/
/* close() */
/****************************************************************************/
// Tests rapid opening and closure of sockets // Tests rapid opening and closure of sockets
void close_test(struct sockaddr *bind_addr) void close_test(struct sockaddr *bind_addr)
{ {
@@ -2310,15 +2426,16 @@ for(int i=0; i<num_repeats; i++)
{ {
DEBUG_TEST("\n\n\n --- COMPREHENSIVE TEST ITERATION: %d out of %d ---\n\n\n", i, num_repeats); DEBUG_TEST("\n\n\n --- COMPREHENSIVE TEST ITERATION: %d out of %d ---\n\n\n", i, num_repeats);
// closure test
#if defined(__SELFTEST__) #if defined(__SELFTEST__)
if (false) {
port = 1000; port = 1000;
// closure test
struct sockaddr_in in4; struct sockaddr_in in4;
DEBUG_TEST("testing closures by binding to: %s", local_ipstr.c_str()); DEBUG_TEST("testing closures by binding to: %s", local_ipstr.c_str());
str2addr(local_ipstr, port, 4, (struct sockaddr *)&in4); str2addr(local_ipstr, port, 4, (struct sockaddr *)&in4);
close_test((struct sockaddr*)&in4); close_test((struct sockaddr*)&in4);
port++; port++;
}
// Test adding, resolving, and removing a DNS server // Test adding, resolving, and removing a DNS server
@@ -2344,7 +2461,18 @@ for(int i=0; i<num_repeats; i++)
//test_bad_args(); //test_bad_args();
// OBSCURE API TEST // OBSCURE API TEST
obscure_api_test(&passed); //obscure_api_test(&passed);
//
ipv = 4;
port = start_port;
str2addr(local_ipstr, port, ipv, (struct sockaddr *)&local_addr);
str2addr(remote_ipstr, port, ipv, (struct sockaddr *)&remote_addr);
multithread_udp_write((struct sockaddr_in *)&local_addr, (struct sockaddr_in *)&remote_addr, &passed);
//
multithread_test(10, &passed);
//exit(0);
#endif // __SELFTEST__ #endif // __SELFTEST__