#include #include #include #include "cJSON.h" #include "stellar/stellar.h" #include "stellar/session.h" #include "stellar/stellar_mq.h" #include "stellar/stellar_exdata.h" #include "stellar/stratum_decoder.h" #include "stellar/log.h" #define unused(x) ((void)(x)) #define STRATUM_LOG_MOUDLE "STRATUM_DECODER" //字段内容宏 #define METHOD_STR "method" #define WORKER_STR "worker" #define PARAMS_STR "params" #define AGENT_STR "agent" enum stratum_field_type { CRYPTOCURRENCY_FLAG = 1, MINING_POOL_FLAG, MINING_PROGRAM_FLAG, MINING_SUBSCRIBE_FLAG, }; struct stratum_decoder_info { int plugin_id; struct stellar *st; struct logger *log_handle; int tcp_topic_id; int stratum_decoder_topic_id; }; void free_stratum_filed(struct stratum_field *stratum_field) { if(stratum_field!=NULL){ if(stratum_field->mining_program.iov_base != NULL){ free(stratum_field->mining_program.iov_base); stratum_field->mining_program.iov_len = 0; } if(stratum_field->mining_pools.iov_base != NULL){ free(stratum_field->mining_pools.iov_base); stratum_field->mining_pools.iov_len = 0; } if(stratum_field->mining_subscribe.iov_base != NULL){ free(stratum_field->mining_subscribe.iov_base); stratum_field->mining_subscribe.iov_len = 0; } free(stratum_field); } return; } void stratum_decoder_session_msg_free_cb(void *msg, void *msg_free_arg) { unused(msg_free_arg); struct stratum_field *stratum_field = (struct stratum_field *)msg; free_stratum_filed(stratum_field); return; } int stratum_field_assign(char *content, struct stratum_field* stratum_field,int field_range) { switch (field_range) { case MINING_POOL_FLAG: stratum_field->mining_pools.iov_len=strlen(content); stratum_field->mining_pools.iov_base=(char*)malloc(stratum_field->mining_pools.iov_len+1); memcpy(stratum_field->mining_pools.iov_base, content, stratum_field->mining_pools.iov_len); ((char*)stratum_field->mining_pools.iov_base)[stratum_field->mining_pools.iov_len]='\0'; break; case MINING_PROGRAM_FLAG: stratum_field->mining_program.iov_len=strlen(content); stratum_field->mining_program.iov_base=(char*)malloc(stratum_field->mining_program.iov_len+1); memcpy(stratum_field->mining_program.iov_base, content, stratum_field->mining_program.iov_len); ((char*)stratum_field->mining_program.iov_base)[stratum_field->mining_program.iov_len]='\0'; break; case MINING_SUBSCRIBE_FLAG: stratum_field->mining_subscribe.iov_len=strlen(content); stratum_field->mining_subscribe.iov_base=(char*)malloc(stratum_field->mining_subscribe.iov_len+1); memcpy(stratum_field->mining_subscribe.iov_base, content,stratum_field->mining_subscribe.iov_len); ((char*)stratum_field->mining_subscribe.iov_base)[stratum_field->mining_subscribe.iov_len]='\0'; break; default: return -1; } return 0; } struct stratum_field *stratum_json_decode(struct stratum_decoder_info *stratum_decoder_info, char *raw_json_str) { if(raw_json_str==NULL){ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode error!:raw_json_str is null! "); return NULL; } cJSON *root = cJSON_Parse(raw_json_str); if (root == NULL){ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode error!:root_json is null,error: %s ",cJSON_GetErrorPtr()); return NULL; } cJSON *method_item = cJSON_GetObjectItem(root, METHOD_STR); if(method_item==NULL){ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode error!:not_found '%s' in raw_json_str ", METHOD_STR); cJSON_Delete(root); return NULL; } struct stratum_field *stratum_field = (struct stratum_field *)calloc(1, sizeof(struct stratum_field)); int field_range = 0; if(cJSON_IsString(method_item)){ int assign_ret = 0; STELLAR_LOG_DEBUG(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode,find '%s':%s\n", METHOD_STR, method_item->valuestring); if((strncasecmp("mining.subscribe", method_item->valuestring,strlen("mining.subscribe")) == 0) ||(strncasecmp("login", method_item->valuestring,strlen("login")) == 0)){ field_range = MINING_SUBSCRIBE_FLAG; assign_ret = stratum_field_assign(raw_json_str, stratum_field, field_range); cJSON *paras_item = cJSON_GetObjectItem(root, PARAMS_STR); if(paras_item == NULL){ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode:error!not_found '%s' in raw_json_str ", PARAMS_STR); goto err_ret; } if(cJSON_IsArray(paras_item)){ int asize = cJSON_GetArraySize(paras_item); stratum_field->type = OTHER; cJSON *array_item = cJSON_GetArrayItem(paras_item, 0); if(cJSON_IsString(array_item)){ field_range = MINING_PROGRAM_FLAG; assign_ret = stratum_field_assign(array_item->valuestring, stratum_field, field_range); } if(asize>2){ array_item = cJSON_GetArrayItem(paras_item, 2); if(cJSON_IsString(array_item)){ field_range = MINING_POOL_FLAG; assign_ret = stratum_field_assign(array_item->valuestring, stratum_field, field_range); } } }else if(cJSON_IsObject(paras_item) || cJSON_IsRaw(paras_item)){ cJSON *agent_item = cJSON_GetObjectItem(paras_item, AGENT_STR); if(agent_item == NULL){ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode:error!not_found '%s' in raw_json_str ", AGENT_STR); goto err_ret; } if(cJSON_IsString(agent_item)){ field_range = MINING_PROGRAM_FLAG; stratum_field->type = OTHER; assign_ret = stratum_field_assign(agent_item->valuestring, stratum_field, field_range); } }else{ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode:error!found '%s',json_value_type:%d\n ", PARAMS_STR, paras_item->type); goto err_ret; } }else if(strncasecmp("eth_submitLogin", method_item->valuestring, strlen("eth_submitLogin")) == 0){ field_range = MINING_SUBSCRIBE_FLAG; assign_ret = stratum_field_assign(raw_json_str, stratum_field, field_range); cJSON *worker_item = cJSON_GetObjectItem(root, WORKER_STR); if(worker_item == NULL){ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode:error!not_found '%s' in raw_json_str ", WORKER_STR); goto err_ret; } if(cJSON_IsString(worker_item)){ field_range = MINING_PROGRAM_FLAG; stratum_field->type = ETH; assign_ret = stratum_field_assign(worker_item->valuestring, stratum_field, field_range); }else{ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode:error!found '%s',json_value_type:%d\n ", WORKER_STR, worker_item->type); goto err_ret; } }else{ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode:error!found '%s',json_value_type:%d\n ", METHOD_STR, method_item->type); goto err_ret; } if(assign_ret<0){ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "field_assign_error:raw_json_str:%s\n ", raw_json_str); goto err_ret; } cJSON_Delete(root); return stratum_field; }else{ STELLAR_LOG_INFO(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "json_decode:error!found '%s',json_value_type:%d\n ", METHOD_STR, method_item->type); goto err_ret; } err_ret: cJSON_Delete(root); free_stratum_filed(stratum_field); return NULL; } struct stratum_field *stratum_data_process(struct stratum_decoder_info *stratum_decoder_info, const char *tcpdata, size_t datalen) { if(tcpdata == NULL || datalen == 0) { return NULL; } char *tcp_json=(char*)malloc(datalen+1); memcpy(tcp_json, tcpdata, datalen); tcp_json[datalen]='\0'; struct stratum_field *stratum_field=stratum_json_decode(stratum_decoder_info, tcp_json); if(stratum_field==NULL) { STELLAR_LOG_DEBUG(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "payload_decode failed!stratum_field is null"); } free(tcp_json); tcp_json=NULL; return stratum_field; } void stratum_decoder_tcp_on_msg_cb(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) { unused(topic_id); unused(per_session_ctx); unused(msg); struct stratum_decoder_info *stratum_decoder_info = (struct stratum_decoder_info *)plugin_env; const struct packet *pkt = session_get0_current_packet(sess); const char *payload = packet_get_payload(pkt); size_t payload_len = packet_get_payload_len(pkt); struct stratum_field *stratum_field = stratum_data_process(stratum_decoder_info, payload, payload_len); if (stratum_field != NULL) { if (session_mq_publish_message(sess, stratum_decoder_info->stratum_decoder_topic_id, stratum_field) < 0) { STELLAR_LOG_FATAL(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "stratum_decoder_tcp_on_msg_cb:session_mq_publish_message failed"); free_stratum_filed(stratum_field); } stellar_session_plugin_dettach_current_session(sess); } return; } extern "C" void *stratum_decoder_init(struct stellar *st) { struct stratum_decoder_info *stratum_decoder_info = (struct stratum_decoder_info *)malloc(sizeof(struct stratum_decoder_info)); stratum_decoder_info->st = st; stratum_decoder_info->log_handle = stellar_get_logger(st); stratum_decoder_info->plugin_id = stellar_session_plugin_register(st, NULL, NULL, stratum_decoder_info); if (stratum_decoder_info->plugin_id < 0) { STELLAR_LOG_FATAL(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "stellar_session_plugin_register failed"); goto ERROR; } stratum_decoder_info->stratum_decoder_topic_id = stellar_mq_create_topic(st, STRATUM_MESSAGE_TOPIC, stratum_decoder_session_msg_free_cb, NULL); if (stratum_decoder_info->stratum_decoder_topic_id < 0) { STELLAR_LOG_FATAL(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "stellar_session_mq_create_topic failed"); goto ERROR; } stratum_decoder_info->tcp_topic_id = stellar_mq_get_topic_id(st, TOPIC_TCP_STREAM); if (stratum_decoder_info->tcp_topic_id < 0) { STELLAR_LOG_FATAL(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "stellar_session_mq_get_topic_id failed"); goto ERROR; } if (stellar_session_mq_subscribe(st, stratum_decoder_info->tcp_topic_id, stratum_decoder_tcp_on_msg_cb, stratum_decoder_info->plugin_id) < 0) { STELLAR_LOG_FATAL(stratum_decoder_info->log_handle, STRATUM_LOG_MOUDLE, "stellar_session_mq_subscribe failed"); goto ERROR; } return stratum_decoder_info; ERROR: if (stratum_decoder_info) { free(stratum_decoder_info); } return NULL; } extern "C" void stratum_decoder_exit(void *plugin_env) { struct stratum_decoder_info *stratum_decoder_info = (struct stratum_decoder_info *)plugin_env; if (stratum_decoder_info != NULL) { free(stratum_decoder_info); } return; }