28#define SCHED_LOG(FMT, ...) DBG("SCHED", FMT __VA_OPT__(, ) __VA_ARGS__)
35#define SCHED_TRACE(FMT, ...) DBG("SCHED", FMT __VA_OPT__(, ) __VA_ARGS__)
37#define SCHED_TRACE(...) \
44#define SRN_MAX_WORKERS 256
209#define SRN_FIBER_LOCAL_RING_CAP 256
211 "SRN_FIBER_LOCAL_RING_CAP must be a power of two");
267 "failed to initialise the scheduler lock");
270 "failed to initialise the scheduler condition");
325 "srn_sched_shutdown must be called from outside the worker pool");
331 "srn_sched_shutdown called while srn_sched_run is active; call "
332 "srn_sched_stop and let the run return first");
338 for (
int i = 1; i < sched->
nworkers; i++) {
356 while (fiber !=
nullptr) {
358 SCHED_LOG(
"shutdown reaping unfinished fiber '%s' (suspended with no waker?)",
359 fiber->
name !=
nullptr ? fiber->
name :
"<unnamed>");
405 atomic_fetch_add(&sched->
runnable, 1);
407 if (atomic_load(&sched->
idle) > 0) {
410 SCHED_TRACE(
"waking a parked os thread (runnable=%ld)", (
long)atomic_load(&sched->
runnable));
430 intptr_t b = atomic_load_explicit(&w->
bottom, memory_order_relaxed);
431 intptr_t t = atomic_load_explicit(&w->
top, memory_order_acquire);
445 atomic_thread_fence(memory_order_release);
446 atomic_store_explicit(&w->
bottom, b + 1, memory_order_relaxed);
448 SCHED_TRACE(
"worker %d local-push fiber %p", w->
id, (
void *)fiber);
460 intptr_t b = atomic_load_explicit(&w->
bottom, memory_order_relaxed) - 1;
461 atomic_store_explicit(&w->
bottom, b, memory_order_relaxed);
463 atomic_thread_fence(memory_order_seq_cst);
464 intptr_t t = atomic_load_explicit(&w->
top, memory_order_relaxed);
474 if (atomic_compare_exchange_strong_explicit(&w->
top, &t, t + 1, memory_order_seq_cst,
475 memory_order_relaxed)) {
476 SCHED_TRACE(
"worker %d popped the last fiber %p", w->
id, (
void *)fiber);
479 SCHED_TRACE(
"worker %d lost the last fiber %p to a thief", w->
id, (
void *)fiber);
482 atomic_store_explicit(&w->
bottom, b + 1, memory_order_relaxed);
486 atomic_store_explicit(&w->
bottom, b + 1, memory_order_relaxed);
497 intptr_t t = atomic_load_explicit(&victim->
top, memory_order_acquire);
505 atomic_thread_fence(memory_order_seq_cst);
507 intptr_t b = atomic_load_explicit(&victim->
bottom, memory_order_acquire);
512 memory_order_relaxed);
513 if (!atomic_compare_exchange_strong_explicit(&victim->
top, &t, t + 1, memory_order_seq_cst,
514 memory_order_relaxed)) {
533 fiber->
link =
nullptr;
542 atomic_fetch_add(&sched->
runnable, 1);
544 SCHED_TRACE(
"global-push fiber %p (runnable=%ld)", (
void *)fiber,
545 (
long)atomic_load(&sched->
runnable));
547 if (atomic_load(&sched->
idle) > 0) {
559 if (fiber !=
nullptr) {
564 fiber->
link =
nullptr;
586 SCHED_TRACE(
"worker %d local deque full, overflow fiber %p to global", w->
id, (
void *)fiber);
620 if (fiber ==
nullptr) {
624 if (fiber ==
nullptr) {
625 for (
int i = 1; i < sched->
nworkers; i++) {
632 if (fiber !=
nullptr) {
633 SCHED_TRACE(
"worker %d stole fiber %p from worker %d", w->
id, (
void *)fiber, victim->
id);
639 if (fiber !=
nullptr) {
640 atomic_fetch_sub(&sched->
runnable, 1);
665 if (fiber ==
nullptr) {
674 atomic_fetch_add(&sched->
idle, 1);
682 while (atomic_load(&sched->
runnable) == 0 &&
691 atomic_fetch_sub(&sched->
idle, 1);
732 if (!commit(fiber, park_arg)) {
748 while (waiters !=
nullptr) {
769 worker->current =
nullptr;
786 PANIC_IF(sched->
destroyed,
"srn_sched_run called on a scheduler that was already shut down");
808 for (
int i = 0; i < nworkers; i++) {
816 atomic_init(&w->
top, 0);
817 atomic_init(&w->
bottom, 0);
832 for (
int i = 1; i < nworkers; i++) {
834 PANIC(
"failed to spawn an os thread");
static srn_fiber_result_t worker(srn_context_t *ctx, void *arg)
static srn_fiber_result_t waiter(srn_context_t *ctx, void *arg)
void srn_mm_free(srn_mm_t *mm, void *ptr)
Release a pointer previously returned by srn_mm_allocate or srn_mm_reallocate.
void * srn_mm_allocate(srn_mm_t *mm, size_t size)
Generic allocations that do not participate in the block based pools.
void srn_fiber_init_thread(srn_fiber_t *f)
Represent the calling OS thread as the running fiber ("#0"), so the scheduler or a test can switch aw...
void srn_fiber_switch(srn_fiber_t *from, srn_fiber_t *to)
Compiled without AddressSanitizer instrumentation: in stack-use-after-return mode ASan would place fr...
void srn_fiber_on_reap(srn_fiber_t *fiber)
Call when a finished fiber is reaped, after it has switched away for the last time.
AI Generated (🤦) Fiber subsystem overview.
#define srn_fiber_get_scheduler_m(fiber)
@ SRN_FIBER_RUNNING
Currently executing.
@ SRN_FIBER_READY
On the run queue, eligible to run.
@ SRN_FIBER_DONE
Entry returned. The result is final.
@ SRN_FIBER_SUSPENDED
Parked off the run queue, awaits srn_fiber_ready.
bool(* srn_fiber_park_fn)(srn_fiber_t *self, void *arg)
Suspend commit callback.
void * srn_fiber_result_t
enum srn_fiber_state_e srn_fiber_state_t
#define srn_mm_immortal_allocate(mm, T)
#define SRN_MAX_WORKERS
Upper bound on workers per run.
void srn_sched_register(srn_scheduler_t *sched, srn_fiber_t *fiber)
Record a fiber in the scheduler's registry of live fibers, where it stays until it is reaped.
srn_fiber_t * srn_fiber_worker_loop(void)
The worker's loop of the worker running on the calling os thread.
static void registry_add(srn_scheduler_t *sched, srn_fiber_t *fiber)
Insert at the head of the registry. Caller must hold sched->lock.
#define SCHED_LOG(FMT,...)
static void ready_fiber(srn_scheduler_t *sched, srn_fiber_t *fiber)
Wake a parked fiber by flipping SUSPENDED to READY and enqueuing it.
void srn_fiber_ready(srn_fiber_t *fiber)
Mark a suspended fiber runnable again.
static void worker_run(srn_worker_t *worker)
Run the worker routine over worker on the calling os thread.
srn_fiber_result_t srn_fiber_wait_for(srn_fiber_t *target)
Block the calling fiber until target finishes, then return its result.
static srn_fiber_t * local_pop(srn_worker_t *w)
Owner only.
static _Thread_local srn_worker_t * current_worker
The worker the calling os thread is running, or null when this os thread is not running the worker ro...
void srn_sched_shutdown(srn_scheduler_t *sched)
The one stop tear down of the fiber subsystem, should be called once srn_sched_run has returned.
static bool local_push(srn_worker_t *w, srn_fiber_t *fiber)
This operation is only for the owner of the ring.
static void registry_remove(srn_scheduler_t *sched, srn_fiber_t *fiber)
Unlink from the registry.
static void push_ready(srn_scheduler_t *sched, srn_fiber_t *fiber)
Put a runnable fiber on a queue, with its state already set to READY.
#define SRN_FIBER_LOCAL_RING_CAP
Capacity of each worker's local work-stealing deque.
void srn_sched_stop(srn_scheduler_t *sched)
Ask a running scheduler to stop.
void srn_sched_enqueue(srn_scheduler_t *sched, srn_fiber_t *fiber)
Place a fiber on a scheduler's ready queue, making it eligible to run.
static void announce_work(srn_scheduler_t *sched)
Wake the os thread of one parked worker after a fiber has joined a queue.
srn_scheduler_t * srn_sched_init(srn_engine_t *engine)
srn_fiber_t * srn_fiber_current(void)
The fiber currently running on this os thread (the bootstrap fiber if none).
void srn_sched_run(srn_scheduler_t *sched, int nworkers)
Run the scheduler with nworkers os threads draining it, returning once the pool goes quiescent (every...
static void worker_main(void *arg)
The entry an os thread starts in.
srn_sched_state_t
The scheduler's lifecycle as one atomic value.
static srn_fiber_t * global_take(srn_scheduler_t *sched)
Pop the head of the global queue, or null when empty.
static void global_enqueue(srn_scheduler_t *sched, srn_fiber_t *fiber)
Append a fiber to the global/overflow queue.
static srn_fiber_t * find_work(srn_worker_t *w)
Find a fiber to run: the worker's own deque first, then the global queue, then a steal of one fiber f...
static bool wait_for_park(srn_fiber_t *self, void *arg)
Add the calling fiber to the target's waiter list and stay parked, unless the target has already fini...
void srn_fiber_suspend(srn_fiber_park_fn commit, void *arg)
A suspended fiber is on no scheduler queue, and the scheduler does not track what it waits on – whoev...
void srn_fiber_yield(void)
Yield cooperatively: re-enqueue the running fiber and run the next ready one.
#define SCHED_TRACE(...)
Per-operation deque and queue tracing (push, pop, steal, wake).
static srn_fiber_t * local_steal(srn_worker_t *victim)
Thief side.
void srn_fiber_stack_free(srn_fiber_stack_t stack)
Engine is a structure to own the long living and main pieces of the compiler.
srn_mm_t * mm
Memory manager.
_Atomic srn_fiber_state_t state
The lifecycle state.
srn_fiber_t * link
Intrusive link threading this fiber onto one of the scheduler's singly-linked lists (the ready run qu...
srn_fiber_t * waiters
Head of the list of fibers blocked in srn_fiber_wait_for on this fiber.
srn_fiber_result_t result
Set when state reaches SRN_FIBER_DONE.
srn_fiber_park_fn park_commit
While this fiber is suspending, the commit the worker routine runs once the fiber is off the stack,...
srn_fiber_t * reg_prev
Registry links.
const char * name
For debugging purposes.
bool destroyed
Set once srn_sched_shutdown has torn the scheduler down.
_Atomic bool run_active
True for the duration of an srn_sched_run call.
srn_fiber_t * registry
Registry: head of the doubly-linked list (through reg_prev/reg_next) of every live fiber,...
srn_mutex_t lock
Global lock.
srn_cond_t work
Worker coordination.
srn_fiber_t * ready_head
Global / overflow queue.
srn_thread_t * os_threads
_Atomic srn_sched_state_t state
srn_worker_t * workers
srn_sched_run allocates these two arrays and srn_sched_shutdown frees them.
The state one os thread uses to run fibers.
atomic_intptr_t top
Chase-Lev deque.
srn_thread_t, srn_mutex_t, and srn_cond_t model the thread-level operations the runtime needs,...
srn_thread_status_t srn_mutex_destroy(srn_mutex_t *m)
Release a mutex's resources.
srn_thread_status_t srn_mutex_init(srn_mutex_t *m)
srn_thread_status_t srn_cond_destroy(srn_cond_t *c)
Release a condition's resources.
srn_thread_status_t srn_thread_join(srn_thread_t *t)
Block until the thread started for t returns.
srn_thread_status_t srn_mutex_unlock(srn_mutex_t *m)
srn_thread_status_t srn_cond_wait(srn_cond_t *c, srn_mutex_t *m)
Release m, sleep until notified, then re-acquire m before returning.
srn_thread_status_t srn_mutex_lock(srn_mutex_t *m)
srn_thread_status_t srn_cond_init(srn_cond_t *c)
srn_thread_status_t srn_cond_notify_one(srn_cond_t *c)
Wake one waiter.
srn_thread_status_t srn_cond_notify_all(srn_cond_t *c)
Wake every waiter.
srn_thread_status_t srn_thread_spawn(srn_thread_t *t, void(*fn)(void *), void *arg)
Run fn(arg) on a new OS thread.
#define PANIC_IF_NULL(ptr)
#define PANIC_IF(cond, msg)