diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 22ba14b..9df2956 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,4 +2,5 @@ add_subdirectory(timestamp) add_subdirectory(tuple) add_subdirectory(packet) add_subdirectory(session) -add_subdirectory(stellar) \ No newline at end of file +add_subdirectory(stellar) +add_subdirectory(dablooms) \ No newline at end of file diff --git a/src/dablooms/CMakeLists.txt b/src/dablooms/CMakeLists.txt new file mode 100644 index 0000000..9ea1fe1 --- /dev/null +++ b/src/dablooms/CMakeLists.txt @@ -0,0 +1,9 @@ +############################################################################### +# dablooms +############################################################################### + +add_library(dablooms dablooms.cpp murmur.cpp) +target_include_directories(dablooms PUBLIC ${CMAKE_SOURCE_DIR}/src/dablooms) +target_link_libraries(dablooms) + +add_subdirectory(test) \ No newline at end of file diff --git a/src/dablooms/README.md b/src/dablooms/README.md new file mode 100644 index 0000000..d606233 --- /dev/null +++ b/src/dablooms/README.md @@ -0,0 +1,259 @@ +Dablooms: A Scalable, Counting, Bloom Filter +---------------------------------- + +_Note_: this project has been mostly unmaintained for a while. + +### Overview +This project aims to demonstrate a novel Bloom filter implementation that can +scale, and provide not only the addition of new members, but reliable removal +of existing members. + +Bloom filters are a probabilistic data structure that provide space-efficient +storage of elements at the cost of possible false positive on membership +queries. + +**dablooms** implements such a structure that takes additional metadata to classify +elements in order to make an intelligent decision as to which Bloom filter an element +should belong. + +### Features +**dablooms**, in addition to the above, has several features. + +* Implemented as a static C library +* Memory mapped +* 4 bit counters +* Sequence counters for clean/dirty checks +* Python wrapper + +For performance, the low-level operations are implemented in C. It is also +memory mapped which provides async flushing and persistence at low cost. +In an effort to maintain memory efficiency, rather than using integers, or +even whole bytes as counters, we use only four bit counters. These four bit +counters allow up to 15 items to share a counter in the map. If more than a +small handful are sharing said counter, the Bloom filter would be overloaded +(resulting in excessive false positives anyway) at any sane error rate, so +there is no benefit in supporting larger counters. + +The Bloom filter also employs change sequence numbers to track operations performed +on the Bloom filter. These allow the application to determine if a write might have +only partially completed (perhaps due to a crash), leaving the filter in an +inconsistent state. The application can thus determine if a filter is ok or needs +to be recreated. The sequence number can be used to determine what a consistent but +out-of-date filter missed, and bring it up-to-date. + +There are two sequence numbers (and helper functions to get them): "mem_seqnum" and +"disk_seqnum". The "mem" variant is useful if the user is sure the OS didn't crash, +and the "disk" variant is useful if the OS might have crashed since the Bloom filter +was last changed. Both values could be "0", meaning the filter is possibly +inconsistent from their point of view, or a non-zero sequence number that the filter +is consistent with. The "mem" variant is often non-zero, but the "disk" variant only +becomes non-zero right after a (manual) flush. This can be expensive (it's an fsync), +so the value can be ignored if not relevant for the application. For example, if the +Bloom file exists in a directory which is cleared at boot (like `/tmp`), then the +application can safely assume that any existing file was not affected by an OS crash, +and never bother to flush or check disk_seqnum. Schemes involving batching up changes +are also possible. + +The dablooms library is not inherently thread safe, this is the clients responsibility. +Bindings are also not thread safe, unless they state otherwise. + +### Installing +Clone the repo, or download and extract a tarball of a tagged version +[from github](https://github.com/bitly/dablooms/tags). +In the source tree, type `make`, `make install` (`sudo` may be needed). +This will only install static and dynamic versions of the C dablooms library "libdablooms". + +To use a specific build directory, install prefix, or destination directory for packaging, +specify `BLDDIR`, `prefix`, or `DESTDIR` to make. For example: +`make install BLDDIR=/tmp/dablooms/bld DESTDIR=/tmp/dablooms/pkg prefix=/usr` + +Look at the output of `make help` for more options and targets. + +Also available are bindings for various other languages: + +#### Python (pydablooms) +To install the Python bindings "pydablooms" (currently only compatibly with python 2.x) +run `make pydablooms`, `make install_pydablooms` (`sudo` may be needed). + +To use and install for a specific version of Python installed on your system, +use the `PYTHON` option to make. For example: `make install_pydablooms PYTHON=python2.7`. +You can override the module install location with the `PY_MOD_DIR` option to make, +and the `BLDDIR` and `DESTDIR` options also affect pydablooms. + +The Makefile attempts to determine the python module location `PY_MOD_DIR` +automatically. It prefers a location in `/usr/local`, but you can specify +`PY_MOD_DIR_ARG=--user` to try to use the location which `pip install --user` +would use in your HOME dir. You can instead specify `PY_MOD_DIR_ARG=--system` +to prefer the normal/central system python module dir. + +See pydablooms/README.md for more info. + +#### Go (godablooms) +The Go bindings "godablooms" are not integrated into the Makefile. +Install libdablooms first, then look at `godablooms/README.md` + +### Contributing +If you make changes to C portions of dablooms which you would like merged into the +upstream repository, it would help to have your code match our C coding style. We use +[astyle](http://astyle.sourceforge.net/), svn rev 353 or later, on our code, with the +following options: + + astyle --style=1tbs --lineend=linux --convert-tabs --preserve-date \ + --fill-empty-lines --pad-header --indent-switches \ + --align-pointer=name --align-reference=name --pad-oper -n + +### Testing +To run a quick and dirty test, type `make test`. This test uses a list of words +and defaults to `/usr/share/dict/words`. If your path differs, you can use the +`WORDS` flag to specific its location, such as `make test WORDS=/usr/dict/words`. + +This will run a simple test that iterates through a word list and +adds each word to dablooms. It iterates again, removing every fifth +element. Lastly, it saves the file, opens a new filter, and iterates a third time +checking the existence of each word. It prints results of the true negatives, +false positives, true positives, and false negatives, and the false positive rate. + +The false positive rate is calculated by "false positives / (false positivies + true negatives)". +That is, what rate of real negatives are false positives. This is the interesting +statistic because the rate of false negatives should always be zero. + +The test uses a maximum error rate of .05 (5%) and an initial capacity of 100k. If +the dictionary is near 500k, we should have created 4 new filters in order to scale to size. + +A second test adds every other word in the list, and removes no words, causing each +used filter to stay at maximum capacity, which is a worse case for accuracy. + +Check out the performance yourself, and checkout the size of the resulting file! + +## Bloom Filter Basics +Bloom filters are probabilistic data structures that provide +space-efficient storage of elements at the cost of occasional false positives on +membership queries, i.e. a Bloom filter may state true on query when it in fact does +not contain said element. A Bloom filter is traditionally implemented as an array of +`M` bits, where `M` is the size of the Bloom filter. On initialization all bits are +set to zero. A filter is also parameterized by a constant `k` that defines the number +of hash functions used to set and test bits in the filter. Each hash function should +output one index in `M`. When inserting an element `x` into the filter, the bits +in the `k` indices `h1(x), h2(x), ..., hk(X)` are set. + +In order to query a Bloom filter, say for element `x`, it suffices to verify if +all bits in indices `h1(x), h2(x), ..., hk(x)` are set. If one or more of these +bits is not set then the queried element is definitely not present in the +filter. However, if all these bits are set, then the element is considered to +be in the filter. Given this procedure, an error probability exists for positive +matches, since the tested indices might have been set by the insertion of other +elements. + +### Counting Bloom Filters: Solving Removals +The same property that results in false positives *also* makes it +difficult to remove an element from the filter as there is no +easy means of discerning if another element is hashed to the same bit. +Unsetting a bit that is hashed by multiple elements can cause **false +negatives**. Using a counter, instead of a bit, can circumvent this issue. +The bit can be incremented when an element is hashed to a +given location, and decremented upon removal. Membership queries rely on whether a +given counter is greater than zero. This reduces the exceptional +space-efficiency provided by the standard Bloom filter. + +### Scalable Bloom Filters: Solving Scale +Another important property of a Bloom filter is its linear relationship between size +and storage capacity. If the maximum allowable error probability and the number of elements to store +are both known, it is relatively straightforward to dimension an appropriate +filter. However, it is not always possible to know how many elements +will need to be stored a priori. There is a trade off between over-dimensioning filters or +suffering from a ballooning error probability as it fills. + +Almeida, Baquero, Preguiça, Hutchison published a paper in 2006, on +[Scalable Bloom Filters](http://www.sciencedirect.com/science/article/pii/S0020019006003127), +which suggested a means of scalable Bloom filters by creating essentially +a list of Bloom filters that act as one large Bloom filter. When greater +capacity is desired, a new filter is added to the list. + +Membership queries are conducted on each filter with the positives +evaluated if the element is found in any one of the filters. Naively, this +leads to an increasing compounding error probability since the probability +of the given structure evaluates to: + + 1 - 𝚺(1 - P) + +It is possible to bound this error probability by adding a reducing tightening +ratio, `r`. As a result, the bounded error probability is represented as: + + 1 - 𝚺(1 - P0 * r^i) where r is chosen as 0 < r < 1 + +Since size is simply a function of an error probability and capacity, any +array of growth functions can be applied to scale the size of the Bloom filter +as necessary. We found it sufficient to pick .9 for `r`. + +## Problems with Mixing Scalable and Counting Bloom Filters +Scalable Bloom filters do not allow for the removal of elements from the filter. +In addition, simply converting each Bloom filter in a scalable Bloom filter into +a counting filter also poses problems. Since an element can be in any filter, and +Bloom filters inherently allow for false positives, a given element may appear to +be in two or more filters. If an element is inadvertently removed from a filter +which did not contain it, it would introduce the possibility of **false negatives**. + +If however, an element can be removed from the correct filter, it maintains +the integrity of said filter, i.e. prevents the possibility of false negatives. Thus, +a scaling, counting, Bloom filter is possible if upon additions and deletions +one can correctly decide which Bloom filter contains the element. + +There are several advantages to using a Bloom filter. A Bloom filter gives the +application cheap, memory efficient set operations, with no actual data stored +about the given element. Rather, Bloom filters allow the application to test, +with some given error probability, the membership of an item. This leads to the +conclusion that the majority of operations performed on Bloom filters are the +queries of membership, rather than the addition and removal of elements. Thus, +for a scaling, counting, Bloom filter, we can optimize for membership queries at +the expense of additions and removals. This expense comes not in performance, +but in the addition of more metadata concerning an element and its relation to +the Bloom filter. With the addition of some sort of identification of an +element, which does not need to be unique as long as it is fairly distributed, it +is possible to correctly determine which filter an element belongs to, thereby able +to maintain the integrity of a given Bloom filter with accurate additions +and removals. + +## Enter dablooms +dablooms is one such implementation of a scaling, counting, Bloom filter that takes +additional metadata during additions and deletions in the form of a (generally) +monotonically increasing integer to classify elements (possibly a timestamp). +This is used during additions/removals to easily determine the correct Bloom filter +for an element (each filter is assigned a range). Checking an item against the Bloom +filter, which is assumed to be the dominant activity, does not use the id (it works +like a normal scaling Bloom filter). + +dablooms is designed to scale itself using these identifiers and the given capacity. +When a Bloom filter is at capacity, dablooms will create a new Bloom filter which +starts at the next id after the greatest id of the previous Bloom filter. Given the +fact that the identifiers monotonically increase, new elements will be added to the +newest Bloom filter. Note, in theory and as implemented, nothing prevents one from +adding an element to any "older" filter. You just run the increasing risk of the +error probability growing beyond the bound as it becomes "overfilled". + +You can then remove any element from any Bloom filter using the identifier to intelligently +pick which Bloom filter to remove from. Consequently, as you continue to remove elements +from Bloom filters that you are not continuing to add to, these Bloom filters will become +more accurate. + +The "id" of an element does not need to be known to check the Bloom filter, but does need +to be known when the element is removed (and the same as when it was added). This might +be convenient if the item already has an appropriate id (almost always increasing for new +items) associated with it. + +### Example use case +There is a database with a collection of entries. There is a series of items, each of which +you want to look up in the database; most will have no entry in the database, but some +will. Perhaps it's a database of spam links. If you use dablooms in front of the database, +you can avoid needing to check the database for almost all items which won't be found in +it anyway, and save a lot of time and effort. It's also much easier to distribute the +Bloom filter than the entire database. But to make it work, you need to determine an "id" +whenever you add to or remove from the Bloom filter. You could store the timestamp when +you add the item to the database as another column in the database, and give it to +`scaling_bloom_add()` as well. When you remove the item, you look it up in the database +first and pass the timestamp stored there to `scaling_bloom_remove()`. The timestamps for +new items will be equal or greater, and definitely greater over time. Instead of +timestamps, you could also use an auto-incrementing index. Checks against the Bloom +don't need to know the id and should be quick. If a check comes back negative, you can be +sure the item isn't in the database, and skip that query completely. If a check comes +back positive, you have to query the database, because there's a slight chance that the +item isn't actually in there. diff --git a/src/dablooms/dablooms.cpp b/src/dablooms/dablooms.cpp new file mode 100644 index 0000000..2964f60 --- /dev/null +++ b/src/dablooms/dablooms.cpp @@ -0,0 +1,705 @@ +/* Copyright @2012 by Justin Hines at Bitly under a very liberal license. See LICENSE in the source distribution. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "murmur.h" +#include "dablooms.h" + +#define DABLOOMS_VERSION "0.9.1" + +#define ERROR_TIGHTENING_RATIO 0.5 +#define SALT_CONSTANT 0x97c29b3a + +const char *dablooms_version(void) +{ + return DABLOOMS_VERSION; +} + +void free_bitmap(bitmap_t *bitmap) +{ +#if 0 + if ((munmap(bitmap->array, bitmap->bytes)) < 0) { + perror("Error, unmapping memory"); + } +#else + free(bitmap->array); +#endif + free(bitmap); +} + +bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size) +{ + +#if 0 + /* resize if mmap exists and possible on this os, else new mmap */ + if (bitmap->array != NULL) { +#if __linux + bitmap->array = mremap(bitmap->array, old_size, new_size, MREMAP_MAYMOVE); + if (bitmap->array == MAP_FAILED) { + perror("Error resizing mmap"); + free_bitmap(bitmap); + return NULL; + } +#else + if (munmap(bitmap->array, bitmap->bytes) < 0) { + perror("Error unmapping memory"); + free_bitmap(bitmap); + return NULL; + } + bitmap->array = NULL; +#endif + } + if (bitmap->array == NULL) { + bitmap->array = mmap(NULL, new_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0); + if (bitmap->array == MAP_FAILED) { + perror("Error init mmap"); + free_bitmap(bitmap); + return NULL; + } + } +#else + if (bitmap->array != NULL) + { + bitmap->array = (char *)realloc(bitmap->array, new_size); + if (bitmap->array == NULL) + { + perror("Error resizing memory"); + free_bitmap(bitmap); + return NULL; + } + memset(bitmap->array + old_size, 0, new_size - old_size); + } + else + { + bitmap->array = (char *)malloc(new_size); + if (bitmap->array == NULL) + { + perror("Error init memory"); + free_bitmap(bitmap); + return NULL; + } + memset(bitmap->array, 0, new_size); + } +#endif + bitmap->bytes = new_size; + return bitmap; +} + +/* Create a new bitmap, not full featured, simple to give + * us a means of interacting with the 4 bit counters */ +bitmap_t *new_bitmap(size_t bytes) +{ + bitmap_t *bitmap; + + if ((bitmap = (bitmap_t *)malloc(sizeof(bitmap_t))) == NULL) + { + return NULL; + } + + bitmap->bytes = bytes; + bitmap->array = NULL; + + if ((bitmap = bitmap_resize(bitmap, 0, bytes)) == NULL) + { + return NULL; + } + + return bitmap; +} + +int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset) +{ + long access = index / 2 + offset; + uint8_t temp; + __builtin_prefetch(&(bitmap->array[access]), 0, 1); + uint8_t n = bitmap->array[access]; + if (index % 2 != 0) + { + temp = (n & 0x0f); + n = (n & 0xf0) + ((n & 0x0f) + 0x01); + } + else + { + temp = (n & 0xf0) >> 4; + n = (n & 0x0f) + ((n & 0xf0) + 0x10); + } + + if (temp == 0x0f) + { + // fprintf(stderr, "Error, 4 bit int Overflow\n"); + return -1; + } + + __builtin_prefetch(&(bitmap->array[access]), 1, 1); + bitmap->array[access] = n; + return 0; +} + +/* increments the four bit counter */ +int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset) +{ + long access = index / 2 + offset; + uint8_t temp; + uint8_t n = bitmap->array[access]; + + if (index % 2 != 0) + { + temp = (n & 0x0f); + n = (n & 0xf0) + ((n & 0x0f) - 0x01); + } + else + { + temp = (n & 0xf0) >> 4; + n = (n & 0x0f) + ((n & 0xf0) - 0x10); + } + + if (temp == 0x00) + { + // fprintf(stderr, "Error, Decrementing zero\n"); + return -1; + } + + bitmap->array[access] = n; + return 0; +} + +/* decrements the four bit counter */ +int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset) +{ + long access = index / 2 + offset; + if (index % 2 != 0) + { + return bitmap->array[access] & 0x0f; + } + else + { + return bitmap->array[access] & 0xf0; + } +} + +int bitmap_flush(bitmap_t *bitmap) +{ +#if 0 + if ((msync(bitmap->array, bitmap->bytes, MS_SYNC) < 0)) { + perror("Error, flushing bitmap to disk"); + return -1; + } else { + return 0; + } +#else + return 0; +#endif +} + +/* + * Perform the actual hashing for `key` + * + * Only call the hash once to get a pair of initial values (h1 and + * h2). Use these values to generate all hashes in a quick loop. + * + * See paper by Kirsch, Mitzenmacher [2006] + * http://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf + */ +void hash_func(counting_bloom_t *bloom, const char *key, size_t key_len, uint32_t *hashes) +{ + uint32_t checksum[4]; + + MurmurHash3_x64_128(key, key_len, SALT_CONSTANT, checksum); + uint32_t h1 = checksum[0]; + uint32_t h2 = checksum[1]; + + for (size_t i = 0; i < bloom->nfuncs; i++) + { + hashes[i] = (h1 + i * h2) % bloom->counts_per_func; + } +} + +int free_counting_bloom(counting_bloom_t *bloom) +{ + if (bloom != NULL) + { + free(bloom->hashes); + bloom->hashes = NULL; + free_bitmap(bloom->bitmap); + free(bloom); + bloom = NULL; + } + return 0; +} + +counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate, long offset) +{ + counting_bloom_t *bloom; + + if ((bloom = (counting_bloom_t *)malloc(sizeof(counting_bloom_t))) == NULL) + { + fprintf(stderr, "Error, could not realloc a new bloom filter\n"); + return NULL; + } + bloom->bitmap = NULL; + bloom->capacity = capacity; + bloom->error_rate = error_rate; + bloom->offset = offset + sizeof(counting_bloom_header_t); + bloom->nfuncs = (int)ceil(log(1 / error_rate) / log(2)); + bloom->counts_per_func = (int)ceil(capacity * fabs(log(error_rate)) / (bloom->nfuncs * pow(log(2), 2))); + bloom->size = bloom->nfuncs * bloom->counts_per_func; + /* rounding-up integer divide by 2 of bloom->size */ + bloom->num_bytes = ((bloom->size + 1) / 2) + sizeof(counting_bloom_header_t); + bloom->hashes = (uint32_t *)calloc(bloom->nfuncs, sizeof(uint32_t)); + + return bloom; +} + +counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate) +{ + counting_bloom_t *cur_bloom; + + cur_bloom = counting_bloom_init(capacity, error_rate, 0); + cur_bloom->bitmap = new_bitmap(cur_bloom->num_bytes); + cur_bloom->header = (counting_bloom_header_t *)(cur_bloom->bitmap->array); + return cur_bloom; +} + +int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len) +{ + unsigned int index, offset; + unsigned int *hashes = bloom->hashes; + + hash_func(bloom, s, len, hashes); + + for (size_t i = 0; i < bloom->nfuncs; i++) + { + offset = i * bloom->counts_per_func; + index = hashes[i] + offset; + bitmap_increment(bloom->bitmap, index, bloom->offset); + } + bloom->header->count++; + + return 0; +} + +int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len) +{ + unsigned int index, offset; + unsigned int *hashes = bloom->hashes; + + hash_func(bloom, s, len, hashes); + + for (size_t i = 0; i < bloom->nfuncs; i++) + { + offset = i * bloom->counts_per_func; + index = hashes[i] + offset; + bitmap_decrement(bloom->bitmap, index, bloom->offset); + } + bloom->header->count--; + + return 0; +} + +int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len) +{ + unsigned int index, offset; + unsigned int *hashes = bloom->hashes; + + hash_func(bloom, s, len, hashes); + + for (size_t i = 0; i < bloom->nfuncs; i++) + { + offset = i * bloom->counts_per_func; + index = hashes[i] + offset; + if (!(bitmap_check(bloom->bitmap, index, bloom->offset))) + { + return 0; + } + } + return 1; +} + +int free_scaling_bloom(scaling_bloom_t *bloom) +{ + int i; + for (i = bloom->num_blooms - 1; i >= 0; i--) + { + free(bloom->blooms[i]->hashes); + bloom->blooms[i]->hashes = NULL; + free(bloom->blooms[i]); + bloom->blooms[i] = NULL; + } + free(bloom->blooms); + free_bitmap(bloom->bitmap); + free(bloom); + return 0; +} + +/* creates a new counting bloom filter from a given scaling bloom filter, with count and id */ +counting_bloom_t *new_counting_bloom_from_scale(scaling_bloom_t *bloom) +{ + long offset; + double error_rate; + counting_bloom_t *cur_bloom; + + error_rate = bloom->error_rate * (pow(ERROR_TIGHTENING_RATIO, bloom->num_blooms + 1)); + + if ((bloom->blooms = (counting_bloom_t **)realloc(bloom->blooms, (bloom->num_blooms + 1) * sizeof(counting_bloom_t *))) == NULL) + { + fprintf(stderr, "Error, could not realloc a new bloom filter\n"); + return NULL; + } + + cur_bloom = counting_bloom_init(bloom->capacity, error_rate, bloom->num_bytes); + bloom->blooms[bloom->num_blooms] = cur_bloom; + + bloom->bitmap = bitmap_resize(bloom->bitmap, bloom->num_bytes, bloom->num_bytes + cur_bloom->num_bytes); + + /* reset header pointer, as mmap may have moved */ + bloom->header = (scaling_bloom_header_t *)bloom->bitmap->array; + + /* Set the pointers for these header structs to the right location since mmap may have moved */ + bloom->num_blooms++; + for (unsigned int i = 0; i < bloom->num_blooms; i++) + { + offset = bloom->blooms[i]->offset - sizeof(counting_bloom_header_t); + bloom->blooms[i]->header = (counting_bloom_header_t *)(bloom->bitmap->array + offset); + } + + bloom->num_bytes += cur_bloom->num_bytes; + cur_bloom->bitmap = bloom->bitmap; + + return cur_bloom; +} + +uint64_t scaling_bloom_clear_seqnums(scaling_bloom_t *bloom) +{ + uint64_t seqnum; + + if (bloom->header->disk_seqnum != 0) + { + // disk_seqnum cleared on disk before any other changes + bloom->header->disk_seqnum = 0; + bitmap_flush(bloom->bitmap); + } + seqnum = bloom->header->mem_seqnum; + bloom->header->mem_seqnum = 0; + return seqnum; +} + +int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id) +{ + int i; + uint64_t seqnum; + + counting_bloom_t *cur_bloom = NULL; + for (i = bloom->num_blooms - 1; i >= 0; i--) + { + cur_bloom = bloom->blooms[i]; + if (id >= cur_bloom->header->id) + { + break; + } + } + + seqnum = scaling_bloom_clear_seqnums(bloom); + + if ((id > bloom->header->max_id) && (cur_bloom->header->count >= cur_bloom->capacity - 1)) + { + cur_bloom = new_counting_bloom_from_scale(bloom); + cur_bloom->header->count = 0; + cur_bloom->header->id = bloom->header->max_id + 1; + } + if (bloom->header->max_id < id) + { + bloom->header->max_id = id; + } + counting_bloom_add(cur_bloom, s, len); + + bloom->header->mem_seqnum = seqnum + 1; + + return 1; +} + +int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id) +{ + counting_bloom_t *cur_bloom; + int i; + uint64_t seqnum; + + for (i = bloom->num_blooms - 1; i >= 0; i--) + { + cur_bloom = bloom->blooms[i]; + if (id >= cur_bloom->header->id) + { + seqnum = scaling_bloom_clear_seqnums(bloom); + + counting_bloom_remove(cur_bloom, s, len); + + bloom->header->mem_seqnum = seqnum + 1; + return 1; + } + } + return 0; +} + +int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len) +{ + int i; + counting_bloom_t *cur_bloom; + for (i = bloom->num_blooms - 1; i >= 0; i--) + { + cur_bloom = bloom->blooms[i]; + if (counting_bloom_check(cur_bloom, s, len)) + { + return 1; + } + } + return 0; +} + +int scaling_bloom_flush(scaling_bloom_t *bloom) +{ + if (bitmap_flush(bloom->bitmap) != 0) + { + return -1; + } + // all changes written to disk before disk_seqnum set + if (bloom->header->disk_seqnum == 0) + { + bloom->header->disk_seqnum = bloom->header->mem_seqnum; + return bitmap_flush(bloom->bitmap); + } + return 0; +} + +uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom) +{ + return bloom->header->mem_seqnum; +} + +uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom) +{ + return bloom->header->disk_seqnum; +} + +scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate) +{ + scaling_bloom_t *bloom; + + if ((bloom = (scaling_bloom_t *)malloc(sizeof(scaling_bloom_t))) == NULL) + { + return NULL; + } + if ((bloom->bitmap = new_bitmap(sizeof(scaling_bloom_header_t))) == NULL) + { + fprintf(stderr, "Error, Could not create bitmap with file\n"); + free_scaling_bloom(bloom); + return NULL; + } + + bloom->header = (scaling_bloom_header_t *)bloom->bitmap->array; + bloom->capacity = capacity; + bloom->error_rate = error_rate; + bloom->num_blooms = 0; + bloom->num_bytes = sizeof(scaling_bloom_header_t); + bloom->blooms = NULL; + + return bloom; +} + +scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate) +{ + + scaling_bloom_t *bloom; + counting_bloom_t *cur_bloom; + + bloom = scaling_bloom_init(capacity, error_rate); + + if (!(cur_bloom = new_counting_bloom_from_scale(bloom))) + { + fprintf(stderr, "Error, Could not create counting bloom\n"); + free_scaling_bloom(bloom); + return NULL; + } + cur_bloom->header->count = 0; + cur_bloom->header->id = 0; + + bloom->header->mem_seqnum = 1; + return bloom; +} + +struct expiry_dablooms_handle +{ + scaling_bloom_t *cur_bloom; + scaling_bloom_t *next_bloom; + time_t cur_bloom_start; + time_t next_bloom_start; + time_t last_bloom_check; + uint64_t cur_bloom_inc_id; + uint64_t next_bloom_inc_id; + unsigned int capacity; + int expiry_time; + time_t cur_time; + double error_rate; +}; + +char *expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno) +{ + switch (_errno) + { + case EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL: + return (char *)"scaling_bloom_null"; + case EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL: + return (char *)"new_scaling_bloom_fail"; + default: + return (char *)"unknown"; + } +} + +void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle) +{ + if (handle != NULL) + { + if (handle->cur_bloom != NULL) + { + free_scaling_bloom(handle->cur_bloom); + } + if (handle->next_bloom != NULL) + { + free_scaling_bloom(handle->next_bloom); + } + FREE(&handle); + } +} + +struct expiry_dablooms_handle *expiry_dablooms_init(unsigned int capacity, double error_rate, time_t cur_time, int expiry_time) +{ + struct expiry_dablooms_handle *handle = ALLOC(struct expiry_dablooms_handle, 1); + scaling_bloom_t *cur_bloom = new_scaling_bloom(capacity, error_rate); + if (cur_bloom == NULL) + { + goto error_out; + } + handle->cur_bloom = cur_bloom; + handle->cur_bloom_inc_id = 0; + handle->cur_bloom_start = cur_time; + handle->capacity = capacity; + handle->error_rate = error_rate; + handle->expiry_time = expiry_time; + handle->cur_time = cur_time; + return handle; + +error_out: + expiry_dablooms_destroy(handle); + return NULL; +} + +int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count) +{ + if (handle == NULL || handle->cur_bloom == NULL) + { + return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL; + } + *count = handle->cur_bloom_inc_id; + return 0; +} + +static int bloom_expired_check(struct expiry_dablooms_handle *handle, time_t cur_time) +{ + if (handle == NULL || handle->cur_bloom == NULL) + { + return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL; + } + if (cur_time <= handle->last_bloom_check) + { + return 0; + } + time_t delta_time = cur_time - handle->cur_bloom_start; + handle->cur_time = cur_time; + if (delta_time >= handle->expiry_time) + { + free_scaling_bloom(handle->cur_bloom); + if (handle->next_bloom != NULL) + { + handle->cur_bloom = handle->next_bloom; + handle->cur_bloom_start = handle->next_bloom_start; + handle->cur_bloom_inc_id = handle->next_bloom_inc_id; + handle->next_bloom = NULL; + handle->last_bloom_check = 0; + } + else + { + scaling_bloom_t *cur_bloom = new_scaling_bloom(handle->capacity, handle->error_rate); + if (cur_bloom == NULL) + { + return EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL; + } + handle->cur_bloom = cur_bloom; + handle->cur_bloom_inc_id = 0; + handle->cur_bloom_start = cur_time; + handle->last_bloom_check = 0; + } + } + else + { + handle->last_bloom_check = cur_time; + } + return 0; +} + +int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time) +{ + if (key == NULL || len == 0 || handle == NULL) + { + return -1; + } + int ret = bloom_expired_check(handle, cur_time); + if (ret < 0) + { + return ret; + } + + scaling_bloom_add(handle->cur_bloom, key, len, handle->cur_bloom_inc_id); + handle->cur_bloom_inc_id++; + time_t delta_time = cur_time - handle->cur_bloom_start; + handle->cur_time = cur_time; + if (delta_time >= handle->expiry_time) + { + if (handle->next_bloom == NULL) + { + scaling_bloom_t *next_bloom = new_scaling_bloom(handle->capacity, handle->error_rate); + if (next_bloom == NULL) + { + return EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL; + } + handle->next_bloom = next_bloom; + handle->next_bloom_inc_id = 0; + handle->next_bloom_start = cur_time; + } + scaling_bloom_add(handle->next_bloom, key, len, handle->next_bloom_inc_id); + handle->next_bloom_inc_id++; + } + return 0; +} + +int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time) +{ + if (key == NULL || len == 0 || handle == NULL) + { + return -1; + } + int ret = bloom_expired_check(handle, cur_time); + if (ret < 0) + { + return ret; + } + int bloom_hit = scaling_bloom_check(handle->cur_bloom, key, len); + return bloom_hit; +} diff --git a/src/dablooms/dablooms.h b/src/dablooms/dablooms.h new file mode 100644 index 0000000..9083818 --- /dev/null +++ b/src/dablooms/dablooms.h @@ -0,0 +1,100 @@ +/* Copyright @2012 by Justin Hines at Bitly under a very liberal license. See LICENSE in the source distribution. */ + +#ifndef __BLOOM_H__ +#define __BLOOM_H__ +#include +#include + +#define ALLOC(type, number) ((type *)calloc(sizeof(type), number)) +#define FREE(p) \ + { \ + free(*p); \ + *p = NULL; \ + } + +const char *dablooms_version(void); + +typedef struct +{ + size_t bytes; + char *array; +} bitmap_t; + +bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size); +bitmap_t *new_bitmap(size_t bytes); + +int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset); +int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset); +int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset); +int bitmap_flush(bitmap_t *bitmap); + +void free_bitmap(bitmap_t *bitmap); + +typedef struct +{ + uint64_t id; + uint32_t count; + uint32_t _pad; +} counting_bloom_header_t; + +typedef struct +{ + counting_bloom_header_t *header; + unsigned int capacity; + long offset; + unsigned int counts_per_func; + uint32_t *hashes; + size_t nfuncs; + size_t size; + size_t num_bytes; + double error_rate; + bitmap_t *bitmap; +} counting_bloom_t; + +int free_counting_bloom(counting_bloom_t *bloom); +counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate); +int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len); +int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len); +int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len); + +typedef struct +{ + uint64_t max_id; + uint64_t mem_seqnum; + uint64_t disk_seqnum; +} scaling_bloom_header_t; + +typedef struct +{ + scaling_bloom_header_t *header; + unsigned int capacity; + unsigned int num_blooms; + size_t num_bytes; + double error_rate; + counting_bloom_t **blooms; + bitmap_t *bitmap; +} scaling_bloom_t; + +scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate); +int free_scaling_bloom(scaling_bloom_t *bloom); +int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id); +int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id); +int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len); +int scaling_bloom_flush(scaling_bloom_t *bloom); +uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom); +uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom); + +struct expiry_dablooms_handle; +enum expiry_dablooms_errno +{ + EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL = -1, + EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL = -2, +}; +char *expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno); +void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle); +struct expiry_dablooms_handle *expiry_dablooms_init(unsigned int capacity, double error_rate, time_t cur_time, int expiry_time); +int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count); +int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time); +int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time); + +#endif diff --git a/src/dablooms/murmur.cpp b/src/dablooms/murmur.cpp new file mode 100644 index 0000000..a36c5fa --- /dev/null +++ b/src/dablooms/murmur.cpp @@ -0,0 +1,153 @@ +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the public +// domain. The author hereby disclaims copyright to this source code. + +// Note - The x86 and x64 versions do _not_ produce the same results, as the +// algorithms are optimized for their respective platforms. You can still +// compile and run any of them on any platform, but your performance with the +// non-native version will be less than optimal. + +#include "murmur.h" + +#define FORCE_INLINE inline static + +FORCE_INLINE uint64_t rotl64(uint64_t x, int8_t r) +{ + return (x << r) | (x >> (64 - r)); +} + +#define ROTL64(x, y) rotl64(x, y) + +#define BIG_CONSTANT(x) (x##LLU) + +#define getblock(x, i) (x[i]) + +//----------------------------------------------------------------------------- +// Finalization mix - force all bits of a hash block to avalanche + +FORCE_INLINE uint64_t fmix64(uint64_t k) +{ + k ^= k >> 33; + k *= BIG_CONSTANT(0xff51afd7ed558ccd); + k ^= k >> 33; + k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53); + k ^= k >> 33; + + return k; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x64_128(const void *key, const int len, const uint32_t seed, void *out) +{ + const uint8_t *data = (const uint8_t *)key; + const int nblocks = len / 16; + + uint64_t h1 = seed; + uint64_t h2 = seed; + + uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5); + uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f); + + int i; + + //---------- + // body + + const uint64_t *blocks = (const uint64_t *)(data); + + for (i = 0; i < nblocks; i++) + { + uint64_t k1 = getblock(blocks, i * 2 + 0); + uint64_t k2 = getblock(blocks, i * 2 + 1); + + k1 *= c1; + k1 = ROTL64(k1, 31); + k1 *= c2; + h1 ^= k1; + + h1 = ROTL64(h1, 27); + h1 += h2; + h1 = h1 * 5 + 0x52dce729; + + k2 *= c2; + k2 = ROTL64(k2, 33); + k2 *= c1; + h2 ^= k2; + + h2 = ROTL64(h2, 31); + h2 += h1; + h2 = h2 * 5 + 0x38495ab5; + } + + //---------- + // tail + + const uint8_t *tail = (const uint8_t *)(data + nblocks * 16); + + uint64_t k1 = 0; + uint64_t k2 = 0; + + switch (len & 15) + { + case 15: + k2 ^= ((uint64_t)tail[14]) << 48; + case 14: + k2 ^= ((uint64_t)tail[13]) << 40; + case 13: + k2 ^= ((uint64_t)tail[12]) << 32; + case 12: + k2 ^= ((uint64_t)tail[11]) << 24; + case 11: + k2 ^= ((uint64_t)tail[10]) << 16; + case 10: + k2 ^= ((uint64_t)tail[9]) << 8; + case 9: + k2 ^= ((uint64_t)tail[8]) << 0; + k2 *= c2; + k2 = ROTL64(k2, 33); + k2 *= c1; + h2 ^= k2; + + case 8: + k1 ^= ((uint64_t)tail[7]) << 56; + case 7: + k1 ^= ((uint64_t)tail[6]) << 48; + case 6: + k1 ^= ((uint64_t)tail[5]) << 40; + case 5: + k1 ^= ((uint64_t)tail[4]) << 32; + case 4: + k1 ^= ((uint64_t)tail[3]) << 24; + case 3: + k1 ^= ((uint64_t)tail[2]) << 16; + case 2: + k1 ^= ((uint64_t)tail[1]) << 8; + case 1: + k1 ^= ((uint64_t)tail[0]) << 0; + k1 *= c1; + k1 = ROTL64(k1, 31); + k1 *= c2; + h1 ^= k1; + } + + //---------- + // finalization + + h1 ^= len; + h2 ^= len; + + h1 += h2; + h2 += h1; + + h1 = fmix64(h1); + h2 = fmix64(h2); + + h1 += h2; + h2 += h1; + + ((uint64_t *)out)[0] = h1; + ((uint64_t *)out)[1] = h2; +} + +//----------------------------------------------------------------------------- diff --git a/src/dablooms/murmur.h b/src/dablooms/murmur.h new file mode 100644 index 0000000..6e3c133 --- /dev/null +++ b/src/dablooms/murmur.h @@ -0,0 +1,12 @@ +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the public +// domain. The author hereby disclaims copyright to this source code. + +#ifndef _MURMURHASH3_H_ +#define _MURMURHASH3_H_ + +#include + +void MurmurHash3_x64_128(const void *key, int len, uint32_t seed, void *out); + +#endif // _MURMURHASH3_H_ diff --git a/src/dablooms/test/CMakeLists.txt b/src/dablooms/test/CMakeLists.txt new file mode 100644 index 0000000..9254651 --- /dev/null +++ b/src/dablooms/test/CMakeLists.txt @@ -0,0 +1,10 @@ +############################################################################### +# gtest +############################################################################### + +add_executable(gtest_dablooms gtest_dablooms.cpp) +target_include_directories(gtest_dablooms PUBLIC ${CMAKE_CURRENT_LIST_DIR}) +target_link_libraries(gtest_dablooms dablooms gtest) + +include(GoogleTest) +gtest_discover_tests(gtest_dablooms) \ No newline at end of file diff --git a/src/dablooms/test/gtest_dablooms.cpp b/src/dablooms/test/gtest_dablooms.cpp new file mode 100644 index 0000000..36740a2 --- /dev/null +++ b/src/dablooms/test/gtest_dablooms.cpp @@ -0,0 +1,71 @@ +#include + +#include "dablooms.h" + +struct packet_key +{ + unsigned int tcp_seq; + unsigned int tcp_ack; + unsigned short sport; + unsigned short dport; + unsigned short checksum; + unsigned short ip_id; + unsigned int ip_src; + unsigned int ip_dst; +} __attribute__((packed, aligned(1))); + +struct duplicate_packet_idetify_config +{ + int enable; + + unsigned int capacity; + double error_rate; + int expiry_time; +} config = { + .enable = 1, + .capacity = 1000000, + .error_rate = 0.00001, + .expiry_time = 10, +}; + +struct packet_key key = { + .tcp_seq = 2172673142, + .tcp_ack = 2198097831, + .sport = 46582, + .dport = 443, + .checksum = 0x2c4b, + .ip_id = 65535, + .ip_src = 123456, + .ip_dst = 789000, +}; + +TEST(DABLOOMS, ADD_SEARCH) +{ + struct expiry_dablooms_handle *handle = expiry_dablooms_init(config.capacity, config.error_rate, time(NULL), config.expiry_time); + EXPECT_TRUE(handle != nullptr); + + EXPECT_TRUE(expiry_dablooms_search(handle, (const char *)&key, sizeof(key), time(NULL)) != 1); // no exist + EXPECT_TRUE(expiry_dablooms_add(handle, (const char *)&key, sizeof(key), time(NULL)) == 0); // add + + for (int i = 0; i < 15; i++) + { + if (i < config.expiry_time) + { + EXPECT_TRUE(expiry_dablooms_search(handle, (const char *)&key, sizeof(key), time(NULL)) == 1); // exist + } + else + { + EXPECT_TRUE(expiry_dablooms_search(handle, (const char *)&key, sizeof(key), time(NULL)) != 1); // no exist + } + sleep(1); + printf("sleep[%02d] 1s\n", i); + } + + expiry_dablooms_destroy(handle); +} + +int main(int argc, char **argv) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}