diff options
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 168 |
1 files changed, 82 insertions, 86 deletions
diff --git a/src/worker.c b/src/worker.c index 59a19cb..82d4ce6 100644 --- a/src/worker.c +++ b/src/worker.c @@ -24,7 +24,7 @@ static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; static int job_quit_flag = 0; -job_t *job_create(void) +job_t *job_create(job_type_t type, db_entry *entry) { job_t *job; @@ -34,25 +34,26 @@ job_t *job_create(void) abort(); } - pthread_mutex_init(&job->mutex, NULL); + job->type = type; + job->entry = entry; + return job; } +void job_destroy(job_t *job) +{ + free(job); +} + void job_enqueue(job_t *job) { pthread_mutex_lock(&job_queue_mutex); - eli_append(&job_queue, job, list); + eli_append(&job_queue, job, queue_list); pthread_mutex_unlock(&job_queue_mutex); pthread_cond_signal(&job_queue_cond); } -static void job_add_dep(job_t *parent, job_t *dep) -{ - eli_append(&parent->deps, dep, dep_list); - dep->parent = parent; - parent->num_deps++; -} - +// FIXME: bad name void job_quit(void) { pthread_mutex_lock(&job_queue_mutex); @@ -61,9 +62,8 @@ void job_quit(void) pthread_cond_broadcast(&job_queue_cond); } -int _host = 0, _whois = 0; - -int read_everything(FILE *fp, char **buffer) +// FIXME: refactor with vstrs +static int read_everything(FILE *fp, char **buffer) { size_t read = 0, alloc = 128, partial; @@ -105,65 +105,83 @@ oom: abort(); } -int schachtest(const char *revdns, const char *whois) +// Check if the database entry is complete (all fields are DB_VALID). +// If it is then enqueue all of its waiting jobs. +static void check_waiting_jobs(db_entry *entry) { - return 0; + job_t *job; + + if (entry->revdns.state != DB_VALID + || entry->whois.state != DB_VALID) + return; + + eli_for(job, entry->waiting_jobs, waiting_list) + job_enqueue(job); + + entry->waiting_jobs = NULL; } -static void worker_job_test(job_t *job) +// Update an entry's field and call check_waiting_jobs. +static void update_entry(db_entry *entry, db_entry_field *field, char *data) { - cache_entry_t *entry; - job_t *dep; - int result; - char reply[1000]; + pthread_mutex_lock(&entry->mutex); - entry = cache_find(job->query); + field->state = DB_VALID; + free(field->data); + field->data = data; + field->exp_time = get_time() + 24 * TIME_HOUR; - if (!entry->revdns) { - DEBUG("cache miss on revdns\n"); - dep = job_create(); - dep->type = JOB_REVDNS; - dep->query = job->query; - job_add_dep(job, dep); - } + // invalidate the result cache + entry->cached_result_valid = false; - if (!entry->whois) { - DEBUG("cache miss on whois\n"); - dep = job_create(); - dep->type = JOB_WHOIS; - dep->query = job->query; - job_add_dep(job, dep); - } + check_waiting_jobs(entry); - if (!entry->revdns || !entry->whois) { - pthread_mutex_unlock(&entry->mutex); - return; - } + pthread_mutex_unlock(&entry->mutex); +} - DEBUG("ready for schachtest\n"); +// This job is queued when both WHOIS and revDNS data is available. +// It calls lists_test and sends back the result. +static void worker_job_reply(job_t *job) +{ + int result; + char reply[1000]; - result = lists_test(entry->revdns, entry->whois); - pthread_mutex_unlock(&entry->mutex); + pthread_mutex_lock(&job->entry->mutex); + + if (!job->entry->cached_result_valid) { + DEBUG("result cache miss for %08X\n", job->ipv4); + result = lists_test(job->entry->revdns.data, + job->entry->whois.data); + + job->entry->cached_result_valid = true; + job->entry->cached_result = result; + } else + result = job->entry->cached_result; + + pthread_mutex_unlock(&job->entry->mutex); snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n", RESPONSE_HEADER, - (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, - (job->query >> 8) & 0xFF, job->query & 0xFF, result); + (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF, + (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF, result); // FIXME: handle sendto errors - sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->reply_to, - sizeof(job->reply_to)); -} + sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->ret_addr, + sizeof(job->ret_addr)); + DEBUG("reply %08X %i\n", job->ipv4, result); +} +// This job makes a revDNS query and stores it in the database. static void worker_job_revdns(job_t *job) { struct sockaddr_in addr; char buffer[NI_MAXHOST], *revdns; int rv; - cache_entry_t *entry; + + DEBUG("getnameinfo(%08X, ...)\n", job->ipv4); addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(job->query); + addr.sin_addr.s_addr = htonl(job->ipv4); addr.sin_port = 0; rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer), @@ -189,33 +207,26 @@ static void worker_job_revdns(job_t *job) break; } - entry = cache_find(job->query); - free(entry->revdns); - entry->revdns = revdns; - DEBUG("revdns is done\n"); - pthread_mutex_unlock(&entry->mutex); + update_entry(job->entry, &job->entry->revdns, revdns); } +// This job makes a WHOIS query and stores it in the database. static void worker_job_whois(job_t *job) { char command[80]; FILE *fp; char *out; - cache_entry_t *entry; snprintf(command, sizeof(command), "whois \"%hhu.%hhu.%hhu.%hhu\"", - (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, - (job->query >> 8) & 0xFF, job->query & 0xFF); + (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF, + (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF); + DEBUG("%s\n", command); fp = popen(command, "r"); - read_everything(fp, &out); + read_everything(fp, &out); // FIXME: handle errors fclose(fp); - entry = cache_find(job->query); - free(entry->whois); - entry->whois = out; - DEBUG("whois is done\n"); - pthread_mutex_unlock(&entry->mutex); + update_entry(job->entry, &job->entry->whois, out); } void *worker_main(void *unused) @@ -227,16 +238,19 @@ void *worker_main(void *unused) while (!job_queue && !job_quit_flag) pthread_cond_wait(&job_queue_cond, &job_queue_mutex); - if (job_quit_flag) + if (job_quit_flag) { + pthread_mutex_unlock(&job_queue_mutex); + DEBUG("quitting\n"); return NULL; + } job = job_queue; - eli_unlink(&job_queue, job, list); + eli_unlink(&job_queue, job, queue_list); pthread_mutex_unlock(&job_queue_mutex); switch (job->type) { - case JOB_TEST: - worker_job_test(job); + case JOB_REPLY: + worker_job_reply(job); break; case JOB_REVDNS: @@ -248,24 +262,6 @@ void *worker_main(void *unused) break; } - // Handle the dependencies first. Keep this job off the queue. - if (job->deps) { - job_t *dep; - - eli_for(dep, job->deps, dep_list) - job_enqueue(dep); - memset(&job->deps, 0, sizeof(job->deps)); - continue; - } - - if (job->parent) { - pthread_mutex_lock(&job->parent->mutex); - job->parent->deps_done++; - if (job->parent->deps_done == job->parent->num_deps) - job_enqueue(job->parent); - pthread_mutex_unlock(&job->parent->mutex); - } - - free(job); + job_destroy(job); } } |