/* This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see */
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
typedef uint64_t ntime_t;
ntime_t nclock(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
return (ntime_t)ts.tv_sec * 1000000000 + ts.tv_nsec;
}
static size_t count_newlines(void *data, size_t size)
{
size_t i, total = 0;
for (i = 0; i < size; i++)
if (((char*)data)[i] == '\n')
total++;
return total;
}
static size_t count_newlines_sse2(void *blocks, size_t num_blocks)
{
size_t i, total;
__m128i const_0A, const_FF, counters;
__attribute__((aligned(16))) uint8_t parts[16];
const_0A = _mm_set_epi64x(0x0A0A0A0A0A0A0A0A, 0x0A0A0A0A0A0A0A0A);
const_FF = _mm_set_epi64x(0xFFFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFF);
counters = const_FF;
for (i = 0; i < num_blocks; i++) {
__m128i data, tmp;
data = _mm_load_si128(blocks + i * 16);
tmp = _mm_cmpeq_epi8(data, const_0A);
counters = _mm_add_epi8(counters, tmp);
}
counters = _mm_sub_epi8(const_FF, counters);
_mm_store_si128((void*)parts, counters);
for (total = 0, i = 0; i < 16; i++)
total += (size_t)parts[i];
return total;
}
struct {
void *cursor;
size_t left;
pthread_mutex_t mutex;
size_t total;
pthread_mutex_t total_mutex;
} data_pool = {
.mutex = PTHREAD_MUTEX_INITIALIZER,
.total_mutex = PTHREAD_MUTEX_INITIALIZER
};
static void *thread_f(void *arg)
{
void *data;
size_t size, total = 0;
while (1) {
pthread_mutex_lock(&data_pool.mutex);
size = data_pool.left & (~15);
if (!size) {
pthread_mutex_unlock(&data_pool.mutex);
break;
}
else if (size > 16 * 256 * 64)
size = 16 * 256 * 64;
data = data_pool.cursor;
data_pool.cursor += size;
data_pool.left -= size;
pthread_mutex_unlock(&data_pool.mutex);
while (size) {
size_t blocks;
blocks = size / 16;
if (blocks > 255)
blocks = 255;
total += count_newlines_sse2(data, blocks);
data += blocks * 16;
size -= blocks * 16;
}
}
pthread_mutex_lock(&data_pool.total_mutex);
data_pool.total += total;
pthread_mutex_unlock(&data_pool.total_mutex);
return NULL;
}
struct {
size_t size, lines;
ntime_t time;
} totals;
static int turbowc(const char *path, size_t num_threads)
{
int ret = 1, fd;
struct stat stat;
void *map;
size_t i;
pthread_t *threads = NULL;
fd = open(path, O_RDONLY);
if (fd == -1) {
perror(path);
goto error_open;
}
fstat(fd, &stat);
if (fd == -1) {
perror("fstat");
goto error_fstat;
}
map = mmap(NULL, stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
if (!map) {
perror("mmap");
goto error_mmap;
}
data_pool.cursor = map;
data_pool.left = stat.st_size;
data_pool.total = 0;
if (num_threads) {
threads = calloc(num_threads, sizeof(pthread_t));
if (!threads) {
perror("malloc");
goto error_malloc;
}
}
for (i = 0; i < num_threads - 1; i++) {
int rv;
rv = pthread_create(threads + i, NULL, thread_f, NULL);
if (rv) {
size_t j;
perror("pthread_create");
for (j = 0; j < i; j++)
pthread_cancel(threads[j]);
goto error_pthread;
}
}
thread_f(NULL);
for (i = 0; i < num_threads - 1; i++)
pthread_join(threads[i], NULL);
data_pool.total += count_newlines(data_pool.cursor, data_pool.left);
printf("%zu %s\n", data_pool.total, path);
totals.size += stat.st_size;
totals.lines += data_pool.total;
ret = 0;
error_pthread:
if (num_threads)
free(threads);
error_malloc:
munmap(map, stat.st_size);
error_mmap:
error_fstat:
close(fd);
error_open:
return ret;
}
int main(int argc, char **argv)
{
int opt;
size_t num_threads = 1;
_Bool print_stats = 0;
ntime_t t0;
while ((opt = getopt(argc, argv, "vt:s")) != -1) {
switch (opt) {
case 'v':
printf("turbowc, with mmap, SSE2 and multithreading\n");
break;
case 't':
num_threads = strtoull(optarg, NULL, 10);
break;
case 's':
print_stats = 1;
break;
default:
print_usage:
fprintf(stderr, "usage: turbowc [-v] [-s] [-t THREADS] [FILE]\n");
return 1;
}
}
if (optind >= argc)
goto print_usage;
t0 = nclock();
for (; optind < argc; optind++ )
if (turbowc(argv[optind], num_threads))
return 1;
totals.time = nclock() - t0;
if (!print_stats)
goto skip_stats;
printf("Lines\t%zu\n", totals.lines);
printf("Size\t%zu\t%f\t-\n",
totals.size,
(double)totals.size / totals.lines);
printf("Time\t%f s\t%f ns\t%f ps\n",
totals.time * 1.0e-9,
(double)totals.time / totals.lines,
totals.time * 1.0e+3 / totals.size);
printf("Thput\t%f GB/s\n", (double)totals.size / totals.time);
skip_stats:
return 0;
}