diff options
author | Paweł Redman <pawel.redman@gmail.com> | 2017-04-05 23:09:06 +0200 |
---|---|---|
committer | Paweł Redman <pawel.redman@gmail.com> | 2017-04-05 23:09:06 +0200 |
commit | da4626cb5a741a9b7861dd54f8570a00753a5d92 (patch) | |
tree | 3731ac96db396ceaf83d0c867aff06810a08daa3 /src/worker.c |
Initial commit.
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/src/worker.c b/src/worker.c new file mode 100644 index 0000000..59a19cb --- /dev/null +++ b/src/worker.c @@ -0,0 +1,271 @@ +/* +Copyright (C) 2017 Paweł Redman + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 3 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "shared.h" +#include <netdb.h> // reverse DNS + +static job_t *job_queue; +static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER; +static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; +static int job_quit_flag = 0; + +job_t *job_create(void) +{ + job_t *job; + + job = calloc(1, sizeof(job_t)); + if (!job) { + fprintf(stderr, "job_create: calloc failed\n"); + abort(); + } + + pthread_mutex_init(&job->mutex, NULL); + return job; +} + +void job_enqueue(job_t *job) +{ + pthread_mutex_lock(&job_queue_mutex); + eli_append(&job_queue, job, list); + pthread_mutex_unlock(&job_queue_mutex); + pthread_cond_signal(&job_queue_cond); +} + +static void job_add_dep(job_t *parent, job_t *dep) +{ + eli_append(&parent->deps, dep, dep_list); + dep->parent = parent; + parent->num_deps++; +} + +void job_quit(void) +{ + pthread_mutex_lock(&job_queue_mutex); + job_quit_flag = 1; + pthread_mutex_unlock(&job_queue_mutex); + pthread_cond_broadcast(&job_queue_cond); +} + +int _host = 0, _whois = 0; + +int read_everything(FILE *fp, char **buffer) +{ + size_t read = 0, alloc = 128, partial; + + alloc = 128; + *buffer = malloc(alloc); + if (!*buffer) + goto oom; + + while (1) { + size_t left = alloc - read; + + if (left == 0) { + alloc *= 2; + *buffer = realloc(*buffer, alloc); + if (!buffer) + goto oom; + + continue; + } + + partial = fread((*buffer) + read, 1, left, fp); + if (partial == 0) { + if (ferror(fp)) + return 1; + + break; + } + + read += partial; + } + + + *buffer = realloc(*buffer, read + 1); + (*buffer)[read] = '\0'; + return 0; + +oom: + fprintf(stderr, "read_everything: out of memory\n"); + abort(); +} + +int schachtest(const char *revdns, const char *whois) +{ + return 0; +} + +static void worker_job_test(job_t *job) +{ + cache_entry_t *entry; + job_t *dep; + int result; + char reply[1000]; + + entry = cache_find(job->query); + + if (!entry->revdns) { + DEBUG("cache miss on revdns\n"); + dep = job_create(); + dep->type = JOB_REVDNS; + dep->query = job->query; + job_add_dep(job, dep); + } + + if (!entry->whois) { + DEBUG("cache miss on whois\n"); + dep = job_create(); + dep->type = JOB_WHOIS; + dep->query = job->query; + job_add_dep(job, dep); + } + + if (!entry->revdns || !entry->whois) { + pthread_mutex_unlock(&entry->mutex); + return; + } + + DEBUG("ready for schachtest\n"); + + result = lists_test(entry->revdns, entry->whois); + pthread_mutex_unlock(&entry->mutex); + + snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n", + RESPONSE_HEADER, + (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, + (job->query >> 8) & 0xFF, job->query & 0xFF, result); + // FIXME: handle sendto errors + sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->reply_to, + sizeof(job->reply_to)); +} + + +static void worker_job_revdns(job_t *job) +{ + struct sockaddr_in addr; + char buffer[NI_MAXHOST], *revdns; + int rv; + cache_entry_t *entry; + + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(job->query); + addr.sin_port = 0; + + rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer), + NULL, 0, NI_NAMEREQD); + switch (rv) { + case 0: + revdns = strdup(buffer); + break; + + case EAI_NONAME: + // FIXME: handle errors and missing revDNS entries better + revdns = strdup("<none>"); + break; + + case EAI_SYSTEM: + revdns = strdup("<error>"); + perror("getnameinfo"); + break; + + default: + revdns = strdup("<error>"); + fprintf(stderr, "getnameinfo: %s\n", gai_strerror(rv)); + break; + } + + entry = cache_find(job->query); + free(entry->revdns); + entry->revdns = revdns; + DEBUG("revdns is done\n"); + pthread_mutex_unlock(&entry->mutex); +} + +static void worker_job_whois(job_t *job) +{ + char command[80]; + FILE *fp; + char *out; + cache_entry_t *entry; + + snprintf(command, sizeof(command), "whois \"%hhu.%hhu.%hhu.%hhu\"", + (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, + (job->query >> 8) & 0xFF, job->query & 0xFF); + + fp = popen(command, "r"); + read_everything(fp, &out); + fclose(fp); + + entry = cache_find(job->query); + free(entry->whois); + entry->whois = out; + DEBUG("whois is done\n"); + pthread_mutex_unlock(&entry->mutex); +} + +void *worker_main(void *unused) +{ + for (;;) { + job_t *job; + + pthread_mutex_lock(&job_queue_mutex); + while (!job_queue && !job_quit_flag) + pthread_cond_wait(&job_queue_cond, &job_queue_mutex); + + if (job_quit_flag) + return NULL; + + job = job_queue; + eli_unlink(&job_queue, job, list); + pthread_mutex_unlock(&job_queue_mutex); + + switch (job->type) { + case JOB_TEST: + worker_job_test(job); + break; + + case JOB_REVDNS: + worker_job_revdns(job); + break; + + case JOB_WHOIS: + worker_job_whois(job); + break; + } + + // Handle the dependencies first. Keep this job off the queue. + if (job->deps) { + job_t *dep; + + eli_for(dep, job->deps, dep_list) + job_enqueue(dep); + memset(&job->deps, 0, sizeof(job->deps)); + continue; + } + + if (job->parent) { + pthread_mutex_lock(&job->parent->mutex); + job->parent->deps_done++; + if (job->parent->deps_done == job->parent->num_deps) + job_enqueue(job->parent); + pthread_mutex_unlock(&job->parent->mutex); + } + + free(job); + } +} |