From 18e72e3773935fa7e316ca3d32e1fa49ef58f44d Mon Sep 17 00:00:00 2001 From: Paweł Redman Date: Thu, 6 Apr 2017 13:39:48 +0200 Subject: Refactor large parts of the program. The new design assures revDNS/WHOIS queries will be done exactly once, no matter how many requests the Schachtmeister gets. This commit also adds cache timeouts. --- Makefile | 2 +- src/cache.c | 74 ------------------------- src/database.c | 94 ++++++++++++++++++++++++++++++++ src/main.c | 85 ++++++++++++++++++++++++++--- src/shared.h | 76 ++++++++++++++++---------- src/worker.c | 168 ++++++++++++++++++++++++++++----------------------------- 6 files changed, 304 insertions(+), 195 deletions(-) delete mode 100644 src/cache.c create mode 100644 src/database.c diff --git a/Makefile b/Makefile index 1fee50d..7062e05 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ PP_CC := $(PP_BOLD)$(shell tput setf 6)CC$(PP_RESET) PP_LD := $(PP_BOLD)$(shell tput setf 2)LD$(PP_RESET) PP_RM := $(PP_BOLD)$(shell tput setf 4)RM$(PP_RESET) -SRC := src/cache.c \ +SRC := src/database.c \ src/lexer.c \ src/lists.c \ src/main.c \ diff --git a/src/cache.c b/src/cache.c deleted file mode 100644 index 6a31a16..0000000 --- a/src/cache.c +++ /dev/null @@ -1,74 +0,0 @@ -/* -Copyright (C) 2017 Paweł Redman - -This program is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 3 -of the License, or (at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software Foundation, -Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -*/ - -#include "shared.h" - -#define HASH_MAX 4095 - -static size_t cache_hash(uint32_t ipv4) -{ - // FIXME: Use an academically acclaimed hashing algorithm. - return (((ipv4 >> 16) ^ ipv4) * 3137) & HASH_MAX; -} - -static cache_entry_t *cache_ht[HASH_MAX]; -static pthread_mutex_t cache_mutex = PTHREAD_MUTEX_INITIALIZER; - -// Returns an entry with its mutex locked. -cache_entry_t *cache_find(uint32_t ipv4) -{ - cache_entry_t *entry; - size_t hash; - - hash = cache_hash(ipv4); - - pthread_mutex_lock(&cache_mutex); - eli_for (entry, cache_ht[hash], ht_list) - if (entry->ipv4 == ipv4) - break; - - if (!entry) { - entry = calloc(1, sizeof(cache_entry_t)); - entry->ipv4 = ipv4; - pthread_mutex_init(&entry->mutex, NULL); - eli_append(cache_ht + hash, entry, ht_list); - } - - pthread_mutex_lock(&entry->mutex); - pthread_mutex_unlock(&cache_mutex); - - return entry; -} - -void cache_destroy(void) -{ - size_t i; - cache_entry_t *entry, *next; - - for (i = 0; i < HASH_MAX; i++) { - for (entry = cache_ht[i]; entry; entry = next) { - next = entry->ht_list.next; - - free(entry->revdns); - free(entry->whois); - free(entry); - } - - cache_ht[i] = NULL; - } -} diff --git a/src/database.c b/src/database.c new file mode 100644 index 0000000..c42cfb6 --- /dev/null +++ b/src/database.c @@ -0,0 +1,94 @@ +/* +Copyright (C) 2017 Paweł Redman + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 3 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "shared.h" + +#define HASH_MAX 4095 + +static size_t db_hash(uint32_t ipv4) +{ + // FIXME: Use an academically acclaimed hashing algorithm. + return (((ipv4 >> 16) ^ ipv4) * 3137) & HASH_MAX; +} + +db_entry *database[HASH_MAX]; +pthread_mutex_t database_mutex = PTHREAD_MUTEX_INITIALIZER; + +// Returns a pointer to the relevant db_entry (newly created if didn't exist). +// This function locks the db_entry's mutex before returning. +// Returns NULL if calloc fails. +db_entry *db_find(uint32_t ipv4) +{ + size_t hash; + db_entry *entry; + + hash = db_hash(ipv4); + + pthread_mutex_lock(&database_mutex); + eli_for (entry, database[hash], ht_chain) + if (entry->ipv4 == ipv4) + break; + + if (!entry) { + entry = calloc(1, sizeof(db_entry)); + if (!entry) + goto out; + entry->ipv4 = ipv4; + pthread_mutex_init(&entry->mutex, NULL); + eli_append(database + hash, entry, ht_chain); + } + + pthread_mutex_lock(&entry->mutex); +out: + pthread_mutex_unlock(&database_mutex); + return entry; +} + +static void destroy_entry(db_entry *entry) +{ + job_t *job, *next; + + for (job = entry->waiting_jobs; job; job = next) { + next = job->waiting_list.next; + job_destroy(job); + } +} + +// Frees all entries and resets the database. +void db_destroy(void) +{ + size_t i; + db_entry *entry, *next; + + pthread_mutex_lock(&database_mutex); + + for (i = 0; i < HASH_MAX; i++) { + for (entry = database[i]; entry; entry = next) { + next = entry->ht_chain.next; + + free(entry->revdns.data); + free(entry->whois.data); + destroy_entry(entry); + } + + database[i] = NULL; + } + + pthread_mutex_unlock(&database_mutex); +} + diff --git a/src/main.c b/src/main.c index 69653c7..fc377e3 100644 --- a/src/main.c +++ b/src/main.c @@ -17,9 +17,21 @@ Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include "shared.h" +#include int server_sockfd; +static struct timespec time_ref; + +uint64_t get_time(void) +{ + static struct timespec now; + + clock_gettime(CLOCK_MONOTONIC, &now); + return (uint64_t)(now.tv_sec - time_ref.tv_sec) * TIME_SECOND + + now.tv_nsec + 1; +} + static int parse_req_addr(char *begin, char *end, uint32_t *out) { char *p; @@ -48,6 +60,68 @@ static int parse_req_addr(char *begin, char *end, uint32_t *out) return 0; } +void handle_request(struct sockaddr_in *ret_addr, uint32_t query) +{ + db_entry *entry; + job_t *reply_job; + uint64_t now = get_time(); + + DEBUG("query %08X\n", query); + + entry = db_find(query); + if (!entry) { + eprintf("handle_request: out of memory\n"); + return; + } + + reply_job = job_create(JOB_REPLY, entry); + reply_job->ipv4 = query; + memcpy(&reply_job->ret_addr, ret_addr, sizeof(struct sockaddr_in)); + + if (entry->revdns.state == DB_VALID + && entry->revdns.exp_time <= now) { + entry->revdns.state = DB_INVALID; + DEBUG("revDNS for %08X expired\n", query); + } + + if (entry->whois.state == DB_VALID + && entry->whois.exp_time <= now) { + entry->whois.state = DB_INVALID; + DEBUG("WHOIS for %08X expired\n", query); + } + + if (entry->revdns.state == DB_VALID && + entry->whois.state == DB_VALID) { + job_enqueue(reply_job); + goto out; + } + + if (entry->revdns.state == DB_INVALID) { + job_t *job; + + DEBUG("revDNS cache miss for %08X\n", query); + job = job_create(JOB_REVDNS, entry); + job->ipv4 = query; + job_enqueue(job); + entry->revdns.state = DB_IN_PROGRESS; + } + + if (entry->whois.state == DB_INVALID) { + job_t *job; + + DEBUG("WHOIS cache miss for %08X\n", query); + job = job_create(JOB_WHOIS, entry); + job->ipv4 = query; + job_enqueue(job); + entry->whois.state = DB_IN_PROGRESS; + } + + eli_append(&entry->waiting_jobs, reply_job, waiting_list); + +out: + pthread_mutex_unlock(&entry->mutex); +} + #define NUM_WORKERS 8 // FIXME: shouldn't be hardcoded int main(void) @@ -57,6 +131,8 @@ int main(void) pthread_t workers[NUM_WORKERS]; size_t i; + clock_gettime(CLOCK_MONOTONIC_RAW, &time_ref); + if (lists_load("schachts.list", 0)) { eprintf("fatal error: couldn't load the lists\n"); goto error_lists; @@ -88,7 +164,6 @@ int main(void) socklen_t addrlen = sizeof(addr); ssize_t size; uint32_t query; - job_t *job; size = recvfrom(server_sockfd, buffer, sizeof(buffer), 0, (void*)&addr, &addrlen); @@ -111,11 +186,7 @@ int main(void) &query)) continue; - job = job_create(); - job->type = JOB_TEST; - job->query = query; - memcpy(&job->reply_to, &addr, sizeof(addr)); - job_enqueue(job); + handle_request(&addr, query); } job_quit(); @@ -123,7 +194,7 @@ int main(void) for (i = 0; i < NUM_WORKERS; i++) pthread_join(workers[i], NULL); - cache_destroy(); + db_destroy(); // todo: clear all jobs error_recvfrom: diff --git a/src/shared.h b/src/shared.h index a615f83..77c9d3f 100644 --- a/src/shared.h +++ b/src/shared.h @@ -47,49 +47,71 @@ Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA extern int server_sockfd; +uint64_t get_time(void); + +#define TIME_SECOND (1000000000llu) +#define TIME_MINUTE (60 * TIME_SECOND) +#define TIME_HOUR (60 * TIME_MINUTE) + +// shared + +typedef struct job_s job_t; + +// database.c + +enum { + DB_INVALID, + DB_IN_PROGRESS, + DB_VALID +}; + +typedef struct { + int state; + uint64_t exp_time; + char *data; +} db_entry_field; + +typedef struct { + uint32_t ipv4; + + db_entry_field revdns, whois; + + bool cached_result_valid; + int cached_result; + + job_t *waiting_jobs; + + eli_header ht_chain; + pthread_mutex_t mutex; +} db_entry; + +db_entry *db_find(uint32_t ipv4); +void db_destroy(void); + // worker.c typedef enum { - JOB_TEST, JOB_REVDNS, - JOB_WHOIS + JOB_WHOIS, + JOB_REPLY, } job_type_t; -typedef struct job_s job_t; struct job_s { job_type_t type; - uint32_t query; - struct sockaddr_in reply_to; + db_entry *entry; + struct sockaddr_in ret_addr; // for JOB_REPLY + uint32_t ipv4; // for other jobs - eli_header list, dep_list; - job_t *parent, *deps; - size_t num_deps, deps_done; - pthread_mutex_t mutex; + eli_header queue_list, waiting_list; }; -job_t *job_create(void); +job_t *job_create(job_type_t type, db_entry *entry); +void job_destroy(job_t *job); void job_enqueue(job_t *job); void job_quit(void); void *worker_main(void *unused); -// cache.c - -typedef struct { - uint32_t ipv4; - - char *revdns; - uint64_t revdns_exptime; // FIXME: implement this already - char *whois; - uint64_t whois_exptime; - - eli_header ht_list; - pthread_mutex_t mutex; -} cache_entry_t; - -cache_entry_t *cache_find(uint32_t ipv4); -void cache_destroy(void); - // lexer.c typedef struct { diff --git a/src/worker.c b/src/worker.c index 59a19cb..82d4ce6 100644 --- a/src/worker.c +++ b/src/worker.c @@ -24,7 +24,7 @@ static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; static int job_quit_flag = 0; -job_t *job_create(void) +job_t *job_create(job_type_t type, db_entry *entry) { job_t *job; @@ -34,25 +34,26 @@ job_t *job_create(void) abort(); } - pthread_mutex_init(&job->mutex, NULL); + job->type = type; + job->entry = entry; + return job; } +void job_destroy(job_t *job) +{ + free(job); +} + void job_enqueue(job_t *job) { pthread_mutex_lock(&job_queue_mutex); - eli_append(&job_queue, job, list); + eli_append(&job_queue, job, queue_list); pthread_mutex_unlock(&job_queue_mutex); pthread_cond_signal(&job_queue_cond); } -static void job_add_dep(job_t *parent, job_t *dep) -{ - eli_append(&parent->deps, dep, dep_list); - dep->parent = parent; - parent->num_deps++; -} - +// FIXME: bad name void job_quit(void) { pthread_mutex_lock(&job_queue_mutex); @@ -61,9 +62,8 @@ void job_quit(void) pthread_cond_broadcast(&job_queue_cond); } -int _host = 0, _whois = 0; - -int read_everything(FILE *fp, char **buffer) +// FIXME: refactor with vstrs +static int read_everything(FILE *fp, char **buffer) { size_t read = 0, alloc = 128, partial; @@ -105,65 +105,83 @@ oom: abort(); } -int schachtest(const char *revdns, const char *whois) +// Check if the database entry is complete (all fields are DB_VALID). +// If it is then enqueue all of its waiting jobs. +static void check_waiting_jobs(db_entry *entry) { - return 0; + job_t *job; + + if (entry->revdns.state != DB_VALID + || entry->whois.state != DB_VALID) + return; + + eli_for(job, entry->waiting_jobs, waiting_list) + job_enqueue(job); + + entry->waiting_jobs = NULL; } -static void worker_job_test(job_t *job) +// Update an entry's field and call check_waiting_jobs. +static void update_entry(db_entry *entry, db_entry_field *field, char *data) { - cache_entry_t *entry; - job_t *dep; - int result; - char reply[1000]; + pthread_mutex_lock(&entry->mutex); - entry = cache_find(job->query); + field->state = DB_VALID; + free(field->data); + field->data = data; + field->exp_time = get_time() + 24 * TIME_HOUR; - if (!entry->revdns) { - DEBUG("cache miss on revdns\n"); - dep = job_create(); - dep->type = JOB_REVDNS; - dep->query = job->query; - job_add_dep(job, dep); - } + // invalidate the result cache + entry->cached_result_valid = false; - if (!entry->whois) { - DEBUG("cache miss on whois\n"); - dep = job_create(); - dep->type = JOB_WHOIS; - dep->query = job->query; - job_add_dep(job, dep); - } + check_waiting_jobs(entry); - if (!entry->revdns || !entry->whois) { - pthread_mutex_unlock(&entry->mutex); - return; - } + pthread_mutex_unlock(&entry->mutex); +} - DEBUG("ready for schachtest\n"); +// This job is queued when both WHOIS and revDNS data is available. +// It calls lists_test and sends back the result. +static void worker_job_reply(job_t *job) +{ + int result; + char reply[1000]; - result = lists_test(entry->revdns, entry->whois); - pthread_mutex_unlock(&entry->mutex); + pthread_mutex_lock(&job->entry->mutex); + + if (!job->entry->cached_result_valid) { + DEBUG("result cache miss for %08X\n", job->ipv4); + result = lists_test(job->entry->revdns.data, + job->entry->whois.data); + + job->entry->cached_result_valid = true; + job->entry->cached_result = result; + } else + result = job->entry->cached_result; + + pthread_mutex_unlock(&job->entry->mutex); snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n", RESPONSE_HEADER, - (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, - (job->query >> 8) & 0xFF, job->query & 0xFF, result); + (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF, + (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF, result); // FIXME: handle sendto errors - sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->reply_to, - sizeof(job->reply_to)); -} + sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->ret_addr, + sizeof(job->ret_addr)); + DEBUG("reply %08X %i\n", job->ipv4, result); +} +// This job makes a revDNS query and stores it in the database. static void worker_job_revdns(job_t *job) { struct sockaddr_in addr; char buffer[NI_MAXHOST], *revdns; int rv; - cache_entry_t *entry; + + DEBUG("getnameinfo(%08X, ...)\n", job->ipv4); addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(job->query); + addr.sin_addr.s_addr = htonl(job->ipv4); addr.sin_port = 0; rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer), @@ -189,33 +207,26 @@ static void worker_job_revdns(job_t *job) break; } - entry = cache_find(job->query); - free(entry->revdns); - entry->revdns = revdns; - DEBUG("revdns is done\n"); - pthread_mutex_unlock(&entry->mutex); + update_entry(job->entry, &job->entry->revdns, revdns); } +// This job makes a WHOIS query and stores it in the database. static void worker_job_whois(job_t *job) { char command[80]; FILE *fp; char *out; - cache_entry_t *entry; snprintf(command, sizeof(command), "whois \"%hhu.%hhu.%hhu.%hhu\"", - (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, - (job->query >> 8) & 0xFF, job->query & 0xFF); + (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF, + (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF); + DEBUG("%s\n", command); fp = popen(command, "r"); - read_everything(fp, &out); + read_everything(fp, &out); // FIXME: handle errors fclose(fp); - entry = cache_find(job->query); - free(entry->whois); - entry->whois = out; - DEBUG("whois is done\n"); - pthread_mutex_unlock(&entry->mutex); + update_entry(job->entry, &job->entry->whois, out); } void *worker_main(void *unused) @@ -227,16 +238,19 @@ void *worker_main(void *unused) while (!job_queue && !job_quit_flag) pthread_cond_wait(&job_queue_cond, &job_queue_mutex); - if (job_quit_flag) + if (job_quit_flag) { + pthread_mutex_unlock(&job_queue_mutex); + DEBUG("quitting\n"); return NULL; + } job = job_queue; - eli_unlink(&job_queue, job, list); + eli_unlink(&job_queue, job, queue_list); pthread_mutex_unlock(&job_queue_mutex); switch (job->type) { - case JOB_TEST: - worker_job_test(job); + case JOB_REPLY: + worker_job_reply(job); break; case JOB_REVDNS: @@ -248,24 +262,6 @@ void *worker_main(void *unused) break; } - // Handle the dependencies first. Keep this job off the queue. - if (job->deps) { - job_t *dep; - - eli_for(dep, job->deps, dep_list) - job_enqueue(dep); - memset(&job->deps, 0, sizeof(job->deps)); - continue; - } - - if (job->parent) { - pthread_mutex_lock(&job->parent->mutex); - job->parent->deps_done++; - if (job->parent->deps_done == job->parent->num_deps) - job_enqueue(job->parent); - pthread_mutex_unlock(&job->parent->mutex); - } - - free(job); + job_destroy(job); } } -- cgit