Модераторы: xvr

Поиск:

Ответ в темуСоздание новой темы Создание опроса
> Синхронизация pthread потоков при помощи barier, реализация чередующегося вызова потоков 
V
    Опции темы
Gluttton
Дата 22.9.2012, 22:48 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Начинающий
***


Профиль
Группа: Завсегдатай
Сообщений: 1170
Регистрация: 28.8.2008
Где: Феодосия

Репутация: нет
Всего: 54



Итак. Во-первых, хочу поделиться решением вопроса, по которому создавался топик (вдруг по поиску сюда, кто-нибудь попадет).

Еще раз сформулирую задачу.
Есть источник данных и есть потребитель, принимающий данные определенными порциями. Терять данные нельзя. Встает задача гранулирования данных источника для передачи их потребителю. Поскольку операция подготовки данных и операция их обработки ресурсоемкие, то целесообразно реализовать их в отдельных потоках и синхронизировать их таким образом, что бы пока потребитель данных не примет на обработку очередную порцию, поставщик не приступал к накоплению новых данных (т.е. собственно "Терять данные нельзя").

Реализация (суть осталась такая же, как и описано в первом посте). 
Код. Я постарался (насколько у меня это получилось) сделать его прозрачнее и читабельней. Комментариями решил код не захламлять, а несколько пояснений приведу здесь. Задачи поставки данных и их обработки вынесены в отдельные функции (Produce и Consume), которые запускаються, каждая в отдельном одноименном потоке. Суть синхронизации та же, что и изначально: барьер синхронизации обработки данных (barrierDataProcSync) и мютекс на доступ к указателю на актуальные данные (mutexData). Каждые поток готовит/обрабатывает данные, а затем по факту подготовки данных/окончания обработки "лочиться" на барьере. Для исключения проблемы обусловленной случайной очередностью захвата мютекса на указатель данных, которая и приводила к тому, что операции чтения и обработки вызывались с нарушением очередности (например: операция чтения последней захватила мютекс перед барьером и первой - после) введена дополнительная переменная - token, которая обеспечивает захват мютекса в первую очередь обработчиком. Если токен "не включен" (а это значит, что поставщик подобрался к мютексу на данные первым), то поставщик ожидает токен (condToken), а если "включен", то без задержек проходит на следующую итерацию обработки и сбрасывает токен. Обработчик всегда форсированно (без всяких проверок) включает токен и шлет сигнал поставщику. mutexToken - мютекс на доступ к токену.

Добавил в код обработчика проверку на повторение данных (что бы не логи глазами парсить, а программа сама "падала").

Код

#include <iostream>
#include <pthread.h>
#include <stdlib.h>


void * Produce (void * container);
void * Consume (void * container);


pthread_barrier_t * barrierDataProcSync = new pthread_barrier_t;
pthread_mutex_t     mutexData  = PTHREAD_MUTEX_INITIALIZER;
bool                token      = false;
pthread_mutex_t     mutexToken = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t      condToken  = PTHREAD_COND_INITIALIZER;

const unsigned int SET_COUNT = 2;
const unsigned int DATA_SIZE = 42;


int main (int argc, char * argv [])
{
    int ** container  = NULL;
    container = new int * [SET_COUNT + 1];
    container [0] = new int [DATA_SIZE];
    container [1] = new int [DATA_SIZE];
    container [0][0] = 0;
    container [1][0] = 1;

    pthread_barrier_init (barrierDataProcSync, NULL, 2);
    pthread_t producer;
    pthread_t consumer;
    pthread_create (&producer, NULL, Produce, (void *) container);
    pthread_create (&consumer, NULL, Consume, (void *) container);
    pthread_join   (producer,  NULL);
    pthread_join   (consumer,  NULL);

    pthread_barrier_destroy (barrierDataProcSync);
    delete [] container [0];
    delete [] container [1];
    delete [] container;

    return 0;
}


void * Produce (void * container)
{
    int ** data  = static_cast <int **> (container);

    int set = 0;
    data [SET_COUNT] = data [set];

    while (1) {
        data [set][0] += SET_COUNT;

        pthread_mutex_lock   (&mutexData);
        data [SET_COUNT] = data [set];
        std::cout << "Set: " << data [SET_COUNT][0] << " (" << data [SET_COUNT] << ")" << std::endl;
        pthread_mutex_unlock (&mutexData);
        pthread_barrier_wait (barrierDataProcSync);
        pthread_mutex_lock   (&mutexToken);
        if (token == false) {
            pthread_cond_wait (&condToken, &mutexToken);
        }
        token = false;
        pthread_mutex_unlock (&mutexToken);

        ++set %= SET_COUNT;
    }
    
    return NULL;
}


void * Consume (void * container)
{
    int ** data  = static_cast <int **> (container);
    int lastData = 0;

    while (1) {
        pthread_barrier_wait (barrierDataProcSync);
        pthread_mutex_lock   (&mutexData);
        pthread_mutex_lock   (&mutexToken);
        token = true;
        pthread_cond_signal  (&condToken);
        pthread_mutex_unlock (&mutexToken);
        int * i = (static_cast <int **> (data) ) [SET_COUNT];
        if (lastData == i [0]) {
            std::cout << "Error! Get the same data." << std::endl;
            exit (1);
        }
        lastData = i [0];
        std::cout << "Get:\t\t\t" << i [0] << " (" << i << ")" << std::endl;
        pthread_mutex_unlock (&mutexData);
    }

    return NULL;
}


Запускал - работает. Перенес это решение на рабочий проект - тоже работает. Ради интереса "нагрузил" оба потока работой (один читает данные из /dev/urandom, а другой их множит), итеративно пробовал сбалансировать нагрузку на них сделав ее одинаковой и добился загрузки процессора на двухядерном ПК в 187%.

Пытался найти дедлоки аналитически и построил таблицу инвариантов. Вроде бы взаимных блокировок нет. Самая неблагоприятная ситуация - C1P4, т.е. если обработчик "залочен" на барьере, а поставщик "залочен" на условии и ждет сигнала от обработчика, но эта ситуация исключается алгоритмом: if (token == false).
Цитата

        Consume                         C1.                                   C2.                  C3.
                                        barrier_wait (barrierDataProcSync)    lock (&mutexData)    lock (&mutexData)
Produce                                                                                            lock (&mutexToken)

P1. lock (&mutexData)

P2. barrier_wait (barrierDataProcSync)

P3. lock (&mutexToken)

P4. lock (&mutexToken)
    cond_wait (&condToken, &mutexToken)



Теперь касательно вопроса органицазии обработки в целом.
Цитата

Цитата(boostcoder @  19.9.2012,  11:04 Найти цитируемый пост)
а что произойдет если их не забрать?

Они безвозвратно проподут.

Вот в этом месте я был не прав. Драйвер имеет сфой буфер, размер которого удовлетворяет критерию валидности. Таким образом задача моего циклического буфера сводится к временному хранилищу для накопления полной порции данных и нет никакой необходимости разносить в два потока задачу наполнения циклического буфера и чтения из него. "Амортизация" производительности уже и так выполняется в буфере драйвера. Т.о. чтение из псевдоустройства, накопление данных и их валидация - в одном потоке, а обработка - в другом, которые синхронизируются по описанной выше схеме.

Цитата


                *** * * **    ***][******]      [******]

/dev/device1  -------------> CircularBuffer ---------------> Processing ------------>

                                                         |                |
                       therad1                           |     thread2    |
                                                         |                |
 Read data                          Set container        |   Process data |
                                                         |    ---->----   |
-------------------------------------------------------->|--->|       |-->|
                                                         |    ----<----   |
                                                         |                |
                                                         |                |


boostcoderxvr огромное спасибо за помощь!
boostcoder, отдельное спасибо за терпение!

На мой взгляд вопрос решен.


--------------------
Слава Україні!
PM MAIL   Вверх
xvr
Дата 23.9.2012, 09:46 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
****


Профиль
Группа: Комодератор
Сообщений: 7046
Регистрация: 28.8.2007
Где: Дублин, Ирландия

Репутация: 20
Всего: 223



У вас получилась классическая multi-thread очередь на 1 элемент.  smile 
Кстати, мне приходилось решать подобную задачу - обработка потока пакетов с данными от PCIe устройства. Я сделал кольцевой буфер прямо в драйвере и экспортировал его в User Space. Синхронизацией занимался драйвер, а прикладная программа из этого буфера могла только читать. Правда размер буфера был довольно скромным - до 3М. Кстати, устройство тоже писало напрямую в этот буфер (по DMA)

PM MAIL   Вверх
Ответ в темуСоздание новой темы Создание опроса
Правила форума "С/С++: Программирование под Unix/Linux"
xvr
  • Проставьте несколько ключевых слов темы, чтобы её можно было легче найти.
  • Не забывайте пользоваться кнопкой "Код".
  • Вопросы мобильной разработки тут
  • Телепатов на форуме нет! Задавайте чёткий, конкретный и полный вопрос. Указывайте полностью ошибки компилятора и компоновщика.
  • Новое сообщение должно иметь прямое отношение к разделу форума. Флуд, флейм, оффтопик запрещены.
  • Категорически запрещается обсуждение вареза, "кряков", взлома программ и т.д.

Если Вам понравилась атмосфера форума, заходите к нам чаще! С уважением, xvr.

 
 
0 Пользователей читают эту тему (0 Гостей и 0 Скрытых Пользователей)
0 Пользователей:
« Предыдущая тема | C/C++: Программирование под Unix/Linux | Следующая тема »


 




[ Время генерации скрипта: 0.0730 ]   [ Использовано запросов: 22 ]   [ GZIP включён ]


Реклама на сайте     Информационное спонсорство

 
По вопросам размещения рекламы пишите на vladimir(sobaka)vingrad.ru
Отказ от ответственности     Powered by Invision Power Board(R) 1.3 © 2003  IPS, Inc.