Threads C++ – Parte 2: Mutex

por Fabio A. Mazzarino

Quando falamos de processamento paralelo existe um problema que pode escalar de maneira muito destrutiva, são as racing conditions, ou condições de corrida. São situações em que vários processos ou threads que estão rodando em paralelo competem pelo mesmo recurso, causando problemas e bugs que podem se tornar bem destrutivos.

Muitos Problemas

Gosto de citar um dos bugs mais famosos da história, o bug do Therac-25, um equipamento de radioterapia que devido a uma racing condition o equipamento poderia entregar uma dose possivelmente fatal de radiação, matando quatro pessoas e incapacitando outras duas. Portanto cuidado com threads não é brincadeira.

Vamos começar com um software simples de gerador e consumidor:

#include <chrono>
#include <iostream>
#include <queue>
#include <random>
#include <thread>

class DataQueue {
    private: 
        std::queue<unsigned> queue;
    public:
        void push(unsigned data) {
 queue.push(data);
 }
        unsigned pop() {
            unsigned data = queue.front();
            queue.pop();
            return data;
        }
        bool empty() {
 return queue.empty();
 }
};

class Supplier {
    public:
        void operator()(const char* name, DataQueue *queue) {
            std::cout << "[" << name << "] Starting Supplier" << std::endl;
            std::default_random_engine generator;
            std::uniform_int_distribution<unsigned> distr(0, 99);
            for ( ; ; ) {
                unsigned data = distr(generator);
                std::cout << "[" << name << "] Generating data: " << data << std::endl;
                queue->push(data);
                 std::this_thread::sleep_for(std::chrono::seconds(5));
            }
        }
};

class Consumer {
    public:
        void operator()(const char* name, DataQueue *queue) {
            std::cout << "[" << name << "] Starting Consumer" << std::endl;
            for ( ; ; ) {
                if (!queue->empty()) {
                    unsigned data = queue->pop();
                    std::cout << "[" << name << "] Consuming data: " << data << std::endl;
                }
                std::this_thread::sleep_for(std::chrono::seconds(2));
            }
       }
};

int main () {
    DataQueue queue;

    std::thread supplier(Supplier(), "Supplier", &queue);
    std::thread consumer1(Consumer(), "Consumer #1", &queue);
    std::thread consumer2(Consumer(), "Consumer #2", &queue);

    supplier.join();
    consumer1.join();
    consumer2.join();
    return 0;
}

A classe DataQueue é somente um encapsulamento para a classe std::queue. A classe Supplier é uma classe que gera números randômicos que são disponibilizados para consumo em um objeto DataQueue. A classe Consumer consome dados colocados em um objeto DataQueue. Para indicar geração e consumo vamos utilizar std::cout, imprimindo os dados na saída.

Note que neste programa utilizamos de forma diferente o std::thread, através de uma classe que sobrecarrega o operador ().

O programa é executado continuamente por tempo indeterminado. Os primeiros segundos da execução entrega a seguinte saída:

[Supplier] Starting Supplier
[Supplier] Generating data: [Consumer #1] Starting Consumer
0
[Consumer #2] Starting Consumer
[Consumer #2] Consuming data: 0
[Supplier] Generating data: 13
[Consumer #1] Consuming data: 13
[Supplier] Generating data: 75
[Consumer #1] Consuming data: 75
[Supplier] Generating data: 45
[Consumer #2] Consuming data: 45
[Supplier] Generating data: 53
[Consumer #2] Consuming data: 53
[Supplier] Generating data: 21
[Consumer #2] Consuming data: 21

Note a segunda linha da saída. Os dados das duas primeiras threads estão misturados. São duas threads competindo pelo mesmo recurso, gerando uma saída confusa.

O mesmo pode acontecer com o acesso aos dados da fila, fazendo com que os dois consumidores entendam que existe um dado disponível na vila, e os dois podem consumir o mesmo dado, o um deles consumir um dado errado.

Solução: Mutex

Mutex (MUTual EXclusion) é uma técnica muito utilizada, consistem em criar um mecanismo de semáforo que evita que mais de uma thread acesse simultaneamente o mesmo recurso.

Para usar um objeto mutex basta solicitar o lock antes de acessar os dados, através do método std::mutex::lock(), o método ficara bloqueado até que seja possível efetuar o lock exclusivo. Para liberar o lock basta chamar o método std::mutex::unlock().

Para corrigir o exemplo anterior vamos usar dois mutexes, um para controlar a saída padrão, e outro pra controlar a fila de dados:


#include <chrono>
#include <iostream>
#include <mutex>
#include <queue>
#include <random>
#include <string>
#include <thread>

class DataQueue {
    public:
        std::queue<unsigned> queue;
        std::mutex mutex;

        void lock() { mutex.lock(); }
        void unlock() { mutex.unlock(); }
        bool empty() { return queue.empty(); }
        void push(unsigned data) { queue.push(data); }
        unsigned pop() {
            unsigned data = queue.front();
            queue.pop();
            return data;
        }
};

class Supplier {
    public:
        void operator()(const char* name, DataQueue *queue, std::mutex *coutmtx) {
            coutmtx->lock();
            std::cout << "[" << name << "] Starting Supplier" << std::endl;
            coutmtx->unlock();

            std::default_random_engine generator;
            std::uniform_int_distribution<unsigned> distr(0, 99);

            for ( ; ; ) {
                unsigned data = distr(generator);
                coutmtx->lock();
                std::cout << "[" << name << "] Generating data: " << data << std::endl;
                coutmtx->unlock();

                queue->lock();
                queue->push(data);
                queue->unlock();
                std::this_thread::sleep_for(std::chrono::seconds(5));
            }
        }
};

class Consumer {
    public:
        void operator()(const char* name, DataQueue *queue, std::mutex *coutmtx) {
            coutmtx->lock();
            std::cout << "[" << name << "] Starting Consumer" << std::endl;
            coutmtx->unlock();

            for ( ; ; ) {
                queue->lock();
                if (!queue->empty()) {
                    unsigned data = queue->pop();
                    coutmtx->lock();
                    std::cout << "[" << name << "] Consuming data: " << data << std::endl;
                    coutmtx->unlock();
                }
                queue->unlock();
                std::this_thread::sleep_for(std::chrono::seconds(3));
            }
       }
};


int main () {
    DataQueue queue;
    std::mutex coutmtx;

    std::thread supplier(Supplier(), "Supplier", &queue, &coutmtx);
    std::thread consumer1(Consumer(), "Consumer #1", &queue, &coutmtx);
    std::thread consumer2(Consumer(), "Consumer #2", &queue, &coutmtx);

    supplier.join();
    consumer1.join();
    consumer2.join();
    return 0;
}

Para evitar racing condition na fila de dados é utilizado um mutex dentro da classe DataQueue, adicionando métodos para bloquear e desbloquear o acesso aos dados. Assim qualquer acesso aos dados dos objetos DataQueue devem ser precedidos de uma chamada ao método lock, e seguido de uma chamada ao método unlock.

Já para o acesso ao std::cout foi criado um mutex coutmtx, e da mesma forma é necessário efetuar o lock e o unlock pela thread que estiver usando o std::cout.

A saída passará a ser:

[Supplier] Starting Supplier
[Supplier] Generating data: 0
[Consumer #1] Starting Consumer
[Consumer #1] Consuming data: 0
[Consumer #2] Starting Consumer
[Supplier] Generating data: 13
[Consumer #2] Consuming data: 13
[Supplier] Generating data: 75
[Consumer #2] Consuming data: 75
[Supplier] Generating data: 45
[Consumer #2] Consuming data: 45
[Supplier] Generating data: 53
[Consumer #1] Consuming data: 53
[Supplier] Generating data: 21
[Consumer #2] Consuming data: 21
[Supplier] Generating data: 4
[Consumer #1] Consuming data: 4

Note que logo de cara não ocorre a racing condition no início da execução, assim como não irá ocorrer falha caso duas threads de Consumer tente acessar o DataQueue simultaneamente.

Oportunidades de Melhoria

Uma boa oportunidade de melhoria é criar mecanismos para que as funções de lock e unlock sejam obrigatórios durante os acessos aos objetos de DataQueue. Para isso é necessário utilizar o método std::muntex::try_lock, que tenta fazer o lock, mas caso não consiga retorna imediatamente com o valor false, assim se não houver nenhum lock o acesso tem que ser negado.