Report a bug If you spot a problem with this page, click here to create a Bugzilla issue. Improve this page Quickly fork, edit online, and submit a pull request for this page. Requires a signed-in GitHub account. This works well for small changes. If you'd like to make larger changes you may want to consider using local clone.

std.concurrency

This is a low-level messaging API upon which more structured or restrictive APIs may be built. The general idea is that every messageable entity is represented by a common handle type called a Tid, which allows messages to be sent to logical threads that are executing in both the current process and in external processes using the same interface. This is an important aspect of scalability because it allows the components of a program to be spread across available resources with few to no changes to the actual implementation.
A logical thread is an execution context that has its own stack and which runs asynchronously to other logical threads. These may be preemptively scheduled kernel threads, fibers (cooperative user-space threads), or some other concept with similar behavior.

The type of concurrency used when logical threads are created is determined by the Scheduler selected at initialization time. The default behavior is currently to create a new kernel thread per call to spawn, but other schedulers are available that multiplex fibers across the main thread or use some combination of the two approaches.

Synposis:

import std.stdio;
import std.concurrency;

void spawnedFunc(Tid ownerTid)
{
    // Receive a message from the owner thread.
    receive(
        (int i) { writeln("Received the number ", i);}
    );

    // Send a message back to the owner thread
    // indicating success.
    send(ownerTid, true);
}

void main()
{
    // Start spawnedFunc in a new thread.
    auto childTid = spawn(&spawnedFunc, thisTid);

    // Send the number 42 to this new thread.
    send(childTid, 42);

    // Receive the result code.
    auto wasSuccessful = receiveOnly!(bool);
    assert(wasSuccessful);
    writeln("Successfully printed number.");
}
License:
Authors:
Sean Kelly, Alex Rønne Petersen, Martin Nowak

Source: std/concurrency.d

class MessageMismatch: object.Exception;
Thrown on calls to receiveOnly if a message other than the type the receiving thread expected is sent.
class OwnerTerminated: object.Exception;
Thrown on calls to receive if the thread that spawned the receiving thread has terminated and no more messages exist.
class LinkTerminated: object.Exception;
Thrown if a linked thread has terminated.
class PriorityMessageException: object.Exception;
Thrown if a message was sent to a thread via std.concurrency.prioritySend and the receiver does not have a handler for a message of this type.
Variant message;
The message that was sent.
class MailboxFull: object.Exception;
Thrown on mailbox crowding if the mailbox is configured with OnCrowding.throwException.
class TidMissingException: object.Exception;
Thrown when a Tid is missing, e.g. when ownerTid doesn't find an owner thread.
struct Tid;
An opaque type used to represent a logical thread.
void toString(scope void delegate(const(char)[]) sink);
Generate a convenient string for identifying this Tid. This is only useful to see if Tid's that are currently executing are the same or different, e.g. for logging and debugging. It is potentially possible that a Tid executed in the future will have the same toString() output as another Tid that has already terminated.
@property @safe Tid thisTid();
Returns the caller's Tid.
@property Tid ownerTid();
Return the Tid of the thread which spawned the caller's thread.
Throws:
A TidMissingException exception if there is no owner thread.
Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T));
Starts fn(args) in a new logical thread.
Executes the supplied function in a new logical thread represented by Tid. The calling thread is designated as the owner of the new thread. When the owner thread terminates an OwnerTerminated message will be sent to the new thread, causing an OwnerTerminated exception to be thrown on receive().
Parameters:
F fn The function to execute.
T args Arguments to the function.
Returns:
A Tid representing the new logical thread.

Notes: args must not have unshared aliasing. In other words, all arguments to fn must either be shared or immutable or have no pointer indirection. This is necessary for enforcing isolation among threads.

Example:

import std.stdio, std.concurrency;

void f1(string str)
{
    writeln(str);
}

void f2(char[] str)
{
    writeln(str);
}

void main()
{
    auto str = "Hello, world";

    // Works:  string is immutable.
    auto tid1 = spawn(&f1, str);

    // Fails:  char[] has mutable aliasing.
    auto tid2 = spawn(&f2, str.dup);
}
Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T));
Starts fn(args) in a logical thread and will receive a LinkTerminated message when the operation terminates.
Executes the supplied function in a new logical thread represented by Tid. This new thread is linked to the calling thread so that if either it or the calling thread terminates a LinkTerminated message will be sent to the other, causing a LinkTerminated exception to be thrown on receive(). The owner relationship from spawn() is preserved as well, so if the link between threads is broken, owner termination will still result in an OwnerTerminated exception to be thrown on receive().
Parameters:
F fn The function to execute.
T args Arguments to the function.
Returns:
A Tid representing the new thread.
void send(T...)(Tid tid, T vals);
Places the values as a message at the back of tid's message queue.
Sends the supplied value to the thread represented by tid. As with std.concurrency.spawn, T must not have unshared aliasing.
void prioritySend(T...)(Tid tid, T vals);
Places the values as a message on the front of tid's message queue.
Send a message to tid but place it at the front of tid's message queue instead of at the back. This function is typically used for out-of-band communication, to signal exceptional conditions, etc.
void receive(T...)(T ops);
Receives a message from another thread.
Receive a message from another thread, or block if no messages of the specified types are available. This function works by pattern matching a message against a set of delegates and executing the first match found.

If a delegate that accepts a std.variant.Variant is included as the last argument to receive, it will match any message that was not matched by an earlier delegate. If more than one argument is sent, the Variant will contain a std.typecons.Tuple of all values sent.

Example:

import std.stdio;
import std.variant;
import std.concurrency;

void spawnedFunction()
{
    receive(
        (int i) { writeln("Received an int."); },
        (float f) { writeln("Received a float."); },
        (Variant v) { writeln("Received some other type."); }
    );
}

void main()
{
     auto tid = spawn(&spawnedFunction);
     send(tid, 42);
}
receiveOnlyRet!T receiveOnly(T...)();
Receives only messages with arguments of types T.
Throws:
MessageMismatch if a message of types other than T is received.
Returns:
The received message. If T.length is greater than one, the message will be packed into a std.typecons.Tuple.

Example:

import std.concurrency;

void spawnedFunc()
{
    auto msg = receiveOnly!(int, string)();
    assert(msg[0] == 42);
    assert(msg[1] == "42");
}

void main()
{
    auto tid = spawn(&spawnedFunc);
    send(tid, 42, "42");
}
bool receiveTimeout(T...)(Duration duration, T ops);
Tries to receive but will give up if no matches arrive within duration. Won't wait at all if provided core.time.Duration is negative.
Same as receive except that rather than wait forever for a message, it waits until either it receives a message or the given core.time.Duration has passed. It returns true if it received a message and false if it timed out waiting for one.
enum OnCrowding: int;
These behaviors may be specified when a mailbox is full.
block
Wait until room is available.
throwException
Throw a MailboxFull exception.
ignore
Abort the send and return.
void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);
Sets a maximum mailbox size.
Sets a limit on the maximum number of user messages allowed in the mailbox. If this limit is reached, the caller attempting to add a new message will execute the behavior specified by doThis. If messages is zero, the mailbox is unbounded.
Parameters:
Tid tid The Tid of the thread for which this limit should be set.
size_t messages The maximum number of messages or zero if no limit.
OnCrowding doThis The behavior executed when a message is sent to a full mailbox.
void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis);
Sets a maximum mailbox size.
Sets a limit on the maximum number of user messages allowed in the mailbox. If this limit is reached, the caller attempting to add a new message will execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded.
Parameters:
Tid tid The Tid of the thread for which this limit should be set.
size_t messages The maximum number of messages or zero if no limit.
bool function(Tid) onCrowdingDoThis The routine called when a message is sent to a full mailbox.
bool register(string name, Tid tid);
Associates name with tid.
Associates name with tid in a process-local map. When the thread represented by tid terminates, any names associated with it will be automatically unregistered.
Parameters:
string name The name to associate with tid.
Tid tid The tid register by name.
Returns:
true if the name is available and tid is not known to represent a defunct thread.
bool unregister(string name);
Removes the registered name associated with a tid.
Parameters:
string name The name to unregister.
Returns:
true if the name is registered, false if not.
Tid locate(string name);
Gets the Tid associated with name.
Parameters:
string name The name to locate within the registry.
Returns:
The associated Tid or Tid.init if name is not registered.
struct ThreadInfo;
Encapsulates all implementation-level data needed for scheduling.
When definining a Scheduler, an instance of this struct must be associated with each logical thread. It contains all implementation-level information needed by the internal API.
static nothrow @property ref auto thisInfo();
Gets a thread-local instance of ThreadInfo.
Gets a thread-local instance of ThreadInfo, which should be used as the default instance when info is requested for a thread not created by the Scheduler.
void cleanup();
Cleans up this ThreadInfo.
This must be called when a scheduled thread terminates. It tears down the messaging system for the thread and notifies interested parties of the thread's termination.
interface Scheduler;
A Scheduler controls how threading is performed by spawn.
Implementing a Scheduler allows the concurrency mechanism used by this module to be customized according to different needs. By default, a call to spawn will create a new kernel thread that executes the supplied routine and terminates when finished. But it is possible to create Schedulers that reuse threads, that multiplex Fibers (coroutines) across a single thread, or any number of other approaches. By making the choice of Scheduler a user-level option, std.concurrency may be used for far more types of application than if this behavior were predefined.

Example:

import std.concurrency;
import std.stdio;

void main()
{
    scheduler = new FiberScheduler;
    scheduler.start(
    {
        writeln("the rest of main goes here");
    });
}

Some schedulers have a dispatching loop that must run if they are to work properly, so for the sake of consistency, when using a scheduler, start() must be called within main(). This yields control to the scheduler and will ensure that any spawned threads are executed in an expected manner.
abstract void start(void delegate() op);
Spawns the supplied op and starts the Scheduler.
This is intended to be called at the start of the program to yield all scheduling to the active Scheduler instance. This is necessary for schedulers that explicitly dispatch threads rather than simply relying on the operating system to do so, and so start should always be called within main() to begin normal program execution.
Parameters:
void delegate() op A wrapper for whatever the main thread would have done in the absence of a custom scheduler. It will be automatically executed via a call to spawn by the Scheduler.
abstract void spawn(void delegate() op);
Assigns a logical thread to execute the supplied op.
This routine is called by spawn. It is expected to instantiate a new logical thread and run the supplied operation. This thread must call thisInfo.cleanup() when the thread terminates if the scheduled thread is not a kernel thread--all kernel threads will have their ThreadInfo cleaned up automatically by a thread-local destructor.
Parameters:
void delegate() op The function to execute. This may be the actual function passed by the user to spawn itself, or may be a wrapper function.
abstract nothrow void yield();
Yields execution to another logical thread.
This routine is called at various points within concurrency-aware APIs to provide a scheduler a chance to yield execution when using some sort of cooperative multithreading model. If this is not appropriate, such as when each logical thread is backed by a dedicated kernel thread, this routine may be a no-op.
abstract nothrow @property ref ThreadInfo thisInfo();
Returns an appropriate ThreadInfo instance.
Returns an instance of ThreadInfo specific to the logical thread that is calling this routine or, if the calling thread was not create by this scheduler, returns ThreadInfo.thisInfo instead.
abstract nothrow Condition newCondition(Mutex m);
Creates a Condition variable analog for signaling.
Creates a new Condition variable analog which is used to check for and to signal the addition of messages to a thread's message queue. Like yield, some schedulers may need to define custom behavior so that calls to Condition.wait() yield to another thread when no new messages are available instead of blocking.
Parameters:
Mutex m The Mutex that will be associated with this condition. It will be locked prior to any operation on the condition, and so in some cases a Scheduler may need to hold this reference and unlock the mutex before yielding execution to another logical thread.
class ThreadScheduler: std.concurrency.Scheduler;
An example Scheduler using kernel threads.
This is an example Scheduler that mirrors the default scheduling behavior of creating one kernel thread per call to spawn. It is fully functional and may be instantiated and used, but is not a necessary part of the default functioning of this module.
void start(void delegate() op);
This simply runs op directly, since no real scheduling is needed by this approach.
void spawn(void delegate() op);
Creates a new kernel thread and assigns it to run the supplied op.
nothrow void yield();
This scheduler does no explicit multiplexing, so this is a no-op.
nothrow @property ref ThreadInfo thisInfo();
Returns ThreadInfo.thisInfo, since it is a thread-local instance of ThreadInfo, which is the correct behavior for this scheduler.
nothrow Condition newCondition(Mutex m);
Creates a new Condition variable. No custom behavior is needed here.
class FiberScheduler: std.concurrency.Scheduler;
An example Scheduler using Fibers.
This is an example scheduler that creates a new Fiber per call to spawn and multiplexes the execution of all fibers within the main thread.
void start(void delegate() op);
This creates a new Fiber for the supplied op and then starts the dispatcher.
nothrow void spawn(void delegate() op);
This created a new Fiber for the supplied op and adds it to the dispatch list.
nothrow void yield();
If the caller is a scheduled Fiber, this yields execution to another scheduled Fiber.
nothrow @property ref ThreadInfo thisInfo();
Returns an appropriate ThreadInfo instance.
Returns a ThreadInfo instance specific to the calling Fiber if the Fiber was created by this dispatcher, otherwise it returns ThreadInfo.thisInfo.
nothrow Condition newCondition(Mutex m);
Returns a Condition analog that yields when wait or notify is called.
Scheduler scheduler;
Sets the Scheduler behavior within the program.
This variable sets the Scheduler behavior within this program. Typically, when setting a Scheduler, scheduler.start() should be called in main. This routine will not return until program execution is complete.
nothrow void yield();
If the caller is a Fiber and is not a Generator, this function will call scheduler.yield() or Fiber.yield(), as appropriate.
class Generator(T): Fiber, IsGenerator;
A Generator is a Fiber that periodically returns values of type T to the caller via yield. This is represented as an InputRange.

Example:

import std.concurrency;
import std.stdio;


void main()
{
    auto tid = spawn(
    {
        while (true)
        {
            writeln(receiveOnly!int());
        }
    });

    auto r = new Generator!int(
    {
        foreach (i; 1 .. 10)
            yield(i);
    });

    foreach (e; r)
    {
        tid.send(e);
    }
}
this(void function() fn);
Initializes a generator object which is associated with a static D function. The function will be called once to prepare the range for iteration.
Parameters:
void function() fn The fiber function.

In: fn must not be null.

this(void function() fn, size_t sz);
Initializes a generator object which is associated with a static D function. The function will be called once to prepare the range for iteration.
Parameters:
void function() fn The fiber function.
size_t sz The stack size for this fiber.

In: fn must not be null.

this(void delegate() dg);
Initializes a generator object which is associated with a dynamic D function. The function will be called once to prepare the range for iteration.
Parameters:
void delegate() dg The fiber function.

In: dg must not be null.

this(void delegate() dg, size_t sz);
Initializes a generator object which is associated with a dynamic D function. The function will be called once to prepare the range for iteration.
Parameters:
void delegate() dg The fiber function.
size_t sz The stack size for this fiber.

In: dg must not be null.

final @property bool empty();
Returns true if the generator is empty.
final void popFront();
Obtains the next value from the underlying function.
final @property T front();
Returns the most recently generated value.
void yield(T)(ref T value);
void yield(T)(T value);
Yields a value of type T to the caller of the currently executing generator.
Parameters:
T value The value to yield.
ref auto initOnce(alias var)(lazy typeof(var) init);
Initializes var with the lazy init value in a thread-safe manner.
The implementation guarantees that all threads simultaneously calling initOnce with the same var argument block until var is fully initialized. All side-effects of init are globally visible afterwards.
Parameters:
var The variable to initialize
typeof(var) init The lazy initializer value
Returns:
A reference to the initialized variable
Examples:
A typical use-case is to perform lazy but thread-safe initialization.
static class MySingleton
{
    static MySingleton instance()
    {
        static __gshared MySingleton inst;
        return initOnce!inst(new MySingleton);
    }
}
assert(MySingleton.instance !is null);
ref auto initOnce(alias var)(lazy typeof(var) init, Mutex mutex);
Same as above, but takes a separate mutex instead of sharing one among all initOnce instances.
This should be used to avoid dead-locks when the init expression waits for the result of another thread that might also call initOnce. Use with care.
Parameters:
var The variable to initialize
typeof(var) init The lazy initializer value
Mutex mutex A mutex to prevent race conditions
Returns:
A reference to the initialized variable
Examples:
Use a separate mutex when init blocks on another thread that might also call initOnce.
static shared bool varA, varB;
__gshared Mutex m;
m = new Mutex;

spawn({
    // use a different mutex for varB to avoid a dead-lock
    initOnce!varB(true, m);
    ownerTid.send(true);
});
// init depends on the result of the spawned thread
initOnce!varA(receiveOnly!bool);
assert(varA == true);
assert(varB == true);