diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 7a5aad0..b2915c1 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -44,9 +44,6 @@ struct tfe_conn_private evutil_socket_t fd; struct bufferevent * bev; uint8_t on_writing; - uint8_t closed; - uint8_t need_shutdown; - struct tfe_stream_write_ctx w_ctx; }; struct tfe_stream_private @@ -59,6 +56,8 @@ struct tfe_stream_private struct tfe_thread_ctx * thread_ref; enum tfe_stream_proto session_type; + struct tfe_stream_write_ctx * w_ctx_upstream; + struct tfe_stream_write_ctx * w_ctx_downstream; struct tfe_conn_private * conn_upstream; struct tfe_conn_private * conn_downstream; diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index bf86feb..e825076 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -148,33 +148,77 @@ void tfe_stream_resume(const struct tfe_stream * stream) struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) { struct tfe_stream_private * _stream = to_stream_private(stream); - struct tfe_conn_private * this_conn = __this_conn(_stream, dir); - struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); + struct tfe_stream_write_ctx ** ref_write_ctx = NULL; + struct tfe_conn_private * this_conn = NULL; + struct tfe_conn_private * peer_conn = NULL; - if (this_conn->on_writing == 1) + if (dir == CONN_DIR_DOWNSTREAM) { - return NULL; + this_conn = _stream->conn_downstream; + peer_conn = _stream->conn_upstream; + ref_write_ctx = &_stream->w_ctx_downstream; + } + else + { + this_conn = _stream->conn_upstream; + peer_conn = _stream->conn_downstream; + ref_write_ctx = &_stream->w_ctx_upstream; } - this_conn->w_ctx.dir = dir; - this_conn->w_ctx._stream = _stream; + if (*ref_write_ctx != NULL) + return NULL; + this_conn->on_writing = 1; + *ref_write_ctx = ALLOC(struct tfe_stream_write_ctx, 1); + (*ref_write_ctx)->_stream = _stream; + (*ref_write_ctx)->dir = dir; + bufferevent_disable(peer_conn->bev, EV_READ); - return &(this_conn->w_ctx); + return *ref_write_ctx; } int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) { - struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);; - int ret = bufferevent_write(this_conn->bev, data, size); + struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); + int ret = 0; + if (this_conn != NULL) + { + ret = bufferevent_write(this_conn->bev, data, size); + } + else + { + return -EPIPE; + } + return ret; } + void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) { struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); struct tfe_conn_private * peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir); - this_conn->on_writing = 0; - bufferevent_enable(peer_conn->bev, EV_READ); + + if (this_conn != NULL) + { + this_conn->on_writing = 0; + bufferevent_enable(peer_conn->bev, EV_READ); + } + + if (w_ctx->_stream != NULL) + { + if (w_ctx->dir == CONN_DIR_DOWNSTREAM) + { + assert(w_ctx->_stream->w_ctx_downstream == w_ctx); + w_ctx->_stream->w_ctx_downstream = NULL; + } + else + { + assert(w_ctx->_stream->w_ctx_upstream == w_ctx); + w_ctx->_stream->w_ctx_upstream = NULL; + } + } + + free(w_ctx); } int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size) @@ -577,7 +621,8 @@ __close_connection: if (*ref_this_conn != NULL) { - fprintf(stderr, "---- eventcb ----, close this connection, stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir); + fprintf(stderr, "---- eventcb ----, close this connection, " + "stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir); assert((*ref_this_conn)->on_writing == 0); __conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream);