diff options
Diffstat (limited to 'executor/executor.cc')
| -rw-r--r-- | executor/executor.cc | 49 |
1 files changed, 33 insertions, 16 deletions
diff --git a/executor/executor.cc b/executor/executor.cc index 96676a676..e126d466d 100644 --- a/executor/executor.cc +++ b/executor/executor.cc @@ -12,9 +12,11 @@ #include <fcntl.h> #include <time.h> #include <sys/types.h> +#include <sys/time.h> #include <sys/syscall.h> #include <sys/stat.h> #include <sys/mman.h> +#include <linux/futex.h> #include <pthread.h> #include <grp.h> #include <algorithm> @@ -68,8 +70,8 @@ struct thread_t { pthread_t th; uint32_t cover_data[16 << 10]; uint64_t* copyout_pos; - bool ready; - bool done; + int ready; + int done; bool handled; int call_n; int call_index; @@ -87,7 +89,7 @@ __attribute__((noreturn)) void fail(const char* msg, ...); __attribute__((noreturn)) void error(const char* msg, ...); __attribute__((noreturn)) void exitf(const char* msg, ...); void debug(const char* msg, ...); -uint64_t read_input(uint64_t** input_posp); +uint64_t read_input(uint64_t** input_posp, bool peek = false); uint64_t read_arg(uint64_t** input_posp); uint64_t read_result(uint64_t** input_posp); void write_output(uint32_t v); @@ -196,21 +198,36 @@ retry: if (collide && (n % 2)) { // Don't wait for every other call. // We already have results from the previous execution. - } else if (flag_threaded) { + } + else if (flag_threaded) { // Wait for call completion. uint64_t start = current_time_ms(); - while (!__atomic_load_n(&th->done, __ATOMIC_ACQUIRE) && (current_time_ms() - start) < 100) - usleep(10); + uint64_t now = start; + for (;;) { + timespec ts = {}; + ts.tv_sec = 0; + ts.tv_nsec = (100 - (now - start)) * 1000 * 1000; + syscall(SYS_futex, &th->done, FUTEX_WAIT, 0, &ts); + if (__atomic_load_n(&th->done, __ATOMIC_RELAXED)) + break; + now = current_time_ms(); + if (now - start > 100) + break; + } if (__atomic_load_n(&th->done, __ATOMIC_ACQUIRE)) handle_completion(th); // Check if any of previous calls have completed. - usleep(100); + // Give them some additional time, because they could have been + // just unblocked by the current call. + bool last = read_input(&input_pos, true); + usleep(last ? 1000 : 100); for (int i = 0; i < kMaxThreads; i++) { th = &threads[i]; if (__atomic_load_n(&th->done, __ATOMIC_ACQUIRE) && !th->handled) handle_completion(th); } - } else { + } + else { // Execute directly. if (th != &threads[0]) fail("using non-main thread in non-thread mode"); @@ -218,7 +235,6 @@ retry: handle_completion(th); } } - // TODO: handle hanged threads. if (flag_collide) { if (!collide) { @@ -226,8 +242,6 @@ retry: collide = true; goto retry; } - // Give any pending calls time to complete, since we did not wait for them. - usleep(1000); } debug("exiting\n"); @@ -270,7 +284,8 @@ thread_t* schedule_call(int n, int call_index, int call_num, uint64_t num_args, th->num_args = num_args; for (int i = 0; i < kMaxArgs; i++) th->args[i] = args[i]; - __atomic_store_n(&th->ready, true, __ATOMIC_RELEASE); + __atomic_store_n(&th->ready, 1, __ATOMIC_RELEASE); + syscall(SYS_futex, &th->ready, FUTEX_WAKE); return th; } @@ -315,7 +330,7 @@ void* worker_thread(void* arg) cover_init(th); for (;;) { while (!__atomic_load_n(&th->ready, __ATOMIC_ACQUIRE)) - usleep(10); + syscall(SYS_futex, &th->ready, FUTEX_WAIT, 0, 0); execute_call(th); } return 0; @@ -345,7 +360,8 @@ void execute_call(thread_t* th) debug("#%d: %s = errno(%d)\n", th->id, call->name, errno0); else debug("#%d: %s = %lx\n", th->id, call->name, th->res); - __atomic_store_n(&th->done, true, __ATOMIC_RELEASE); + __atomic_store_n(&th->done, 1, __ATOMIC_RELEASE); + syscall(SYS_futex, &th->done, FUTEX_WAKE); } void cover_open() @@ -484,12 +500,13 @@ uint64_t read_result(uint64_t** input_posp) return arg; } -uint64_t read_input(uint64_t** input_posp) +uint64_t read_input(uint64_t** input_posp, bool peek) { uint64_t* input_pos = *input_posp; if ((char*)input_pos >= input_data + kMaxInput) fail("input command overflows input"); - *input_posp = input_pos + 1; + if (!peek) + *input_posp = input_pos + 1; return *input_pos; } |
