/* Copyright (C) 2017 Paweł Redman This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include "shared.h" #include // reverse DNS static job_t *job_queue; static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; static int job_quit_flag = 0; job_t *job_create(void) { job_t *job; job = calloc(1, sizeof(job_t)); if (!job) { fprintf(stderr, "job_create: calloc failed\n"); abort(); } pthread_mutex_init(&job->mutex, NULL); return job; } void job_enqueue(job_t *job) { pthread_mutex_lock(&job_queue_mutex); eli_append(&job_queue, job, list); pthread_mutex_unlock(&job_queue_mutex); pthread_cond_signal(&job_queue_cond); } static void job_add_dep(job_t *parent, job_t *dep) { eli_append(&parent->deps, dep, dep_list); dep->parent = parent; parent->num_deps++; } void job_quit(void) { pthread_mutex_lock(&job_queue_mutex); job_quit_flag = 1; pthread_mutex_unlock(&job_queue_mutex); pthread_cond_broadcast(&job_queue_cond); } int _host = 0, _whois = 0; int read_everything(FILE *fp, char **buffer) { size_t read = 0, alloc = 128, partial; alloc = 128; *buffer = malloc(alloc); if (!*buffer) goto oom; while (1) { size_t left = alloc - read; if (left == 0) { alloc *= 2; *buffer = realloc(*buffer, alloc); if (!buffer) goto oom; continue; } partial = fread((*buffer) + read, 1, left, fp); if (partial == 0) { if (ferror(fp)) return 1; break; } read += partial; } *buffer = realloc(*buffer, read + 1); (*buffer)[read] = '\0'; return 0; oom: fprintf(stderr, "read_everything: out of memory\n"); abort(); } int schachtest(const char *revdns, const char *whois) { return 0; } static void worker_job_test(job_t *job) { cache_entry_t *entry; job_t *dep; int result; char reply[1000]; entry = cache_find(job->query); if (!entry->revdns) { DEBUG("cache miss on revdns\n"); dep = job_create(); dep->type = JOB_REVDNS; dep->query = job->query; job_add_dep(job, dep); } if (!entry->whois) { DEBUG("cache miss on whois\n"); dep = job_create(); dep->type = JOB_WHOIS; dep->query = job->query; job_add_dep(job, dep); } if (!entry->revdns || !entry->whois) { pthread_mutex_unlock(&entry->mutex); return; } DEBUG("ready for schachtest\n"); result = lists_test(entry->revdns, entry->whois); pthread_mutex_unlock(&entry->mutex); snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n", RESPONSE_HEADER, (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, (job->query >> 8) & 0xFF, job->query & 0xFF, result); // FIXME: handle sendto errors sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->reply_to, sizeof(job->reply_to)); } static void worker_job_revdns(job_t *job) { struct sockaddr_in addr; char buffer[NI_MAXHOST], *revdns; int rv; cache_entry_t *entry; addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(job->query); addr.sin_port = 0; rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer), NULL, 0, NI_NAMEREQD); switch (rv) { case 0: revdns = strdup(buffer); break; case EAI_NONAME: // FIXME: handle errors and missing revDNS entries better revdns = strdup(""); break; case EAI_SYSTEM: revdns = strdup(""); perror("getnameinfo"); break; default: revdns = strdup(""); fprintf(stderr, "getnameinfo: %s\n", gai_strerror(rv)); break; } entry = cache_find(job->query); free(entry->revdns); entry->revdns = revdns; DEBUG("revdns is done\n"); pthread_mutex_unlock(&entry->mutex); } static void worker_job_whois(job_t *job) { char command[80]; FILE *fp; char *out; cache_entry_t *entry; snprintf(command, sizeof(command), "whois \"%hhu.%hhu.%hhu.%hhu\"", (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, (job->query >> 8) & 0xFF, job->query & 0xFF); fp = popen(command, "r"); read_everything(fp, &out); fclose(fp); entry = cache_find(job->query); free(entry->whois); entry->whois = out; DEBUG("whois is done\n"); pthread_mutex_unlock(&entry->mutex); } void *worker_main(void *unused) { for (;;) { job_t *job; pthread_mutex_lock(&job_queue_mutex); while (!job_queue && !job_quit_flag) pthread_cond_wait(&job_queue_cond, &job_queue_mutex); if (job_quit_flag) return NULL; job = job_queue; eli_unlink(&job_queue, job, list); pthread_mutex_unlock(&job_queue_mutex); switch (job->type) { case JOB_TEST: worker_job_test(job); break; case JOB_REVDNS: worker_job_revdns(job); break; case JOB_WHOIS: worker_job_whois(job); break; } // Handle the dependencies first. Keep this job off the queue. if (job->deps) { job_t *dep; eli_for(dep, job->deps, dep_list) job_enqueue(dep); memset(&job->deps, 0, sizeof(job->deps)); continue; } if (job->parent) { pthread_mutex_lock(&job->parent->mutex); job->parent->deps_done++; if (job->parent->deps_done == job->parent->num_deps) job_enqueue(job->parent); pthread_mutex_unlock(&job->parent->mutex); } free(job); } }