TCP reassembly add stat of TCP retransmit and TCP overlap

This commit is contained in:
luwenpeng
2024-05-06 15:54:16 +08:00
parent 309736f9f1
commit 61ee619689
8 changed files with 167 additions and 70 deletions

View File

@@ -81,6 +81,9 @@ enum session_stat
STAT_TCP_SEGS_EXPIRED,
STAT_TCP_PLDS_EXPIRED,
STAT_TCP_SEGS_RETRANSMIT,
STAT_TCP_PLDS_RETRANSMIT,
STAT_TCP_SEGS_OVERLAP,
STAT_TCP_PLDS_OVERLAP,

View File

@@ -225,16 +225,6 @@ static int check_options(const struct session_manager_options *opts)
* TCP
******************************************************************************/
/*
* The next routines deal with comparing 32 bit unsigned ints
* and worry about wraparound (automatic with unsigned arithmetic).
*/
static inline bool before(uint32_t seq1, uint32_t seq2)
{
return (int32_t)(seq1 - seq2) < 0;
}
static void tcp_clean(struct session_manager *mgr, struct session *sess)
{
struct tcp_reassembly *c2s_ssembler = sess->tcp_halfs[SESSION_DIRECTION_C2S].assembler;
@@ -279,6 +269,11 @@ static int tcp_init(struct session_manager *mgr, struct session *sess)
return -1;
}
SESSION_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p",
session_get_id(sess), session_get_tuple_str(sess),
sess->tcp_halfs[SESSION_DIRECTION_C2S].assembler,
sess->tcp_halfs[SESSION_DIRECTION_S2C].assembler);
return 0;
}
@@ -340,6 +335,7 @@ static void tcp_update(struct session_manager *mgr, struct session *sess, enum s
mgr->stat.nr_tcp_seg_received++;
uint32_t rcv_nxt = tcp_reassembly_get_recv_next(half->assembler);
// in order
if (half->seq == rcv_nxt)
{
session_inc_stat(sess, dir, STAT_TCP_SEGS_INORDER, 1);
@@ -350,16 +346,22 @@ static void tcp_update(struct session_manager *mgr, struct session *sess, enum s
half->in_order.len = len;
tcp_reassembly_inc_recv_next(half->assembler, len);
}
else if (before(half->seq, rcv_nxt))
// retransmission
else if (uint32_before(uint32_add(half->seq, len), rcv_nxt))
{
session_inc_stat(sess, dir, STAT_TCP_SEGS_OVERLAP, 1);
session_inc_stat(sess, dir, STAT_TCP_PLDS_OVERLAP, len);
mgr->stat.nr_tcp_seg_overlap++;
session_inc_stat(sess, dir, STAT_TCP_SEGS_RETRANSMIT, 1);
session_inc_stat(sess, dir, STAT_TCP_PLDS_RETRANSMIT, len);
mgr->stat.nr_tcp_seg_retransmit++;
}
else if ((seg = tcp_segment_new(half->seq, tcp_layer->pld_ptr, len)))
{
switch (tcp_reassembly_push(half->assembler, seg, now))
{
case -2:
session_inc_stat(sess, dir, STAT_TCP_SEGS_RETRANSMIT, 1);
session_inc_stat(sess, dir, STAT_TCP_PLDS_RETRANSMIT, len);
mgr->stat.nr_tcp_seg_retransmit++;
tcp_segment_free(seg);
case -1:
session_inc_stat(sess, dir, STAT_TCP_SEGS_NOSPACE, 1);
session_inc_stat(sess, dir, STAT_TCP_PLDS_NOSPACE, len);

View File

@@ -85,14 +85,15 @@ struct session_manager_stat
uint64_t nr_udp_pkts_evctd_bypass; // sum
// TCP segments
uint64_t nr_tcp_seg_received; // sum
uint64_t nr_tcp_seg_expired; // sum
uint64_t nr_tcp_seg_overlap; // sum
uint64_t nr_tcp_seg_no_space; // sum
uint64_t nr_tcp_seg_inorder; // sum
uint64_t nr_tcp_seg_reorded; // sum
uint64_t nr_tcp_seg_buffered; // sum
uint64_t nr_tcp_seg_released; // sum
uint64_t nr_tcp_seg_received; // sum
uint64_t nr_tcp_seg_expired; // sum
uint64_t nr_tcp_seg_retransmit; // sum
uint64_t nr_tcp_seg_overlap; // sum
uint64_t nr_tcp_seg_no_space; // sum
uint64_t nr_tcp_seg_inorder; // sum
uint64_t nr_tcp_seg_reorded; // sum
uint64_t nr_tcp_seg_buffered; // sum
uint64_t nr_tcp_seg_released; // sum
};
struct session_manager;

View File

@@ -334,6 +334,46 @@ TEST(CASE, TCP_FAST_OPEN)
EXPECT_TRUE(session_get_current_direction(sess) == SESSION_DIRECTION_C2S);
EXPECT_TRUE(session_get_1st_packet(sess, SESSION_DIRECTION_C2S) != NULL);
EXPECT_TRUE(session_get_1st_packet(sess, SESSION_DIRECTION_S2C) == NULL);
// TCP Segment
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_SEGS_RX) == 1);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_PLDS_RX) == 166);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_SEGS_EXPIRED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_PLDS_EXPIRED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_SEGS_RETRANSMIT) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_PLDS_RETRANSMIT) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_SEGS_OVERLAP) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_PLDS_OVERLAP) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_SEGS_NOSPACE) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_PLDS_NOSPACE) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_SEGS_INORDER) == 1);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_PLDS_INORDER) == 166);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_SEGS_REORDERED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_PLDS_REORDERED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_SEGS_BUFFERED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_PLDS_BUFFERED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_SEGS_RELEASED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_C2S, STAT_TCP_PLDS_RELEASED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_SEGS_RX) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_PLDS_RX) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_SEGS_EXPIRED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_PLDS_EXPIRED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_SEGS_RETRANSMIT) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_PLDS_RETRANSMIT) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_SEGS_OVERLAP) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_PLDS_OVERLAP) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_SEGS_NOSPACE) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_PLDS_NOSPACE) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_SEGS_INORDER) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_PLDS_INORDER) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_SEGS_REORDERED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_PLDS_REORDERED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_SEGS_BUFFERED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_PLDS_BUFFERED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_SEGS_RELEASED) == 0);
EXPECT_TRUE(session_get_stat(sess, SESSION_DIRECTION_S2C, STAT_TCP_PLDS_RELEASED) == 0);
session_print(sess);
struct tcp_segment *seg = session_get_tcp_segment(sess);

View File

@@ -102,6 +102,7 @@ struct stat_id
// TCP segments
int nr_tcp_seg_received;
int nr_tcp_seg_expired;
int nr_tcp_seg_retransmit;
int nr_tcp_seg_overlap;
int nr_tcp_seg_no_space;
int nr_tcp_seg_inorder;
@@ -226,6 +227,7 @@ struct stellar_stat *stellar_stat_new(uint16_t nr_thread)
// TCP segments
stat->ids.nr_tcp_seg_received = fieldstat_easy_register_counter(stat->fs, "tcp_seg_received");
stat->ids.nr_tcp_seg_expired = fieldstat_easy_register_counter(stat->fs, "tcp_seg_expired");
stat->ids.nr_tcp_seg_retransmit = fieldstat_easy_register_counter(stat->fs, "tcp_seg_retransmit");
stat->ids.nr_tcp_seg_overlap = fieldstat_easy_register_counter(stat->fs, "tcp_seg_overlap");
stat->ids.nr_tcp_seg_no_space = fieldstat_easy_register_counter(stat->fs, "tcp_seg_no_space");
stat->ids.nr_tcp_seg_inorder = fieldstat_easy_register_counter(stat->fs, "tcp_seg_inorder");
@@ -345,6 +347,7 @@ void stellar_stat_output(struct stellar_stat *stat)
stat->sess_stat.nr_tcp_seg_received += stat->thr_sess_stat[i].nr_tcp_seg_received;
stat->sess_stat.nr_tcp_seg_expired += stat->thr_sess_stat[i].nr_tcp_seg_expired;
stat->sess_stat.nr_tcp_seg_retransmit += stat->thr_sess_stat[i].nr_tcp_seg_retransmit;
stat->sess_stat.nr_tcp_seg_overlap += stat->thr_sess_stat[i].nr_tcp_seg_overlap;
stat->sess_stat.nr_tcp_seg_no_space += stat->thr_sess_stat[i].nr_tcp_seg_no_space;
stat->sess_stat.nr_tcp_seg_inorder += stat->thr_sess_stat[i].nr_tcp_seg_inorder;
@@ -432,6 +435,7 @@ void stellar_stat_output(struct stellar_stat *stat)
// TCP segments
fieldstat_easy_counter_set(stat->fs, 0, stat->ids.nr_tcp_seg_received, NULL, 0, stat->sess_stat.nr_tcp_seg_received);
fieldstat_easy_counter_set(stat->fs, 0, stat->ids.nr_tcp_seg_expired, NULL, 0, stat->sess_stat.nr_tcp_seg_expired);
fieldstat_easy_counter_set(stat->fs, 0, stat->ids.nr_tcp_seg_retransmit, NULL, 0, stat->sess_stat.nr_tcp_seg_retransmit);
fieldstat_easy_counter_set(stat->fs, 0, stat->ids.nr_tcp_seg_overlap, NULL, 0, stat->sess_stat.nr_tcp_seg_overlap);
fieldstat_easy_counter_set(stat->fs, 0, stat->ids.nr_tcp_seg_no_space, NULL, 0, stat->sess_stat.nr_tcp_seg_no_space);
fieldstat_easy_counter_set(stat->fs, 0, stat->ids.nr_tcp_seg_inorder, NULL, 0, stat->sess_stat.nr_tcp_seg_inorder);

View File

@@ -90,9 +90,12 @@ void tcp_reassembly_free(struct tcp_reassembly *assembler)
}
}
// return: 1: success (seg overlap)
// return: 0: success
// return: -1: failed (no space)
/*
* return: 1: push tcp segment success (segment overlap)
* return: 0: push tcp segment success
* return: -1: push tcp segment failed (no space)
* return: -2: push tcp segment failed (segment repeat)
*/
int tcp_reassembly_push(struct tcp_reassembly *assembler, struct tcp_segment *seg, uint64_t now)
{
if (assembler == NULL)
@@ -102,17 +105,32 @@ int tcp_reassembly_push(struct tcp_reassembly *assembler, struct tcp_segment *se
if (assembler->cur_seg_num >= assembler->max_seg_num)
{
TCP_REASSEMBLY_LOG_ERROR("assembler is full");
TCP_REASSEMBLY_LOG_ERROR("assembler %p is full", assembler);
return -1;
}
int ret = 0;
struct tcp_segment_private *p = container_of(seg, struct tcp_segment_private, seg);
if (interval_tree_iter_first(&assembler->root, p->node.start, p->node.last))
struct interval_tree_node *node = interval_tree_iter_first(&assembler->root, p->node.start, p->node.last);
if (node)
{
TCP_REASSEMBLY_LOG_DEBUG("seg overlap");
do
{
struct tcp_segment_private *t = container_of(node, struct tcp_segment_private, node);
if (t->node.start == p->node.start && t->node.last == p->node.last)
{
TCP_REASSEMBLY_LOG_DEBUG("assembler %p push segment %p [%lu, %lu] failed, segment repeat", assembler, seg, p->node.start, p->node.last);
return -2;
}
} while ((node = interval_tree_iter_next(node, p->node.start, p->node.last)));
TCP_REASSEMBLY_LOG_DEBUG("assembler %p push segment %p [%lu, %lu], but segment overlap", assembler, seg, p->node.start, p->node.last);
ret = 1;
}
else
{
TCP_REASSEMBLY_LOG_DEBUG("assembler %p push segment %p [%lu, %lu]", assembler, seg, p->node.start, p->node.last);
}
p->ts = now;
p->id = assembler->sum_seg_num++;
@@ -137,6 +155,7 @@ struct tcp_segment *tcp_reassembly_pop(struct tcp_reassembly *assembler)
return NULL;
}
uint64_t overlap = 0;
uint64_t min_id = UINT64_MAX;
struct tcp_segment_private *oldest = NULL;
while (node)
@@ -158,17 +177,15 @@ struct tcp_segment *tcp_reassembly_pop(struct tcp_reassembly *assembler)
if (oldest->node.start < assembler->recv_next)
{
// trim overlap
uint64_t overlap = assembler->recv_next - oldest->node.start;
overlap = assembler->recv_next - oldest->node.start;
oldest->seg.len -= overlap;
oldest->seg.data = (char *)oldest->data + overlap;
}
TCP_REASSEMBLY_LOG_DEBUG("assembler %p pop segment %p [%lu, %lu], trim overlap %lu", assembler, &oldest->seg, oldest->node.start, oldest->node.last, overlap);
// update recv_next
assembler->recv_next = oldest->node.last + 1;
if (assembler->recv_next > UINT32_MAX)
{
assembler->recv_next = assembler->recv_next % 4294967296;
}
assembler->recv_next = uint32_add(assembler->recv_next, oldest->seg.len);
return &oldest->seg;
}
@@ -191,6 +208,7 @@ struct tcp_segment *tcp_reassembly_expire(struct tcp_reassembly *assembler, uint
assembler->cur_seg_num--;
list_del(&p->lru);
interval_tree_remove(&p->node, &assembler->root);
TCP_REASSEMBLY_LOG_DEBUG("assembler %p expire segment %p [%lu, %lu]", assembler, &p->seg, p->node.start, p->node.last);
return &p->seg;
}
else
@@ -206,11 +224,8 @@ void tcp_reassembly_inc_recv_next(struct tcp_reassembly *assembler, uint32_t off
return;
}
assembler->recv_next += offset;
if (assembler->recv_next > UINT32_MAX)
{
assembler->recv_next = assembler->recv_next % 4294967296;
}
assembler->recv_next = uint32_add(assembler->recv_next, offset);
TCP_REASSEMBLY_LOG_DEBUG("assembler %p inc recv_next %u to %lu", assembler, offset, assembler->recv_next);
}
void tcp_reassembly_set_recv_next(struct tcp_reassembly *assembler, uint32_t seq)
@@ -221,6 +236,7 @@ void tcp_reassembly_set_recv_next(struct tcp_reassembly *assembler, uint32_t seq
}
assembler->recv_next = seq;
TCP_REASSEMBLY_LOG_DEBUG("assembler %p set recv_next %u", assembler, seq);
}
uint32_t tcp_reassembly_get_recv_next(struct tcp_reassembly *assembler)

View File

@@ -15,8 +15,8 @@ extern "C"
struct tcp_segment
{
uint32_t len;
const void *data;
uint32_t len;
const void *data;
};
struct tcp_segment *tcp_segment_new(uint32_t seq, const void *data, uint32_t len);
@@ -25,9 +25,12 @@ void tcp_segment_free(struct tcp_segment *seg);
struct tcp_reassembly *tcp_reassembly_new(uint64_t max_timeout, uint64_t max_seg_num);
void tcp_reassembly_free(struct tcp_reassembly *assembler);
// return: 1: success (seg overlap)
// return: 0: success
// return: -1: failed (no space)
/*
* return: 1: push tcp segment success (segment overlap)
* return: 0: push tcp segment success
* return: -1: push tcp segment failed (no space)
* return: -2: push tcp segment failed (segment repeat)
*/
int tcp_reassembly_push(struct tcp_reassembly *assembler, struct tcp_segment *seg, uint64_t now);
struct tcp_segment *tcp_reassembly_pop(struct tcp_reassembly *assembler);
struct tcp_segment *tcp_reassembly_expire(struct tcp_reassembly *assembler, uint64_t now);
@@ -36,6 +39,30 @@ void tcp_reassembly_inc_recv_next(struct tcp_reassembly *assembler, uint32_t off
void tcp_reassembly_set_recv_next(struct tcp_reassembly *assembler, uint32_t seq);
uint32_t tcp_reassembly_get_recv_next(struct tcp_reassembly *assembler);
/*
* The next routines deal with comparing 32 bit unsigned ints
* and worry about wraparound (automatic with unsigned arithmetic).
*/
static inline bool uint32_before(uint32_t seq1, uint32_t seq2)
{
return (int32_t)(seq1 - seq2) < 0;
}
static inline uint32_t uint32_add(uint32_t seq, uint32_t inc)
{
if (seq > UINT32_MAX - inc)
{
seq = ((uint64_t)seq + (uint64_t)inc) % (4294967296);
}
else
{
seq += inc;
}
return seq;
}
#ifdef __cplusplus
}
#endif

View File

@@ -137,7 +137,8 @@ TEST(TCP_REASSEMBLY, REPEAT)
seg = tcp_segment_new(100, "ABCDEFGHIJ", 10);
EXPECT_TRUE(seg != NULL);
EXPECT_TRUE(tcp_reassembly_push(queue, seg, 0) == 1);
EXPECT_TRUE(tcp_reassembly_push(queue, seg, 0) == -2); // repeat
tcp_segment_free(seg);
seg = tcp_reassembly_pop(queue);
EXPECT_TRUE(seg != NULL);
@@ -299,46 +300,49 @@ TEST(TCP_REASSEMBLY, MAX_TIMEOUT)
tcp_reassembly_set_recv_next(queue, 90);
/*
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* |0|1|2|3|4|5|6|7|8|9|a|b|c|d|e|f|g|h|i|j|
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | |A|B|C|D|E|F|G|H|I|J|
* | +-+-+-+-+-+-+-+-+-+-+
* | | |
* +---> 90 +---> 100 +---> 109
* +-+-+-+-+-+-+-+-+-+-+
* |0|1|2|3|4|5|6|7|8|9|
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | |A|B|C|D|E|F|G|H|I|J|
* | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | | |a|b|c|d|e|f|g|h|i|j|
* | | +-+-+-+-+-+-+-+-+-+-+
* | | |
* +---> 90 +-->95 +---> 100
*/
seg = tcp_segment_new(100, "abcdefghij", 10);
EXPECT_TRUE(seg != NULL);
EXPECT_TRUE(tcp_reassembly_push(queue, seg, 1) == 0);
seg = tcp_segment_new(100, "ABCDEFGHIJ", 10);
EXPECT_TRUE(seg != NULL);
EXPECT_TRUE(tcp_reassembly_push(queue, seg, 2) == 1);
seg = tcp_segment_new(90, "0123456789", 10);
EXPECT_TRUE(seg != NULL);
EXPECT_TRUE(tcp_reassembly_push(queue, seg, 3) == 0);
EXPECT_TRUE(tcp_reassembly_push(queue, seg, 1) == 0);
seg = tcp_reassembly_expire(queue, 11);
seg = tcp_segment_new(95, "ABCDEFGHIJ", 10);
EXPECT_TRUE(seg != NULL);
EXPECT_TRUE(seg->len == 10);
EXPECT_TRUE(memcmp(seg->data, "abcdefghij", seg->len) == 0);
tcp_segment_free(seg);
EXPECT_TRUE(tcp_reassembly_push(queue, seg, 2) == 1);
seg = tcp_segment_new(100, "abcdefghij", 10);
EXPECT_TRUE(seg != NULL);
EXPECT_TRUE(tcp_reassembly_push(queue, seg, 3) == 1);
seg = tcp_reassembly_expire(queue, 11);
EXPECT_TRUE(seg == NULL);
seg = tcp_reassembly_pop(queue);
EXPECT_TRUE(seg != NULL);
EXPECT_TRUE(seg->len == 10);
EXPECT_TRUE(memcmp(seg->data, "0123456789", seg->len) == 0);
tcp_segment_free(seg);
seg = tcp_reassembly_expire(queue, 11);
EXPECT_TRUE(seg == NULL);
tcp_reassembly_inc_recv_next(queue, 10);
seg = tcp_reassembly_pop(queue);
EXPECT_TRUE(seg != NULL);
EXPECT_TRUE(seg->len == 10);
EXPECT_TRUE(memcmp(seg->data, "ABCDEFGHIJ", seg->len) == 0);
EXPECT_TRUE(seg->len == 5);
EXPECT_TRUE(memcmp(seg->data, "FGHIJ", seg->len) == 0);
tcp_segment_free(seg);
seg = tcp_reassembly_pop(queue);
EXPECT_TRUE(seg != NULL);
EXPECT_TRUE(seg->len == 5);
EXPECT_TRUE(memcmp(seg->data, "fghij", seg->len) == 0);
tcp_segment_free(seg);
EXPECT_TRUE(tcp_reassembly_pop(queue) == NULL);