From da4626cb5a741a9b7861dd54f8570a00753a5d92 Mon Sep 17 00:00:00 2001 From: Paweł Redman Date: Wed, 5 Apr 2017 23:09:06 +0200 Subject: Initial commit. --- src/cache.c | 74 ++++++++++++++++ src/eli.h | 84 ++++++++++++++++++ src/lexer.c | 254 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lists.c | 153 +++++++++++++++++++++++++++++++++ src/main.c | 137 ++++++++++++++++++++++++++++++ src/shared.h | 140 ++++++++++++++++++++++++++++++ src/worker.c | 271 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 1113 insertions(+) create mode 100644 src/cache.c create mode 100644 src/eli.h create mode 100644 src/lexer.c create mode 100644 src/lists.c create mode 100644 src/main.c create mode 100644 src/shared.h create mode 100644 src/worker.c (limited to 'src') diff --git a/src/cache.c b/src/cache.c new file mode 100644 index 0000000..6a31a16 --- /dev/null +++ b/src/cache.c @@ -0,0 +1,74 @@ +/* +Copyright (C) 2017 Paweł Redman + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 3 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "shared.h" + +#define HASH_MAX 4095 + +static size_t cache_hash(uint32_t ipv4) +{ + // FIXME: Use an academically acclaimed hashing algorithm. + return (((ipv4 >> 16) ^ ipv4) * 3137) & HASH_MAX; +} + +static cache_entry_t *cache_ht[HASH_MAX]; +static pthread_mutex_t cache_mutex = PTHREAD_MUTEX_INITIALIZER; + +// Returns an entry with its mutex locked. +cache_entry_t *cache_find(uint32_t ipv4) +{ + cache_entry_t *entry; + size_t hash; + + hash = cache_hash(ipv4); + + pthread_mutex_lock(&cache_mutex); + eli_for (entry, cache_ht[hash], ht_list) + if (entry->ipv4 == ipv4) + break; + + if (!entry) { + entry = calloc(1, sizeof(cache_entry_t)); + entry->ipv4 = ipv4; + pthread_mutex_init(&entry->mutex, NULL); + eli_append(cache_ht + hash, entry, ht_list); + } + + pthread_mutex_lock(&entry->mutex); + pthread_mutex_unlock(&cache_mutex); + + return entry; +} + +void cache_destroy(void) +{ + size_t i; + cache_entry_t *entry, *next; + + for (i = 0; i < HASH_MAX; i++) { + for (entry = cache_ht[i]; entry; entry = next) { + next = entry->ht_list.next; + + free(entry->revdns); + free(entry->whois); + free(entry); + } + + cache_ht[i] = NULL; + } +} diff --git a/src/eli.h b/src/eli.h new file mode 100644 index 0000000..8cd75a6 --- /dev/null +++ b/src/eli.h @@ -0,0 +1,84 @@ +/* +Copyright (C) 2017 Paweł Redman + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 3 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include // size_t + +#define eli_rebase(x, xm, y) \ +((void*)((char*)(y) + ((char*)(xm) - (char*)(x)))) + +typedef struct { + void *prev, *next; +} eli_header; + +// Link an element to the end of a list. +static inline void eli_append_real(void **head, void *entry, + eli_header *head_h, eli_header *entry_h) +{ + eli_header *last_h; + + // Empty list. + if (__builtin_expect(!*head, 0)) { + *head = entry; + entry_h->prev = entry; + entry_h->next = NULL; + return; + } + + entry_h->prev = head_h->prev; + entry_h->next = NULL; + + last_h = eli_rebase(entry, entry_h, head_h->prev); + last_h->next = entry; + + head_h->prev = entry; +} + +#define eli_append(pphead, entry, member) \ +eli_append_real((void**)(pphead), (entry), &((*(pphead))->member), &((entry)->member)) + +// Unlink an element (from anywhere in a list). +static inline void eli_unlink_real(void **head, void *entry, + eli_header *head_h, eli_header *entry_h) +{ + if (entry_h->prev && *head != entry) { + eli_header *prev_h; + + prev_h = eli_rebase(entry, entry_h, head_h->prev); + prev_h->next = entry_h->next; + } + + if (entry_h->next) { + eli_header *next_h; + + next_h = eli_rebase(entry, entry_h, head_h->next); + next_h->prev = entry_h->prev; + } else { + head_h->prev = entry_h->prev; + } + + if (*head == entry) + *head = entry_h->next; + +} + +#define eli_unlink(pphead, entry, member) \ +eli_unlink_real((void**)(pphead), (entry), &((*(pphead))->member), &((entry)->member)) + +// Loop over a list +#define eli_for(i, head, member) \ +for ((i) = (head); (i); (i) = (i)->member.next) diff --git a/src/lexer.c b/src/lexer.c new file mode 100644 index 0000000..5863ef6 --- /dev/null +++ b/src/lexer.c @@ -0,0 +1,254 @@ +/* +Copyright (C) 2017 Paweł Redman + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 3 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "shared.h" +#include + +void vstr_init(vstr_t *vstr) +{ + memset(vstr, 0, sizeof(*vstr)); +} + +void vstr_destroy(vstr_t *vstr) +{ + free(vstr->data); +} + +void vstr_clear(vstr_t *vstr) +{ + vstr->size = 0; +} + +static int vstr_enlarge(vstr_t *vstr) +{ + size_t new_alloc; + + new_alloc = (vstr->alloc + 4) * 3 / 2; + + vstr->data = realloc(vstr->data, new_alloc); + if (!vstr->data) + return 1; + + vstr->alloc = new_alloc; + return 0; +} + +int vstr_putc(vstr_t *vstr, char ch) +{ + // note: keep at least one character free at all times for vstr_termz + if (vstr->size + 2 > vstr->alloc) + if (vstr_enlarge(vstr)) + return -ENOMEM; + + vstr->data[vstr->size] = ch; + vstr->size++; + return 0; +} + +int vstr_cmp(vstr_t *vstr, const char *str) +{ + size_t len; + + len = strlen(str); + if (vstr->size < len) + len = vstr->size; + + return memcmp(vstr->data, str, len); +} + +char *vstr_strdup(vstr_t *vstr) +{ + char *str; + + str = malloc(vstr->size + 1); + if (!str) + return NULL; + + memcpy(str, vstr->data, vstr->size); + str[vstr->size] = 0; + + return str; +} + +char *vstr_to_cstr(vstr_t *vstr) +{ + vstr->data[vstr->size] = '\0'; + return vstr->data; +} + +int lexer_open(lexer_state_t *ls, const char *path, vstr_t *token) +{ + ls->error = 0; + ls->path = path; + + ls->fp = fopen(path, "r"); + if (!ls->fp) + return -errno; + + ls->eof = false; + + ls->token = token; + ls->buf_e = ls->buf_c = ls->buf; + ls->cc = ls->lc = ls->Cc = 0; + + ls->in_token = false; + ls->in_quote = false; + ls->in_comment = false; + + return 0; +} + +void lexer_close(lexer_state_t *ls) +{ + vstr_destroy(ls->token); + fclose(ls->fp); +} + +//RETURN VALUES +// <0 on error +// 0 on success +// note: sets ls->eof to true if there's no more data left +static int fill_buffer(lexer_state_t *ls) +{ + size_t read; + + read = fread(ls->buf, 1, sizeof(ls->buf), ls->fp); + if (read < sizeof(ls->buf)) { + if (ferror(ls->fp)) + return -errno; + + ls->eof = true; + } + + ls->buf_c = ls->buf; + ls->buf_e = ls->buf + read; + return 0; +} + +//RETURN VALUES +// -ENOMEM +// -EAGAIN when the buffer runs out +// 0 on success +// 1 when no data is left +static int read_buffer(lexer_state_t *ls) +{ + while (ls->buf_c < ls->buf_e) { + bool ret_token = false; + + if (*ls->buf_c == '\n') { + ls->lc++; + ls->Cc = 0; + } + + if (ls->in_comment) { + if (*ls->buf_c == '\n') + ls->in_comment = false; + } else if (isspace(*ls->buf_c) && !ls->in_quote) { + if (ls->in_token) { + ls->in_token = false; + ret_token = true; + } + } else if (*ls->buf_c == '/' && (ls->cc && ls->last == '/')) { + ls->in_comment = true; + ls->in_token = false; + + ls->token->size--; // remove the first slash + if (ls->token->size) + ret_token = true; + } else if (*ls->buf_c == '\"' && + (ls->cc && ls->last != '\\')) { + ls->in_quote = !ls->in_quote; + + if (!ls->in_quote) { + ls->in_token = false; + ret_token = true; + } + } else { + if (!ls->in_token) { + ls->in_token = true; + } + } + + if (ls->in_token) + if (vstr_putc(ls->token, *ls->buf_c)) { + ls->error = ENOMEM; + return -ENOMEM; + } + + ls->last = *ls->buf_c; + ls->buf_c++; + ls->cc++; + ls->Cc++; + + if (ret_token) + return 0; + } + + if (ls->eof) { + if (ls->token->size > 0) + return 0; + return 1; + } + + return -EAGAIN; +} + +//RETURN VALUES +// <0 on error +// 0 on success +// 1 when no data is left +int lexer_get_token(lexer_state_t *ls) +{ + int ret; + + vstr_clear(ls->token); + + while (1) { + ret = read_buffer(ls); + if (ret != -EAGAIN) + return ret; + + ret = fill_buffer(ls); + if (ret < 0) + return ret; + } +} + +void lexer_perror(lexer_state_t *ls, const char *fmt, ...) +{ + va_list vl; + + eprintf("%s:%zu:%zu: ", ls->path, ls->lc + 1, ls->Cc + 1); + + if (ls->error) { + perror(NULL); + } else { + va_start(vl, fmt); + vfprintf(stderr, fmt, vl); + va_end(vl); + } +} + +void lexer_perror_eg(lexer_state_t *ls, const char *expected) +{ + if (ls->eof && ls->buf_c == ls->buf_e) + lexer_perror(ls, "expected %s, got EOF\n", expected); + else + lexer_perror(ls, "expected %s, got \"%s\"\n", expected, + vstr_to_cstr(ls->token)); +} diff --git a/src/lists.c b/src/lists.c new file mode 100644 index 0000000..00e29ba --- /dev/null +++ b/src/lists.c @@ -0,0 +1,153 @@ +/* +Copyright (C) 2017 Paweł Redman + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 3 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "shared.h" + +typedef enum { + ENTRY_REVDNS, + ENTRY_WHOIS +} entry_type_t; + +typedef struct { + entry_type_t type; + + char *pattern; + int niceness; + + eli_header list; +} entry_t; + +static entry_t *entry_list; +static size_t total_entries; + +static int parse_entry(lexer_state_t *lexer, vstr_t *token, entry_type_t type) +{ + entry_t *entry; + int niceness; + char *pattern; + + if (lexer_get_token(lexer)) { + lexer_perror_eg(lexer, "the niceness adjustment"); + return 1; + } + + niceness = atoi(vstr_to_cstr(token)); + + if (lexer_get_token(lexer)) { + lexer_perror_eg(lexer, "the pattern"); + return 1; + } + + pattern = vstr_strdup(token); + if (!pattern) { + lexer_perror(lexer, "out of memory\n"); + return 1; + } + + entry = malloc(sizeof(entry_t)); + if (!entry) { + free(pattern); + lexer_perror(lexer, "out of memory\n"); + return 1; + } + + entry->type = type; + entry->niceness = niceness; + entry->pattern = pattern; + eli_append(&entry_list, entry, list); + total_entries++; + + return 0; +} + +// FIXME: Make this MT-safe (lock the list before adding shit to it) +int lists_load(const char *file, size_t depth) +{ + int error = 1, rv; + lexer_state_t lexer; + vstr_t token = VSTR_INITIALIZER; + + if (depth > 20) { + eprintf("%s: error: deep recursion (probably a circular dependency)\n", file); + return 1; + } + + if ((rv = lexer_open(&lexer, file, &token))) { + perror(file); + return 1; + } + + while (!lexer_get_token(&lexer)) { + if (!vstr_cmp(&token, "include")) { + if (lexer_get_token(&lexer)) { + lexer_perror_eg(&lexer, "the filename"); + goto out; + } + + // FIXME: relative paths + if (lists_load(vstr_to_cstr(&token), depth + 1)) { + lexer_perror(&lexer, "included from here\n"); + goto out; + } + } else if (!vstr_cmp(&token, "revdns")) { + if (parse_entry(&lexer, &token, ENTRY_REVDNS)) + goto out; + } else if (!vstr_cmp(&token, "whois")) { + if (parse_entry(&lexer, &token, ENTRY_WHOIS)) + goto out; + } else { + lexer_perror_eg(&lexer, "'whois' or 'revdns'"); + goto out; + } + } + + if (depth == 0) + DEBUG("loaded entries: %zu\n", total_entries); + + error = 0; +out: + lexer_close(&lexer); + return error; +} + +void lists_destroy(void) +{ + // TODO... +} + +int lists_test(const char *revdns, const char *whois) +{ + entry_t *entry; + int total = 0; + + eli_for(entry, entry_list, list) { + // TODO: regexps + if (entry->type == ENTRY_REVDNS && + !strstr(revdns, entry->pattern)) + continue; + + if (entry->type == ENTRY_WHOIS && + !strstr(whois, entry->pattern)) + continue; + + // TODO: avoid overflows + total += entry->niceness; + } + + return total; +} diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..69653c7 --- /dev/null +++ b/src/main.c @@ -0,0 +1,137 @@ +/* +Copyright (C) 2017 Paweł Redman + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 3 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "shared.h" + +int server_sockfd; + +static int parse_req_addr(char *begin, char *end, uint32_t *out) +{ + char *p; + size_t i = 0, parts[4] = {0, 0, 0, 0}; + + for (p = begin; p < end; p++) { + if (*p >= '0' && *p <= '9') { + parts[i] *= 10; + parts[i] += *p - '0'; + + if (parts[i] > 255) + return 1; + } else if (*p == '.') { + i++; + + if (i > 3) + return 1; + } else + break; + } + + if (i != 3) + return 1; + + *out = (parts[0] << 24) | (parts[1] << 16) | (parts[2] << 8) | parts[3]; + return 0; +} + +#define NUM_WORKERS 8 // FIXME: shouldn't be hardcoded + +int main(void) +{ + int error = 0; // return value + struct sockaddr_in sockaddr; + pthread_t workers[NUM_WORKERS]; + size_t i; + + if (lists_load("schachts.list", 0)) { + eprintf("fatal error: couldn't load the lists\n"); + goto error_lists; + } + + server_sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (server_sockfd == -1) { + perror("socket"); + error = 1; + goto error_socket; + } + + sockaddr.sin_family = AF_INET; + sockaddr.sin_addr.s_addr = INADDR_ANY; + sockaddr.sin_port = htons(1337); + + if (bind(server_sockfd, (void*)&sockaddr, sizeof(sockaddr)) == -1) { + perror("bind"); + error = 1; + goto error_bind; + } + + for (i = 0; i < NUM_WORKERS; i++) + pthread_create(workers + i, NULL, worker_main, NULL); + + while (1) { + char buffer[256]; + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + ssize_t size; + uint32_t query; + job_t *job; + + size = recvfrom(server_sockfd, buffer, sizeof(buffer), 0, + (void*)&addr, &addrlen); + if (size == -1) { + perror("recvfrom"); + goto error_recvfrom; + } + + if ((ntohl(addr.sin_addr.s_addr) & LOCALHOST_MASK) + != LOCALHOST_NETWORK) + continue; + + if (size < REQUEST_HEADER_LEN) + continue; + + if (memcmp(buffer, REQUEST_HEADER, REQUEST_HEADER_LEN)) + continue; + + if (parse_req_addr(buffer + REQUEST_HEADER_LEN, buffer + size, + &query)) + continue; + + job = job_create(); + job->type = JOB_TEST; + job->query = query; + memcpy(&job->reply_to, &addr, sizeof(addr)); + job_enqueue(job); + } + + job_quit(); + + for (i = 0; i < NUM_WORKERS; i++) + pthread_join(workers[i], NULL); + + cache_destroy(); + // todo: clear all jobs + +error_recvfrom: +error_bind: + shutdown(server_sockfd, SHUT_RDWR); + close(server_sockfd); +error_socket: + lists_destroy(); +error_lists: + return error; +} diff --git a/src/shared.h b/src/shared.h new file mode 100644 index 0000000..a615f83 --- /dev/null +++ b/src/shared.h @@ -0,0 +1,140 @@ +/* +Copyright (C) 2017 Paweł Redman + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 3 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include "eli.h" + +#define eprintf(fmt, ...) fprintf(stderr, fmt, ##__VA_ARGS__) +#define DEBUG(fmt, ...) fprintf(stderr, "%s: " fmt, __func__, ##__VA_ARGS__) + +#define LOCALHOST_NETWORK 0x7F000000 +#define LOCALHOST_MASK 0xFF000000 + +#define REQUEST_HEADER "query " +#define REQUEST_HEADER_LEN (sizeof(REQUEST_HEADER) - 1) +#define RESPONSE_HEADER "reply " + +// main.c + +extern int server_sockfd; + +// worker.c + +typedef enum { + JOB_TEST, + JOB_REVDNS, + JOB_WHOIS +} job_type_t; + +typedef struct job_s job_t; +struct job_s { + job_type_t type; + + uint32_t query; + struct sockaddr_in reply_to; + + eli_header list, dep_list; + job_t *parent, *deps; + size_t num_deps, deps_done; + pthread_mutex_t mutex; +}; + +job_t *job_create(void); +void job_enqueue(job_t *job); +void job_quit(void); +void *worker_main(void *unused); + +// cache.c + +typedef struct { + uint32_t ipv4; + + char *revdns; + uint64_t revdns_exptime; // FIXME: implement this already + char *whois; + uint64_t whois_exptime; + + eli_header ht_list; + pthread_mutex_t mutex; +} cache_entry_t; + +cache_entry_t *cache_find(uint32_t ipv4); +void cache_destroy(void); + +// lexer.c + +typedef struct { + char *data; + size_t size, alloc; +} vstr_t; + +#define VSTR_INITIALIZER ((vstr_t){NULL, 0, 0}) + +void vstr_init(vstr_t *vstr); +void vstr_destroy(vstr_t *vstr); +void vstr_clear(vstr_t *vstr); +int vstr_putc(vstr_t *vstr, char ch); +int vstr_cmp(vstr_t *vstr, const char *str); +char *vstr_strdup(vstr_t *vstr); +char *vstr_to_cstr(vstr_t *vstr); + +#define LEXER_BUFFER 1024 + +typedef struct { + int error; + const char *path; + FILE *fp; + bool eof; + + vstr_t *token; + char buf[LEXER_BUFFER]; + char *buf_c, *buf_e; + + size_t cc, lc, Cc; // character, line, and column counters + char last; + + bool in_token; + bool in_quote; + bool in_comment; +} lexer_state_t; + +int lexer_open(lexer_state_t *ls, const char *path, vstr_t *token); +void lexer_close(lexer_state_t *ls); +int lexer_get_token(lexer_state_t *ls); +void lexer_perror(lexer_state_t *ls, const char *fmt, ...); +void lexer_perror_eg(lexer_state_t *ls, const char *expected); + +// lists.c + +int lists_load(const char *file, size_t depth); +void lists_destroy(void); +int lists_test(const char *revdns, const char *whois); diff --git a/src/worker.c b/src/worker.c new file mode 100644 index 0000000..59a19cb --- /dev/null +++ b/src/worker.c @@ -0,0 +1,271 @@ +/* +Copyright (C) 2017 Paweł Redman + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 3 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "shared.h" +#include // reverse DNS + +static job_t *job_queue; +static pthread_cond_t job_queue_cond = PTHREAD_COND_INITIALIZER; +static pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; +static int job_quit_flag = 0; + +job_t *job_create(void) +{ + job_t *job; + + job = calloc(1, sizeof(job_t)); + if (!job) { + fprintf(stderr, "job_create: calloc failed\n"); + abort(); + } + + pthread_mutex_init(&job->mutex, NULL); + return job; +} + +void job_enqueue(job_t *job) +{ + pthread_mutex_lock(&job_queue_mutex); + eli_append(&job_queue, job, list); + pthread_mutex_unlock(&job_queue_mutex); + pthread_cond_signal(&job_queue_cond); +} + +static void job_add_dep(job_t *parent, job_t *dep) +{ + eli_append(&parent->deps, dep, dep_list); + dep->parent = parent; + parent->num_deps++; +} + +void job_quit(void) +{ + pthread_mutex_lock(&job_queue_mutex); + job_quit_flag = 1; + pthread_mutex_unlock(&job_queue_mutex); + pthread_cond_broadcast(&job_queue_cond); +} + +int _host = 0, _whois = 0; + +int read_everything(FILE *fp, char **buffer) +{ + size_t read = 0, alloc = 128, partial; + + alloc = 128; + *buffer = malloc(alloc); + if (!*buffer) + goto oom; + + while (1) { + size_t left = alloc - read; + + if (left == 0) { + alloc *= 2; + *buffer = realloc(*buffer, alloc); + if (!buffer) + goto oom; + + continue; + } + + partial = fread((*buffer) + read, 1, left, fp); + if (partial == 0) { + if (ferror(fp)) + return 1; + + break; + } + + read += partial; + } + + + *buffer = realloc(*buffer, read + 1); + (*buffer)[read] = '\0'; + return 0; + +oom: + fprintf(stderr, "read_everything: out of memory\n"); + abort(); +} + +int schachtest(const char *revdns, const char *whois) +{ + return 0; +} + +static void worker_job_test(job_t *job) +{ + cache_entry_t *entry; + job_t *dep; + int result; + char reply[1000]; + + entry = cache_find(job->query); + + if (!entry->revdns) { + DEBUG("cache miss on revdns\n"); + dep = job_create(); + dep->type = JOB_REVDNS; + dep->query = job->query; + job_add_dep(job, dep); + } + + if (!entry->whois) { + DEBUG("cache miss on whois\n"); + dep = job_create(); + dep->type = JOB_WHOIS; + dep->query = job->query; + job_add_dep(job, dep); + } + + if (!entry->revdns || !entry->whois) { + pthread_mutex_unlock(&entry->mutex); + return; + } + + DEBUG("ready for schachtest\n"); + + result = lists_test(entry->revdns, entry->whois); + pthread_mutex_unlock(&entry->mutex); + + snprintf(reply, sizeof(reply), "%s%hhu.%hhu.%hhu.%hhu %i\n", + RESPONSE_HEADER, + (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, + (job->query >> 8) & 0xFF, job->query & 0xFF, result); + // FIXME: handle sendto errors + sendto(server_sockfd, reply, strlen(reply), 0, (void*)&job->reply_to, + sizeof(job->reply_to)); +} + + +static void worker_job_revdns(job_t *job) +{ + struct sockaddr_in addr; + char buffer[NI_MAXHOST], *revdns; + int rv; + cache_entry_t *entry; + + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(job->query); + addr.sin_port = 0; + + rv = getnameinfo((void*)&addr, sizeof(addr), buffer, sizeof(buffer), + NULL, 0, NI_NAMEREQD); + switch (rv) { + case 0: + revdns = strdup(buffer); + break; + + case EAI_NONAME: + // FIXME: handle errors and missing revDNS entries better + revdns = strdup(""); + break; + + case EAI_SYSTEM: + revdns = strdup(""); + perror("getnameinfo"); + break; + + default: + revdns = strdup(""); + fprintf(stderr, "getnameinfo: %s\n", gai_strerror(rv)); + break; + } + + entry = cache_find(job->query); + free(entry->revdns); + entry->revdns = revdns; + DEBUG("revdns is done\n"); + pthread_mutex_unlock(&entry->mutex); +} + +static void worker_job_whois(job_t *job) +{ + char command[80]; + FILE *fp; + char *out; + cache_entry_t *entry; + + snprintf(command, sizeof(command), "whois \"%hhu.%hhu.%hhu.%hhu\"", + (job->query >> 24) & 0xFF, (job->query >> 16) & 0xFF, + (job->query >> 8) & 0xFF, job->query & 0xFF); + + fp = popen(command, "r"); + read_everything(fp, &out); + fclose(fp); + + entry = cache_find(job->query); + free(entry->whois); + entry->whois = out; + DEBUG("whois is done\n"); + pthread_mutex_unlock(&entry->mutex); +} + +void *worker_main(void *unused) +{ + for (;;) { + job_t *job; + + pthread_mutex_lock(&job_queue_mutex); + while (!job_queue && !job_quit_flag) + pthread_cond_wait(&job_queue_cond, &job_queue_mutex); + + if (job_quit_flag) + return NULL; + + job = job_queue; + eli_unlink(&job_queue, job, list); + pthread_mutex_unlock(&job_queue_mutex); + + switch (job->type) { + case JOB_TEST: + worker_job_test(job); + break; + + case JOB_REVDNS: + worker_job_revdns(job); + break; + + case JOB_WHOIS: + worker_job_whois(job); + break; + } + + // Handle the dependencies first. Keep this job off the queue. + if (job->deps) { + job_t *dep; + + eli_for(dep, job->deps, dep_list) + job_enqueue(dep); + memset(&job->deps, 0, sizeof(job->deps)); + continue; + } + + if (job->parent) { + pthread_mutex_lock(&job->parent->mutex); + job->parent->deps_done++; + if (job->parent->deps_done == job->parent->num_deps) + job_enqueue(job->parent); + pthread_mutex_unlock(&job->parent->mutex); + } + + free(job); + } +} -- cgit