Multiple threads can make processing a message queue faster. Eugene Surman needs the right data structure.
For the past five years I have mostly been developing multi-threaded messaging applications. While they were all quite different, there was one particular situation that kept recurring: sometimes it was required to maintain the sequential order of incoming and outgoing messages, even though they were being handled by multiple threads concurrently, and not necessarily in the same exact order they were received. I searched for a solution in many ready-made messaging libraries, but did not find anything satisfactory. So, I had to resort to developing a solution of my own: the
PRQueue
– a Queue with Position Reservation (or ‘seat reservation’).
PRQueue
is implemented in C++ using two STL
deques
and the pthread library. Two simple classes –
Mutex
and
Lock
are used in the example to demonstrate the logic. A sample message is represented by the
StringMsg
class, and the
QueueTest
class is used as a test-bed application.
I chose
deque
as a main building block of the design because it has all necessary operations (including
operator[]
) to implement
PRQueue
. In particular, it’s important that the
push_back()
and
pop_front()
operations do not invalidate pointers and references to other elements of the
deque
.
Here is a simple example of how
PRQueue
can be utilized. Let’s say we need to log a stream of large multi-field messages. Converting numeric fields to text strings is a slow process that is not mission-critical, so we decided to offload this task to dedicated threads that will generate the log.
Initially, the processing diagram may look like figure 1.
Figure 1 |
Since the core processing of the messages takes place in multiple threads, the messages may be ready in an order that is different from the original input queue order: if, for example, one thread takes a message off the input queue and goes to sleep, while another thread takes the next message, runs to completion and places the processed message in the output queue, ahead of the first thread. As a result, the log entries may appear out of order. We assume that logging must be done after the messages are processed by the core routines.
Listing 1 is an example illustrating this point. I use the standard STL
queue
and 3 threads. This generates the output shown in Figure 2.
... QueueTest quetest(3); int i1 =0; for( int i =10000; i; i--) { quetest.push( "| %d", i1++ ); quetest.push( "- %d", i1++ ); } ... |
Listing 1 |
Th# Time-stamp Msg# 1: 101108 15:04:49.576167 - 5243 3: 101108 15:04:49.576170 | 5244 1: 101108 15:04:49.576174 - 5245 3: 101108 15:04:49.576177 | 5246 3: 101108 15:04:49.576182 | 5248 // out 2: 101108 15:04:49.571945 | 4338 // of 1: 101108 15:04:49.576179 - 5247 // order 3: 101108 15:04:49.576188 - 5249 2: 101108 15:04:49.576189 | 5250 1: 101108 15:04:49.576191 - 5251 |
Figure 2 |
Using
PRQueue
the above scenario will be avoided. It will make sure the order of messages in the output queue matches the order that existed in the input queue, regardless of the order in which the core routines finish processing the messages.
The basic logic behind
PRQueue
is simple: when the next message is taken off the input queue, still inside the lock, the next push-back position, or ‘seat’, for the output queue is acquired. The lock is then released and the processing continues. After a message is fully processed the previously acquired position is used to place the message into the output queue.
Figure 3 shows the previous example re-written using
PRQueue
. The order of the messages in the log is now perfectly preserved.
Th# Time-stamp Msg# 2: 101108 15:04:49.571945 | 4338 ... ... 1: 101108 15:04:49.576167 - 5243 3: 101108 15:04:49.576170 | 5244 1: 101108 15:04:49.576174 - 5245 3: 101108 15:04:49.576177 | 5246 1: 101108 15:04:49.576179 - 5247 3: 101108 15:04:49.576182 | 5248 3: 101108 15:04:49.576188 - 5249 2: 101108 15:04:49.576189 | 5250 1: 101108 15:04:49.576191 - 5251 |
Figure 3 |
PRQueue
is constructed using two deques: ‘data’ and ‘filled’.
An element of ‘filled’
deque
is an indicator showing that the position is filled with data and can be popped from
PRQueue
. A wrapper class
DataQueue
is a holder of ‘data’ and ‘filled’
deque
s. The
PRQueue
methods are for the most part ‘mutexed’ wrappers of
DataQueue
methods.
The design allows us to separate/hide thread safety code from the actual implementation, so the user shouldn’t be concerned with writing any locking/unlocking logic.
Let’s discuss
PRQueue
’s functionality in a bit more detail.
The
PRQueue
pop
method does two things: it pops data from the input queue and reserves a push position in the output queue. The
push
method uses the previously reserved position to save data into the output queue.
For testing
PRQueue
with multiple threads a function
process_msg
is executed by every spawned thread. It pops a
StringMsg
from the input queue, processes the message by calling the
StringMsg::process()
method, and pushes the message out. (See Listing 2.)
// The function 'process_msg' is executed by every // spawned input thread. The signature corresponds // to the pthread_create 'start_routine' // File prqueue.cpp void* process_msg( void* arg) { int thidx = ++Thidx; QueueTest* quetest =(QueueTest*)arg; Msg* msg; PRQueue< Msg*>::position pos; cout << "Input thread=" << thidx << " started" << endl; for(;;) { // Wait for the next available message in // input queue and pop it up, get the next // push position reserved in output queue quetest->input_que.pop( msg, quetest->output_que, pos); // Process message msg->process( thidx); // Push processed message into output queue // using reserved position quetest->output_que.push( msg, pos); } return NULL; } |
Listing 2 |
The
pop
method is not only waiting for the next message to arrive in the input queue, it also checks if the message is ready to be popped by looking at the element of the ‘filled’ queue. If data is not filled yet,
pop
will go back to sleep and wait.
Pop logic (Listing 3):
- Lock input queue
-
If input queue is not empty and top element is filled with data,
pop
it (otherwise release lock and go to sleep) - Lock output queue
- Reserve bottom position in output queue.
- Unlock output queue
- Unlock input queue
// Pop data from input queue and reserve position // in output queue file prqueue.hpp void PRQueue::pop( DATA& data, PRQueue& outque, PRQueue::position& pos) { Lock lk( m_mux); // Waiting for the message in input queue - pop // message while( true) { if( m_que.pop( data)) break; // either message has not arrived or position // is not filled wait_while_empty(); } // Reserve position in output queue outque.reserve_pos( pos); } // void PRQueue::reserve_pos( PRQueue::position& pos) { Lock lk( m_mux); m_que.reserve( pos); } |
Listing 3 |
The
push
method copies data to the reserved position of the output queue and sets the ‘filled’ indicator to true. It also releases threads waiting on a condition variable by sending a notification signal (
prqueue.hpp)
– see Listing 4.
// Push data using reserved position into output // queue (prqueue.hpp) void PRQueue::push( const DATA& data, const PRQueue::position& pos) { Lock lk( m_mux); m_que.fill( data, pos); notify_not_empty(); } |
Listing 4 |
Now, the messages are arriving in the output queue in order. If we want to extend the chain of our processing conveyor further, another
PRQueue
can be added to the end. In the test case above we don’t do it: we use a single output thread simply to read processed messages from the output queue and print them out. In that final step, a ‘simple
pop
’ method was used without its second and third arguments (references to the output queue and position value). See Listing 5.
// The function 'print_msg' executed by final // single output thread file prqueue.cpp void* print_msg( void* arg) { QueueTest* quetest =(QueueTest*)arg; Msg* msg; cout << "Output thread started" << endl; for(;;) { // pop-up message from output queue and print it quetest->output_que.pop( msg); msg->print(); delete msg; } return NULL; } |
Listing 5 |
Now, let’s take a look at the auxiliary class
DataQueue
.
As was mentioned before,
DataQueue
is a holder of two STL
deque
s: ‘data’ and ‘filled’. The
DataQueue
also defines ‘structure position’ and methods where the key steps of position reservation and data popping happen.
The
DataQueue
is included in
PRQueue
as a data-member
m_que
(see Listing 6).
// An auxiliary class DataQueue - holder of 'data' // and 'filled' deques template< typename DATA> class DataQueue { public: typedef typename deque< DATA>::pointer data_pointer; typedef typename deque< bool>::pointer filled_pointer; // Structure to hold pointers of reserved // position struct position { position() : data_pnt(0), filled_pnt(0) {} data_pointer data_pnt; filled_pointer filled_pnt; }; // Check if data deque is not empty and front // element is 'filled'. // Copy front data out, pop-up front elements // of both deques bool pop( DATA& out) { if( m_data_que.empty() || ! m_filled_que.front()) return false; out = m_data_que.front(); m_data_que.pop_front(); m_filled_que.pop_front(); return true; } // Add dummy elements to the back of both // deques. // Save pointers of both elements to the output // position void reserve( position& pos) { m_data_que.push_back( m_dummy); m_filled_que.push_back( false); pos.data_pnt = &m_data_que[ m_data_que.size() -1]; pos.filled_pnt = &m_filled_que[ m_filled_que.size() -1]; } // Copy data and set 'filled' indicator by // position void fill( const DATA& data, const position& pos) { *pos.data_pnt = data; *pos.filled_pnt = true; } void push( const DATA& data) { m_data_que.push_back( data); m_filled_que.push_back( true); } private : deque<DATA> m_data_que; deque<bool> m_filled_que; DATA m_dummy; };//DataQueue |
Listing 6 |
To compile and run
PRQueue
test, use the commands in Figure 4.
c++ -I. prqueue.cpp -lpthread # PRQueue test c++ -I. prqueue.cpp -lpthread -DSIMPLE_QUE # SimpleQueue test # Try long message c++ -I. prqueue.cpp -lpthread -DLONG_MSG c++ -I. prqueue.cpp -lpthread -DLONG_MSG -DSIMPLE_QUE a.out [number-of-messages] |
Figure 4 |
Conclusion
The queue with Position Reservation (
PRQueue
) presented here could be useful in multi-threaded applications when the order of streaming messages should be preserved.
PRQueue
will make sure that the order of messages in the output queue exactly matches the order that existed in the input queue, because the next push-back position in the output queue is reserved synchronously with taking the message off the input queue. The reserved spot is later filled with data when the message is done processing and ready.
Reference
A zip file containing the code is available at: prqueue.zip