aboutsummaryrefslogtreecommitdiffstats
path: root/executor/executor.cc
diff options
context:
space:
mode:
Diffstat (limited to 'executor/executor.cc')
-rw-r--r--executor/executor.cc49
1 files changed, 33 insertions, 16 deletions
diff --git a/executor/executor.cc b/executor/executor.cc
index 96676a676..e126d466d 100644
--- a/executor/executor.cc
+++ b/executor/executor.cc
@@ -12,9 +12,11 @@
#include <fcntl.h>
#include <time.h>
#include <sys/types.h>
+#include <sys/time.h>
#include <sys/syscall.h>
#include <sys/stat.h>
#include <sys/mman.h>
+#include <linux/futex.h>
#include <pthread.h>
#include <grp.h>
#include <algorithm>
@@ -68,8 +70,8 @@ struct thread_t {
pthread_t th;
uint32_t cover_data[16 << 10];
uint64_t* copyout_pos;
- bool ready;
- bool done;
+ int ready;
+ int done;
bool handled;
int call_n;
int call_index;
@@ -87,7 +89,7 @@ __attribute__((noreturn)) void fail(const char* msg, ...);
__attribute__((noreturn)) void error(const char* msg, ...);
__attribute__((noreturn)) void exitf(const char* msg, ...);
void debug(const char* msg, ...);
-uint64_t read_input(uint64_t** input_posp);
+uint64_t read_input(uint64_t** input_posp, bool peek = false);
uint64_t read_arg(uint64_t** input_posp);
uint64_t read_result(uint64_t** input_posp);
void write_output(uint32_t v);
@@ -196,21 +198,36 @@ retry:
if (collide && (n % 2)) {
// Don't wait for every other call.
// We already have results from the previous execution.
- } else if (flag_threaded) {
+ }
+ else if (flag_threaded) {
// Wait for call completion.
uint64_t start = current_time_ms();
- while (!__atomic_load_n(&th->done, __ATOMIC_ACQUIRE) && (current_time_ms() - start) < 100)
- usleep(10);
+ uint64_t now = start;
+ for (;;) {
+ timespec ts = {};
+ ts.tv_sec = 0;
+ ts.tv_nsec = (100 - (now - start)) * 1000 * 1000;
+ syscall(SYS_futex, &th->done, FUTEX_WAIT, 0, &ts);
+ if (__atomic_load_n(&th->done, __ATOMIC_RELAXED))
+ break;
+ now = current_time_ms();
+ if (now - start > 100)
+ break;
+ }
if (__atomic_load_n(&th->done, __ATOMIC_ACQUIRE))
handle_completion(th);
// Check if any of previous calls have completed.
- usleep(100);
+ // Give them some additional time, because they could have been
+ // just unblocked by the current call.
+ bool last = read_input(&input_pos, true);
+ usleep(last ? 1000 : 100);
for (int i = 0; i < kMaxThreads; i++) {
th = &threads[i];
if (__atomic_load_n(&th->done, __ATOMIC_ACQUIRE) && !th->handled)
handle_completion(th);
}
- } else {
+ }
+ else {
// Execute directly.
if (th != &threads[0])
fail("using non-main thread in non-thread mode");
@@ -218,7 +235,6 @@ retry:
handle_completion(th);
}
}
- // TODO: handle hanged threads.
if (flag_collide) {
if (!collide) {
@@ -226,8 +242,6 @@ retry:
collide = true;
goto retry;
}
- // Give any pending calls time to complete, since we did not wait for them.
- usleep(1000);
}
debug("exiting\n");
@@ -270,7 +284,8 @@ thread_t* schedule_call(int n, int call_index, int call_num, uint64_t num_args,
th->num_args = num_args;
for (int i = 0; i < kMaxArgs; i++)
th->args[i] = args[i];
- __atomic_store_n(&th->ready, true, __ATOMIC_RELEASE);
+ __atomic_store_n(&th->ready, 1, __ATOMIC_RELEASE);
+ syscall(SYS_futex, &th->ready, FUTEX_WAKE);
return th;
}
@@ -315,7 +330,7 @@ void* worker_thread(void* arg)
cover_init(th);
for (;;) {
while (!__atomic_load_n(&th->ready, __ATOMIC_ACQUIRE))
- usleep(10);
+ syscall(SYS_futex, &th->ready, FUTEX_WAIT, 0, 0);
execute_call(th);
}
return 0;
@@ -345,7 +360,8 @@ void execute_call(thread_t* th)
debug("#%d: %s = errno(%d)\n", th->id, call->name, errno0);
else
debug("#%d: %s = %lx\n", th->id, call->name, th->res);
- __atomic_store_n(&th->done, true, __ATOMIC_RELEASE);
+ __atomic_store_n(&th->done, 1, __ATOMIC_RELEASE);
+ syscall(SYS_futex, &th->done, FUTEX_WAKE);
}
void cover_open()
@@ -484,12 +500,13 @@ uint64_t read_result(uint64_t** input_posp)
return arg;
}
-uint64_t read_input(uint64_t** input_posp)
+uint64_t read_input(uint64_t** input_posp, bool peek)
{
uint64_t* input_pos = *input_posp;
if ((char*)input_pos >= input_data + kMaxInput)
fail("input command overflows input");
- *input_posp = input_pos + 1;
+ if (!peek)
+ *input_posp = input_pos + 1;
return *input_pos;
}