summaryrefslogtreecommitdiff
path: root/src/worker.c
diff options
context:
space:
mode:
authorPaweł Redman <pawel.redman@gmail.com>2017-04-06 13:39:48 +0200
committerPaweł Redman <pawel.redman@gmail.com>2017-04-06 13:39:48 +0200
commit18e72e3773935fa7e316ca3d32e1fa49ef58f44d (patch)
treed91006956cc8073b1b50fd30660a886459c6eb03 /src/worker.c
parentda4626cb5a741a9b7861dd54f8570a00753a5d92 (diff)
Refactor large parts of the program.
The new design assures revDNS/WHOIS queries will be done exactly once, no matter how many requests the Schachtmeister gets. This commit also adds cache timeouts.
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c168
1 files changed, 82 insertions, 86 deletions
diff --git a/src/worker.c b/src/worker.c
index 59a19cb..82d4ce6 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -24,7 +24,7 @@ static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static int job_quit_flag = 0;
-job_t *job_create(void)
+job_t *job_create(job_type_t type, db_entry *entry)
{
job_t *job;
@@ -34,25 +34,26 @@ job_t *job_create(void)
abort();
}
- pthread_mutex_init(&job->mutex, NULL);
+ job->type = type;
+ job->entry = entry;
+
return job;
}
+void job_destroy(job_t *job)
+{
+ free(job);
+}
+
void job_enqueue(job_t *job)
{
pthread_mutex_lock(&job_queue_mutex);
- eli_append(&job_queue, job, list);
+ eli_append(&job_queue, job, queue_list);
pthread_mutex_unlock(&job_queue_mutex);
pthread_cond_signal(&job_queue_cond);
}
-static void job_add_dep(job_t *parent, job_t *dep)
-{
- eli_append(&parent->deps, dep, dep_list);
- dep->parent = parent;
- parent->num_deps++;
-}
-
+// FIXME: bad name
void job_quit(void)
{
pthread_mutex_lock(&job_queue_mutex);
@@ -61,9 +62,8 @@ void job_quit(void)
pthread_cond_broadcast(&job_queue_cond);
}
-int _host = 0, _whois = 0;
-
-int read_everything(FILE *fp, char **buffer)
+// FIXME: refactor with vstrs
+static int read_everything(FILE *fp, char **buffer)
{
size_t read = 0, alloc = 128, partial;
@@ -105,65 +105,83 @@ oom:
abort();
}
-int schachtest(const char *revdns, const char *whois)
+// Check if the database entry is complete (all fields are DB_VALID).
+// If it is then enqueue all of its waiting jobs.
+static void check_waiting_jobs(db_entry *entry)
{
- return 0;
+ job_t *job;
+
+ if (entry->revdns.state != DB_VALID
+ || entry->whois.state != DB_VALID)
+ return;
+
+ eli_for(job, entry->waiting_jobs, waiting_list)
+ job_enqueue(job);
+
+ entry->waiting_jobs = NULL;
}
-static void worker_job_test(job_t *job)
+// Update an entry's field and call check_waiting_jobs.
+static void update_entry(db_entry *entry, db_entry_field *field, char *data)
{
- cache_entry_t *entry;
- job_t *dep;
- int result;
- char reply[1000];
+ pthread_mutex_lock(&entry->mutex);
- entry = cache_find(job->query);
+ field->state = DB_VALID;
+ free(field->data);
+ field->data = data;
+ field->exp_time = get_time() + 24 * TIME_HOUR;
- if (!entry->revdns) {
- DEBUG("cache miss on revdns\n");
- dep = job_create();
- dep->type = JOB_REVDNS;
- dep->query = job->query;
- job_add_dep(job, dep);
- }
+ // invalidate the result cache
+ entry->cached_result_valid = false;
- if (!entry->whois) {
- DEBUG("cache miss on whois\n");
- dep = job_create();
- dep->type = JOB_WHOIS;
- dep->query = job->query;
- job_add_dep(job, dep);
- }
+ check_waiting_jobs(entry);
- if (!entry->revdns || !entry->whois) {
- pthread_mutex_unlock(&entry->mutex);
- return;
- }
+ pthread_mutex_unlock(&entry->mutex);
+}
- DEBUG("ready for schachtest\n");
+// This job is queued when both WHOIS and revDNS data is available.
+// It calls lists_test and sends back the result.
+static void worker_job_reply(job_t *job)
+{
+ int result;
+ char reply[1000];
- result = lists_test(entry->revdns, entry->whois);
- pthread_mutex_unlock(&entry->mutex);
+ pthread_mutex_lock(&job->entry->mutex);
+
+ if (!job->entry->cached_result_valid) {
+ DEBUG("result cache miss for %08X\n", job->ipv4);
+ result = lists_test(job->entry->revdns.data,
+ job->entry->whois.data);
+
+ job->entry->cached_result_valid = true;
+ job->entry->cached_result = result;
+ } else
+ result = job->entry->cached_result;
+
+ pthread_mutex_unlock(&job->entry->mutex);
snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n",
RESPONSE_HEADER,
- (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF,
- (job->query >> 8) & 0xFF, job->query & 0xFF, result);
+ (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF,
+ (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF, result);
// FIXME: handle sendto errors
- sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->reply_to,
- sizeof(job->reply_to));
-}
+ sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->ret_addr,
+ sizeof(job->ret_addr));
+ DEBUG("reply %08X %i\n", job->ipv4, result);
+}
+// This job makes a revDNS query and stores it in the database.
static void worker_job_revdns(job_t *job)
{
struct sockaddr_in addr;
char buffer[NI_MAXHOST], *revdns;
int rv;
- cache_entry_t *entry;
+
+ DEBUG("getnameinfo(%08X, ...)\n", job->ipv4);
addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl(job->query);
+ addr.sin_addr.s_addr = htonl(job->ipv4);
addr.sin_port = 0;
rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer),
@@ -189,33 +207,26 @@ static void worker_job_revdns(job_t *job)
break;
}
- entry = cache_find(job->query);
- free(entry->revdns);
- entry->revdns = revdns;
- DEBUG("revdns is done\n");
- pthread_mutex_unlock(&entry->mutex);
+ update_entry(job->entry, &job->entry->revdns, revdns);
}
+// This job makes a WHOIS query and stores it in the database.
static void worker_job_whois(job_t *job)
{
char command[80];
FILE *fp;
char *out;
- cache_entry_t *entry;
snprintf(command, sizeof(command), "whois \"%hhu.%hhu.%hhu.%hhu\"",
- (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF,
- (job->query >> 8) & 0xFF, job->query & 0xFF);
+ (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF,
+ (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF);
+ DEBUG("%s\n", command);
fp = popen(command, "r");
- read_everything(fp, &out);
+ read_everything(fp, &out); // FIXME: handle errors
fclose(fp);
- entry = cache_find(job->query);
- free(entry->whois);
- entry->whois = out;
- DEBUG("whois is done\n");
- pthread_mutex_unlock(&entry->mutex);
+ update_entry(job->entry, &job->entry->whois, out);
}
void *worker_main(void *unused)
@@ -227,16 +238,19 @@ void *worker_main(void *unused)
while (!job_queue && !job_quit_flag)
pthread_cond_wait(&job_queue_cond, &job_queue_mutex);
- if (job_quit_flag)
+ if (job_quit_flag) {
+ pthread_mutex_unlock(&job_queue_mutex);
+ DEBUG("quitting\n");
return NULL;
+ }
job = job_queue;
- eli_unlink(&job_queue, job, list);
+ eli_unlink(&job_queue, job, queue_list);
pthread_mutex_unlock(&job_queue_mutex);
switch (job->type) {
- case JOB_TEST:
- worker_job_test(job);
+ case JOB_REPLY:
+ worker_job_reply(job);
break;
case JOB_REVDNS:
@@ -248,24 +262,6 @@ void *worker_main(void *unused)
break;
}
- // Handle the dependencies first. Keep this job off the queue.
- if (job->deps) {
- job_t *dep;
-
- eli_for(dep, job->deps, dep_list)
- job_enqueue(dep);
- memset(&job->deps, 0, sizeof(job->deps));
- continue;
- }
-
- if (job->parent) {
- pthread_mutex_lock(&job->parent->mutex);
- job->parent->deps_done++;
- if (job->parent->deps_done == job->parent->num_deps)
- job_enqueue(job->parent);
- pthread_mutex_unlock(&job->parent->mutex);
- }
-
- free(job);
+ job_destroy(job);
}
}