/* Copyright (C) 2017 Paweł Redman This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include "shared.h" #include // reverse DNS #include #include static job_t *job_queue; static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; static int job_quit_flag = 0; job_t *job_create(job_type_t type, db_entry *entry) { job_t *job; job = calloc(1, sizeof(job_t)); if (!job) { fprintf(stderr, "job_create: calloc failed\n"); abort(); } job->type = type; job->entry = entry; return job; } void job_destroy(job_t *job) { free(job); } void job_enqueue(job_t *job) { pthread_mutex_lock(&job_queue_mutex); eli_append(&job_queue, job, queue_list); pthread_mutex_unlock(&job_queue_mutex); pthread_cond_signal(&job_queue_cond); } // FIXME: bad name void job_quit(void) { pthread_mutex_lock(&job_queue_mutex); job_quit_flag = 1; pthread_mutex_unlock(&job_queue_mutex); pthread_cond_broadcast(&job_queue_cond); } // Execute a process and read its stdout. // path: the path to the process. // argv: the argument list. The first arg is usually expected to be the // process's path and the last to be NULL. // timeout: wait for at most this long, kill the process if it takes to long. // out: the output. // NOTE: the caller is supposed to free the returned string // NOTE: it's undefined if this function fails static int get_process_output(const char *path, char * const *argv, const struct timeval *timeout, char **out) { int rv, pipefds[2], status; pid_t child; struct timeval time_left; vstr_t vstr = VSTR_INITIALIZER; if (pipe(pipefds) == -1) { perror("pipe"); return 1; } child = fork(); if (child == -1) { close(pipefds[0]); close(pipefds[1]); perror("fork"); return 1; } if (child == 0) { close(pipefds[0]); dup2(pipefds[1], STDOUT_FILENO); close(pipefds[1]); if (execv(path, argv) == -1) return 1; } close(pipefds[1]); memcpy(&time_left, timeout, sizeof(struct timeval)); for (;;) { fd_set fds; ssize_t i, r; char buffer[1024]; FD_ZERO(&fds); FD_SET(pipefds[0], &fds); rv = select(pipefds[0] + 1, &fds, NULL, NULL, &time_left); if (rv == 0) { eprintf("select timed out\n"); goto error; } else if (rv == -1) { perror("select"); goto error; } r = read(pipefds[0], buffer, sizeof(buffer)); if (r == 0) break; else if (r == -1) { perror("read"); goto error; } // FIXME: don't do this char-by-char for (i = 0; i < r; i++) if (vstr_putc(&vstr, buffer[i])) { eprintf("get_process_output: out of memory\n"); goto error; } } if (waitpid(child, &status, 0) == -1) { perror("waitpid"); // FIXME: figure out it's safe to ignore waitpid errors. // I'm not sure if this can leak resources. } else { // FIXME: check the exit code for errors eprintf("subprocess exited with code %i\n", status); } close(pipefds[0]); if (vstr.size == 0) { *out = strdup(""); if (!*out) { eprintf("get_process_output: out of memory\n"); return 1; } } else { if (vstr.alloc > vstr.size + 1) { vstr.data = realloc(vstr.data, vstr.size + 1); if (!vstr.data) { eprintf("get_process_output: realloc failed (wtf?)\n"); return 1; } } vstr.data[vstr.size] = '\0'; *out = vstr.data; } return 0; error: vstr_destroy(&vstr); if (kill(child, SIGTERM) == -1) perror("kill"); close(pipefds[0]); return 1; } // Check if the database entry is complete (all fields are DB_VALID). // If it is then enqueue all of its waiting jobs. static void check_waiting_jobs(db_entry *entry) { job_t *job; if (entry->revdns.state != DB_VALID || entry->whois.state != DB_VALID) return; eli_for(job, entry->waiting_jobs, waiting_list) job_enqueue(job); entry->waiting_jobs = NULL; } // Update an entry's field and call check_waiting_jobs. static void update_entry(db_entry *entry, db_entry_field *field, char *data) { pthread_mutex_lock(&entry->mutex); free(field->data); if (data) { field->state = DB_VALID; field->data = data; field->exp_time = get_time() + 24 * TIME_HOUR; } else { field->state = DB_VALID; field->data = NULL; field->exp_time = get_time() + 30 * TIME_MINUTE; } // invalidate the result cache entry->cached_result_valid = false; check_waiting_jobs(entry); pthread_mutex_unlock(&entry->mutex); } // This job is queued when both WHOIS and revDNS data is available. // It calls lists_test and sends back the result. static void worker_job_reply(job_t *job) { int result; char reply[1000]; pthread_mutex_lock(&job->entry->mutex); if (!job->entry->cached_result_valid) { DEBUG("result cache miss for %08X\n", job->ipv4); result = lists_test(job->entry->revdns.data, job->entry->whois.data); job->entry->cached_result_valid = true; job->entry->cached_result = result; } else result = job->entry->cached_result; pthread_mutex_unlock(&job->entry->mutex); snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n", RESPONSE_HEADER, (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF, (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF, result); // FIXME: handle sendto errors sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->ret_addr, sizeof(job->ret_addr)); DEBUG("reply %08X %i\n", job->ipv4, result); } // This job makes a revDNS query and stores it in the database. static void worker_job_revdns(job_t *job) { struct sockaddr_in addr; char buffer[NI_MAXHOST], *revdns; int rv; DEBUG("getnameinfo(%08X, ...)\n", job->ipv4); addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(job->ipv4); addr.sin_port = 0; rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer), NULL, 0, NI_NAMEREQD); switch (rv) { case 0: revdns = strdup(buffer); break; case EAI_NONAME: // FIXME: handle errors and missing revDNS entries better revdns = strdup(""); break; case EAI_SYSTEM: revdns = strdup(""); perror("getnameinfo"); break; default: revdns = strdup(""); fprintf(stderr, "getnameinfo: %s\n", gai_strerror(rv)); break; } update_entry(job->entry, &job->entry->revdns, revdns); } // This job makes a WHOIS query and stores it in the database. static void worker_job_whois(job_t *job) { char *argv[3], iparg[20], *out; struct timeval timeout = {15, 0}; // FIXME: don't hardcode this snprintf(iparg, sizeof(iparg), "%hhu.%hhu.%hhu.%hhu", (job->ipv4 >> 24) & 0xFF, (job->ipv4 >> 16) & 0xFF, (job->ipv4 >> 8) & 0xFF, job->ipv4 & 0xFF); argv[0] = "/bin/whois"; argv[1] = iparg; argv[2] = NULL; if (get_process_output("/bin/whois", argv, &timeout, &out)) { eprintf("whois failed for %s\n", iparg); out = NULL; } update_entry(job->entry, &job->entry->whois, out); } void *worker_main(void *unused) { for (;;) { job_t *job; pthread_mutex_lock(&job_queue_mutex); while (!job_queue && !job_quit_flag) pthread_cond_wait(&job_queue_cond, &job_queue_mutex); if (job_quit_flag) { pthread_mutex_unlock(&job_queue_mutex); DEBUG("quitting\n"); return NULL; } job = job_queue; eli_unlink(&job_queue, job, queue_list); pthread_mutex_unlock(&job_queue_mutex); switch (job->type) { case JOB_REPLY: worker_job_reply(job); break; case JOB_REVDNS: worker_job_revdns(job); break; case JOB_WHOIS: worker_job_whois(job); break; } job_destroy(job); } }