#include #include #include #include #include #include #include #include #include #include #include #include static size_t count_newlines(void *data, size_t size) { size_t i, total = 0; for (i = 0; i < size; i++) if (((char*)data)[i] == '\n') total++; return total; } static size_t count_newlines_sse2(void *blocks, size_t num_blocks) { size_t i, total; __m128i const_0A, const_FF, counters; __attribute__((aligned(16))) uint8_t parts[16]; const_0A = _mm_set_epi64x(0x0A0A0A0A0A0A0A0A, 0x0A0A0A0A0A0A0A0A); const_FF = _mm_set_epi64x(0xFFFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFF); counters = const_FF; for (i = 0; i < num_blocks; i++) { __m128i data, tmp; data = _mm_load_si128(blocks + i * 16); tmp = _mm_cmpeq_epi8(data, const_0A); counters = _mm_add_epi8(counters, tmp); } counters = _mm_sub_epi8(const_FF, counters); _mm_store_si128((void*)parts, counters); for (total = 0, i = 0; i < 16; i++) total += (size_t)parts[i]; return total; } struct { void *cursor; size_t left; pthread_mutex_t mutex; size_t total; pthread_mutex_t total_mutex; } data_pool = { .mutex = PTHREAD_MUTEX_INITIALIZER, .total_mutex = PTHREAD_MUTEX_INITIALIZER }; static void *thread_f(void *arg) { void *data; size_t size, total = 0; while (1) { pthread_mutex_lock(&data_pool.mutex); size = data_pool.left & (~15); if (!size) { pthread_mutex_unlock(&data_pool.mutex); break; } else if (size > 16 * 256 * 64) size = 16 * 256 * 64; data = data_pool.cursor; data_pool.cursor += size; data_pool.left -= size; pthread_mutex_unlock(&data_pool.mutex); while (size) { size_t blocks; blocks = size / 16; if (blocks > 255) blocks = 255; total += count_newlines_sse2(data, blocks); data += blocks * 16; size -= blocks * 16; } } pthread_mutex_lock(&data_pool.total_mutex); data_pool.total += total; pthread_mutex_unlock(&data_pool.total_mutex); return NULL; } static int turbowc(const char *path, size_t num_threads) { int ret = 1, opt, fd; struct stat stat; void *map; size_t i; pthread_t *threads; struct timespec t0, t1; double delta; fd = open(path, O_RDONLY); if (fd == -1) { perror(path); goto error_open; } fstat(fd, &stat); if (fd == -1) { perror("fstat"); goto error_fstat; } map = mmap(NULL, stat.st_size, PROT_READ, MAP_SHARED, fd, 0); if (!map) { perror("mmap"); goto error_mmap; } data_pool.cursor = map; data_pool.left = stat.st_size; if (num_threads) { threads = calloc(num_threads, sizeof(pthread_t)); if (!threads) { perror("malloc"); goto error_malloc; } } clock_gettime(CLOCK_MONOTONIC, &t0); for (i = 0; i < num_threads; i++) { int rv; rv = pthread_create(threads + i, NULL, thread_f, NULL); if (rv) { size_t j; for (j = 0; j < i; j++) pthread_cancel(threads[j]); goto error_pthread; } } thread_f(NULL); for (i = 0; i < num_threads; i++) pthread_join(threads[i], NULL); data_pool.total += count_newlines(data_pool.cursor, data_pool.left); clock_gettime(CLOCK_MONOTONIC, &t1); printf("%zu %s\n", data_pool.total, path); delta = (t1.tv_sec - t0.tv_sec) + ((double)t1.tv_nsec - t0.tv_nsec) * 1.0e-9; fprintf(stderr, " Total size: %zu\n", stat.st_size); fprintf(stderr, " Total time: %f s\n", delta); fprintf(stderr, "Time / char: %f ns\n", delta / stat.st_size * 1.0e+9); fprintf(stderr, " Throughput: %f GB/s\n", stat.st_size / delta * 1.0e-9); fprintf(stderr, "Total lines: %zu\n", data_pool.total); fprintf(stderr, " Avg. line: %f char(s)\n", (double)stat.st_size / data_pool.total); fprintf(stderr, "Time / line: %f ns\n", delta / data_pool.total * 1.0e+9); ret = 0; error_pthread: if (num_threads) free(threads); error_malloc: munmap(map, stat.st_size); error_mmap: error_fstat: close(fd); error_open: return ret; } int main(int argc, char **argv) { int opt; size_t num_threads = 1; while ((opt = getopt(argc, argv, "vt:")) != -1) { switch (opt) { case 'v': printf("turbowc, with mmap, SSE2 and multithreading\n"); break; case 't': num_threads = strtoull(optarg, NULL, 10); break; default: print_usage: fprintf(stderr, "usage: turbowc [-v] [-t THREADS] [FILE]\n"); return 1; } } if (optind >= argc) goto print_usage; for (; optind < argc; optind++ ) if (turbowc(argv[optind], num_threads)) return 1; return 0; }