/* Copyright (C) 2017 Paweł Redman This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include "shared.h" #include // reverse DNS static job_t *job_queue; static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; static int job_quit_flag = 0; job_t *job_create(job_type_t type, db_entry *entry) { job_t *job; job = calloc(1, sizeof(job_t)); if (!job) { fprintf(stderr, "job_create: calloc failed\n"); abort(); } job->type = type; job->entry = entry; return job; } void job_destroy(job_t *job) { free(job); } void job_enqueue(job_t *job) { pthread_mutex_lock(&job_queue_mutex); eli_append(&job_queue, job, queue_list); pthread_mutex_unlock(&job_queue_mutex); pthread_cond_signal(&job_queue_cond); } // FIXME: bad name void job_quit(void) { pthread_mutex_lock(&job_queue_mutex); job_quit_flag = 1; pthread_mutex_unlock(&job_queue_mutex); pthread_cond_broadcast(&job_queue_cond); } // FIXME: refactor with vstrs static int read_everything(FILE *fp, char **buffer) { size_t read = 0, alloc = 128, partial; alloc = 128; *buffer = malloc(alloc); if (!*buffer) goto oom; while (1) { size_t left = alloc - read; if (left == 0) { alloc *= 2; *buffer = realloc(*buffer, alloc); if (!buffer) goto oom; continue; } partial = fread((*buffer) + read, 1, left, fp); if (partial == 0) { if (ferror(fp)) return 1; break; } read += partial; } *buffer = realloc(*buffer, read + 1); (*buffer)[read] = '\0'; return 0; oom: fprintf(stderr, "read_everything: out of memory\n"); abort(); } // Check if the database entry is complete (all fields are DB_VALID). // If it is then enqueue all of its waiting jobs. static void check_waiting_jobs(db_entry *entry) { job_t *job; if (entry->revdns.state != DB_VALID || entry->whois.state != DB_VALID) return; eli_for(job, entry->waiting_jobs, waiting_list) job_enqueue(job); entry->waiting_jobs = NULL; } // Update an entry's field and call check_waiting_jobs. static void update_entry(db_entry *entry, db_entry_field *field, char *data) { pthread_mutex_lock(&entry->mutex); field->state = DB_VALID; free(field->data); field->data = data; field->exp_time = get_time() + 24 * TIME_HOUR; // invalidate the result cache entry->cached_result_valid = false; check_waiting_jobs(entry); pthread_mutex_unlock(&entry->mutex); } // This job is queued when both WHOIS and revDNS data is available. // It calls lists_test and sends back the result. static void worker_job_reply(job_t *job) { int result; char reply[1000]; pthread_mutex_lock(&job->entry->mutex); if (!job->entry->cached_result_valid) { DEBUG("result cache miss for %08X\n", job->ipv4); result = lists_test(job->entry->revdns.data, job->entry->whois.data); job->entry->cached_result_valid = true; job->entry->cached_result = result; } else result = job->entry->cached_result; pthread_mutex_unlock(&job->entry->mutex); snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n", RESPONSE_HEADER, (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF, (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF, result); // FIXME: handle sendto errors sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->ret_addr, sizeof(job->ret_addr)); DEBUG("reply %08X %i\n", job->ipv4, result); } // This job makes a revDNS query and stores it in the database. static void worker_job_revdns(job_t *job) { struct sockaddr_in addr; char buffer[NI_MAXHOST], *revdns; int rv; DEBUG("getnameinfo(%08X, ...)\n", job->ipv4); addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(job->ipv4); addr.sin_port = 0; rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer), NULL, 0, NI_NAMEREQD); switch (rv) { case 0: revdns = strdup(buffer); break; case EAI_NONAME: // FIXME: handle errors and missing revDNS entries better revdns = strdup(""); break; case EAI_SYSTEM: revdns = strdup(""); perror("getnameinfo"); break; default: revdns = strdup(""); fprintf(stderr, "getnameinfo: %s\n", gai_strerror(rv)); break; } update_entry(job->entry, &job->entry->revdns, revdns); } // This job makes a WHOIS query and stores it in the database. static void worker_job_whois(job_t *job) { char command[80]; FILE *fp; char *out; snprintf(command, sizeof(command), "whois \"%hhu.%hhu.%hhu.%hhu\"", (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF, (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF); DEBUG("%s\n", command); fp = popen(command, "r"); read_everything(fp, &out); // FIXME: handle errors fclose(fp); update_entry(job->entry, &job->entry->whois, out); } void *worker_main(void *unused) { for (;;) { job_t *job; pthread_mutex_lock(&job_queue_mutex); while (!job_queue && !job_quit_flag) pthread_cond_wait(&job_queue_cond, &job_queue_mutex); if (job_quit_flag) { pthread_mutex_unlock(&job_queue_mutex); DEBUG("quitting\n"); return NULL; } job = job_queue; eli_unlink(&job_queue, job, queue_list); pthread_mutex_unlock(&job_queue_mutex); switch (job->type) { case JOB_REPLY: worker_job_reply(job); break; case JOB_REVDNS: worker_job_revdns(job); break; case JOB_WHOIS: worker_job_whois(job); break; } job_destroy(job); } }