Producer–consumer problem

Last updated

In computing, the producer-consumer problem (also known as the bounded-buffer problem) is a family of problems described by Edsger W. Dijkstra since 1965.

Contents

Dijkstra found the solution for the producer-consumer problem as he worked as a consultant for the Electrologica X1 and X8 computers: "The first use of producer-consumer was partly software, partly hardware: The component taking care of the information transport between store and peripheral was called 'a channel' ... Synchronization was controlled by two counting semaphores in what we now know as the producer/consumer arrangement: the one semaphore indicating the length of the queue, was incremented (in a V) by the CPU and decremented (in a P) by the channel, the other one, counting the number of unacknowledged completions, was incremented by the channel and decremented by the CPU. [The second semaphore being positive would raise the corresponding interrupt flag.]" [1]

Dijkstra wrote about the unbounded buffer case: "We consider two processes, which are called the 'producer' and the 'consumer' respectively. The producer is a cyclic process and each time it goes through its cycle it produces a certain portion of information, that has to be processed by the consumer. The consumer is also a cyclic process and each time it goes through its cycle, it can process the next portion of information, as has been produced by the producer ... We assume the two processes to be connected for this purpose via a buffer with unbounded capacity." [2]

He wrote about the bounded buffer case: "We have studied a producer and a consumer coupled via a buffer with unbounded capacity ... The relation becomes symmetric, if the two are coupled via a buffer of finite size, say N portions" [3]

And about the multiple producer-consumer case: "We consider a number of producer/consumer pairs, where pairi is coupled via an information stream containing ni portions. We assume ... the finite buffer that should contain all portions of all streams to have a capacity of 'tot' portions." [4]

Per Brinch Hansen and Niklaus Wirth saw soon the problem of semaphores: "I have come to the same conclusion with regard to semaphores, namely that they are not suitable for higher level languages. Instead, the natural synchronization events are exchanges of message." [5]

Dijkstra's bounded buffer solution

The original semaphore bounded buffer solution was written in ALGOL style. The buffer can store N portions or elements. The "number of queueing portions" semaphore counts the filled locations in the buffer, the "number of empty positions" semaphore counts the empty locations in the buffer and the semaphore "buffer manipulation" works as mutex for the buffer put and get operations. If the buffer is full, that is number of empty positions is zero, the producer thread will wait in the P(number of empty positions) operation. If the buffer is empty, that is the number of queueing portions is zero, the consumer thread will wait in the P(number of queueing portions) operation. The V() operations release the semaphores. As a side effect, a thread can move from the wait queue to the ready queue. The P() operation decreases the semaphore value down to zero. The V() operation increases the semaphore value. [6]

beginintegernumberofqueueingportions,numberofemptypositions,buffermanipulation;numberofqueueingportions:=0;numberofemptypositions:=N;buffermanipulation:=1;parbeginproducer:beginagain1:producenextportion;P(numberofemptypositions);P(buffermanipulation);addportiontobuffer;V(buffermanipulation);V(numberofqueueingportions);gotoagain1end;consumer:beginagain2:P(numberofqueueingportions);P(buffermanipulation);takeportionfrombuffer;V(buffermanipulation);V(numberofemptypositions);processportiontaken;gotoagain2endparendend

As of C++ 20, semaphores are part of the language. Dijkstra's solution can easily be written in modern C++. The variable buffer_manipulation is a mutex. The semaphore feature of acquiring in one thread and releasing in another thread is not needed. The lock_guard() statement instead of a lock() and unlock() pair is C++ RAII. The lock_guard destructor ensures lock release in case of an exception. This solution can handle multiple consumer threads and/or multiple producer threads.

#include<thread>#include<mutex>#include<semaphore>std::counting_semaphore<N>number_of_queueing_portions{0};std::counting_semaphore<N>number_of_empty_positions{N};std::mutexbuffer_manipulation;voidproducer(){for(;;){Portionportion=produce_next_portion();number_of_empty_positions.acquire();{std::lock_guard<std::mutex>g(buffer_manipulation);add_portion_to_buffer(portion);}number_of_queueing_portions.release();}}voidconsumer(){for(;;){number_of_queueing_portions.acquire();Portionportion;{std::lock_guard<std::mutex>g(buffer_manipulation);portion=take_portion_from_buffer();}number_of_empty_positions.release();process_portion_taken(portion);}}intmain(){std::threadt1(producer);std::threadt2(consumer);t1.join();t2.join();}

[ improper synthesis? ]

Using monitors

Per Brinch Hansen defined the monitor: I will use the term monitor to denote a shared variable and the set of meaningful operations on it. The purpose of a monitor is to control the scheduling of resources among individual processes according to a certain policy. [7] Tony Hoare laid a theoretical foundation for the monitor. [8]

boundedbuffer:monitorbeginbuffer:array0..N-1ofportion;head,tail:0..N-1;count:0..N;nonempty,nonfull:condition;procedureappend(x:portion);beginifcount=Nthennonfull.wait;note0<=count<N;buffer[tail]:=x;tail:=tail(+)1;count:=count+1;nonempty.signalendappend;procedureremove(resultx:portion);beginifcount=0thennonempty.wait;note0<count<=N;x:=buffer[head];head:=head(+)1;count:=count-1;nonfull.signalendremove;head:=0;tail:=0;count:=0;endboundedbuffer;

The monitor is an object that contains variables buffer, head, tail and count to realize a circular buffer, the condition variables nonempty and nonfull for synchronization and the methods append and remove to access the bounded buffer. The monitor operation wait corresponds to the semaphore operation P or acquire, signal corresponds to V or release. The circled operation (+) are taken modulo N. The presented Pascal style pseudo code shows a Hoare monitor. A Mesa monitor uses while count instead of if count. A programming language C++ version is:

classBounded_buffer{Portionbuffer[N];// 0..N-1unsignedhead,tail;// 0..N-1unsignedcount;// 0..Nstd::condition_variablenonempty,nonfull;std::mutexmtx;public:voidappend(Portionx){std::unique_lock<std::mutex>lck(mtx);nonfull.wait(lck,[&]{return!(N==count);});assert(0<=count&&count<N);buffer[tail++]=x;tail%=N;++count;nonempty.notify_one();}Portionremove(){std::unique_lock<std::mutex>lck(mtx);nonempty.wait(lck,[&]{return!(0==count);});assert(0<count&&count<=N);Portionx=buffer[head++];head%=N;--count;nonfull.notify_one();returnx;}Bounded_buffer(){head=0;tail=0;count=0;}};

The C++ version needs an additional mutex for technical reasons. It uses assert to enforce the preconditions for the buffer add and remove operations.

Using channels

The very first producer-consumer solution in the Electrologica computers used 'channels'. Hoare defined channels: An alternative to explicit naming of source and destination would be to name a port through which communication is to take place. The port names would be local to the processes, and the manner in which pairs of ports are to be connected by channels could be declared in the head of a parallel command. [9] Brinch Hansen implemented channels in the programming languages Joyce and Super Pascal. The Plan 9 operating system programming language Alef, the Inferno operating system programming language Limbo have channels. The following C source code compiles on Plan 9 from User Space:

#include"u.h"#include"libc.h"#include"thread.h"enum{STACK=8192};voidproducer(void*v){Channel*ch=v;for(uinti=1;;++i){sleep(400);print("p %d\n",i);sendul(ch,i);}}voidconsumer(void*v){Channel*ch=v;for(;;){uintp=recvul(ch);print("\t\tc %d\n",p);sleep(200+nrand(600));}}voidthreadmain(intargc,char**argv){int(*mk)(void(*fn)(void*),void*arg,uintstack);mk=threadcreate;Channel*ch=chancreate(sizeof(ulong),1);mk(producer,ch,STACK);mk(consumer,ch,STACK);recvp(chancreate(sizeof(void*),0));threadexitsall(0);}

The program entry point is at function threadmain. The function call ch = chancreate(sizeof(ulong), 1) creates the channel, the function call sendul(ch, i) sends a value into the channel and the function call p = recvul(ch) receives a value from the channel. The programming language Go has channels, too. A Go example:

packagemainimport("fmt""math/rand""time")varsendMsg=0funcproduceMessage()int{time.Sleep(400*time.Millisecond)sendMsg++fmt.Printf("sendMsg = %v\n",sendMsg)returnsendMsg}funcconsumeMessage(recvMsgint){fmt.Printf("\t\trecvMsg = %v\n",recvMsg)time.Sleep(time.Duration(200+rand.Intn(600))*time.Millisecond)}funcmain(){ch:=make(chanint,3)gofunc(){for{ch<-produceMessage()}}()forrecvMsg:=rangech{consumeMessage(recvMsg)}}

The Go producer-consumer solution uses the main Go routine for consumer and creates a new, unnamed Go routine for the producer. The two Go routines are connected with channel ch. This channel can queue up to three int values. The statement ch := make(chan int, 3) creates the channel, the statement ch <- produceMessage() sends a value into the channel and the statement recvMsg := range ch receives a value from the channel. [10] The allocation of memory resources, the allocation of processing resources, and the synchronization of resources are done by the programming language automatically.

Without semaphores or monitors

Leslie Lamport documented a bounded buffer producer-consumer solution for one producer and one consumer: We assume that the buffer can hold at most b messages, b >= 1. In our solution, we let k be a constant greater than b, and let s and r be integer variables assuming values between 0 and k-1. We assume that initially s=r and the buffer is empty. By choosing k to be a multiple of b, the buffer can be implemented as an array B [0: b - 1]. The producer simply puts each new message into B[s mod b], and the consumer takes each message from B[r mod b]. [11] The algorithm is shown below, generalized for infinite k.

Producer:L:if(s-r)modk=bthengotoLfi;putmessageinbuffer;s:=(s+1)modk;gotoL;Consumer:L:if(s-r)modk=0thengotoLfi;takemessagefrombuffer;r:=(r+1)modk;gotoL;

The Lamport solution uses busy waiting in the thread instead of waiting in the scheduler. This solution neglects the impact of scheduler thread switch at inconvenient times. If the first thread has read a variable value from memory, the scheduler switches to the second thread that changes the variable value, and the scheduler switches back to the first thread then the first thread uses the old value of the variable, not the current value. Atomic read-modify-write solves this problem. Modern C++ offers atomic variables and operations for multi-thread programming. The following busy waiting C++11 solution for one producer and one consumer uses atomic read-modify-write operations fetch_add and fetch_sub on the atomic variable count.

enum{N=4};Messagebuffer[N];std::atomic<unsigned>count{0};voidproducer(){unsignedtail{0};for(;;){Messagemessage=produceMessage();while(N==count);// busy waitingbuffer[tail++]=message;tail%=N;count.fetch_add(1,std::memory_order_relaxed);}}voidconsumer(){unsignedhead{0};for(;;){while(0==count);// busy waitingMessagemessage=buffer[head++];head%=N;count.fetch_sub(1,std::memory_order_relaxed);consumeMessage(message);}}intmain(){std::threadt1(producer);std::threadt2(consumer);t1.join();t2.join();}

The circular buffer index variables head and tail are thread-local and therefore not relevant for memory consistency. The variable count controls the busy waiting of the producer and consumer thread.

See also

Related Research Articles

<span class="mw-page-title-main">FIFO (computing and electronics)</span> Scheduling algorithm, the first piece of data inserted into a queue is processed first

In computing and in systems theory, first in, first out, acronymized as FIFO, is a method for organizing the manipulation of a data structure where the oldest (first) entry, or "head" of the queue, is processed first.

A real-time operating system (RTOS) is an operating system (OS) for real-time computing applications that processes data and events that have critically defined time constraints. An RTOS is distinct from a time-sharing operating system, such as Unix, which manages the sharing of system resources with a scheduler, data buffers, or fixed task prioritization in a multitasking or multiprogramming environment. Processing time requirements need to be fully understood and bound rather than just kept as a minimum. All processing must occur within the defined constraints. Real-time operating systems are event-driven and preemptive, meaning the OS can monitor the relevant priority of competing tasks, and make changes to the task priority. Event-driven systems switch between tasks based on their priorities, while time-sharing systems switch the task based on clock interrupts.

<span class="mw-page-title-main">Mutual exclusion</span> In computing, restricting data to be accessible by one thread at a time

In computer science, mutual exclusion is a property of concurrency control, which is instituted for the purpose of preventing race conditions. It is the requirement that one thread of execution never enters a critical section while a concurrent thread of execution is already accessing said critical section, which refers to an interval of time during which a thread of execution accesses a shared resource or shared memory.

<span class="mw-page-title-main">Semaphore (programming)</span> Variable used in a concurrent system

In computer science, a semaphore is a variable or abstract data type used to control access to a common resource by multiple threads and avoid critical section problems in a concurrent system such as a multitasking operating system. Semaphores are a type of synchronization primitive. A trivial semaphore is a plain variable that is changed depending on programmer-defined conditions.

In computer science, a lock or mutex is a synchronization primitive that prevents state from being modified or accessed by multiple threads of execution at once. Locks enforce mutual exclusion concurrency control policies, and with a variety of possible methods there exists multiple unique implementations for different applications.

<span class="mw-page-title-main">Dining philosophers problem</span> Problem used to illustrate synchronization issues and techniques for resolving them

In computer science, the dining philosophers problem is an example problem often used in concurrent algorithm design to illustrate synchronization issues and techniques for resolving them.

In computer science, the sleeping barber problem is a classic inter-process communication and synchronization problem that illustrates the complexities that arise when there are multiple operating system processes.

In the C++ programming language, the C++ Standard Library is a collection of classes and functions, which are written in the core language and part of the C++ ISO Standard itself.

In computer science, an algorithm is called non-blocking if failure or suspension of any thread cannot cause failure or suspension of another thread; for some operations, these algorithms provide a useful alternative to traditional blocking implementations. A non-blocking algorithm is lock-free if there is guaranteed system-wide progress, and wait-free if there is also guaranteed per-thread progress. "Non-blocking" was used as a synonym for "lock-free" in the literature until the introduction of obstruction-freedom in 2003.

Resource acquisition is initialization (RAII) is a programming idiom used in several object-oriented, statically typed programming languages to describe a particular language behavior. In RAII, holding a resource is a class invariant, and is tied to object lifetime. Resource allocation is done during object creation, by the constructor, while resource deallocation (release) is done during object destruction, by the destructor. In other words, resource acquisition must succeed for initialization to succeed. Thus the resource is guaranteed to be held between when initialization finishes and finalization starts, and to be held only when the object is alive. Thus if there are no object leaks, there are no resource leaks.

In concurrent programming, a monitor is a synchronization construct that prevents threads from concurrently accessing a shared object's state and allows them to wait for the state to change. They provide a mechanism for threads to temporarily give up exclusive access in order to wait for some condition to be met, before regaining exclusive access and resuming their task. A monitor consists of a mutex (lock) and at least one condition variable. A condition variable is explicitly 'signalled' when the object's state is modified, temporarily passing the mutex to another thread 'waiting' on the conditional variable.

In computer science, the reentrant mutex is a particular type of mutual exclusion (mutex) device that may be locked multiple times by the same process/thread, without causing a deadlock.

In computer science, future, promise, delay, and deferred refer to constructs used for synchronizing program execution in some concurrent programming languages. They describe an object that acts as a proxy for a result that is initially unknown, usually because the computation of its value is not yet complete.

In computer science, a readers–writer is a synchronization primitive that solves one of the readers–writers problems. An RW lock allows concurrent access for read-only operations, whereas write operations require exclusive access. This means that multiple threads can read the data in parallel but an exclusive lock is needed for writing or modifying data. When a writer is writing the data, all other writers and readers will be blocked until the writer is finished writing. A common use might be to control access to a data structure in memory that cannot be updated atomically and is invalid until the update is complete.

In computer science, the readers–writers problems are examples of a common computing problem in concurrency. There are at least three variations of the problems, which deal with situations in which many concurrent threads of execution try to access the same shared resource at one time.

In computer science, synchronization is the task of coordinating multiple of processes to join up or handshake at a certain point, in order to reach an agreement or commit to a certain sequence of action.

In parallel computing, a barrier is a type of synchronization method. A barrier for a group of threads or processes in the source code means any thread/process must stop at this point and cannot proceed until all other threads/processes reach this barrier.

Concurrent Pascal is a programming language designed by Per Brinch Hansen for writing concurrent computing programs such as operating systems and real-time computing monitoring systems on shared memory computers.

<span class="mw-page-title-main">Circular buffer</span> A circular shaped buffer shown while obtaining data

In computer science, a circular buffer, circular queue, cyclic buffer or ring buffer is a data structure that uses a single, fixed-size buffer as if it were connected end-to-end. This structure lends itself easily to buffering data streams. There were early circular buffer implementations in hardware.

Join-patterns provides a way to write concurrent, parallel and distributed computer programs by message passing. Compared to the use of threads and locks, this is a high level programming model using communication constructs model to abstract the complexity of concurrent environment and to allow scalability. Its focus is on the execution of a chord between messages atomically consumed from a group of channels.

References

Further reading