diff --git a/common/include/tfe_http.h b/common/include/tfe_http.h index 774cd9d..cffed8f 100644 --- a/common/include/tfe_http.h +++ b/common/include/tfe_http.h @@ -236,6 +236,7 @@ struct tfe_http_session_ops void (* ops_drop)(struct tfe_http_session * session); void (* ops_suspend)(struct tfe_http_session * session); void (* ops_resume)(struct tfe_http_session * session); + void (* ops_kill)(struct tfe_http_session * session); void (* ops_request_set)(struct tfe_http_session * session, struct tfe_http_half * req); void (* ops_response_set)(struct tfe_http_session * session, struct tfe_http_half * resp); @@ -397,6 +398,11 @@ static inline void tfe_http_session_drop(struct tfe_http_session * session) return session->ops->ops_drop(session); } +static inline void tfe_http_session_kill(struct tfe_http_session * session) +{ + return session->ops->ops_kill(session); +} + static inline void tfe_http_session_suspend(struct tfe_http_session * session) { return session->ops->ops_suspend(session); diff --git a/common/include/tfe_stream.h b/common/include/tfe_stream.h index 0c07bb1..4f40bda 100644 --- a/common/include/tfe_stream.h +++ b/common/include/tfe_stream.h @@ -90,6 +90,7 @@ void tfe_stream_resume(const struct tfe_stream * stream); //close both sides of the stream. int tfe_stream_shutdown(const struct tfe_stream * stream); int tfe_stream_shutdown_dir(const struct tfe_stream * stream, enum tfe_conn_dir dir); +void tfe_stream_kill(const struct tfe_stream * stream); /** * @brief Write linear text for given stream diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index b5bef27..d069cfd 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -1183,3 +1183,27 @@ void tfe_stream_write_access_log(const struct tfe_stream * stream, int level, co MESA_handle_runtime_log(_stream->stream_logger, level, "access", "%s %s", _stream->str_stream_addr, __tmp_buffer); free(__tmp_buffer); } + + +int tfe_stream_shutdown(const struct tfe_stream * stream) +{ + return 0; +} + +int tfe_stream_shutdown_dir(const struct tfe_stream * stream, enum tfe_conn_dir dir) +{ + return 0; +} + +void tfe_stream_kill(const struct tfe_stream * stream) +{ + struct tfe_stream_private * _stream = to_stream_private(stream); + const static struct linger sl {.l_onoff = 1, .l_linger = 0}; + + /* Set SO_LINGER, the fd will be closed by RST */ + setsockopt(_stream->conn_upstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); + setsockopt(_stream->conn_downstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); + + /* Destroy STREAM */ + return tfe_stream_destory(_stream); +} \ No newline at end of file diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index 72c89f5..9adef8a 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -58,6 +58,8 @@ struct http_session_private int suspend_counter; /* IN GC QUEUE, means the connection of session has destroyed */ bool in_gc_queue; + /* KILL */ + bool kill_signal; }; struct http_connection_private diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index 43f0b92..108a174 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -446,6 +446,12 @@ enum tfe_stream_action http_connection_entry(const struct tfe_stream * stream, e goto __passthrough; } + if(hs_private->kill_signal) + { + tfe_stream_kill(stream); + return ACTION_DROP_DATA; + } + ret = (dir == CONN_DIR_DOWNSTREAM) ? __on_request_handle_user_req_or_resp(stream, hs_private, hf_private_in, __need_to_close_the_session) : __on_response_handle_user_req_or_resp(stream, hs_private, hf_private_in, __need_to_close_the_session); diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index 424aceb..f37314e 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -900,6 +900,12 @@ void hs_ops_detach(const struct tfe_http_session * session) void hs_ops_drop(struct tfe_http_session * session) {} +void hs_ops_kill(struct tfe_http_session * session) +{ + struct http_session_private * hs_private = to_hs_private((struct tfe_http_session *) session); + hs_private->kill_signal = true; +} + void hs_ops_suspend(struct tfe_http_session * session) { struct http_session_private * hs_private = to_hs_private(session); @@ -1028,6 +1034,7 @@ struct tfe_http_session_ops __http_session_ops = .ops_drop = hs_ops_drop, .ops_suspend = hs_ops_suspend, .ops_resume = hs_ops_resume, + .ops_kill = hs_ops_kill, .ops_request_set = hs_ops_request_set, .ops_response_set = hs_ops_response_set, .ops_request_create = hs_ops_request_create, diff --git a/plugin/protocol/http/test/test_http_convert.cpp b/plugin/protocol/http/test/test_http_convert.cpp index 4151322..72e5b3c 100644 --- a/plugin/protocol/http/test/test_http_convert.cpp +++ b/plugin/protocol/http/test/test_http_convert.cpp @@ -384,6 +384,9 @@ unsigned int tfe_proxy_get_work_thread_count() return 0; } +void tfe_stream_kill(const struct tfe_stream * stream) +{} + const char * tfe_version() { return NULL; diff --git a/plugin/protocol/http/test/test_http_half.cpp b/plugin/protocol/http/test/test_http_half.cpp index 4da6603..a27968d 100644 --- a/plugin/protocol/http/test/test_http_half.cpp +++ b/plugin/protocol/http/test/test_http_half.cpp @@ -1539,6 +1539,9 @@ int tfe_stream_preempt(const struct tfe_stream * stream) return 0; } +void tfe_stream_kill(const struct tfe_stream * stream) +{} + unsigned int tfe_proxy_get_work_thread_count() { return 0;