Версия для печати темы
Нажмите сюда для просмотра этой темы в оригинальном формате
Форум программистов > C/C++: Общие вопросы > Проектирование системы


Автор: Бегемот 19.8.2014, 18:12
Добрый день. 
Уважаемые программисты, дайте совет, как лучше спроектировать такую вот систему...

Есть несколько (n) потоков. В каждом потоке происходит одно и то же, а именно... Примерно 1000 раз в секунду приходит сообщение определенного формата. Сообщения складываются в пакеты. Максимальное количество сообщений в пакете - 16. Сообщение содержит информацию о своем номере в пакете.
Есть еще один поток. Задача данного потока находить в каждом пакете всех потоков сообщения с одинаковыми номерами и производить с ними определенные действия. После чего сообщения уничтожаются. В каком-то из потоков (или в нескольких потоках) могут быть пропуски сообщений. Например в потоке k отутствует сообщение с номером 7. В таком случае никакие действия не производятся. Во всех остальных потоках сообщения с номером 7 уничтожаются. Т.е ищутся полные соответствия для всех потоков. Вполне возможно, что обработка будет выполняться дольше, чем прием, поэтому требуется хранить всю цепочку сообщений и периодически при достижении определенного размера цепочки, пропускать часть данных. 
Систему приема данных я решил сделать следующим образом...
Класс message
Его задача разделение сообщения на номер и данные.
Класс packet
Его задача формирование пакета, а именно списка объектов типа message.
Класс block
Его задача формирования блока данных для своего потока, а именно списка объектов типа packet

Как организовать систему обработки данных?
На рисунке я привел простой пример. В отдельном потоке есть объект некоторого класса. Вызвав его метод join, мы соединяем строки для определенного пакета, определенного сообщения каждого потока. Зеленая рамка показывает, что пакет с номером 1 найден и такие данные поступают на обработку. Красная линия показывает, что в потоке n был пропуск пакета 1, и поэтому данные остальных потоков для пакета 1 игнорируются (уничтожаются).

Спасибо

Автор: bsa 28.8.2014, 17:17
По потоками ты понимаешь stream или thread? Просто если thread, то твоя система смысла не имеет.

Правильно ли я понял:
1. собираем n пакетов по не более чем 16 сообщений из n источников
2. обрабатываем сообщения с одинаковыми номерами во всех пакетах, остальные удаляем.
3. повторяем п. 1

Автор: Бегемот 31.8.2014, 09:39
bsa, спасибо за ответ.

Цитата(bsa @  28.8.2014,  20:17 Найти цитируемый пост)
Правильно ли я понял:
1. собираем n пакетов по не более чем 16 сообщений из n источников
2. обрабатываем сообщения с одинаковыми номерами во всех пакетах, остальные удаляем.
3. повторяем п. 1 


Да, все так и есть. Обработка должна вестись параллельно, закончеными пакетами. Т.е пакет заполняется сообщениями. Как только пакет наполнился, он добавляется в блок. Поток обработки следит за появлением в блоках новых пакетов. Если блок не пустой, то производится поиск сообщений с одинаковыми номерами, попутно их обрабатывая и удаляя непарные. В идеале в очереди (block) должно быть не более одного пакета.  

Что касается потоков, то для потока обработки данных (на рис. thread data processing) применяется TThread (C++ Builder), потоки данных - это TidUDPServer с включенным ThreadedEvent=true.
В любом случае объясните, пожалуйста, почему не имеет смысла.

Автор: bsa 2.9.2014, 17:26
Бегемот, ты на бумажке распиши, что от чего зависит. Потоки собирающие данные основное время простаивают. А время переключения контекста потоков на малоядерных системах очень высокое. Поэтому очень много ресурсов процессора будет уходить на бесполезную работу переключения контекстов. В твоем случае лучше сократить число потоков принимающих данные и увеличить число обработчиков совпадающих сообщений.

Автор: Бегемот 3.9.2014, 15:09
OK. 
Количество потоков в примере я ограничик четырьмя. Все описание основывается на рисунке из первого поста 
Глобально переменные...
Код

// Массивы объектов "пакет" и "блок" по количеству потоков
const ThreadCnt=4;
Packet packet[ThreadCnt];
Block block[ThreadCnt];


Далее на форму выложены компоненты TIdUDPServer в количестве равном количеству потоков, т.е в данном случае 4
Все обработчики OnUDPRead повешаны на следующий код...
Код

void __fastcall TMainForm::IdUDPServer1UDPRead(TIdUDPListenerThread *AThread, TBytes AData,
             TIdSocketHandle *ABinding)
{
    // Идентификатор вызывающего объекта
    int idx = IdUDPServer1->Tag;
    // Буффер для данных
    byte buffer[SIZE];
    // Длина полученного пакета
    int len = AData.Length;
    // Преобразование в массив байт
    BytesToRaw(AData, &buffer, len);

    // Передаем массив объекту класса Message.
    // Внутри производится разбиение сообщения на номер и данные.
    // На самом деле внутри могут быть сложные операции, требующие
    // определенного времени, поэтому сказать, что поток простаивает нельза
    Message message;
    message.SetValue(buffer);

    // Получаем номер текущего сообщения
    int NumberMsg = message.GetNumber();

    // Передаем в пакет номер текущего сообщения, тем самым проверяя заполнен пакет или нет.
    // Сообщения идут строго по возрастанию номеров, поэтому если номер последнего
    // сообщения в пакете больше чем номер текущего сообщения, то пакет считается
    // заполненым (true), а текущее сообщение принадлежит следующему пакету.
    if(packet[idx].IsFull(NumberMsg))
    {
        // Пакет заполнен. Записываем пакет в блок.
        block[idx].AddPacket(packet[idx]);
        // Очищаем содержимое пакета
        packet[idx].Clear();
    }
    // Записываем сообщение в пакет
    packet[idx].AddMessage(message);
}


Таким образом производится получение и упорядочивание данных для обработки.
Обработка должна производиться параллельно в отдельном потоке.

Вот в том и вопрос как лучше организовать обрабатывающую структуру. Во первых, раз объекты глобальные (понимаю, что плохо, но не понимаю, как в моем случае сделать по другому). Соответственно требуется организовать совместный доступ из разных потоков. Предположим можно будет обойтись критическими секциями. При условии, что в блоке все пакеты также нумерованы, можно конечно вытаскивать из каждого блока по пакету. Затем в этих пакетах (из четырех блоков) искать сообщения с одинаковыми номерами. Но как-то через то место все получается.
В целом я описал структуру довольно подробно. Как бы вы спроектировали архитектуру такой программы. Смотрел в сторону паттернов проектирования. Ничего подходящего не нашел.

Автор: Бегемот 3.9.2014, 15:46
А может быть организавать что-то типа таблицы (см. рис). Каждый поток будет иметь ней свой столбец. Номер сообщения - это номер строки. Таким образом приходит сообщение, например, с номером четыре из первого потока, в таблицу в соответствующую ячейку записывается указатель на данные, инкрементируется счетчик заполненых ячеек для строки 4 и вызывается метод проверки, заполнена ли строка полностью. Если заполнена, то имея указатели на все сообщения производим обработку. Удаляем из таблицы строку. В таблице будут копиться записи, где в каком-то (каких-то) из столбцов отсутствуют ссылки сообщения. Эти записи так же периодически чистятся. Плюс избавляемся от дополнительных прослоек "пакет" и "блок". Так же сам механизм обработки будет запускаться не в результате поиска, а в результате добавления в строку последнего значения. Как думаете?

Автор: bsa 4.9.2014, 11:38
Бегемот, этот вариант выглядит значительно лучше.

Автор: Бегемот 4.9.2014, 12:12
Мне в этом варианте не нравится вот что...
У меня будет один объект класса Table и к нему примерно 1000 раз в секунду каждый из потоков будет обращать. Как вы полагаете это будет работать?
Опять же объект глобальный. 
Как-то вот так...

Код

const ThreadCnt=4;
// Создаем таблицу с ThreadCnt столбцами
Table table(ThreadCnt);


Код

void __fastcall TMainForm::IdUDPServer1UDPRead(TIdUDPListenerThread *AThread, TBytes AData,
                 TIdSocketHandle *ABinding)
{
     // Идентификатор вызывающего объекта
     int idx = IdUDPServer1->Tag;
     // Буффер для данных
     byte buffer[SIZE];
     // Длина полученного пакета
     int len = AData.Length;
     // Преобразование в массив байт
     BytesToRaw(AData, &buffer, len);
     // Передаем массив объекту класса Message.
     // Внутри производится разбиение сообщения на номер и данные.
     // На самом деле внутри могут быть сложные операции, требующие
     // определенного времени, поэтому сказать, что поток простаивает нельза
     Message *message = new Message;
     message->SetValue(buffer);
     // Получаем номер текущего сообщения
     int NumberMsg = message.GetNumber();

     // Что-то типа вот этого...
     // table.SetValue(int Column, int Row, Message* data_ptr)
     table.SetValue(idx, NumberMsg, message);
}

Автор: bsa 4.9.2014, 16:58
Бегемот, не обязательно делать глобальную таблицу. можно сделать только столбцы. В итоге обращений к каждому будет меньше, чем к таблице.
Как вариант, можно сделать доступ к ячейкам через свой мьютекс, тогда обращение к одной ячейке таблицы не будет мешать обращениям к другим.

Powered by Invision Power Board (http://www.invisionboard.com)
© Invision Power Services (http://www.invisionpower.com)