Feature connect kafka with sasl plaintext
This commit is contained in:
@@ -510,18 +510,17 @@ static unsigned char do_action_reset(const struct streaminfo *a_stream, Maat_rul
|
|||||||
|
|
||||||
static unsigned char do_action_drop(const struct streaminfo *a_stream, Maat_rule_t *p_result, tsg_protocol_t protocol)
|
static unsigned char do_action_drop(const struct streaminfo *a_stream, Maat_rule_t *p_result, tsg_protocol_t protocol)
|
||||||
{
|
{
|
||||||
if(protocol==PROTO_DNS)
|
switch(protocol)
|
||||||
{
|
{
|
||||||
return STATE_GIVEME|STATE_DROPPKT;
|
case PROTO_DNS:
|
||||||
}
|
return STATE_GIVEME|STATE_DROPPKT;
|
||||||
|
default:
|
||||||
if(g_tsg_para.deploy_mode==DEPLOY_MODE_MIRROR)
|
set_drop_stream(a_stream);
|
||||||
{
|
if(g_tsg_para.deploy_mode==DEPLOY_MODE_MIRROR)
|
||||||
return do_action_reset(a_stream, p_result, protocol);
|
{
|
||||||
}
|
return do_action_reset(a_stream, p_result, protocol);
|
||||||
else
|
}
|
||||||
{
|
break;
|
||||||
set_drop_stream(a_stream);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return STATE_DROPME|STATE_DROPPKT;
|
return STATE_DROPME|STATE_DROPPKT;
|
||||||
@@ -673,6 +672,11 @@ unsigned char tsg_deal_deny_action(const struct streaminfo *a_stream, Maat_rule_
|
|||||||
int method_type=TSG_METHOD_TYPE_RESET;
|
int method_type=TSG_METHOD_TYPE_RESET;
|
||||||
struct compile_user_region *user_region=NULL;
|
struct compile_user_region *user_region=NULL;
|
||||||
|
|
||||||
|
if(p_result->action==TSG_ACTION_BYPASS)
|
||||||
|
{
|
||||||
|
return ((type==ACTION_RETURN_TYPE_PROT) ? PROT_STATE_DROPME : APP_STATE_GIVEME);
|
||||||
|
}
|
||||||
|
|
||||||
user_region=(struct compile_user_region *)Maat_rule_get_ex_data(g_tsg_maat_feather, p_result, g_tsg_para.table_id[TABLE_SECURITY_COMPILE]);
|
user_region=(struct compile_user_region *)Maat_rule_get_ex_data(g_tsg_maat_feather, p_result, g_tsg_para.table_id[TABLE_SECURITY_COMPILE]);
|
||||||
if(user_region!=NULL)
|
if(user_region!=NULL)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1421,6 +1421,8 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
|
|||||||
|
|
||||||
MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", _instance->common_field_file, sizeof(_instance->common_field_file), NULL);
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", _instance->common_field_file, sizeof(_instance->common_field_file), NULL);
|
||||||
MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", _instance->broker_list, sizeof(_instance->broker_list), NULL);
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", _instance->broker_list, sizeof(_instance->broker_list), NULL);
|
||||||
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_USERNAME", _instance->sasl_username, sizeof(_instance->sasl_username), "admin");
|
||||||
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_PASSWD", _instance->sasl_passwd, sizeof(_instance->sasl_passwd), "galaxy2019");
|
||||||
|
|
||||||
MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000");
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000");
|
||||||
MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000");
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000");
|
||||||
@@ -1462,7 +1464,11 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
|
|||||||
rd_kafka_conf_set(rdkafka_conf, "request.required.acks", _instance->require_ack, kafka_errstr, sizeof(kafka_errstr));
|
rd_kafka_conf_set(rdkafka_conf, "request.required.acks", _instance->require_ack, kafka_errstr, sizeof(kafka_errstr));
|
||||||
rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
|
rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
|
||||||
rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", _instance->broker_list, kafka_errstr, sizeof(kafka_errstr));
|
rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", _instance->broker_list, kafka_errstr, sizeof(kafka_errstr));
|
||||||
|
rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
|
||||||
|
rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
|
||||||
|
rd_kafka_conf_set(rdkafka_conf, "sasl.username", _instance->sasl_username, kafka_errstr, sizeof(kafka_errstr));
|
||||||
|
rd_kafka_conf_set(rdkafka_conf, "sasl.password", _instance->sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
|
||||||
|
|
||||||
if(!(kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr))))
|
if(!(kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr))))
|
||||||
{
|
{
|
||||||
MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "KAFKA_INIT", "rd_kafka_new is error");
|
MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "KAFKA_INIT", "rd_kafka_new is error");
|
||||||
|
|||||||
@@ -144,6 +144,8 @@ struct tsg_log_instance_t
|
|||||||
char udp_label[MAX_STRING_LEN];
|
char udp_label[MAX_STRING_LEN];
|
||||||
char common_field_file[MAX_STRING_LEN*4];
|
char common_field_file[MAX_STRING_LEN*4];
|
||||||
char broker_list[MAX_STRING_LEN*8];
|
char broker_list[MAX_STRING_LEN*8];
|
||||||
|
char sasl_username[MAX_STRING_LEN];
|
||||||
|
char sasl_passwd[MAX_STRING_LEN];
|
||||||
char send_queue_max_msg[MAX_STRING_LEN];
|
char send_queue_max_msg[MAX_STRING_LEN];
|
||||||
char require_ack[MAX_STRING_LEN];
|
char require_ack[MAX_STRING_LEN];
|
||||||
char refresh_interval_ms[MAX_STRING_LEN];
|
char refresh_interval_ms[MAX_STRING_LEN];
|
||||||
|
|||||||
Reference in New Issue
Block a user