diff options
| author | Paweł Redman <pawel.redman@gmail.com> | 2017-04-06 13:39:48 +0200 | 
|---|---|---|
| committer | Paweł Redman <pawel.redman@gmail.com> | 2017-04-06 13:39:48 +0200 | 
| commit | 18e72e3773935fa7e316ca3d32e1fa49ef58f44d (patch) | |
| tree | d91006956cc8073b1b50fd30660a886459c6eb03 | |
| parent | da4626cb5a741a9b7861dd54f8570a00753a5d92 (diff) | |
Refactor large parts of the program.
The new design assures revDNS/WHOIS queries will be done exactly once, no matter how many requests the Schachtmeister gets.
This commit also adds cache timeouts.
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | src/cache.c | 74 | ||||
| -rw-r--r-- | src/database.c | 94 | ||||
| -rw-r--r-- | src/main.c | 85 | ||||
| -rw-r--r-- | src/shared.h | 76 | ||||
| -rw-r--r-- | src/worker.c | 168 | 
6 files changed, 304 insertions, 195 deletions
@@ -9,7 +9,7 @@ PP_CC := $(PP_BOLD)$(shell tput setf 6)CC$(PP_RESET)  PP_LD := $(PP_BOLD)$(shell tput setf 2)LD$(PP_RESET)  PP_RM := $(PP_BOLD)$(shell tput setf 4)RM$(PP_RESET) -SRC := src/cache.c \ +SRC := src/database.c \         src/lexer.c \         src/lists.c \         src/main.c \ diff --git a/src/cache.c b/src/cache.c deleted file mode 100644 index 6a31a16..0000000 --- a/src/cache.c +++ /dev/null @@ -1,74 +0,0 @@ -/* -Copyright (C) 2017 Paweł Redman - -This program is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 3 -of the License, or (at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software Foundation, -Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA -*/ - -#include "shared.h" - -#define HASH_MAX 4095 - -static size_t cache_hash(uint32_t ipv4) -{ -	// FIXME: Use an academically acclaimed hashing algorithm. -	return (((ipv4 >> 16) ^ ipv4) * 3137) & HASH_MAX; -} - -static cache_entry_t *cache_ht[HASH_MAX]; -static pthread_mutex_t cache_mutex = PTHREAD_MUTEX_INITIALIZER; - -// Returns an entry with its mutex locked. -cache_entry_t *cache_find(uint32_t ipv4) -{ -	cache_entry_t *entry; -	size_t hash; - -	hash = cache_hash(ipv4); - -	pthread_mutex_lock(&cache_mutex); -	eli_for (entry, cache_ht[hash], ht_list) -		if (entry->ipv4 == ipv4) -			break; - -	if (!entry) { -		entry = calloc(1, sizeof(cache_entry_t)); -		entry->ipv4 = ipv4; -		pthread_mutex_init(&entry->mutex, NULL); -		eli_append(cache_ht + hash, entry, ht_list); -	} - -	pthread_mutex_lock(&entry->mutex); -	pthread_mutex_unlock(&cache_mutex); - -	return entry; -} - -void cache_destroy(void) -{ -	size_t i; -	cache_entry_t *entry, *next; - -	for (i = 0; i < HASH_MAX; i++) { -		for (entry = cache_ht[i]; entry; entry = next) { -			next = entry->ht_list.next; - -			free(entry->revdns); -			free(entry->whois); -			free(entry); -		} - -		cache_ht[i] = NULL; -	} -} diff --git a/src/database.c b/src/database.c new file mode 100644 index 0000000..c42cfb6 --- /dev/null +++ b/src/database.c @@ -0,0 +1,94 @@ +/* +Copyright (C) 2017 Paweł Redman + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 3 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA +*/ + +#include "shared.h" + +#define HASH_MAX 4095 + +static size_t db_hash(uint32_t ipv4) +{ +	// FIXME: Use an academically acclaimed hashing algorithm. +	return (((ipv4 >> 16) ^ ipv4) * 3137) & HASH_MAX; +} + +db_entry *database[HASH_MAX]; +pthread_mutex_t database_mutex = PTHREAD_MUTEX_INITIALIZER; + +// Returns a pointer to the relevant db_entry (newly created if didn't exist). +// This function locks the db_entry's mutex before returning. +// Returns NULL if calloc fails. +db_entry *db_find(uint32_t ipv4) +{ +	size_t hash; +	db_entry *entry; + +	hash = db_hash(ipv4); + +	pthread_mutex_lock(&database_mutex); +	eli_for (entry, database[hash], ht_chain) +		if (entry->ipv4 == ipv4) +			break; + +	if (!entry) { +		entry = calloc(1, sizeof(db_entry)); +		if (!entry) +			goto out; +		entry->ipv4 = ipv4; +		pthread_mutex_init(&entry->mutex, NULL); +		eli_append(database + hash, entry, ht_chain); +	} + +	pthread_mutex_lock(&entry->mutex); +out: +	pthread_mutex_unlock(&database_mutex); +	return entry; +} + +static void destroy_entry(db_entry *entry) +{ +	job_t *job, *next; + +	for (job = entry->waiting_jobs; job; job = next) { +		next = job->waiting_list.next; +		job_destroy(job); +	} +} + +// Frees all entries and resets the database. +void db_destroy(void) +{ +	size_t i; +	db_entry *entry, *next; + +	pthread_mutex_lock(&database_mutex); + +	for (i = 0; i < HASH_MAX; i++) { +		for (entry = database[i]; entry; entry = next) { +			next = entry->ht_chain.next; + +			free(entry->revdns.data); +			free(entry->whois.data); +			destroy_entry(entry); +		} + +		database[i] = NULL; +	} + +	pthread_mutex_unlock(&database_mutex); +} + @@ -17,9 +17,21 @@ Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA  */  #include "shared.h" +#include <time.h>  int server_sockfd; +static struct timespec time_ref; + +uint64_t get_time(void) +{ +	static struct timespec now; + +	clock_gettime(CLOCK_MONOTONIC, &now); +	return (uint64_t)(now.tv_sec - time_ref.tv_sec) * TIME_SECOND + +	       now.tv_nsec + 1; +} +  static int parse_req_addr(char *begin, char *end, uint32_t *out)  {  	char *p; @@ -48,6 +60,68 @@ static int parse_req_addr(char *begin, char *end, uint32_t *out)  	return 0;  } +void handle_request(struct sockaddr_in *ret_addr, uint32_t query) +{ +	db_entry *entry; +	job_t *reply_job; +	uint64_t now = get_time(); + +	DEBUG("query %08X\n", query); + +	entry = db_find(query); +	if (!entry) { +		eprintf("handle_request: out of memory\n"); +		return; +	} + +	reply_job = job_create(JOB_REPLY, entry); +	reply_job->ipv4 = query; +	memcpy(&reply_job->ret_addr, ret_addr, sizeof(struct sockaddr_in)); + +	if (entry->revdns.state == DB_VALID +	    && entry->revdns.exp_time <= now) { +		entry->revdns.state = DB_INVALID; +		DEBUG("revDNS for %08X expired\n", query); +	} + +	if (entry->whois.state == DB_VALID +	    && entry->whois.exp_time <= now) { +		entry->whois.state = DB_INVALID; +		DEBUG("WHOIS for %08X expired\n", query); +	} + +	if (entry->revdns.state == DB_VALID && +	    entry->whois.state == DB_VALID) { +		job_enqueue(reply_job); +		goto out; +	} + +	if (entry->revdns.state == DB_INVALID) { +		job_t *job; + +		DEBUG("revDNS cache miss for %08X\n", query); +		job = job_create(JOB_REVDNS, entry); +		job->ipv4 = query; +		job_enqueue(job); +		entry->revdns.state = DB_IN_PROGRESS; +	} + +	if (entry->whois.state == DB_INVALID) { +		job_t *job; + +		DEBUG("WHOIS cache miss for %08X\n", query); +		job = job_create(JOB_WHOIS, entry); +		job->ipv4 = query; +		job_enqueue(job); +		entry->whois.state = DB_IN_PROGRESS; +	} + +	eli_append(&entry->waiting_jobs, reply_job, waiting_list); + +out: +	pthread_mutex_unlock(&entry->mutex); +} +  #define NUM_WORKERS 8 // FIXME: shouldn't be hardcoded  int main(void) @@ -57,6 +131,8 @@ int main(void)  	pthread_t workers[NUM_WORKERS];  	size_t i; +	clock_gettime(CLOCK_MONOTONIC_RAW, &time_ref); +  	if (lists_load("schachts.list", 0)) {  		eprintf("fatal error: couldn't load the lists\n");  		goto error_lists; @@ -88,7 +164,6 @@ int main(void)  		socklen_t addrlen = sizeof(addr);  		ssize_t size;  		uint32_t query; -		job_t *job;  		size = recvfrom(server_sockfd, buffer, sizeof(buffer), 0,  		                (void*)&addr, &addrlen); @@ -111,11 +186,7 @@ int main(void)  		                   &query))  			continue; -		job = job_create(); -		job->type = JOB_TEST; -		job->query = query; -		memcpy(&job->reply_to, &addr, sizeof(addr)); -		job_enqueue(job); +		handle_request(&addr, query);  	}  	job_quit(); @@ -123,7 +194,7 @@ int main(void)  	for (i = 0; i < NUM_WORKERS; i++)  		pthread_join(workers[i], NULL); -	cache_destroy(); +	db_destroy();  	// todo: clear all jobs  error_recvfrom: diff --git a/src/shared.h b/src/shared.h index a615f83..77c9d3f 100644 --- a/src/shared.h +++ b/src/shared.h @@ -47,49 +47,71 @@ Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA  extern int server_sockfd; +uint64_t get_time(void); + +#define TIME_SECOND (1000000000llu) +#define TIME_MINUTE (60 * TIME_SECOND) +#define TIME_HOUR   (60 * TIME_MINUTE) + +// shared + +typedef struct job_s job_t; + +// database.c + +enum { +	DB_INVALID, +	DB_IN_PROGRESS, +	DB_VALID +}; + +typedef struct { +	int state; +	uint64_t exp_time; +	char *data; +} db_entry_field; + +typedef struct { +	uint32_t ipv4; + +	db_entry_field revdns, whois; + +	bool cached_result_valid; +	int cached_result; + +	job_t *waiting_jobs; + +	eli_header ht_chain; +	pthread_mutex_t mutex; +} db_entry; + +db_entry *db_find(uint32_t ipv4); +void db_destroy(void); +  // worker.c  typedef enum { -	JOB_TEST,  	JOB_REVDNS, -	JOB_WHOIS +	JOB_WHOIS, +	JOB_REPLY,  } job_type_t; -typedef struct job_s job_t;  struct job_s {  	job_type_t type; -	uint32_t query; -	struct sockaddr_in reply_to; +	db_entry *entry; +	struct sockaddr_in ret_addr; // for JOB_REPLY +	uint32_t ipv4; // for other jobs -	eli_header list, dep_list; -	job_t *parent, *deps; -	size_t num_deps, deps_done; -	pthread_mutex_t mutex; +	eli_header queue_list, waiting_list;  }; -job_t *job_create(void); +job_t *job_create(job_type_t type, db_entry *entry); +void job_destroy(job_t *job);  void job_enqueue(job_t *job);  void job_quit(void);  void *worker_main(void *unused); -// cache.c - -typedef struct { -	uint32_t ipv4; - -	char *revdns; -	uint64_t revdns_exptime; // FIXME: implement this already -	char *whois; -	uint64_t whois_exptime; - -	eli_header ht_list; -	pthread_mutex_t mutex; -} cache_entry_t; - -cache_entry_t *cache_find(uint32_t ipv4); -void cache_destroy(void); -  // lexer.c  typedef struct { diff --git a/src/worker.c b/src/worker.c index 59a19cb..82d4ce6 100644 --- a/src/worker.c +++ b/src/worker.c @@ -24,7 +24,7 @@ static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER;  static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;  static int job_quit_flag = 0; -job_t *job_create(void) +job_t *job_create(job_type_t type, db_entry *entry)  {  	job_t *job; @@ -34,25 +34,26 @@ job_t *job_create(void)  		abort();  	} -	pthread_mutex_init(&job->mutex, NULL); +	job->type = type; +	job->entry = entry; +  	return job;  } +void job_destroy(job_t *job) +{ +	free(job); +} +  void job_enqueue(job_t *job)  {  	pthread_mutex_lock(&job_queue_mutex); -	eli_append(&job_queue, job, list); +	eli_append(&job_queue, job, queue_list);  	pthread_mutex_unlock(&job_queue_mutex);  	pthread_cond_signal(&job_queue_cond);  } -static void job_add_dep(job_t *parent, job_t *dep) -{ -	eli_append(&parent->deps, dep, dep_list); -	dep->parent = parent; -	parent->num_deps++; -} - +// FIXME: bad name  void job_quit(void)  {  	pthread_mutex_lock(&job_queue_mutex); @@ -61,9 +62,8 @@ void job_quit(void)  	pthread_cond_broadcast(&job_queue_cond);  } -int _host = 0, _whois = 0; - -int read_everything(FILE *fp, char **buffer) +// FIXME: refactor with vstrs +static int read_everything(FILE *fp, char **buffer)  {  	size_t read = 0, alloc = 128, partial; @@ -105,65 +105,83 @@ oom:  	abort();  } -int schachtest(const char *revdns, const char *whois) +// Check if the database entry is complete (all fields are DB_VALID). +// If it is then enqueue all of its waiting jobs. +static void check_waiting_jobs(db_entry *entry)  { -	return 0; +	job_t *job; + +	if (entry->revdns.state != DB_VALID +	    || entry->whois.state != DB_VALID) +		return; + +	eli_for(job, entry->waiting_jobs, waiting_list) +		job_enqueue(job); + +	entry->waiting_jobs = NULL;  } -static void worker_job_test(job_t *job) +// Update an entry's field and call check_waiting_jobs. +static void update_entry(db_entry *entry, db_entry_field *field, char *data)  { -	cache_entry_t *entry; -	job_t *dep; -	int result; -	char reply[1000]; +	pthread_mutex_lock(&entry->mutex); -	entry = cache_find(job->query); +	field->state = DB_VALID; +	free(field->data); +	field->data = data; +	field->exp_time = get_time() + 24 * TIME_HOUR; -	if (!entry->revdns) { -		DEBUG("cache miss on revdns\n"); -		dep = job_create(); -		dep->type = JOB_REVDNS; -		dep->query = job->query; -		job_add_dep(job, dep); -	} +	// invalidate the result cache +	entry->cached_result_valid = false; -	if (!entry->whois) { -		DEBUG("cache miss on whois\n"); -		dep = job_create(); -		dep->type = JOB_WHOIS; -		dep->query = job->query; -		job_add_dep(job, dep); -	} +	check_waiting_jobs(entry); -	if (!entry->revdns || !entry->whois) { -		pthread_mutex_unlock(&entry->mutex); -		return; -	} +	pthread_mutex_unlock(&entry->mutex); +} -	DEBUG("ready for schachtest\n"); +// This job is queued when both WHOIS and revDNS data is available. +// It calls lists_test and sends back the result.  +static void worker_job_reply(job_t *job) +{ +	int result; +	char reply[1000]; -	result = lists_test(entry->revdns, entry->whois); -	pthread_mutex_unlock(&entry->mutex); +	pthread_mutex_lock(&job->entry->mutex); + +	if (!job->entry->cached_result_valid) { +		DEBUG("result cache miss for %08X\n", job->ipv4); +		result = lists_test(job->entry->revdns.data, +		                    job->entry->whois.data); + +		job->entry->cached_result_valid = true; +		job->entry->cached_result = result; +	} else +		result = job->entry->cached_result; + +	pthread_mutex_unlock(&job->entry->mutex);  	snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n",  	         RESPONSE_HEADER, -	         (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, -	         (job->query >> 8) & 0xFF, job->query & 0xFF, result); +	         (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF, +	         (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF, result);  	// FIXME: handle sendto errors -	sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->reply_to, -	       sizeof(job->reply_to)); -} +	sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->ret_addr, +	       sizeof(job->ret_addr)); +	DEBUG("reply %08X %i\n", job->ipv4, result); +} +// This job makes a revDNS query and stores it in the database.  static void worker_job_revdns(job_t *job)  {  	struct sockaddr_in addr;  	char buffer[NI_MAXHOST], *revdns;  	int rv; -	cache_entry_t *entry; + +	DEBUG("getnameinfo(%08X, ...)\n", job->ipv4);  	addr.sin_family = AF_INET; -	addr.sin_addr.s_addr = htonl(job->query); +	addr.sin_addr.s_addr = htonl(job->ipv4);  	addr.sin_port = 0;  	rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer), @@ -189,33 +207,26 @@ static void worker_job_revdns(job_t *job)  		break;  	} -	entry = cache_find(job->query); -	free(entry->revdns); -	entry->revdns = revdns; -	DEBUG("revdns is done\n"); -	pthread_mutex_unlock(&entry->mutex); +	update_entry(job->entry, &job->entry->revdns, revdns);  } +// This job makes a WHOIS query and stores it in the database.  static void worker_job_whois(job_t *job)  {  	char command[80];  	FILE *fp;  	char *out; -	cache_entry_t *entry;  	snprintf(command, sizeof(command), "whois \"%hhu.%hhu.%hhu.%hhu\"", -	         (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, -	         (job->query >> 8) & 0xFF, job->query & 0xFF); +	         (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF, +	         (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF); +	DEBUG("%s\n", command);  	fp = popen(command, "r"); -	read_everything(fp, &out); +	read_everything(fp, &out); // FIXME: handle errors  	fclose(fp); -	entry = cache_find(job->query); -	free(entry->whois); -	entry->whois = out; -	DEBUG("whois is done\n"); -	pthread_mutex_unlock(&entry->mutex); +	update_entry(job->entry, &job->entry->whois, out);  }  void *worker_main(void *unused) @@ -227,16 +238,19 @@ void *worker_main(void *unused)  		while (!job_queue && !job_quit_flag)  			pthread_cond_wait(&job_queue_cond, &job_queue_mutex); -		if (job_quit_flag) +		if (job_quit_flag) { +			pthread_mutex_unlock(&job_queue_mutex); +			DEBUG("quitting\n");  			return NULL; +		}  		job = job_queue; -		eli_unlink(&job_queue, job, list); +		eli_unlink(&job_queue, job, queue_list);  		pthread_mutex_unlock(&job_queue_mutex);  		switch (job->type) { -		case JOB_TEST: -			worker_job_test(job); +		case JOB_REPLY: +			worker_job_reply(job);  			break;  		case JOB_REVDNS: @@ -248,24 +262,6 @@ void *worker_main(void *unused)  			break;  		} -		// Handle the dependencies first. Keep this job off the queue. -		if (job->deps) { -			job_t *dep; - -			eli_for(dep, job->deps, dep_list) -				job_enqueue(dep); -			memset(&job->deps, 0, sizeof(job->deps)); -			continue; -		} - -		if (job->parent) { -			pthread_mutex_lock(&job->parent->mutex); -			job->parent->deps_done++; -			if (job->parent->deps_done == job->parent->num_deps) -				job_enqueue(job->parent); -			pthread_mutex_unlock(&job->parent->mutex); -		} - -		free(job); +		job_destroy(job);  	}  }  | 
