Example Project

An alternative threading strategy

In the last example we saw a multithreaded server built on the "thread per connection" pattern. In today's example we are going to explore an alternative pattern, the worker thread pattern. In this pattern a server is composed of one or more threads that run for the lifetime of the server. These threads handle any work that is available for them to do. After finishing a task, a worker thread will not exit; instead, it will check to see if more work is available for it to do.

A work queue

An important component of the worker thread pattern is a task queue data structure that can store tasks to be handled by the worker threads. Each worker thread consists of an infinite loop. On each iteration of the loop the work will check the task queue to see if any work is available for it to do. If there is work available on the queue, the worker will remove a task from the queue, handle the task, and then go back to the queue to check for more work.

Because the task queue is going to be shared by multiple worker threads, we have to exercise care in setting up the task queue data structure to make the structure and its methods thread safe.

Using semaphores

In the last lecture we saw that we can place a lock on a critical section of code with a mutex. The mutex will prevent other threads from running the same code until the thread that set the lock leaves the critical section and unlocks the mutex.

An alternative to using a mutex is to use a semaphore. A semaphore is a data type that stores a single unsigned integer. The semaphore provides thread safe functions to increment and decrement the integer.

To use the semaphore functions we include the semaphore.h header file.

#include <semaphore.h>

To create a semaphore we use the sem_init() function:

sem_t semaphore;
sem_init(&semaphore,0,1);

The third parameter to sem_init() is the initial value for the semaphore. The second parameter is an option setting that specifies whether the semaphore will be shared across threads or processes. The value 0 for this option indicates that we only want to share the semaphore across threads in the same process.

To increment and decrement the semaphore we use the sem_post() and sem_wait() functions. The sem_post() function increments the semaphore and returns immediately. If the semaphore currently has a nonzero value, sem_wait() decrements the semaphore and returns immediately. If the semaphore has a value of 0, sem_wait() will block until the semaphore has a nonzero value: it will then try to decrement the semaphore and return.

One simple use of a semaphore is to replicate the behavior of a mutex. The mutex code

// Outside the threads we do:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// In our thread code we do:
pthread_mutex_lock(&mutex);
// Do some stuff
pthread_mutex_unlock(&mutex);

is equivalent to the semaphore code

// Outside the threads we do:
sem_t semaphore;
sem_init(&semaphore,0,1);

// In our thread code we do:
sem_wait(&semaphore);
// Do some stuff
sem_post(&semaphore);

The task queue

In the worker thread version of the web server application we will need a task queue that can store file descriptors for client socket connections. When a client arrives the server will put the file descriptor for that client socket on the task queue and go back to listening for new client connections. The worker threads will consist of infinite loops that check the task queue for file descriptors to serve.

The task queue will be implemented as a queue struct:

#define QUEUE_SIZE 16

typedef struct {
    int d[QUEUE_SIZE];
    int front;
    int back;
    sem_t mutex;
    sem_t slots;
    sem_t items;
} queue;

The following functions will be the methods we will use with the queue:

queue* queueCreate();
void enqueue(queue* q,int fd);
int dequeue(queue* q);

To make the queue thread safe we will use three separate semaphores. The mutex semaphore will implement a simple mutex that will protect critical sections of code in the methods. The items semaphore will maintain a count of how many file descriptors are currently stored in the queue. The slots semaphore will maintain a count of how many free spaces are available in the queue to store additional file descriptors.

Here now is the code for these methods:

queue* queueCreate() {
    queue *q = (queue*) malloc(sizeof(queue));
    q->front = 0;
    q->back = 0;
    sem_init(&q->mutex,0,1);
    sem_init(&q->slots,0,QUEUE_SIZE);
    sem_init(&q->items,0,0);
    return q;
}

void enqueue(queue* q,int fd) {
    sem_wait(&q->slots);
    sem_wait(&q->mutex);
    q->d[q->back] = fd;
    q->back = (q->back+1)%QUEUE_SIZE;
    sem_post(&q->mutex);
    sem_post(&q->items);
}

int dequeue(queue* q) {
    int fd;
    sem_wait(&q->items);
    sem_wait(&q->mutex);
    fd = q->d[q->front];
    q->front = (q->front+1)%QUEUE_SIZE;
    sem_post(&q->mutex);
    sem_post(&q->slots);
    return fd;
}

The three semaphores will cooperate to regulate interactions with the queue. For example, consider the dequeue() method. The first thing we do in that method is to interact with the items semaphore, which can tell us how many items are in the queue. We start by trying to decrement this semaphore, because we are about to remove an item from the queue. If there are currently some items in the queue, the decrement succeeds immediately and we go on the next step. If there are currently no items in the queue, the call to sem_wait() will block until the items semaphore rises above 0.

If items are available, we lock the mutex, proceed to the code that removes a file descriptor from the queue, and then unlock the mutex. On our way out of the method we increment the slots semaphore, since we have just created an additional free slot in the queue by removing an item.

Server code

In the main() function of the server we start by setting up the task queue and the worker threads:

 // Set up the queue
 queue* q = queueCreate();

 // Set up the worker threads
 pthread_t w1,w2;
 pthread_create(&w1,NULL,workerThread,q);
 pthread_create(&w2,NULL,workerThread,q);

We then run the usual code to set up the server socket and start listening for client connections. The code to accept the client connections simply takes each new client socket that gets created on the task queue for the workers to handle and then goes back to listening for a new connection:

while(1) {
    struct sockaddr_in client;
    int new_socket , c = sizeof(struct sockaddr_in);
    new_socket = accept(server_socket, (struct sockaddr *) &client, (socklen_t*)&c);
    if(new_socket != -1) {
      enqueue(q,new_socket);
    }
  }

The thread function for the worker threads is very simple:

void* workerThread(void *arg) {
  queue* q = (queue*) arg;
  while(1) {
    int fd = dequeue(q);
    serveRequest(fd);
  }
  return NULL;
}

As before, the interaction with the client is handled by the serveRequest() function, which is identical to the function we saw in the first version of the server program.

Compiling a multifile project

Another new feature of this version of the web server is the use of multiple source files in the project. The queue data structure is implemented in a header file named queue.h and a source code file queue.c. The code for the server is in a separate source code file, miniweb.c.

The best way to handle a project with multiple source code files is to set up a makefile for the project. Here is the makefile I set up for this project:

miniweb: miniweb.o queue.o
  gcc miniweb.o queue.o -pthread -o miniweb

miniweb.o: miniweb.c queue.h
  gcc miniweb.c -c -g -pthread

queue.o: queue.c queue.h
  gcc queue.c -c -g -pthread

The main target for the project, the miniweb executable, depends on object files compiled from the source files. To compile an object file from an individual source code file we compile the source code file using the gcc -c option, which tells the compiler to compile the C code to an object code without linking the code into an executable.

The build command for the miniweb executable will operate on the object files and link them together to make the miniweb executable.

Note the use of the -pthread option in all of these build commands. The option is required for any project that uses POSIX threads.

Once you have set up a makefile for a project you can use the Makefile extension in Visual Studio Code to compile and run the project. Click the Makefile tab in Code to access the buttons to build, run, and debug the project.