/* This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ #include #include #include #include #include #include #include #include #include #include #include #include typedef uint64_t ntime_t; ntime_t nclock(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC_RAW, &ts); return (ntime_t)ts.tv_sec * 1000000000 + ts.tv_nsec; } static size_t count_newlines(void *data, size_t size) { size_t i, total = 0; for (i = 0; i < size; i++) if (((char*)data)[i] == '\n') total++; return total; } static size_t count_newlines_sse2(void *blocks, size_t num_blocks) { size_t i, total; __m128i const_0A, const_FF, counters; __attribute__((aligned(16))) uint8_t parts[16]; const_0A = _mm_set_epi64x(0x0A0A0A0A0A0A0A0A, 0x0A0A0A0A0A0A0A0A); const_FF = _mm_set_epi64x(0xFFFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFF); counters = const_FF; for (i = 0; i < num_blocks; i++) { __m128i data, tmp; data = _mm_load_si128(blocks + i * 16); tmp = _mm_cmpeq_epi8(data, const_0A); counters = _mm_add_epi8(counters, tmp); } counters = _mm_sub_epi8(const_FF, counters); _mm_store_si128((void*)parts, counters); for (total = 0, i = 0; i < 16; i++) total += (size_t)parts[i]; return total; } struct { void *cursor; size_t left; pthread_mutex_t mutex; size_t total; pthread_mutex_t total_mutex; } data_pool = { .mutex = PTHREAD_MUTEX_INITIALIZER, .total_mutex = PTHREAD_MUTEX_INITIALIZER }; #define CHUNK_SIZE (4194304) static void *thread_f(void *arg) { void *data; size_t size, total = 0; while (1) { pthread_mutex_lock(&data_pool.mutex); size = data_pool.left - data_pool.left % CHUNK_SIZE; if (!size) { pthread_mutex_unlock(&data_pool.mutex); break; } else if (size > CHUNK_SIZE) size = CHUNK_SIZE; data = data_pool.cursor; data_pool.cursor += size; data_pool.left -= size; pthread_mutex_unlock(&data_pool.mutex); while (size) { size_t blocks; blocks = size / 16; if (blocks > 255) blocks = 255; total += count_newlines_sse2(data, blocks); data += blocks * 16; size -= blocks * 16; } } pthread_mutex_lock(&data_pool.total_mutex); data_pool.total += total; pthread_mutex_unlock(&data_pool.total_mutex); return NULL; } struct { size_t size, lines; ntime_t time; } totals; static int turbowc(const char *path, size_t num_threads) { int ret = 1, fd; struct stat stat; void *map; size_t i; pthread_t *threads = NULL; fd = open(path, O_RDONLY); if (fd == -1) { perror(path); goto error_open; } fstat(fd, &stat); if (fd == -1) { perror("fstat"); goto error_fstat; } map = mmap(NULL, stat.st_size, PROT_READ, MAP_SHARED, fd, 0); if (!map) { perror("mmap"); goto error_mmap; } data_pool.cursor = map; data_pool.left = stat.st_size; data_pool.total = 0; if (num_threads) { threads = calloc(num_threads, sizeof(pthread_t)); if (!threads) { perror("malloc"); goto error_malloc; } } for (i = 0; i < num_threads - 1; i++) { int rv; rv = pthread_create(threads + i, NULL, thread_f, NULL); if (rv) { size_t j; perror("pthread_create"); for (j = 0; j < i; j++) pthread_cancel(threads[j]); goto error_pthread; } } thread_f(NULL); for (i = 0; i < num_threads - 1; i++) pthread_join(threads[i], NULL); data_pool.total += count_newlines(data_pool.cursor, data_pool.left); printf("%zu %s\n", data_pool.total, path); totals.size += stat.st_size; totals.lines += data_pool.total; ret = 0; error_pthread: if (num_threads) free(threads); error_malloc: munmap(map, stat.st_size); error_mmap: error_fstat: close(fd); error_open: return ret; } int main(int argc, char **argv) { int opt; size_t num_threads = 1; _Bool print_stats = 0; ntime_t t0; while ((opt = getopt(argc, argv, "vt:s")) != -1) { switch (opt) { case 'v': printf("turbowc, with mmap, SSE2 and multithreading\n"); break; case 't': num_threads = strtoull(optarg, NULL, 10); break; case 's': print_stats = 1; break; default: print_usage: fprintf(stderr, "usage: turbowc [-v] [-s] [-t THREADS] [FILE]\n"); return 1; } } if (optind >= argc) goto print_usage; t0 = nclock(); for (; optind < argc; optind++ ) if (turbowc(argv[optind], num_threads)) return 1; totals.time = nclock() - t0; if (!print_stats) goto skip_stats; printf("Lines\t%zu\n", totals.lines); printf("Size\t%zu\t%f\t-\n", totals.size, (double)totals.size / totals.lines); printf("Time\t%f s\t%f ns\t%f ps\n", totals.time * 1.0e-9, (double)totals.time / totals.lines, totals.time * 1.0e+3 / totals.size); printf("Thput\t%f GB/s\n", (double)totals.size / totals.time); skip_stats: return 0; }