Pages

Saturday, 5 March 2016

multi-threaded programming in C++11

This page is more on examples of multi-thread programming in c++11 than a reference.

Creating thread

A new thread can be created using std::thread class. An example is shown below on how to create a thread.
#include <thread>
#include <iostream>

void run() 
{
    std::cout << __FUNCTION__ << std::endl;
}

int main(int argc, char **argv) {
    std::thread t(run);

    t.join();
}

The above example create a thread t which executes the run function it the new thread.
If we do not create any thread there will always be one thread in the application, we usually call it the main thread.Now suppose we are creating a new thread then the main thread and the newly created thread will execute in parallel. In our example above thread object t is created in main thread and it will be out of scope once main thread finish executes main function and terminate will be called, even if the newly created thread is active. To avoid this we need to make our main thread wait for the newly created thread to complete its execution. The way to do that is to call t.join();

Important functions in std::thread

get_id Retrieves thread thread id
joinable Checks if the thread is joinable
join Joins the thread, ie. wait for the thread to complete its execution
detach Detach thread
swap Swap thread objects

Using Mutex

Mutex can be used to protect data from multiple threads. mutex::lock can be used to prevent multiple thread access to shared data and mutex::unlock to release the lock.
An example of using std::mutex is shown below,
#include <thread>
#include <mutex>
#include <chrono>
#include <vector>
#include <memory>
#include <iostream>

void run()
{
    static std::mutex m;

    // only one thread at a time so that both
    // of the print outs happens one after another
    m.lock();

    std::cout << __FUNCTION__ << " started" << std::endl;

    // wait for 100 milli seconds
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    std::cout << __FUNCTION__ << " ended" << std::endl;

    m.unlock();
}

int main(int argc, char **argv) {
    std::vector<std::shared_ptr<std::thread> > threads;

    for (int i=0; i<5; ++i) {
        std::shared_ptr<std::thread> t(new std::thread(run));
        threads.push_back(t);
    }

    for (std::shared_ptr<std::thread> t : threads ) {
        t->join();
    }
}

Important functions in std::mutex

lock locks the mutex
unlock unlock the mutex
try_lock try locking the mutex, returns if the mutex is already locked

Condition Variable

Condition variable is a synchronization primitive that allows a thread to wait until a condition occurs.
As a simple example, lets take the first example and make it work without using join function.
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>

std::condition_variable cv;
std::mutex m;

void run() {
    std::cout << __FUNCTION__ << " entered" << std::endl;

    // lets put a small delay
    for (int i=0; i<5; ++i) {
        std::cout << "." << std::flush;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
    std::cout << std::endl;

    std::cout << __FUNCTION__ << " exiting" << std::endl;

    // notify once we are done with the thread execution
    std::unique_lock<std::mutex> l(m);
    cv.notify_one();
}

int main() {
    std::thread t(run);
    t.detach();

    // wait for the thread to finish, once the thread execution is complete it will be notified 
    std::unique_lock<std::mutex> l(m);
    cv.wait(l);
    
    return 0;
}

Important functions in std::condition_variable

wait wait for notification
notify_one notify one waiting thread
notify_all notify all waiting thread

Now lets see some example of multi thread programming.

Printing 1 to 10 from two thread.

Lets print 1 to 10 with odd numbers printed in one thread and even from the other thread.
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>

// mutex used with condition variables
std::mutex evenMutex;
std::mutex oddMutex;

// condition variables
std::condition_variable evenCV;
std::condition_variable oddCV;

void evenThread()
{
    for (int i=2; i<11; i+=2) {
        // wait for the odd thread to print the value and notify this thread
        {
            std::unique_lock<std::mutex> l(oddMutex);
            oddCV.wait(l);
        }

        std::cout << i << std::endl;

        // notify the odd thread to print value
        std::unique_lock<std::mutex> el(evenMutex);
        evenCV.notify_one();
    }
}

void oddThread()
{
    for (int i=1; i<10; i+=2) {
        std::cout << i << std::endl;

        // notify the even thread to print the value
        {
            std::unique_lock<std::mutex> ol(oddMutex);
            oddCV.notify_one();
        }

        // wait for the even thread to print the value
        std::unique_lock<std::mutex> el(evenMutex);
        evenCV.wait(el);
    }
}

int main(int argc, char **argv)
{
    std::thread ot(oddThread);
    std::thread et(evenThread);

    ot.join();
    et.join();

    return 0;
}

Thread pool 

Lets create a thread pool which creates a given number of threads and distributes the work in one of the available thread. If no thread is available it will wait for a thread to finish.
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <atomic>
#include <vector>
#include <queue>
#include <memory>

class ThreadPool;

// worker class. It will create a thread and wait for a task from the ThreadPool
class Worker {
    std::thread mThread;
    std::atomic_bool mFinished;
    ThreadPool *mPool;

public:
    Worker(ThreadPool *pool)
     : mFinished(false),
       mPool(pool) {
        create();
    }

    ~Worker() {
        join();
    }

    void join() {
        if (mThread.joinable()) {
            mThread.join();
        }
    }

    void terminate() {
        mFinished = true;
    }

    void create();

private:
    void doCreate();
};

class ThreadPool {
private:
    int mWorkerCount;
    // worker vector
    std::vector<std::shared_ptr<Worker>> mThreads;

    // work quueue is a queue of lambda functions
    std::queue<std::function<void(void)>> mWorkQueue;

    // lock used to synchronize queue access and update
    std::mutex mWorkQueueLock;

    // condition variable used to notify task availability
    std::mutex mNotifierLock;
    std::condition_variable mNotifier;

    // condition variable used to notify termination of thread pool
    std::mutex mTerminateLock;
    std::condition_variable mTerminateCV;

public:
    ThreadPool(int workerCount)
     : mWorkerCount(workerCount) {
        create();
    }

    ~ThreadPool() {
        // wait for the termination of the pool
        std::unique_lock<std::mutex> tl(mTerminateLock);
        mTerminateCV.wait(tl);
    }

    void terminate() {
        for (std::shared_ptr<Worker> t: mThreads) {
            t->terminate();
        }
        std::unique_lock<std::mutex> tl(mTerminateLock);
        mTerminateCV.notify_one();
    }

    void submitWork(std::function<void()> const & work) {
        // add task to queue
        {
            std::unique_lock<std::mutex> ql(mWorkQueueLock);
            mWorkQueue.push(work);
        }

        // notify availability of task
        std::unique_lock<std::mutex> nl(mNotifierLock);
        mNotifier.notify_one();
    }

    std::function<void()> getWork() {
        // wait for a task
        waitForWork();

        {
            // retrieve a work
            std::unique_lock<std::mutex> ql(mWorkQueueLock);
            std::function<void()> work = mWorkQueue.front();
            mWorkQueue.pop();
            return work;
        }
    }

private:
    void waitForWork() {
        // wait until queue is not empty
        std::unique_lock<std::mutex> nl(mNotifierLock);
        mNotifier.wait(nl, [this] {
            std::unique_lock<std::mutex> ql(mWorkQueueLock);  return !mWorkQueue.empty();
        });
    }

    void create() {
        for (int i=0; i<mWorkerCount; ++i) {
            mThreads.push_back(std::shared_ptr<Worker>(new Worker(this)));
        }
    }

};

void Worker::create() {
    mThread = std::thread([this] {
        doCreate();
    });
}

void Worker::doCreate() {
    while (!mFinished) {
        mPool->getWork()();
    }
}

int main(int argc, char **argv) {
    ThreadPool pool(2);

    pool.submitWork([]{std::cout << "**work1**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); });
    pool.submitWork([]{std::cout << "**work2**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); });
    pool.submitWork([]{std::cout << "**work3**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); });
    pool.submitWork([]{std::cout << "**work4**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); });
    pool.submitWork([]{std::cout << "**work5**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); });
    pool.submitWork([]{std::cout << "**work6**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); });

    return 0;
}

No comments:

Post a Comment