#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "tango_cache_client.h" struct event_base *ev_base; struct tango_cache_instance *tango_instance; #define MSG_OUT stdout /* Send info to stdout, change to stderr if you want */ struct future_pdata { struct future * future; FILE *fp; char filename[256]; }; void get_future_success(future_result_t* result, void * user) { struct tango_cache_result *res = tango_cache_read_result(result); struct future_pdata *pdata = (struct future_pdata *)user; char buffer[1024]; switch(res->type) { case RESULT_TYPE_USERTAG: case RESULT_TYPE_HEADER: memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size); buffer[res->size] = '\0'; printf("%s", buffer); break; case RESULT_TYPE_BODY: fwrite(res->data_frag, res->size, 1, pdata->fp); break; case RESULT_TYPE_MISS: printf("cache not hit/fresh\n"); case RESULT_TYPE_END: if(res->type != RESULT_TYPE_MISS) printf("get cache over, total length: %ld\n", res->tlength); future_destroy(pdata->future); fclose(pdata->fp); free(pdata); break; default:break; } } void get_future_failed(enum e_future_error err, const char * what, void * user) { struct future_pdata *pdata = (struct future_pdata *)user; future_destroy(pdata->future); fclose(pdata->fp); free(pdata); printf("GET fail: %s\n", what); } void head_future_success(future_result_t* result, void * user) { struct tango_cache_result *res = tango_cache_read_result(result); struct future_pdata *pdata = (struct future_pdata *)user; char buffer[1024]; switch(res->type) { case RESULT_TYPE_USERTAG: case RESULT_TYPE_HEADER: memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size); buffer[res->size] = '\0'; printf("%s", buffer); break; case RESULT_TYPE_BODY: assert(0); break; case RESULT_TYPE_MISS: printf("cache not hit/fresh\n"); case RESULT_TYPE_END: if(res->type != RESULT_TYPE_MISS) printf("HEAD cache over, total length: %ld\n", res->tlength); future_destroy(pdata->future); free(pdata); break; default:break; } } void head_future_failed(enum e_future_error err, const char * what, void * user) { struct future_pdata *pdata = (struct future_pdata *)user; future_destroy(pdata->future); free(pdata); printf("HEAD fail: %s\n", what); } void put_future_success(future_result_t* result, void * user) { struct future_pdata *pdata = (struct future_pdata *)user; printf("PUT %s succ\n", pdata->filename); future_destroy(pdata->future); free(pdata); } void put_future_failed(enum e_future_error err, const char * what, void * user) { struct future_pdata *pdata = (struct future_pdata *)user; printf("PUT %s fail: %s\n", what, pdata->filename); future_destroy(pdata->future); free(pdata); } void del_future_success(future_result_t* result, void * user) { struct future_pdata *pdata = (struct future_pdata *)user; printf("DEL %s succ\n", pdata->filename); future_destroy(pdata->future); free(pdata); } void del_future_failed(enum e_future_error err, const char * what, void * user) { struct future_pdata *pdata = (struct future_pdata *)user; printf("DEL %s fail: %s\n", pdata->filename, what); future_destroy(pdata->future); free(pdata); } char * get_file_content(const char *filename, size_t *filelen_out) { char *buffer; FILE *fp; size_t filelen = 0; struct stat filestat; int readlen; fp = fopen(filename, "rb"); if(fp == NULL) { printf("fopen file %s failed.\n", filename); return NULL; } if(fstat(fileno(fp), &filestat)) { printf("fstat %s failed.\n", filename); return NULL; } buffer = (char *)malloc(filestat.st_size); while(filelen < (size_t)filestat.st_size) { readlen = fread(buffer + filelen, 1, filestat.st_size - filelen, fp); if(readlen < 0) { printf("read error: %s\n", strerror(errno)); return NULL; } filelen += readlen; } fclose(fp); *filelen_out = filestat.st_size; return buffer; } int tango_cache_multi_delete(struct tango_cache_instance *instance, struct future* future, char *objlist[], u_int32_t num); static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) { char s[1024]; long int rv = 0; int n = 0; FILE *input = (FILE *)arg; static int index=0; char filename[128], method[16], buffer[1024], *p; char *dellist[16]; char *pstart, *save_ptr=NULL; int delnum=0; struct tango_cache_meta_put putmeta; struct tango_cache_meta_get getmeta; struct future_pdata *pdata; struct tango_cache_ctx *ctx; do { s[0]='\0'; rv = fscanf(input, "%[^:]:%1023s%n", method, s, &n); if(strlen(s) > 0) { p = method; memset(&putmeta, 0, sizeof(struct tango_cache_meta_put)); memset(&getmeta, 0, sizeof(struct tango_cache_meta_get)); putmeta.url = s; putmeta.std_hdr[HDR_CONTENT_TYPE] = "Content-Type: maintype/subtype"; putmeta.std_hdr[HDR_CONTENT_ENCODING] = "Content-Encoding: gzip"; putmeta.usertag = "Etag: hgdkqkwdwqekdfjwjfjwelkjfkwfejwhf\r\n"; putmeta.usertag_len = strlen(putmeta.usertag); getmeta.url = s; while(*p=='\r'||*p=='\n')p++; if(*p=='\0') continue; if(!strcasecmp(p, "GET")) { sprintf(filename, "file_index_%u.bin", index++); pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->fp = fopen(filename, "w"); pdata->future = future_create("_get", get_future_success, get_future_failed, (void *)pdata); if(tango_cache_fetch_object(tango_instance, pdata->future, &getmeta, OBJECT_IN_UNKNOWN) < 0) { get_future_failed(FUTURE_ERROR_CANCEL, "", pdata); } } else if(!strcasecmp(p, "HEAD")) { pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create("_head", head_future_success, head_future_failed, (void *)pdata); if(tango_cache_head_object(tango_instance, pdata->future, &getmeta) < 0) { head_future_failed(FUTURE_ERROR_CANCEL, "", pdata); } } else if(!strcasecmp(p, "PUTONCE")) { size_t filelen; p = get_file_content(s, &filelen); pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create("_putnoce", put_future_success, put_future_failed, (void *)pdata); if(tango_cache_upload_once_data(tango_instance, pdata->future, PUT_MEM_FREE, p, filelen, &putmeta, pdata->filename, 256)) { put_future_failed(FUTURE_ERROR_CANCEL, "", pdata); } } else if(!strcasecmp(p, "PUTONCEEV")) { size_t readlen; pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create("_putonceev", put_future_success, put_future_failed, pdata); struct evbuffer *evbuf = evbuffer_new(); char buffer[1024]; FILE *fp = fopen(s, "rb"); while(!feof(fp)) { readlen = fread(buffer, 1, 1024, fp); if(readlen < 0) { assert(0); } evbuffer_add(evbuf, buffer, readlen); } fclose(fp); if(tango_cache_upload_once_evbuf(tango_instance, pdata->future, EVBUFFER_MOVE, evbuf, &putmeta, pdata->filename, 256)) { put_future_failed(FUTURE_ERROR_CANCEL, "", pdata); } } else if(!strcasecmp(p, "DEL")) { pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create("_del", del_future_success, del_future_failed, pdata); sprintf(pdata->filename, "%s", s); tango_cache_delete_object(tango_instance, pdata->future, s); } else if(!strcasecmp(p, "DELMUL")) //TODO { pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create("_delmul", del_future_success, del_future_failed, pdata); sprintf(pdata->filename, "%s", s); for(pstart = strtok_r(s, ";", &save_ptr); pstart != NULL; pstart = strtok_r(NULL, ";", &save_ptr)) { dellist[delnum++] = pstart; } tango_cache_multi_delete(tango_instance, pdata->future, dellist, delnum); } else { pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create("_default", put_future_success, put_future_failed, pdata); ctx = tango_cache_update_start(tango_instance, pdata->future, &putmeta); if(ctx==NULL) { put_future_failed(FUTURE_ERROR_CANCEL, "tango_cache_update_start_NULL", pdata); continue; } FILE *fp = fopen(s, "r"); while(!feof(fp)) { n = fread(buffer, 1, 1024, fp); assert(n>=0); tango_cache_update_frag_data(ctx, buffer, n); } fclose(fp); if(tango_cache_update_end(ctx, pdata->filename, 256) < 0) { put_future_failed(FUTURE_ERROR_CANCEL, "", pdata); } } } else { break; } } while(rv != EOF); } struct event fifo_event; static int init_fifo(void) { struct stat st; curl_socket_t sockfd; static const char *fifo = "cache.fifo"; fprintf(MSG_OUT, "Creating named pipe \"%s\"\n", fifo); if(lstat (fifo, &st) == 0) { if((st.st_mode & S_IFMT) == S_IFREG) { errno = EEXIST; perror("lstat"); exit(1); } } unlink(fifo); if(mkfifo (fifo, 0600) == -1) { perror("mkfifo"); exit(1); } sockfd = open(fifo, O_RDWR | O_NONBLOCK, 0); if(sockfd == -1) { perror("open"); exit(1); } FILE *input = fdopen(sockfd, "r"); memset(&fifo_event, 0, sizeof(struct event)); fprintf(MSG_OUT, "Now, pipe some URL's into > %s\n", fifo); event_assign(&fifo_event, ev_base, sockfd, EV_READ|EV_PERSIST, dummy_accept_callback, input); event_add(&fifo_event, NULL); return (0); } void timer_cb(evutil_socket_t fd, short what, void *arg) { struct timeval tv; struct cache_statistics out; tv.tv_sec = 10; tv.tv_usec = 0; /*static int num=0; num++; if(ctx_global!=NULL && num>5) { tango_cache_update_end(ctx_global); ctx_global = NULL; }*/ tango_cache_get_statistics(tango_instance, &out); printf("-------------------------------------------------------------------------------------------\n" "get_recv: %llu, get_http: %llu, get_redis: %llu, get_fail_http: %llu, get_fail_redis: %llu, get_miss: %llu\n" "put_recv: %llu, put_http: %llu, put_redis: %llu, put_fail_http: %llu, put_fail_redis: %llu\n" "del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session_redis: %llu, session_http: %llu, memory: %llu\n", out.get_recv_num, out.get_succ_http, out.get_succ_redis, out.get_err_http, out.get_err_redis, out.get_miss_num, out.put_recv_num, out.put_succ_http, out.put_succ_redis, out.put_err_http, out.put_err_redis, out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_redis, out.session_http, out.memory_used); event_add((struct event *)arg, &tv); } int main(int crgc, char **arg) { struct event ev_timer; struct timeval tv; void *runtime_log; struct tango_cache_parameter *parameter; future_promise_library_init(NULL); runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10); if(NULL==runtime_log) { return -1; } ev_base = event_base_new(); if (!ev_base) { printf("create socket error!\n"); return 0; } init_fifo(); tango_cache_global_init(); parameter = tango_cache_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); assert(parameter != NULL); tango_instance = tango_cache_instance_new(parameter, ev_base, runtime_log); tv.tv_sec = 10; tv.tv_usec = 0; evtimer_assign(&ev_timer, ev_base, timer_cb, &ev_timer); evtimer_add(&ev_timer, &tv); event_base_dispatch(ev_base); return 0; }