Respecting the scheduler in Erlang NIFs

Companion code for this post available on Github

Recently, I did a small writeup on creating natively implemented functions (NIFs) in Erlang. But as was brought up by several people on reddit and lobste.rs, that example did not account for any sort of coöperative scheduling with the VM.

The BEAM has one scheduler per CPU core, and these (and, in later versions of Erlang, the dirty schedulers) are the executors on which all code is run. In order for the BEAm to try and guarantee a soft-real-time execution environment, it needs to be able to track how much work each process has done, so that it can keep CPU hogs in check and not starve other processes. To do this, it uses the concept of reductions. Each time a statement is evaluated (reduced), the number of reductions performed by that process is incremented, until it reaches a set limit and the process is context switched out.

If we take a look at the erl_nif man page, we can see that there is one method that deals with accounting for nif time: enif_consume_timeslice. This method expects to be called relatively regularly from your NIF, with an argument that is the percentage of a time slice that you believe you have used since the last time you called that method. The idea of a time slice is somewhat lackluster in its specificity:

The time is specified as a percent of the timeslice that a process is allowed to execute Erlang code until it can be suspended to give time for other runnable processes. The scheduling timeslice is not an exact entity, but can usually be approximated to about 1 millisecond.

If we take a look at the actual implementation we can see what’s actually happening here:

int enif_consume_timeslice(ErlNifEnv* env, int percent) {
    Process *proc;
    Sint reds;

    execution_state(env, &proc, NULL);

    ASSERT(is_proc_bound(env) && percent >= 1 && percent <= 100);
    if (percent < 1) percent = 1;
    else if (percent > 100) percent = 100;

    reds = ((CONTEXT_REDS+99) / 100) * percent;
    ASSERT(reds > 0 && reds <= CONTEXT_REDS);
    BUMP_REDS(proc, reds);
    return ERTS_BIF_REDS_LEFT(proc) == 0;
}

When we call this method, we increase the number of reductions this process has executed by CONTEXT_REDS (the number of reductions a process may perform before it should be context-switched out) divided by our percentage.

So far so good. But now, we need to see how we can actually restructure our code to allow it to be context switched out. The best way to do this is to use the primitive enif_schedule_nif, which allows you to specify a function pointer and arguments that should be called to continue your calculations. Note that the NIF scheduled in this way does not need to be exported - meaning that it is fairly safe to create helper NIFs under the assumption that end users will not try and call them. So let’s take a look at how we might re-write our Levenshtein example to allow it to perform work in chunks:

// The first thing you'll want to do is create a struct to maintain any
// internal state you want to pass between calls to your NIF.
// In this case, I need to keep track of my matrix, the input strings,
// and which x, y position I had iterated up to.
struct LevenshteinState {
    // The matrix being used to calculate the distance
    unsigned int *matrix;

    // The input strings + their sizes
    unsigned char *s1;
    unsigned s1len;
    unsigned char *s2;
    unsigned s2len;

    // The index of the last processed row of the matrix,
    // so that the next iteration can pick up where we left off
    unsigned int lastX;
    unsigned int lastY;
};

// Now, let's rewrite our entry point so that all it does is read in the
// command arguments, validate them and then yield a call to our internal NIF
// that will do all the actual work.
static ERL_NIF_TERM erl_levenshtein(ErlNifEnv* env, int argc, const
                                    ERL_NIF_TERM argv[]) {
    // Not pictured: verifying argc, the type of the arguments,
    // and casting the binaries to ErlNifBinary structs.

    // Retrieve the state resource descriptor from our priv data,
    // and allocate a new structure
    // PrivData here is a custom struct, initialized during our module load
    // callback. See the code on github for the full implementation with
    // regards to this.
    struct PrivData *priv_data = enif_priv_data(env);
    struct LevenshteinState* state = enif_alloc_resource(
        priv_data->levenshtein_state_resource,
        sizeof(struct LevenshteinState)
    );

    //// Initialize the calculation state
    // Allocate our matrix
    size_t matrix_size = (
        sizeof(unsigned int) * (binary1.size + 1) * (binary2.size + 1)
    );
    state->matrix = malloc(matrix_size);
    // Copy the binary term info
    state->s1 = binary1.data;
    state->s1len = binary1.size;
    state->s2 = binary2.data;
    state->s2len = binary2.size;
    // Set our initial X and Y values
    state->lastX = 1;
    state->lastY = 1;

    // In the full version, here is where I also initialize the first row and
    // column of the matrix, in order to simplify the code in the helper NIF.

    // Now that the setup is complete, we can call erl_schedule_nif to
    // tell the beam our continuation function.
    // First, we need to wrap our data resource so that it can be passed
    // through the BEAM. enif_make_resource takes our state pointer and returns
    // an ERL_NIF_TERM.
    ERL_NIF_TERM state_term = enif_make_resource(env, state);
    // The NIF name here does not seem to be used for determining what code to
    // call, and is likely only used when debugging what code is running.
    return enif_schedule_nif(
        env,
        "levenshtein_yielding", // NIF to call
        0, // Flags
        erl_levenshtein_yielding,
        1,
        args
    );

Now that the glue code is out of the way, we can create our erl_levenshtein_yielding method, which for workloads greater than a millisecond we can expect will be called multiple times for the given input. It will take a single argument, our wrapped state from before, unwrap it, and continue wherever the previous call left off.

static ERL_NIF_TERM erl_levenshtein_yielding(ErlNifEnv* env, int argc,
                                             const ERL_NIF_TERM argv[]) {
    // Not pictured: argc check

    // Extract the state term. In the same way we wrapped it before, we need
    // to now unwrap the resource we used to pass our struct through.
    struct PrivData *priv_data = enif_priv_data(env);
    struct LevenshteinState* state;
    if (!enif_get_resource(env, argv[0],
                           priv_data->levenshtein_state_resource,
                           ((void*) (&state)))) {
        return mk_error(env, "bad_internal_state");
    }

    // Start processing wherever the previous slice left off
    const unsigned int xsize = state->s1len + 1;
    unsigned int x = state->lastX;
    unsigned int y = state->lastY;

    // Specs for tracking function runtime
    struct timespec start_time;
    struct timespec current_time;

    // Grab the function start time
    clock_gettime(CLOCK_MONOTONIC, &start_time);

    // Create a tracker for the number of loop iterations we've done.
    // This operation count will act as a punctuator for us to check
    // whether it's time for us to yield again.
    unsigned long operations = 0;

    // This is a bit slimy, but is the simplest way to preload
    // the x and y loop vars for the first inner loop iteration
    goto loop_inner;

    // Loop over the matrix
    for (x = state->lastX; x <= state->s2len; x++) {
        for (y = 1; y <= state->s1len; y++) {
loop_inner:
            // Ordinary Levenshtein implementation
            MATRIX_ELEMENT(state->matrix, xsize, x, y) = MIN3(
                MATRIX_ELEMENT(state->matrix, xsize, x-1, y) + 1,
                MATRIX_ELEMENT(state->matrix, xsize, x, y-1) + 1,
                MATRIX_ELEMENT(state->matrix, xsize, x-1, y-1) +
                    (state->s1[y-1] == state->s2[x-1] ? 0 : 1)
            );

            // For each cell, increment the op count until we hit our
            // check threshold.
            if (unlikely(operations++ > OPERATIONS_BETWEN_TIMECHEKS)) {
                // When we do, get the current time
                clock_gettime(CLOCK_MONOTONIC, &current_time);

                // Figure out how many nanoseconds have passed since we started
                // calculating
                unsigned long nanoseconds_diff = (
                    (current_time.tv_nsec - start_time.tv_nsec) +
                    (current_time.tv_sec - start_time.tv_sec) * 1000000000
                );

                // Convert that to a percentage of a timeslice, assuming that
                // a time slice is 1 millisecond.
                int slice_percent = (nanoseconds_diff * 100) / TIMESLICE_NANOSECONDS;

                // enif_consume_timeslice requires a percentage in the range
                // 1 <= timeslice <= 100
                if (slice_percent < 1) {
                    slice_percent = 1;
                } else if (slice_percent > 100) {
                    slice_percent = 100;
                }

                // Consume that amount of a timeslice.
                // If the result is 1, then we have consumed the entire slice and
                // should now yield.
                if (enif_consume_timeslice(env, slice_percent)) {
                    // Break out of both loops
                    goto loop_exit;
                }

                // If we're not done, shift the times over and keep looping
                start_time.tv_sec = current_time.tv_sec;
                start_time.tv_nsec = current_time.tv_nsec;
                operations = 0;
            }
        }
    }

loop_exit:
    // If we exited the loop via jump, we must have run out of time
    // in this slice. Update our state and yield the next cycle.
    if (likely(x <= state->s2len || y <= state->s1len)) {
        // Update the state with the next row value to process
        state->lastX = x;
        state->lastY = y;

        // Yield another call to ourselves.
        // We can re-use our argv, since we're reusing the same state struct.
        return enif_schedule_nif(
            env,
            "levenshtein_yielding", // NIF to call
            0, // Flags
            erl_levenshtein_yielding,
            1,
            argv
        );
    }

    // If we are done, grab the result
    unsigned int result = MATRIX_ELEMENT(
        state->matrix, xsize, state->s2len, state->s1len);

    // We've finished, so it's time to free the work state
    // state.
    free(state->matrix);
    enif_release_resource(state);

    // Return the calculated value
    return enif_make_int(env, result);
}

Now that we’ve added all that complexity, let’s see whether it was worth it. The hypothesis is that without the timeslice accounting, we would hog the scheduler and not allow other processes to run in time. So to test that, let’s create a process that tries to sleep for exactly one second, and prints how much over/under one second it actually slept for. While it’s running, we’ll also saturate all of our cores with processes that do nothing but run Levenshtein on large inputs. Here’s how we’ll do it:

realtime_test() ->
    % Allocate two large binaries
    A = << <<0>> || _ <- lists:seq(1, 10000) >>,
    B = << <<1>> || _ <- lists:seq(1, 10000) >>,

    % Create a printer process that tries to print regularly
    _Printer = spawn_link(fun() -> realtime_printer(os:system_time()) end),

    % Create enough adversarial worker processes to saturate all cores
    _Workers = [
        spawn_link(fun() -> realtime_worker(A, B) end)
        || _ <- lists:seq(1, erlang:system_info(logical_processors_available))
    ],
    ok.

% Spins forever, running our NIF on the input strings
realtime_worker(A, B) ->
    levenshtein:levenshtein(A, B),
    realtime_worker(A, B).

% Attempt to run exactly every second, and print how much we were off by.
realtime_printer(LastRan) ->
    timer:sleep(1000),
    Delta = os:system_time() - LastRan,
    DeltaMs = Delta / 1000000,
    Jitter = 1000000000 - Delta,
    JitterMs = Jitter / 1000000,
    io:format("Time since last schedule: ~p ms, Jitter: ~p ms~n", [
        DeltaMs, abs(JitterMs)
    ]),
    realtime_printer(os:system_time()).

First, as a baseline, let’s actually run this with an entirely Erlang version of Levenshtein to see what amount of jitter we should expect:

2> perftest:realtime_test().
Time since last schedule: 1004.905694 ms, Jitter: -4.905694 ms
Time since last schedule: 1003.176042 ms, Jitter: -3.176042 ms
Time since last schedule: 1003.292757 ms, Jitter: -3.292757 ms
Time since last schedule: 1003.264791 ms, Jitter: -3.264791 ms

As expected, we have a fairly low amount of deviation from our expected one second print loop. Now let’s see how it looks with our scheduler-friendly NIF implementation:

1> % With the yielding NIF implementation
1> perftest:realtime_test().
ok
Time since last schedule: 1002.378093 ms, Jitter: -2.378093 ms
Time since last schedule: 1002.232311 ms, Jitter: -2.232311 ms
Time since last schedule: 1003.469838 ms, Jitter: -3.469838 ms
Time since last schedule: 1002.724563 ms, Jitter: -2.724563 ms
Time since last schedule: 1002.120373 ms, Jitter: -2.120373 ms
Time since last schedule: 1003.110727 ms, Jitter: -3.110727 ms
Time since last schedule: 1002.924888 ms, Jitter: -2.924888 ms
Time since last schedule: 1002.408802 ms, Jitter: -2.408802 ms
Time since last schedule: 1002.575524 ms, Jitter: -2.575524 ms

Hardly any difference! Looks like our time slice management is enough to keep call latencies in check. Let’s see what happens when we run this test case on our previous non-yielding NIF implementation:

1> perftest:realtime_test().
ok
[shell becomes unresponsive]

In the end I wasn’t able to compare the jitter between this scheduler-friendly implementation and older implementation, because the older version completely hogs the scheduler, rendering the shell entirely inoperable. So I think we can consider this a good reason to make NIFs that perform heavy lifting report their status!

While much better at respecting the soft-realtime native of the BEAM, the naïve implementation here adds a lot of calls to clock_gettime and the extra overhead of having to yield. If we compare the performance against the old version, we do have a performance decrease:

1> perftest:perftest(100000, fun levenshtein:unfair_levenshtein/2).
209974.53409952734
2> perftest:perftest(100000, fun levenshtein:yielding_levenshtein/2).
149317.74831923426

Our unfair code is approximately 1.4x the speed of the fair code. A noticeable amount, but worth it if you are running this calclation in an environment that also has time-sensitive processes you do not want to disrupt.

As before, a full working copy of the source code can be seen on Github

Comments