summaryrefslogtreecommitdiff
path: root/src/worker.c
diff options
context:
space:
mode:
authorPaweł Redman <pawel.redman@gmail.com>2017-04-05 23:09:06 +0200
committerPaweł Redman <pawel.redman@gmail.com>2017-04-05 23:09:06 +0200
commitda4626cb5a741a9b7861dd54f8570a00753a5d92 (patch)
tree3731ac96db396ceaf83d0c867aff06810a08daa3 /src/worker.c
Initial commit.
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c271
1 files changed, 271 insertions, 0 deletions
diff --git a/src/worker.c b/src/worker.c
new file mode 100644
index 0000000..59a19cb
--- /dev/null
+++ b/src/worker.c
@@ -0,0 +1,271 @@
+/*
+Copyright (C) 2017 Paweł Redman
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 3
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software Foundation,
+Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "shared.h"
+#include <netdb.h> // reverse DNS
+
+static job_t *job_queue;
+static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+static int job_quit_flag = 0;
+
+job_t *job_create(void)
+{
+ job_t *job;
+
+ job = calloc(1, sizeof(job_t));
+ if (!job) {
+ fprintf(stderr, "job_create: calloc failed\n");
+ abort();
+ }
+
+ pthread_mutex_init(&job->mutex, NULL);
+ return job;
+}
+
+void job_enqueue(job_t *job)
+{
+ pthread_mutex_lock(&job_queue_mutex);
+ eli_append(&job_queue, job, list);
+ pthread_mutex_unlock(&job_queue_mutex);
+ pthread_cond_signal(&job_queue_cond);
+}
+
+static void job_add_dep(job_t *parent, job_t *dep)
+{
+ eli_append(&parent->deps, dep, dep_list);
+ dep->parent = parent;
+ parent->num_deps++;
+}
+
+void job_quit(void)
+{
+ pthread_mutex_lock(&job_queue_mutex);
+ job_quit_flag = 1;
+ pthread_mutex_unlock(&job_queue_mutex);
+ pthread_cond_broadcast(&job_queue_cond);
+}
+
+int _host = 0, _whois = 0;
+
+int read_everything(FILE *fp, char **buffer)
+{
+ size_t read = 0, alloc = 128, partial;
+
+ alloc = 128;
+ *buffer = malloc(alloc);
+ if (!*buffer)
+ goto oom;
+
+ while (1) {
+ size_t left = alloc - read;
+
+ if (left == 0) {
+ alloc *= 2;
+ *buffer = realloc(*buffer, alloc);
+ if (!buffer)
+ goto oom;
+
+ continue;
+ }
+
+ partial = fread((*buffer) + read, 1, left, fp);
+ if (partial == 0) {
+ if (ferror(fp))
+ return 1;
+
+ break;
+ }
+
+ read += partial;
+ }
+
+
+ *buffer = realloc(*buffer, read + 1);
+ (*buffer)[read] = '\0';
+ return 0;
+
+oom:
+ fprintf(stderr, "read_everything: out of memory\n");
+ abort();
+}
+
+int schachtest(const char *revdns, const char *whois)
+{
+ return 0;
+}
+
+static void worker_job_test(job_t *job)
+{
+ cache_entry_t *entry;
+ job_t *dep;
+ int result;
+ char reply[1000];
+
+ entry = cache_find(job->query);
+
+ if (!entry->revdns) {
+ DEBUG("cache miss on revdns\n");
+ dep = job_create();
+ dep->type = JOB_REVDNS;
+ dep->query = job->query;
+ job_add_dep(job, dep);
+ }
+
+ if (!entry->whois) {
+ DEBUG("cache miss on whois\n");
+ dep = job_create();
+ dep->type = JOB_WHOIS;
+ dep->query = job->query;
+ job_add_dep(job, dep);
+ }
+
+ if (!entry->revdns || !entry->whois) {
+ pthread_mutex_unlock(&entry->mutex);
+ return;
+ }
+
+ DEBUG("ready for schachtest\n");
+
+ result = lists_test(entry->revdns, entry->whois);
+ pthread_mutex_unlock(&entry->mutex);
+
+ snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n",
+ RESPONSE_HEADER,
+ (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF,
+ (job->query >> 8) & 0xFF, job->query & 0xFF, result);
+ // FIXME: handle sendto errors
+ sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->reply_to,
+ sizeof(job->reply_to));
+}
+
+
+static void worker_job_revdns(job_t *job)
+{
+ struct sockaddr_in addr;
+ char buffer[NI_MAXHOST], *revdns;
+ int rv;
+ cache_entry_t *entry;
+
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl(job->query);
+ addr.sin_port = 0;
+
+ rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer),
+ NULL, 0, NI_NAMEREQD);
+ switch (rv) {
+ case 0:
+ revdns = strdup(buffer);
+ break;
+
+ case EAI_NONAME:
+ // FIXME: handle errors and missing revDNS entries better
+ revdns = strdup("<none>");
+ break;
+
+ case EAI_SYSTEM:
+ revdns = strdup("<error>");
+ perror("getnameinfo");
+ break;
+
+ default:
+ revdns = strdup("<error>");
+ fprintf(stderr, "getnameinfo: %s\n", gai_strerror(rv));
+ break;
+ }
+
+ entry = cache_find(job->query);
+ free(entry->revdns);
+ entry->revdns = revdns;
+ DEBUG("revdns is done\n");
+ pthread_mutex_unlock(&entry->mutex);
+}
+
+static void worker_job_whois(job_t *job)
+{
+ char command[80];
+ FILE *fp;
+ char *out;
+ cache_entry_t *entry;
+
+ snprintf(command, sizeof(command), "whois \"%hhu.%hhu.%hhu.%hhu\"",
+ (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF,
+ (job->query >> 8) & 0xFF, job->query & 0xFF);
+
+ fp = popen(command, "r");
+ read_everything(fp, &out);
+ fclose(fp);
+
+ entry = cache_find(job->query);
+ free(entry->whois);
+ entry->whois = out;
+ DEBUG("whois is done\n");
+ pthread_mutex_unlock(&entry->mutex);
+}
+
+void *worker_main(void *unused)
+{
+ for (;;) {
+ job_t *job;
+
+ pthread_mutex_lock(&job_queue_mutex);
+ while (!job_queue && !job_quit_flag)
+ pthread_cond_wait(&job_queue_cond, &job_queue_mutex);
+
+ if (job_quit_flag)
+ return NULL;
+
+ job = job_queue;
+ eli_unlink(&job_queue, job, list);
+ pthread_mutex_unlock(&job_queue_mutex);
+
+ switch (job->type) {
+ case JOB_TEST:
+ worker_job_test(job);
+ break;
+
+ case JOB_REVDNS:
+ worker_job_revdns(job);
+ break;
+
+ case JOB_WHOIS:
+ worker_job_whois(job);
+ break;
+ }
+
+ // Handle the dependencies first. Keep this job off the queue.
+ if (job->deps) {
+ job_t *dep;
+
+ eli_for(dep, job->deps, dep_list)
+ job_enqueue(dep);
+ memset(&job->deps, 0, sizeof(job->deps));
+ continue;
+ }
+
+ if (job->parent) {
+ pthread_mutex_lock(&job->parent->mutex);
+ job->parent->deps_done++;
+ if (job->parent->deps_done == job->parent->num_deps)
+ job_enqueue(job->parent);
+ pthread_mutex_unlock(&job->parent->mutex);
+ }
+
+ free(job);
+ }
+}