Boost mutex e condizioni

Proseguendo la lettura dell'articolo di Bill Kempf su Dr.Dobb's che fa da introduzione alla libreria thread di boost, arriviamo alle variabili di condizione.

A volte non basta un lock su una risorsa condivisa, dato che occorre che la risorsa sia in un particolare stato prima di poterla usare. E questo é proprio lo scopo delle variabili di condizione.

L'idea generale é che il thread blocca un mutex sulla risorsa condivisa e quindi verifica che lo stato sia quello atteso. Se non lo é si mette in attesa sulla condizione. Facendo ciò libera il mutex, permettendo l'accesso alla risorsa da parte di un altro thread. Quando lo stato diventa tale per cui il thread in attesa sulla condizione può tornare ad agire, riceve una notifica che gli permette di ripartire.

Nell'esempio che segue, tratto dall'articolo e leggermente modificato da me, vengono creati due thread, uno per leggere e uno per scrivere su di un buffer condiviso. Per essere sicuri che si inizi a leggere quando non c'é ancora niente da leggere - e quindi avere certamente una situazione in cui é necessario che il lettore venga messo in attesa - ho inserito una pausa nel thread principale tra la creazione del thread in lettura e quello in scrittura.

La funzione utilizzata é boost::this_thread::sleep() che fa "dormire" il thread per un determinato intervallo (o fino a un certo momento). Per specificare quanto tempo debba dormire il thread é possibile usare la funzione boost::posix_time::milliseconds() che converte un intero in millisecondi.

Le due funzioni reader e writer sono simmetriche, leggono e scrivono su di un buffer rappresentato dalla classe Buffer. Notiamo che mettono un lock su un mutex per accedere la risorsa condivisa cout.

La classe Buffer é praticamente un wrapper costruito intorno alla risorsa condivisa, una lista circolare di interi. Il nostro problema é che non possiamo leggere da questo container quando é vuoto, e nemmeno scriverci quando é pieno. Dunque, prima di leggere e scrivere dobbiamo verificare la sua condizione.

Vediamo la funzione put(). Per prima cosa crea uno scoped_lock sul mutex che sorveglia l'accesso al circular_buffer, quindi controlla se é pieno. Se questo é il caso, stampa un messaggio (controllando l'accesso a cout con uno scoped_lock sul mutex di competenza) e si mette in attesa sulla condizione relativa al lock.

Abbiamo dunque un mutex, un lock che usa il mutex per gestire l'accesso alla risorsa e una condizione sul lock che permette al thread di passare il controllo in attesa di tempi migliori.

Quando abbiamo la possibilità, mettiamo il valore passato nel container e notifichiamo alla condizione che ne abbiamo cambiato lo stato.

La get() funziona in modo equivalente: se il container é vuoto si mette in attesa. Appena possibile legge un elemento, lo elimina dal container e notifica alla condizione che lo stato del container é cambiato.

Questo il codice risultante:

#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/circular_buffer.hpp>

using namespace std;

namespace {
boost::mutex mio;

class Buffer
{
private:
static const int BUF_SIZE = 3;

boost::mutex mcb;
boost::condition cond;
boost::circular_buffer<int> cb;

public:
Buffer(int size = BUF_SIZE) : cb(size) {}

void put(int m) {
boost::mutex::scoped_lock lock(mcb);
if(cb.full())
{
{
boost::mutex::scoped_lock lock(mio);
cout << "Buffer is full. Waiting..." << endl;
}
while (cb.full())
cond.wait(lock);
}
cb.push_back(m);
cond.notify_one();
}

int get()
{
boost::mutex::scoped_lock lock(mcb);
if (cb.empty())
{
{
boost::mutex::scoped_lock lock(mio);
cout << "Buffer is empty. Waiting..." << endl;
}
while (cb.empty())
cond.wait(lock);
}

int i = cb.front();
cb.pop_front();
cond.notify_one();
return i;
}
};

const int Buffer::BUF_SIZE;

Buffer buf;

const int ITERS = 20;

void writer()
{
for (int n = 0; n < ITERS; ++n)
{
{
boost::mutex::scoped_lock lock(mio);
cout << "sending: " << n << endl;
}
buf.put(n);
}
}

void reader()
{
for (int x = 0; x < ITERS; ++x)
{
int n = buf.get();
{
boost::mutex::scoped_lock lock(mio);
cout << "received: " << n << endl;
}
}
}
}

Nessun commento:

Posta un commento