Модераторы: xvr

Поиск:

Ответ в темуСоздание новой темы Создание опроса
> Синхронизация pthread потоков при помощи barier, реализация чередующегося вызова потоков 
V
    Опции темы
Gluttton
Дата 17.9.2012, 23:11 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Начинающий
***


Профиль
Группа: Завсегдатай
Сообщений: 1170
Регистрация: 28.8.2008
Где: Феодосия

Репутация: нет
Всего: 54



Доброго времени суток!

Постановка задачи.
Существует некий источник, данные из которого в гранулированном виде должны быть отправлены на обработку. Поскольку операция буферизации данных относительно протяженная по времени, а операция обработки данных ресурсоемкая (и как следсвие тоже протяженная по времени) вполне логично встает вопрос паралельной реализации этих двух операций.

Необходимо реализовать взаимодействие этих двух потоков таким образом, что бы в случае готовности новой порции данных поставщик данных ожидал готовности обработчика принять эти данные и наоборот в случае готовности обработчика данных он ожидал новой порции от поставщика (собственно ничего неочевидного).

Реализация.
Как вариант принято решение реализовать следующую схему: поставщик вместо одного контейнера, для передачи данных обработчику будет имеет массив таких контейнеров, так что бы, отдавая один их элементов такого массива на обработку, не ожидая ее окончания, приступать к наполнению данных в другой элемент массива. При готовности новой порции данных и окончания обработки старой поставщик и обработчик "меняются" контейнерами (опять же ничего выдающегося).

Код

#include <iostream>
#include <pthread.h>


void * Extern (void * input);

pthread_mutex_t     mutex   = PTHREAD_MUTEX_INITIALIZER;
pthread_barrier_t * barrier = new pthread_barrier_t;
// Количество контейнеров (наборов данных).
// Реализация будет происходить в два потока, а раз так, то нам будет достаточно
// массива размерностью 2 для разделения операций чтения и обработки.
const unsigned int SET_COUNT = 2;


int main (int argc, char * argv [])
{
    // Массив контейнеров (в качестве контейнера используется тоже массив).
    int ** data  = NULL;
    // Номер текущего (с точки зрения поставщика данных) контейнера (набора данных).
    int set      = 0;

    // Дополнительный элемент массива создается для того, что бы служить указателем на контейнер подлежащий обработке.
    data = new int * [SET_COUNT + 1];
    data [0] = new int [42];
    data [1] = new int [42];
    data [0][0] = 1;
    data [1][0] = 2;
    // Вот он, указатель на контейнер подлежащей обработке.
    data [SET_COUNT] = data [set];

    // Вывод отладочной информации.
    std::cout << "Init: data:        " << data        << std::endl;
    std::cout << "Init: data [0]:    " << data [0]    << std::endl;
    std::cout << "Init: data [1]:    " << data [1]    << std::endl;
    std::cout << "Init: data [2]:    " << data [2]    << std::endl;

    pthread_barrier_init (barrier, NULL, 2);
    pthread_t worker;
    pthread_create (&worker, NULL, Extern, (void *) data);

    // Цикл в котором работает т.н. поставщик данных.
    while (1) {
        //sleep (1);
        // Имитация заполнения контейнера (при ожидаемой работе на отладочную печать
        // пойдет непрерывная последовательность чисел от 3 и далее).
        data [set][0] += SET_COUNT;

        // Обмен пустого контейнера на полный.
        pthread_mutex_lock (&mutex);
        data [SET_COUNT] = data [set];
        // Вывод отладочной информации.
        std::cout << "Set: " << data [SET_COUNT][0] << " (" << data [SET_COUNT] << ")" << std::endl;
        pthread_mutex_unlock (&mutex);
        // Ждем до тех пор пока обработчик данных не будет готов принять порцию данных.
        pthread_barrier_wait (barrier);

        ++set %= SET_COUNT;
    }

    delete [] data [0];
    delete [] data [1];
    delete [] data;

    return 0;
}


void * Extern (void * input)
{
    // Цикл в котором работает т.н. обработчик данных.
    while (1) {
        // Ждем до тех пор, пока не поступят новые данные.
        pthread_barrier_wait (barrier);
        pthread_mutex_lock (&mutex);
        // Здесь должна выполняться обработка данных, но вместо этого просто отладочная печать.
        int * i = (static_cast <int **> (input) ) [SET_COUNT];
        std::cout << "Get:\t\t\t" << i [0] << " (" << i << ")" << std::endl;
        pthread_mutex_unlock (&mutex);
        //sleep (1);
    }

    return 0;
}


Компилируем:
Цитата

g++ main.cpp -Wall -lpthread -o sync


Для отладки запустип программу скомпилированную с раскомментированными строками "sleep (1);" сначала для поставщика, а затем обработчика.
При этом получим следующее.
Для случая, когда поставщик данных в цикле уходи в сон:
Цитата

Init: data:        0x861040
Init: data [0]:    0x861060
Init: data [1]:    0x861110
Init: data [2]:    0x861060
Set: 3 (0x861060)
Get:    3 (0x861060)
Set: 4 (0x861110)
Get:    4 (0x861110)
Set: 5 (0x861060)
Get:    5 (0x861060)
Set: 6 (0x861110)
Get:    6 (0x861110)
Set: 7 (0x861060)
Get:    7 (0x861060)


Для случая, когда обработчик в цикле уходит в сон:
Цитата

Init: data:        0x18e4040
Init: data [0]:    0x18e4060
Init: data [1]:    0x18e4110
Init: data [2]:    0x18e4060
Set: 3 (0x18e4060)
Set: 4 (0x18e4110)
Get:    4 (0x18e4110)
Get:    4 (0x18e4110)
Set: 5 (0x18e4060)
Get:    5 (0x18e4060)
Set: 6 (0x18e4110)
Get:    6 (0x18e4110)
Set: 7 (0x18e4060)
Get:    7 (0x18e4060)


Причем в дальнейшем "сдвоенные" вызовы встречаются еще.

Вопрос
Как достичь строго поочередного выполнения операций? Пригоден ли описанный пример для реализации поставленной задачи в принципе и если да, то какая в нем ошибка? А если нет, то как лучше реализовать поставленную задачу?
Буду благодарен за любую помщь или совет!


P.S. Я так понимаю, что "сдвоенные" вызовы - это результат случайной очередности захвата mutex'a потоками после преодоления ими барьера? Поскольку требования к очередности захвата mutex'a, исходя из задачи, определены - первым mutex должен всегда захватывать обработчик, то напрашивается pthread_cond_wait... Что то вроде такого:
Код

pthread_cond_t      cond    = PTHREAD_COND_INITIALIZER;

...

    while (1) {
        data [set][0] += SET_COUNT;

        pthread_mutex_lock (&mutex);
        data [SET_COUNT] = data [set];
        std::cout << "Set: " << data [SET_COUNT][0] << " (" << data [SET_COUNT] << ")" << std::endl;
        pthread_mutex_unlock (&mutex);
        pthread_barrier_wait (barrier);

        pthread_mutex_lock (&mutex);
        pthread_cond_wait  (&cond, &mutex);
        pthread_mutex_unlock (&mutex);

        ++set %= SET_COUNT;
    }

...

    while (1) {
        pthread_barrier_wait (barrier);
        pthread_mutex_lock (&mutex);
        pthread_cond_signal (&cond);
        int * i = (static_cast <int **> (input) ) [SET_COUNT];
        std::cout << "Get:\t\t\t" << i [0] << " (" << i << ")" << std::endl;
        pthread_mutex_unlock (&mutex);
    }

Но не будет ли это "перегруз" который сведет на нет всю оптимизацию - это во-первых, а во-вторых: если хоть один pthread_cond_signal будет отправлен до того, как наступит pthread_cond_wait, то мы получим deadlock.


--------------------
Слава Україні!
PM MAIL   Вверх
boostcoder
Дата 18.9.2012, 09:28 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


pattern`щик
****


Профиль
Группа: Завсегдатай
Сообщений: 5458
Регистрация: 1.4.2010

Репутация: 16
Всего: 110



Gluttton, во-первых - ЯП реализации какой? а то код смешанный.
во-вторых - производитель и потребитель всегда в кол-ве одной штуки?
в-третьих - расскажите больше про производитель: 1)откуда берет данные, 2)алгоритм получения данных, 3)суммарное время задержек на получение данных относительно 100% времени работы программы.
в-четвертых - расскажите больше о потребителе: чем ограничивается пропускная способность.

PM WWW   Вверх
Gluttton
Дата 18.9.2012, 11:03 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Начинающий
***


Профиль
Группа: Завсегдатай
Сообщений: 1170
Регистрация: 28.8.2008
Где: Феодосия

Репутация: нет
Всего: 54



1. С++.
2. Да, производитель всегда один и потребитель тоже всегда один.
3.1. Больше всего боялся этого вопроса... Есть некое PCI-устройство, которое генерирует данные со скоростью 4 Мб/с. Данные из устройства читает character device драйвер, который для меня предствален как файл /dev/device1. Это все не мое, теперь обо мне. Центральный элемент - обработчик данных, все остальные компоненты - сервисные (т.е. призваны удовлетворить его потребности). Обработчик данных принимает данные на обработку некоторыми логическими порциями, поскольку алгоритм обработки существенно зависит от поступивших на обоботку данных, которые по своей природе случайные, то и вычислительные затраты на обработку некоторой порции данных в общем случае случайны. Т.о. "пропускная способность" обработчика колеблется. Терять данные плохо - это с одной стороны, но данные достаточно быстро устаревают (валидность данных около 1 с) - с другой стороны. 
3.2. Поэтому реализован следующий механизм: данные из устройства в отдельном потоке читаются в циклический буфер размер которого соответствует критерию валидности данных, данные из циклического буфера читаются в другом потоке и (а вот тут загвоздочка) в отдельном потоке выполняется их обработка.
3.3. Сложный вопрос... Наверное с этого и нужно было бы начать... Не могу сказать однозначно, но в том потоке в котором производится наполнение циклического буфера в user space я успеваю вычитать данные из устройства (переданные мне драйвером из kernel space) и если при следующем сеансе чтения я получу меньше данных, чем запросил (а прошу я их всегда по 32 КБ), то я делаю, вывод, что данных мало и иду спать на сотню-другую наносекунд - и в целом при таком подходе я успеваю. Т.е. опосредованно я прихожу к выводу, что операция чтения составляет малый (менее 10) процент от общего времени работы программы.
4. См. 3.1. (если не достаточно, то уточню).


--------------------
Слава Україні!
PM MAIL   Вверх
xvr
Дата 18.9.2012, 11:50 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
****


Профиль
Группа: Комодератор
Сообщений: 7046
Регистрация: 28.8.2007
Где: Дублин, Ирландия

Репутация: 20
Всего: 223



Одного барьера вам явно недостаточно  - надо как то сигнализировать, что есть данные для обработки.
Тут напрашивается очередь (хотя бы в виде std::deque<>) и семафор, который будет считать количество пакетов данных в очереди

PM MAIL   Вверх
boostcoder
Дата 18.9.2012, 19:03 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


pattern`щик
****


Профиль
Группа: Завсегдатай
Сообщений: 5458
Регистрация: 1.4.2010

Репутация: 16
Всего: 110



Gluttton, я не вижу надобности ни в одном потоке, кроме основного.
сейчас обдумаю и выдам...

уточните один момент: Вы считываете 32кб и отдаете на обработку, и потом снова считываете? и так по кругу?

PM WWW   Вверх
Gluttton
Дата 18.9.2012, 23:08 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Начинающий
***


Профиль
Группа: Завсегдатай
Сообщений: 1170
Регистрация: 28.8.2008
Где: Феодосия

Репутация: нет
Всего: 54



Цитата(boostcoder @  18.9.2012,  19:03 Найти цитируемый пост)
уточните один момент: Вы считываете 32кб и отдаете на обработку, и потом снова считываете? и так по кругу?

Я читаю так:
Код

readBytesCount  = read (file, &buffer [head], readingBytesCount);

read - функция из unistd.h, т.е. я прошу 32 КБ, а дают мне как правило меньше. Больше просить вообще смысла нет, т.к. больше 32 КБ все-равно не приходит. Старшие товарищи подсказывают, что это связано с механизмом передачи данных из простанства ядра в пользовательское. Размер порции очень сильно колеблеться, но в среднем около 16 КБ, иногда, что бы получить порцию нужно обратиться несколько (2 - 3) раза, а иногда достаточно и одного.

Цитата(boostcoder @  18.9.2012,  19:03 Найти цитируемый пост)
я не вижу надобности ни в одном потоке, кроме основного.

Мне начинает казаться, что это яркий пример преждевременной оптимизации. На самом деле в производительность мы еще не упорлись, просто я, ожидая больших нагрузок, пытаюсь максимально "вылизать" эту операцию...


Аргумент в пользу наполнения циклического буфера и вычитки данных из него в разных потоках.
Данные из устройства поступают с постоянной скоростью. Размер порции величина переменная, но детерменированная (определяется пользователем) и постоянна, на заданном большом, относительно периода обновления данных, отрезке времени. Продолжительность работы алгоритма обработки величина случайная. Так вот. Наличие буфера, способного растягиваться в небольших пределах полезно, поскольку для тех случаев, когда алгоритм не уложится в отведенный квант времени (между получением порций данных) данные не потеряются. Скорее всего, за данными, которые требую значительных затрат на обработку, будут следовать данные, не требующие таких затрат и данные накопленные в буфере будут в "ускоренном" варианте обработаны без потерь, после чего буфер "сожмется".

А вот аргументом в пользу разноса по потокам обработки данных и гранулирования (тема топика) было то, что... Ну... Пока данные обрабатываются, мы готовим новую порцию, и когда старые данные обработаны, нам не нужно тратьить время на формирование новой порции, а она уже готовая и ждет, а раз так, то мы выигрываем время на обработку данных (!), поскольку мы должны успеть сделать все между поступлением порций данных, и если наполнять контейнер и обрабатывать данные в один поток, то это две операции, а если выполнять их паралельно... Ну в общем суть ясна.
С другой стороны, честно говоря, операция по наполнению контейнера не особо трудоемка и в процентном соотношении (условно) к операции обработки составляет не более одного процента. По сути это 60-70 строк "плоского" кода (до десятка условий и присвоения) с одним циклом внутри которого простая начинка с шагом два байта по порции данных (валидация).


--------------------
Слава Україні!
PM MAIL   Вверх
boostcoder
Дата 18.9.2012, 23:37 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


pattern`щик
****


Профиль
Группа: Завсегдатай
Сообщений: 5458
Регистрация: 1.4.2010

Репутация: 16
Всего: 110



Цитата(Gluttton @  18.9.2012,  23:08 Найти цитируемый пост)
readBytesCount  = read (file, &buffer [head], readingBytesCount);

и, наверное, последний нюанс... после этой строки добавь плиз этот код, и покажи что выводит:
Код

if ( readBytesCount != readingBytesCount ) {
   int errorc = errno;
   std::cout << errorc << ": " << strerror(errorc) << std::endl;
}


Добавлено через 9 минут и 7 секунд
Цитата(Gluttton @  18.9.2012,  23:08 Найти цитируемый пост)
Пока данные обрабатываются, мы готовим новую порцию, и когда старые данные обработаны, нам не нужно тратьить время на формирование новой порции, а она уже готовая и ждет

стоп!
приведенный тобою фрагмент получения данных из девайса, и пояснение "я прошу 32 КБ, а дают мне как правило меньше." говорят лишь о том, что девайс не блокирует код производящий чтение. таким образом, задержек быть не может в принципе. ну...разве что на сискол и на копирование между разными адресными пространствами.

Цитата(Gluttton @  18.9.2012,  23:08 Найти цитируемый пост)
Старшие товарищи подсказывают, что это связано с механизмом передачи данных из простанства ядра в пользовательское.

не совсем так... лишним является этот фрагмент: "это связано с механизмом передачи данных из простанства ядра в пользовательское."
это поведение объясняется механизмом передачи данных модулем ядра в псевдодевайс в лице ' /dev/device1'.

PM WWW   Вверх
Gluttton
Дата 19.9.2012, 01:04 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Начинающий
***


Профиль
Группа: Завсегдатай
Сообщений: 1170
Регистрация: 28.8.2008
Где: Феодосия

Репутация: нет
Всего: 54



Цитата(boostcoder @  18.9.2012,  23:37 Найти цитируемый пост)
и, наверное, последний нюанс... после этой строки добавь плиз этот код, и покажи что выводит:

ОК.


Цитата(boostcoder @  18.9.2012,  23:37 Найти цитируемый пост)
не совсем так...

Спасибо!


Цитата(boostcoder @  18.9.2012,  23:37 Найти цитируемый пост)
стоп!

Данные из физического устройства поступают постоянным потоком байт (строка 1).
Данные из псевдоустройства поступают неритмичным потоком байт (строка 2).
Данные из буфера поступают в виде порций одинакогого размера, ну а раз так, то с постоянным периодом (строка 3).
Цитата

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

*   *** *   ** * ** *  ** * *     *****  *  *** *  *  ** **  *  ** **

   [*****]   [*****]   [*****]   [*****]   [*****]   [*****]   [*****]

Рис. 1 - Гранулирование данных.

Цитата(boostcoder @  18.9.2012,  23:37 Найти цитируемый пост)
таким образом, задержек быть не может в принципе

Если все действия производить в один поток, то в случае, когда обработка продлиться больше периода поступления порции данных, данные будут утеряны:

Цитата

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

*   *** *   ** * ** *  ** * *     *****  *  *** *  *  ** **  *  ** **

   [*****]   [*****]   [*****]   [*****]   [*****]   [*****]   [*****]

   -++++     -+++      -+++++++++++++++++++++        -++       -+++

Рис. 2 - Потеря данных в случае протяженной обрабтки (- - формирование порции данных; + - выполнение обработки); порции 4 и 5 утеряны (вообще говоря, утеряны не они, 4 + N и 5 + N, где N - колличество порций помещающихся в буфере) из-за того, что мы не забрали вовремя данные из устройства пока были заняты обработкой.

Чтение и передача данных на обработку в разных потоках помогают избежать потери данных
Цитата

                *** * * **    ***][******]      [******]

/dev/device1  -------------> CircularBuffer ---------------> Processing ------------>

                                   |                                   |
               therad1             |               thread2             |
                                   |                                   |
 Read data                         | Set container        Process data |
                                   |                       ^--->--->   |
---------------------------------->|---------------------->|       |-->|
                                   |                       <---<---v   |
                                   |                                   |
                                   |                                   |



* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

*   *** *   ** * ** *  ** * *     *****  *  *** *  *  ** **  *  ** **

   [*****]   [*****]   [*****]   [*****]   [*****]   [*****]   [*****]

   -++++     -+++      -+++++++++++++++++++++ -+++ -++++ -++   -+++

Рис. 3 - Чтение и передача данных на обработку в два потока (существующая реализация), буфер сначала будет "растянут" thread'ом 1 во время длительной обработки, а затем будет "сжат" thread'ом 2, после того, как появиться "свободное время".

Но что бы максимально уменьшить длительность работ между поступлением порций данных, я хотел сделать вот так:
Цитата

                *** * * **    ***][******]      [******]

/dev/device1  -------------> CircularBuffer ---------------> Processing ------------>

                                   |                  |                |
               therad1             |    thread2       |     thread3    |
                                   |                  |                |
 Read data                         | Set container    |   Process data |
                                   |                  |    ---->----   |
---------------------------------->|----------------->|--->|       |-->|
                                   |                  |    ----<----   |
                                   |                  |                |
                                   |                  |                |


* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

*   *** *   ** * ** *  ** * *     *****  *  *** *  *  ** **  *  ** **

   [*****]   [*****]   [*****]   [*****]   [*****]   [*****]   [*****]

   ++++      +++       +++++++++++++++++++++ +++ ++++ ++       +++
   -         -         -                     -   -    -        -

Рис. 4 - Чтение и передача данных на обработку и обработка в три потока (желаемая реализация) при существующих критерях валидности данных позволит выдерживать большие задержки при обработке.

Это сообщение отредактировал(а) Gluttton - 19.9.2012, 01:22


--------------------
Слава Україні!
PM MAIL   Вверх
Gluttton
Дата 19.9.2012, 01:20 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Начинающий
***


Профиль
Группа: Завсегдатай
Сообщений: 1170
Регистрация: 28.8.2008
Где: Феодосия

Репутация: нет
Всего: 54



Цитата(Gluttton @  19.9.2012,  01:04 Найти цитируемый пост)
Данные из буфера поступают в виде порций одинакогого размера, ну а раз так, то с постоянным периодом (строка 3).

А вот и фигушки, не обязательно...

Запросто может быть и так:
Цитата

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

*    ***  *   ** * ** * ** **      ***** *  *** *  *  ** **  *  ** **

     [*****]   [*****] [*****]    [*****]  [*****]   [*****]   [*****]


Что то я начинаю путаться в показаниях...


--------------------
Слава Україні!
PM MAIL   Вверх
Gluttton
Дата 19.9.2012, 10:02 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Начинающий
***


Профиль
Группа: Завсегдатай
Сообщений: 1170
Регистрация: 28.8.2008
Где: Феодосия

Репутация: нет
Всего: 54



Цитата(boostcoder @  18.9.2012,  23:37 Найти цитируемый пост)
и, наверное, последний нюанс... после этой строки добавь плиз этот код, и покажи что выводит:

За двадцать секунд работы выдало полторы тысячи:
Цитата

0: Success

И больше ничего.


--------------------
Слава Україні!
PM MAIL   Вверх
boostcoder
Дата 19.9.2012, 11:04 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


pattern`щик
****


Профиль
Группа: Завсегдатай
Сообщений: 5458
Регистрация: 1.4.2010

Репутация: 16
Всего: 110



т.е. получается так, что пока происходит обработка, необходимо в это время забирать данные из девайса?
а что произойдет если их не забрать?

PM WWW   Вверх
Gluttton
Дата 19.9.2012, 12:07 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Начинающий
***


Профиль
Группа: Завсегдатай
Сообщений: 1170
Регистрация: 28.8.2008
Где: Феодосия

Репутация: нет
Всего: 54



Цитата(boostcoder @  19.9.2012,  11:04 Найти цитируемый пост)
т.е. получается так, что пока происходит обработка, необходимо в это время забирать данные из девайса?

Угу.


Цитата(boostcoder @  19.9.2012,  11:04 Найти цитируемый пост)
а что произойдет если их не забрать?

Они безвозвратно проподут.
Но тут есть нюанс:
Цитата(Gluttton @  18.9.2012,  11:03 Найти цитируемый пост)
Терять данные плохо - это с одной стороны, но данные достаточно быстро устаревают (валидность данных около 1 с) - с другой стороны. 

Т.е. если мы "подвисли" на 10 секунд, то сожалеть о потеряных тысячах порций данных уже не нужно, т.к. они никому не нужны. Нужно все отбросить и начать заново.
А вот если мы начном терять по одной порции по несклько раз в секунду, то это очень плохо.
Это связано с тем, что результат работы обработки зависит от накопления данных. И если мы потеряли данные за 10 секунд, то мы скажем об этом пользователю и продолжим выдавать достоверные результаты, а вот если у нас будут хоть и маленькие, но постоянные пропуски, то наши результаты будут хоть и очень близки к достоверным, но не будут таковыми являться никогда.

Вообще терять данные это плохо, хоть валидные хоть не валидные. Просто спасать невалидные нет смысла, т.к. они уже не нужны.


--------------------
Слава Україні!
PM MAIL   Вверх
boostcoder
Дата 19.9.2012, 12:23 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


pattern`щик
****


Профиль
Группа: Завсегдатай
Сообщений: 5458
Регистрация: 1.4.2010

Репутация: 16
Всего: 110



значит нужен основной поток, и вспомогательный для выборки данных.

пишу псевдокодом с использованием boost.

ща...
PM WWW   Вверх
boostcoder
Дата 20.9.2012, 10:32 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


pattern`щик
****


Профиль
Группа: Завсегдатай
Сообщений: 5458
Регистрация: 1.4.2010

Репутация: 16
Всего: 110



заняли вчера меня. сегодня отпишусь.
PM WWW   Вверх
Gluttton
Дата 20.9.2012, 10:36 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Начинающий
***


Профиль
Группа: Завсегдатай
Сообщений: 1170
Регистрация: 28.8.2008
Где: Феодосия

Репутация: нет
Всего: 54



Цитата(boostcoder @  20.9.2012,  10:32 Найти цитируемый пост)
заняли вчера меня. сегодня отпишусь.



Не вопрос! 


И это, того, особо не торопись, а то я тут для себя некоторые факты открываю ;) , отпишусь позднее.


--------------------
Слава Україні!
PM MAIL   Вверх
Gluttton
Дата 22.9.2012, 22:48 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Начинающий
***


Профиль
Группа: Завсегдатай
Сообщений: 1170
Регистрация: 28.8.2008
Где: Феодосия

Репутация: нет
Всего: 54



Итак. Во-первых, хочу поделиться решением вопроса, по которому создавался топик (вдруг по поиску сюда, кто-нибудь попадет).

Еще раз сформулирую задачу.
Есть источник данных и есть потребитель, принимающий данные определенными порциями. Терять данные нельзя. Встает задача гранулирования данных источника для передачи их потребителю. Поскольку операция подготовки данных и операция их обработки ресурсоемкие, то целесообразно реализовать их в отдельных потоках и синхронизировать их таким образом, что бы пока потребитель данных не примет на обработку очередную порцию, поставщик не приступал к накоплению новых данных (т.е. собственно "Терять данные нельзя").

Реализация (суть осталась такая же, как и описано в первом посте). 
Код. Я постарался (насколько у меня это получилось) сделать его прозрачнее и читабельней. Комментариями решил код не захламлять, а несколько пояснений приведу здесь. Задачи поставки данных и их обработки вынесены в отдельные функции (Produce и Consume), которые запускаються, каждая в отдельном одноименном потоке. Суть синхронизации та же, что и изначально: барьер синхронизации обработки данных (barrierDataProcSync) и мютекс на доступ к указателю на актуальные данные (mutexData). Каждые поток готовит/обрабатывает данные, а затем по факту подготовки данных/окончания обработки "лочиться" на барьере. Для исключения проблемы обусловленной случайной очередностью захвата мютекса на указатель данных, которая и приводила к тому, что операции чтения и обработки вызывались с нарушением очередности (например: операция чтения последней захватила мютекс перед барьером и первой - после) введена дополнительная переменная - token, которая обеспечивает захват мютекса в первую очередь обработчиком. Если токен "не включен" (а это значит, что поставщик подобрался к мютексу на данные первым), то поставщик ожидает токен (condToken), а если "включен", то без задержек проходит на следующую итерацию обработки и сбрасывает токен. Обработчик всегда форсированно (без всяких проверок) включает токен и шлет сигнал поставщику. mutexToken - мютекс на доступ к токену.

Добавил в код обработчика проверку на повторение данных (что бы не логи глазами парсить, а программа сама "падала").

Код

#include <iostream>
#include <pthread.h>
#include <stdlib.h>


void * Produce (void * container);
void * Consume (void * container);


pthread_barrier_t * barrierDataProcSync = new pthread_barrier_t;
pthread_mutex_t     mutexData  = PTHREAD_MUTEX_INITIALIZER;
bool                token      = false;
pthread_mutex_t     mutexToken = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t      condToken  = PTHREAD_COND_INITIALIZER;

const unsigned int SET_COUNT = 2;
const unsigned int DATA_SIZE = 42;


int main (int argc, char * argv [])
{
    int ** container  = NULL;
    container = new int * [SET_COUNT + 1];
    container [0] = new int [DATA_SIZE];
    container [1] = new int [DATA_SIZE];
    container [0][0] = 0;
    container [1][0] = 1;

    pthread_barrier_init (barrierDataProcSync, NULL, 2);
    pthread_t producer;
    pthread_t consumer;
    pthread_create (&producer, NULL, Produce, (void *) container);
    pthread_create (&consumer, NULL, Consume, (void *) container);
    pthread_join   (producer,  NULL);
    pthread_join   (consumer,  NULL);

    pthread_barrier_destroy (barrierDataProcSync);
    delete [] container [0];
    delete [] container [1];
    delete [] container;

    return 0;
}


void * Produce (void * container)
{
    int ** data  = static_cast <int **> (container);

    int set = 0;
    data [SET_COUNT] = data [set];

    while (1) {
        data [set][0] += SET_COUNT;

        pthread_mutex_lock   (&mutexData);
        data [SET_COUNT] = data [set];
        std::cout << "Set: " << data [SET_COUNT][0] << " (" << data [SET_COUNT] << ")" << std::endl;
        pthread_mutex_unlock (&mutexData);
        pthread_barrier_wait (barrierDataProcSync);
        pthread_mutex_lock   (&mutexToken);
        if (token == false) {
            pthread_cond_wait (&condToken, &mutexToken);
        }
        token = false;
        pthread_mutex_unlock (&mutexToken);

        ++set %= SET_COUNT;
    }
    
    return NULL;
}


void * Consume (void * container)
{
    int ** data  = static_cast <int **> (container);
    int lastData = 0;

    while (1) {
        pthread_barrier_wait (barrierDataProcSync);
        pthread_mutex_lock   (&mutexData);
        pthread_mutex_lock   (&mutexToken);
        token = true;
        pthread_cond_signal  (&condToken);
        pthread_mutex_unlock (&mutexToken);
        int * i = (static_cast <int **> (data) ) [SET_COUNT];
        if (lastData == i [0]) {
            std::cout << "Error! Get the same data." << std::endl;
            exit (1);
        }
        lastData = i [0];
        std::cout << "Get:\t\t\t" << i [0] << " (" << i << ")" << std::endl;
        pthread_mutex_unlock (&mutexData);
    }

    return NULL;
}


Запускал - работает. Перенес это решение на рабочий проект - тоже работает. Ради интереса "нагрузил" оба потока работой (один читает данные из /dev/urandom, а другой их множит), итеративно пробовал сбалансировать нагрузку на них сделав ее одинаковой и добился загрузки процессора на двухядерном ПК в 187%.

Пытался найти дедлоки аналитически и построил таблицу инвариантов. Вроде бы взаимных блокировок нет. Самая неблагоприятная ситуация - C1P4, т.е. если обработчик "залочен" на барьере, а поставщик "залочен" на условии и ждет сигнала от обработчика, но эта ситуация исключается алгоритмом: if (token == false).
Цитата

        Consume                         C1.                                   C2.                  C3.
                                        barrier_wait (barrierDataProcSync)    lock (&mutexData)    lock (&mutexData)
Produce                                                                                            lock (&mutexToken)

P1. lock (&mutexData)

P2. barrier_wait (barrierDataProcSync)

P3. lock (&mutexToken)

P4. lock (&mutexToken)
    cond_wait (&condToken, &mutexToken)



Теперь касательно вопроса органицазии обработки в целом.
Цитата

Цитата(boostcoder @  19.9.2012,  11:04 Найти цитируемый пост)
а что произойдет если их не забрать?

Они безвозвратно проподут.

Вот в этом месте я был не прав. Драйвер имеет сфой буфер, размер которого удовлетворяет критерию валидности. Таким образом задача моего циклического буфера сводится к временному хранилищу для накопления полной порции данных и нет никакой необходимости разносить в два потока задачу наполнения циклического буфера и чтения из него. "Амортизация" производительности уже и так выполняется в буфере драйвера. Т.о. чтение из псевдоустройства, накопление данных и их валидация - в одном потоке, а обработка - в другом, которые синхронизируются по описанной выше схеме.

Цитата


                *** * * **    ***][******]      [******]

/dev/device1  -------------> CircularBuffer ---------------> Processing ------------>

                                                         |                |
                       therad1                           |     thread2    |
                                                         |                |
 Read data                          Set container        |   Process data |
                                                         |    ---->----   |
-------------------------------------------------------->|--->|       |-->|
                                                         |    ----<----   |
                                                         |                |
                                                         |                |


boostcoderxvr огромное спасибо за помощь!
boostcoder, отдельное спасибо за терпение!

На мой взгляд вопрос решен.


--------------------
Слава Україні!
PM MAIL   Вверх
xvr
Дата 23.9.2012, 09:46 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
****


Профиль
Группа: Комодератор
Сообщений: 7046
Регистрация: 28.8.2007
Где: Дублин, Ирландия

Репутация: 20
Всего: 223



У вас получилась классическая multi-thread очередь на 1 элемент.  smile 
Кстати, мне приходилось решать подобную задачу - обработка потока пакетов с данными от PCIe устройства. Я сделал кольцевой буфер прямо в драйвере и экспортировал его в User Space. Синхронизацией занимался драйвер, а прикладная программа из этого буфера могла только читать. Правда размер буфера был довольно скромным - до 3М. Кстати, устройство тоже писало напрямую в этот буфер (по DMA)

PM MAIL   Вверх
Страницы: (2) [Все] 1 2 
Ответ в темуСоздание новой темы Создание опроса
Правила форума "С/С++: Программирование под Unix/Linux"
xvr
  • Проставьте несколько ключевых слов темы, чтобы её можно было легче найти.
  • Не забывайте пользоваться кнопкой "Код".
  • Вопросы мобильной разработки тут
  • Телепатов на форуме нет! Задавайте чёткий, конкретный и полный вопрос. Указывайте полностью ошибки компилятора и компоновщика.
  • Новое сообщение должно иметь прямое отношение к разделу форума. Флуд, флейм, оффтопик запрещены.
  • Категорически запрещается обсуждение вареза, "кряков", взлома программ и т.д.

Если Вам понравилась атмосфера форума, заходите к нам чаще! С уважением, xvr.

 
 
0 Пользователей читают эту тему (0 Гостей и 0 Скрытых Пользователей)
0 Пользователей:
« Предыдущая тема | C/C++: Программирование под Unix/Linux | Следующая тема »


 




[ Время генерации скрипта: 0.1241 ]   [ Использовано запросов: 22 ]   [ GZIP включён ]


Реклама на сайте     Информационное спонсорство

 
По вопросам размещения рекламы пишите на vladimir(sobaka)vingrad.ru
Отказ от ответственности     Powered by Invision Power Board(R) 1.3 © 2003  IPS, Inc.