Mulithreading C++20 : producteur-consommateur

On propose une solution au problème du producteur-consommateur en C++.

Description

Soit 2 processus cycliques appelés producteur et consommateur. À chacune de leur itération respective, le producteur génère une donnée, et le consommateur l’exploite. Les données produites ou consommées sont situées dans une zone mémoire partagée accessible aux 2 processus. Les 2 processus ne peuvent cependant pas travailler simultanément sur la même zone mémoire.

Rôle du producteur

Le producteur remplit la zone tant qu’il y a de l’espace disponible. Il s’arrête lorsque toute la zone est occupée, et reprend son activté lorsqu’une place se libère.

Rôle du consommateur

À l’inverse le consommateur prélève les données de la zone partagée, jusqu’à ce que celle-ci soit vide, auquel cas il s’arrête jusqu’à ce qu’une donnée soit de nouveau disponible.

Hypothèses

On suppose que :

  1. la consommation se fait dans l’ordre de la production;
  2. la zone partagée est de taille fixe;
  3. les vitesses des processus sont quelconques.

Analyse

Communication inter-processus sur la disponibilité de données

Les processus doivent pouvoir se signaler entre eux de la possilité ou non de produire ou consommer les données.

Dans les cas aux limites (la zone est soit pleine, soit vide), une fois que la zone a été remplie par le producteur, il ne pourra continuer que quand le consommateur lui aura signalé qu’une place est de nouveau disponible. À l’inverse, quand le consommateur aura consommé toutes les données, il ne pourra continuer que quand le producteur lui signalera qu’une donnée est de nouveau disponible.

Dans les cas intermédiaires (la zone n’est ni pleine, ni vide), chacun peut réaliser son action.

Ce mécanisme est réalisé à l’aide de sémaphores:

  • un sémaphore de production (pour pouvoir remplir)
  • un sémaphore de consommation (pour pouvoir lire)

Section critique

Dans les cas où aucun des 2 processus n’est stoppé par l’autre (la zone n’est ni pleine, ni vide), il faut assurer la cohérence des données décrivant la zone mémoire, en particulier la valeur indiquant le niveau de remplissage ou la taille actuelle. Cette valeur ne peut être modifiée que par un seul des 2 processus. Pour verrouiller sa modificiation on utilisera un mutex.

Code

Les données partagées

On suppose que le buffer est initialisé vide. Les valeurs des sémaphores de production et consommation seront donc respectivement égales à la taille du buffer et à zéro.

// Fixed-size fuffer
constexpr int SIZE = 50;
std::array<int, SIZE> shared_buffer;

// Index of the last produced value
int current_index = 0;

// Mutex to protect current_index
std::mutex buffer_mutex;

// Initial value of production semphore is equal to the buffer size (empty at start)
std::counting_semaphore<SIZE> sem_production{SIZE};

// Initial value of consuming is 0 (nothing in the buffer)
std::counting_semaphore<SIZE> sem_consumption{0};

Le producteur

Le producteur ne peut pas agir tant que le sémaphore indiquant la possibilté de produire ne peut peut être acquis. Une fois acquis, il ajoute une valeur au buffer (ici un nombre au hasard) et met à jour sa taille courante en prenant soin d’obtenir un accès exclusif.


void producer(void* args) {
  while(true) {

    // Sleeps if it cannot produce
    sem_production.acquire();
    {
      // Lock and updates the buffer
      std::lock_guard<std::mutex> lock(buffer_mutex);
      shared_buffer[current_index++] = std::rand() % 100;
    }

    // Wakes the consumer up
    sem_consumption.release();
  }
}

Une fois fait, il incrémente la valeur du sémaphore de consommation pour signaler la possiblité de lire le buffer et ainsi débloquer le second processus.

Le consommateur

Le comportement est symétrique par rapport au producteur. Il tente d’acquérir le sémaphore de consommation (ou s’endort sinon), réalise son action et incrémente le sémaphore de production.


void consumer(void* args) {
  while(true) {

    // Sleeps if it cannot consume
    sem_consumption.acquire();
    {
      // Locks and updates the buffer
      std::lock_guard<std::mutex> lock(buffer_mutex);
      int value = shared_buffer[--current_index];

      // Do something with value... 
      (void) value;
    }

    // Wakes the producer up
    sem_production.release();
  }
}

Résultat

Le code est disponible sur le dépôt Github