summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaweł Redman <pawel.redman@gmail.com>2017-04-06 13:39:48 +0200
committerPaweł Redman <pawel.redman@gmail.com>2017-04-06 13:39:48 +0200
commit18e72e3773935fa7e316ca3d32e1fa49ef58f44d (patch)
treed91006956cc8073b1b50fd30660a886459c6eb03
parentda4626cb5a741a9b7861dd54f8570a00753a5d92 (diff)
Refactor large parts of the program.
The new design assures revDNS/WHOIS queries will be done exactly once, no matter how many requests the Schachtmeister gets. This commit also adds cache timeouts.
-rw-r--r--Makefile2
-rw-r--r--src/cache.c74
-rw-r--r--src/database.c94
-rw-r--r--src/main.c85
-rw-r--r--src/shared.h76
-rw-r--r--src/worker.c168
6 files changed, 304 insertions, 195 deletions
diff --git a/Makefile b/Makefile
index 1fee50d..7062e05 100644
--- a/Makefile
+++ b/Makefile
@@ -9,7 +9,7 @@ PP_CC := $(PP_BOLD)$(shell tput setf 6)CC$(PP_RESET)
PP_LD := $(PP_BOLD)$(shell tput setf 2)LD$(PP_RESET)
PP_RM := $(PP_BOLD)$(shell tput setf 4)RM$(PP_RESET)
-SRC := src/cache.c \
+SRC := src/database.c \
src/lexer.c \
src/lists.c \
src/main.c \
diff --git a/src/cache.c b/src/cache.c
deleted file mode 100644
index 6a31a16..0000000
--- a/src/cache.c
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
-Copyright (C) 2017 Paweł Redman
-
-This program is free software; you can redistribute it and/or
-modify it under the terms of the GNU General Public License
-as published by the Free Software Foundation; either version 3
-of the License, or (at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program; if not, write to the Free Software Foundation,
-Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-*/
-
-#include "shared.h"
-
-#define HASH_MAX 4095
-
-static size_t cache_hash(uint32_t ipv4)
-{
- // FIXME: Use an academically acclaimed hashing algorithm.
- return (((ipv4 >> 16) ^ ipv4) * 3137) & HASH_MAX;
-}
-
-static cache_entry_t *cache_ht[HASH_MAX];
-static pthread_mutex_t cache_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-// Returns an entry with its mutex locked.
-cache_entry_t *cache_find(uint32_t ipv4)
-{
- cache_entry_t *entry;
- size_t hash;
-
- hash = cache_hash(ipv4);
-
- pthread_mutex_lock(&cache_mutex);
- eli_for (entry, cache_ht[hash], ht_list)
- if (entry->ipv4 == ipv4)
- break;
-
- if (!entry) {
- entry = calloc(1, sizeof(cache_entry_t));
- entry->ipv4 = ipv4;
- pthread_mutex_init(&entry->mutex, NULL);
- eli_append(cache_ht + hash, entry, ht_list);
- }
-
- pthread_mutex_lock(&entry->mutex);
- pthread_mutex_unlock(&cache_mutex);
-
- return entry;
-}
-
-void cache_destroy(void)
-{
- size_t i;
- cache_entry_t *entry, *next;
-
- for (i = 0; i < HASH_MAX; i++) {
- for (entry = cache_ht[i]; entry; entry = next) {
- next = entry->ht_list.next;
-
- free(entry->revdns);
- free(entry->whois);
- free(entry);
- }
-
- cache_ht[i] = NULL;
- }
-}
diff --git a/src/database.c b/src/database.c
new file mode 100644
index 0000000..c42cfb6
--- /dev/null
+++ b/src/database.c
@@ -0,0 +1,94 @@
+/*
+Copyright (C) 2017 Paweł Redman
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 3
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software Foundation,
+Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "shared.h"
+
+#define HASH_MAX 4095
+
+static size_t db_hash(uint32_t ipv4)
+{
+ // FIXME: Use an academically acclaimed hashing algorithm.
+ return (((ipv4 >> 16) ^ ipv4) * 3137) & HASH_MAX;
+}
+
+db_entry *database[HASH_MAX];
+pthread_mutex_t database_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+// Returns a pointer to the relevant db_entry (newly created if didn't exist).
+// This function locks the db_entry's mutex before returning.
+// Returns NULL if calloc fails.
+db_entry *db_find(uint32_t ipv4)
+{
+ size_t hash;
+ db_entry *entry;
+
+ hash = db_hash(ipv4);
+
+ pthread_mutex_lock(&database_mutex);
+ eli_for (entry, database[hash], ht_chain)
+ if (entry->ipv4 == ipv4)
+ break;
+
+ if (!entry) {
+ entry = calloc(1, sizeof(db_entry));
+ if (!entry)
+ goto out;
+ entry->ipv4 = ipv4;
+ pthread_mutex_init(&entry->mutex, NULL);
+ eli_append(database + hash, entry, ht_chain);
+ }
+
+ pthread_mutex_lock(&entry->mutex);
+out:
+ pthread_mutex_unlock(&database_mutex);
+ return entry;
+}
+
+static void destroy_entry(db_entry *entry)
+{
+ job_t *job, *next;
+
+ for (job = entry->waiting_jobs; job; job = next) {
+ next = job->waiting_list.next;
+ job_destroy(job);
+ }
+}
+
+// Frees all entries and resets the database.
+void db_destroy(void)
+{
+ size_t i;
+ db_entry *entry, *next;
+
+ pthread_mutex_lock(&database_mutex);
+
+ for (i = 0; i < HASH_MAX; i++) {
+ for (entry = database[i]; entry; entry = next) {
+ next = entry->ht_chain.next;
+
+ free(entry->revdns.data);
+ free(entry->whois.data);
+ destroy_entry(entry);
+ }
+
+ database[i] = NULL;
+ }
+
+ pthread_mutex_unlock(&database_mutex);
+}
+
diff --git a/src/main.c b/src/main.c
index 69653c7..fc377e3 100644
--- a/src/main.c
+++ b/src/main.c
@@ -17,9 +17,21 @@ Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "shared.h"
+#include <time.h>
int server_sockfd;
+static struct timespec time_ref;
+
+uint64_t get_time(void)
+{
+ static struct timespec now;
+
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ return (uint64_t)(now.tv_sec - time_ref.tv_sec) * TIME_SECOND +
+ now.tv_nsec + 1;
+}
+
static int parse_req_addr(char *begin, char *end, uint32_t *out)
{
char *p;
@@ -48,6 +60,68 @@ static int parse_req_addr(char *begin, char *end, uint32_t *out)
return 0;
}
+void handle_request(struct sockaddr_in *ret_addr, uint32_t query)
+{
+ db_entry *entry;
+ job_t *reply_job;
+ uint64_t now = get_time();
+
+ DEBUG("query %08X\n", query);
+
+ entry = db_find(query);
+ if (!entry) {
+ eprintf("handle_request: out of memory\n");
+ return;
+ }
+
+ reply_job = job_create(JOB_REPLY, entry);
+ reply_job->ipv4 = query;
+ memcpy(&reply_job->ret_addr, ret_addr, sizeof(struct sockaddr_in));
+
+ if (entry->revdns.state == DB_VALID
+ && entry->revdns.exp_time <= now) {
+ entry->revdns.state = DB_INVALID;
+ DEBUG("revDNS for %08X expired\n", query);
+ }
+
+ if (entry->whois.state == DB_VALID
+ && entry->whois.exp_time <= now) {
+ entry->whois.state = DB_INVALID;
+ DEBUG("WHOIS for %08X expired\n", query);
+ }
+
+ if (entry->revdns.state == DB_VALID &&
+ entry->whois.state == DB_VALID) {
+ job_enqueue(reply_job);
+ goto out;
+ }
+
+ if (entry->revdns.state == DB_INVALID) {
+ job_t *job;
+
+ DEBUG("revDNS cache miss for %08X\n", query);
+ job = job_create(JOB_REVDNS, entry);
+ job->ipv4 = query;
+ job_enqueue(job);
+ entry->revdns.state = DB_IN_PROGRESS;
+ }
+
+ if (entry->whois.state == DB_INVALID) {
+ job_t *job;
+
+ DEBUG("WHOIS cache miss for %08X\n", query);
+ job = job_create(JOB_WHOIS, entry);
+ job->ipv4 = query;
+ job_enqueue(job);
+ entry->whois.state = DB_IN_PROGRESS;
+ }
+
+ eli_append(&entry->waiting_jobs, reply_job, waiting_list);
+
+out:
+ pthread_mutex_unlock(&entry->mutex);
+}
+
#define NUM_WORKERS 8 // FIXME: shouldn't be hardcoded
int main(void)
@@ -57,6 +131,8 @@ int main(void)
pthread_t workers[NUM_WORKERS];
size_t i;
+ clock_gettime(CLOCK_MONOTONIC_RAW, &time_ref);
+
if (lists_load("schachts.list", 0)) {
eprintf("fatal error: couldn't load the lists\n");
goto error_lists;
@@ -88,7 +164,6 @@ int main(void)
socklen_t addrlen = sizeof(addr);
ssize_t size;
uint32_t query;
- job_t *job;
size = recvfrom(server_sockfd, buffer, sizeof(buffer), 0,
(void*)&addr, &addrlen);
@@ -111,11 +186,7 @@ int main(void)
&query))
continue;
- job = job_create();
- job->type = JOB_TEST;
- job->query = query;
- memcpy(&job->reply_to, &addr, sizeof(addr));
- job_enqueue(job);
+ handle_request(&addr, query);
}
job_quit();
@@ -123,7 +194,7 @@ int main(void)
for (i = 0; i < NUM_WORKERS; i++)
pthread_join(workers[i], NULL);
- cache_destroy();
+ db_destroy();
// todo: clear all jobs
error_recvfrom:
diff --git a/src/shared.h b/src/shared.h
index a615f83..77c9d3f 100644
--- a/src/shared.h
+++ b/src/shared.h
@@ -47,49 +47,71 @@ Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
extern int server_sockfd;
+uint64_t get_time(void);
+
+#define TIME_SECOND (1000000000llu)
+#define TIME_MINUTE (60 * TIME_SECOND)
+#define TIME_HOUR (60 * TIME_MINUTE)
+
+// shared
+
+typedef struct job_s job_t;
+
+// database.c
+
+enum {
+ DB_INVALID,
+ DB_IN_PROGRESS,
+ DB_VALID
+};
+
+typedef struct {
+ int state;
+ uint64_t exp_time;
+ char *data;
+} db_entry_field;
+
+typedef struct {
+ uint32_t ipv4;
+
+ db_entry_field revdns, whois;
+
+ bool cached_result_valid;
+ int cached_result;
+
+ job_t *waiting_jobs;
+
+ eli_header ht_chain;
+ pthread_mutex_t mutex;
+} db_entry;
+
+db_entry *db_find(uint32_t ipv4);
+void db_destroy(void);
+
// worker.c
typedef enum {
- JOB_TEST,
JOB_REVDNS,
- JOB_WHOIS
+ JOB_WHOIS,
+ JOB_REPLY,
} job_type_t;
-typedef struct job_s job_t;
struct job_s {
job_type_t type;
- uint32_t query;
- struct sockaddr_in reply_to;
+ db_entry *entry;
+ struct sockaddr_in ret_addr; // for JOB_REPLY
+ uint32_t ipv4; // for other jobs
- eli_header list, dep_list;
- job_t *parent, *deps;
- size_t num_deps, deps_done;
- pthread_mutex_t mutex;
+ eli_header queue_list, waiting_list;
};
-job_t *job_create(void);
+job_t *job_create(job_type_t type, db_entry *entry);
+void job_destroy(job_t *job);
void job_enqueue(job_t *job);
void job_quit(void);
void *worker_main(void *unused);
-// cache.c
-
-typedef struct {
- uint32_t ipv4;
-
- char *revdns;
- uint64_t revdns_exptime; // FIXME: implement this already
- char *whois;
- uint64_t whois_exptime;
-
- eli_header ht_list;
- pthread_mutex_t mutex;
-} cache_entry_t;
-
-cache_entry_t *cache_find(uint32_t ipv4);
-void cache_destroy(void);
-
// lexer.c
typedef struct {
diff --git a/src/worker.c b/src/worker.c
index 59a19cb..82d4ce6 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -24,7 +24,7 @@ static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static int job_quit_flag = 0;
-job_t *job_create(void)
+job_t *job_create(job_type_t type, db_entry *entry)
{
job_t *job;
@@ -34,25 +34,26 @@ job_t *job_create(void)
abort();
}
- pthread_mutex_init(&job->mutex, NULL);
+ job->type = type;
+ job->entry = entry;
+
return job;
}
+void job_destroy(job_t *job)
+{
+ free(job);
+}
+
void job_enqueue(job_t *job)
{
pthread_mutex_lock(&job_queue_mutex);
- eli_append(&job_queue, job, list);
+ eli_append(&job_queue, job, queue_list);
pthread_mutex_unlock(&job_queue_mutex);
pthread_cond_signal(&job_queue_cond);
}
-static void job_add_dep(job_t *parent, job_t *dep)
-{
- eli_append(&parent->deps, dep, dep_list);
- dep->parent = parent;
- parent->num_deps++;
-}
-
+// FIXME: bad name
void job_quit(void)
{
pthread_mutex_lock(&job_queue_mutex);
@@ -61,9 +62,8 @@ void job_quit(void)
pthread_cond_broadcast(&job_queue_cond);
}
-int _host = 0, _whois = 0;
-
-int read_everything(FILE *fp, char **buffer)
+// FIXME: refactor with vstrs
+static int read_everything(FILE *fp, char **buffer)
{
size_t read = 0, alloc = 128, partial;
@@ -105,65 +105,83 @@ oom:
abort();
}
-int schachtest(const char *revdns, const char *whois)
+// Check if the database entry is complete (all fields are DB_VALID).
+// If it is then enqueue all of its waiting jobs.
+static void check_waiting_jobs(db_entry *entry)
{
- return 0;
+ job_t *job;
+
+ if (entry->revdns.state != DB_VALID
+ || entry->whois.state != DB_VALID)
+ return;
+
+ eli_for(job, entry->waiting_jobs, waiting_list)
+ job_enqueue(job);
+
+ entry->waiting_jobs = NULL;
}
-static void worker_job_test(job_t *job)
+// Update an entry's field and call check_waiting_jobs.
+static void update_entry(db_entry *entry, db_entry_field *field, char *data)
{
- cache_entry_t *entry;
- job_t *dep;
- int result;
- char reply[1000];
+ pthread_mutex_lock(&entry->mutex);
- entry = cache_find(job->query);
+ field->state = DB_VALID;
+ free(field->data);
+ field->data = data;
+ field->exp_time = get_time() + 24 * TIME_HOUR;
- if (!entry->revdns) {
- DEBUG("cache miss on revdns\n");
- dep = job_create();
- dep->type = JOB_REVDNS;
- dep->query = job->query;
- job_add_dep(job, dep);
- }
+ // invalidate the result cache
+ entry->cached_result_valid = false;
- if (!entry->whois) {
- DEBUG("cache miss on whois\n");
- dep = job_create();
- dep->type = JOB_WHOIS;
- dep->query = job->query;
- job_add_dep(job, dep);
- }
+ check_waiting_jobs(entry);
- if (!entry->revdns || !entry->whois) {
- pthread_mutex_unlock(&entry->mutex);
- return;
- }
+ pthread_mutex_unlock(&entry->mutex);
+}
- DEBUG("ready for schachtest\n");
+// This job is queued when both WHOIS and revDNS data is available.
+// It calls lists_test and sends back the result.
+static void worker_job_reply(job_t *job)
+{
+ int result;
+ char reply[1000];
- result = lists_test(entry->revdns, entry->whois);
- pthread_mutex_unlock(&entry->mutex);
+ pthread_mutex_lock(&job->entry->mutex);
+
+ if (!job->entry->cached_result_valid) {
+ DEBUG("result cache miss for %08X\n", job->ipv4);
+ result = lists_test(job->entry->revdns.data,
+ job->entry->whois.data);
+
+ job->entry->cached_result_valid = true;
+ job->entry->cached_result = result;
+ } else
+ result = job->entry->cached_result;
+
+ pthread_mutex_unlock(&job->entry->mutex);
snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n",
RESPONSE_HEADER,
- (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF,
- (job->query >> 8) & 0xFF, job->query & 0xFF, result);
+ (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF,
+ (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF, result);
// FIXME: handle sendto errors
- sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->reply_to,
- sizeof(job->reply_to));
-}
+ sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->ret_addr,
+ sizeof(job->ret_addr));
+ DEBUG("reply %08X %i\n", job->ipv4, result);
+}
+// This job makes a revDNS query and stores it in the database.
static void worker_job_revdns(job_t *job)
{
struct sockaddr_in addr;
char buffer[NI_MAXHOST], *revdns;
int rv;
- cache_entry_t *entry;
+
+ DEBUG("getnameinfo(%08X, ...)\n", job->ipv4);
addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl(job->query);
+ addr.sin_addr.s_addr = htonl(job->ipv4);
addr.sin_port = 0;
rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer),
@@ -189,33 +207,26 @@ static void worker_job_revdns(job_t *job)
break;
}
- entry = cache_find(job->query);
- free(entry->revdns);
- entry->revdns = revdns;
- DEBUG("revdns is done\n");
- pthread_mutex_unlock(&entry->mutex);
+ update_entry(job->entry, &job->entry->revdns, revdns);
}
+// This job makes a WHOIS query and stores it in the database.
static void worker_job_whois(job_t *job)
{
char command[80];
FILE *fp;
char *out;
- cache_entry_t *entry;
snprintf(command, sizeof(command), "whois \"%hhu.%hhu.%hhu.%hhu\"",
- (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF,
- (job->query >> 8) & 0xFF, job->query & 0xFF);
+ (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF,
+ (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF);
+ DEBUG("%s\n", command);
fp = popen(command, "r");
- read_everything(fp, &out);
+ read_everything(fp, &out); // FIXME: handle errors
fclose(fp);
- entry = cache_find(job->query);
- free(entry->whois);
- entry->whois = out;
- DEBUG("whois is done\n");
- pthread_mutex_unlock(&entry->mutex);
+ update_entry(job->entry, &job->entry->whois, out);
}
void *worker_main(void *unused)
@@ -227,16 +238,19 @@ void *worker_main(void *unused)
while (!job_queue && !job_quit_flag)
pthread_cond_wait(&job_queue_cond, &job_queue_mutex);
- if (job_quit_flag)
+ if (job_quit_flag) {
+ pthread_mutex_unlock(&job_queue_mutex);
+ DEBUG("quitting\n");
return NULL;
+ }
job = job_queue;
- eli_unlink(&job_queue, job, list);
+ eli_unlink(&job_queue, job, queue_list);
pthread_mutex_unlock(&job_queue_mutex);
switch (job->type) {
- case JOB_TEST:
- worker_job_test(job);
+ case JOB_REPLY:
+ worker_job_reply(job);
break;
case JOB_REVDNS:
@@ -248,24 +262,6 @@ void *worker_main(void *unused)
break;
}
- // Handle the dependencies first. Keep this job off the queue.
- if (job->deps) {
- job_t *dep;
-
- eli_for(dep, job->deps, dep_list)
- job_enqueue(dep);
- memset(&job->deps, 0, sizeof(job->deps));
- continue;
- }
-
- if (job->parent) {
- pthread_mutex_lock(&job->parent->mutex);
- job->parent->deps_done++;
- if (job->parent->deps_done == job->parent->num_deps)
- job_enqueue(job->parent);
- pthread_mutex_unlock(&job->parent->mutex);
- }
-
- free(job);
+ job_destroy(job);
}
}