Модераторы: Daevaorn
  

Поиск:

Ответ в темуСоздание новой темы Создание опроса
> Проектирование системы 
:(
    Опции темы
Бегемот
Дата 19.8.2014, 18:12 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Бывалый
*


Профиль
Группа: Участник
Сообщений: 219
Регистрация: 26.1.2005
Где: На границе Европы и Азии

Репутация: нет
Всего: 3



Добрый день. 
Уважаемые программисты, дайте совет, как лучше спроектировать такую вот систему...

Есть несколько (n) потоков. В каждом потоке происходит одно и то же, а именно... Примерно 1000 раз в секунду приходит сообщение определенного формата. Сообщения складываются в пакеты. Максимальное количество сообщений в пакете - 16. Сообщение содержит информацию о своем номере в пакете.
Есть еще один поток. Задача данного потока находить в каждом пакете всех потоков сообщения с одинаковыми номерами и производить с ними определенные действия. После чего сообщения уничтожаются. В каком-то из потоков (или в нескольких потоках) могут быть пропуски сообщений. Например в потоке k отутствует сообщение с номером 7. В таком случае никакие действия не производятся. Во всех остальных потоках сообщения с номером 7 уничтожаются. Т.е ищутся полные соответствия для всех потоков. Вполне возможно, что обработка будет выполняться дольше, чем прием, поэтому требуется хранить всю цепочку сообщений и периодически при достижении определенного размера цепочки, пропускать часть данных. 
Систему приема данных я решил сделать следующим образом...
Класс message
Его задача разделение сообщения на номер и данные.
Класс packet
Его задача формирование пакета, а именно списка объектов типа message.
Класс block
Его задача формирования блока данных для своего потока, а именно списка объектов типа packet

Как организовать систему обработки данных?
На рисунке я привел простой пример. В отдельном потоке есть объект некоторого класса. Вызвав его метод join, мы соединяем строки для определенного пакета, определенного сообщения каждого потока. Зеленая рамка показывает, что пакет с номером 1 найден и такие данные поступают на обработку. Красная линия показывает, что в потоке n был пропуск пакета 1, и поэтому данные остальных потоков для пакета 1 игнорируются (уничтожаются).

Спасибо

Присоединённый файл ( Кол-во скачиваний: 24 )
Присоединённый файл  pic.GIF 11,62 Kb
PM MAIL   Вверх
bsa
Дата 28.8.2014, 17:17 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
****


Профиль
Группа: Модератор
Сообщений: 9185
Регистрация: 6.4.2006
Где: Москва, Россия

Репутация: 63
Всего: 196



По потоками ты понимаешь stream или thread? Просто если thread, то твоя система смысла не имеет.

Правильно ли я понял:
1. собираем n пакетов по не более чем 16 сообщений из n источников
2. обрабатываем сообщения с одинаковыми номерами во всех пакетах, остальные удаляем.
3. повторяем п. 1
PM   Вверх
Бегемот
Дата 31.8.2014, 09:39 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Бывалый
*


Профиль
Группа: Участник
Сообщений: 219
Регистрация: 26.1.2005
Где: На границе Европы и Азии

Репутация: нет
Всего: 3



bsa, спасибо за ответ.

Цитата(bsa @  28.8.2014,  20:17 Найти цитируемый пост)
Правильно ли я понял:
1. собираем n пакетов по не более чем 16 сообщений из n источников
2. обрабатываем сообщения с одинаковыми номерами во всех пакетах, остальные удаляем.
3. повторяем п. 1 


Да, все так и есть. Обработка должна вестись параллельно, закончеными пакетами. Т.е пакет заполняется сообщениями. Как только пакет наполнился, он добавляется в блок. Поток обработки следит за появлением в блоках новых пакетов. Если блок не пустой, то производится поиск сообщений с одинаковыми номерами, попутно их обрабатывая и удаляя непарные. В идеале в очереди (block) должно быть не более одного пакета.  

Что касается потоков, то для потока обработки данных (на рис. thread data processing) применяется TThread (C++ Builder), потоки данных - это TidUDPServer с включенным ThreadedEvent=true.
В любом случае объясните, пожалуйста, почему не имеет смысла.
PM MAIL   Вверх
bsa
Дата 2.9.2014, 17:26 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
****


Профиль
Группа: Модератор
Сообщений: 9185
Регистрация: 6.4.2006
Где: Москва, Россия

Репутация: 63
Всего: 196



Бегемот, ты на бумажке распиши, что от чего зависит. Потоки собирающие данные основное время простаивают. А время переключения контекста потоков на малоядерных системах очень высокое. Поэтому очень много ресурсов процессора будет уходить на бесполезную работу переключения контекстов. В твоем случае лучше сократить число потоков принимающих данные и увеличить число обработчиков совпадающих сообщений.
PM   Вверх
Бегемот
Дата 3.9.2014, 15:09 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Бывалый
*


Профиль
Группа: Участник
Сообщений: 219
Регистрация: 26.1.2005
Где: На границе Европы и Азии

Репутация: нет
Всего: 3



OK. 
Количество потоков в примере я ограничик четырьмя. Все описание основывается на рисунке из первого поста 
Глобально переменные...
Код

// Массивы объектов "пакет" и "блок" по количеству потоков
const ThreadCnt=4;
Packet packet[ThreadCnt];
Block block[ThreadCnt];


Далее на форму выложены компоненты TIdUDPServer в количестве равном количеству потоков, т.е в данном случае 4
Все обработчики OnUDPRead повешаны на следующий код...
Код

void __fastcall TMainForm::IdUDPServer1UDPRead(TIdUDPListenerThread *AThread, TBytes AData,
             TIdSocketHandle *ABinding)
{
    // Идентификатор вызывающего объекта
    int idx = IdUDPServer1->Tag;
    // Буффер для данных
    byte buffer[SIZE];
    // Длина полученного пакета
    int len = AData.Length;
    // Преобразование в массив байт
    BytesToRaw(AData, &buffer, len);

    // Передаем массив объекту класса Message.
    // Внутри производится разбиение сообщения на номер и данные.
    // На самом деле внутри могут быть сложные операции, требующие
    // определенного времени, поэтому сказать, что поток простаивает нельза
    Message message;
    message.SetValue(buffer);

    // Получаем номер текущего сообщения
    int NumberMsg = message.GetNumber();

    // Передаем в пакет номер текущего сообщения, тем самым проверяя заполнен пакет или нет.
    // Сообщения идут строго по возрастанию номеров, поэтому если номер последнего
    // сообщения в пакете больше чем номер текущего сообщения, то пакет считается
    // заполненым (true), а текущее сообщение принадлежит следующему пакету.
    if(packet[idx].IsFull(NumberMsg))
    {
        // Пакет заполнен. Записываем пакет в блок.
        block[idx].AddPacket(packet[idx]);
        // Очищаем содержимое пакета
        packet[idx].Clear();
    }
    // Записываем сообщение в пакет
    packet[idx].AddMessage(message);
}


Таким образом производится получение и упорядочивание данных для обработки.
Обработка должна производиться параллельно в отдельном потоке.

Вот в том и вопрос как лучше организовать обрабатывающую структуру. Во первых, раз объекты глобальные (понимаю, что плохо, но не понимаю, как в моем случае сделать по другому). Соответственно требуется организовать совместный доступ из разных потоков. Предположим можно будет обойтись критическими секциями. При условии, что в блоке все пакеты также нумерованы, можно конечно вытаскивать из каждого блока по пакету. Затем в этих пакетах (из четырех блоков) искать сообщения с одинаковыми номерами. Но как-то через то место все получается.
В целом я описал структуру довольно подробно. Как бы вы спроектировали архитектуру такой программы. Смотрел в сторону паттернов проектирования. Ничего подходящего не нашел.

Это сообщение отредактировал(а) Бегемот - 3.9.2014, 15:18
PM MAIL   Вверх
Бегемот
Дата 3.9.2014, 15:46 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Бывалый
*


Профиль
Группа: Участник
Сообщений: 219
Регистрация: 26.1.2005
Где: На границе Европы и Азии

Репутация: нет
Всего: 3



А может быть организавать что-то типа таблицы (см. рис). Каждый поток будет иметь ней свой столбец. Номер сообщения - это номер строки. Таким образом приходит сообщение, например, с номером четыре из первого потока, в таблицу в соответствующую ячейку записывается указатель на данные, инкрементируется счетчик заполненых ячеек для строки 4 и вызывается метод проверки, заполнена ли строка полностью. Если заполнена, то имея указатели на все сообщения производим обработку. Удаляем из таблицы строку. В таблице будут копиться записи, где в каком-то (каких-то) из столбцов отсутствуют ссылки сообщения. Эти записи так же периодически чистятся. Плюс избавляемся от дополнительных прослоек "пакет" и "блок". Так же сам механизм обработки будет запускаться не в результате поиска, а в результате добавления в строку последнего значения. Как думаете?

Это сообщение отредактировал(а) Бегемот - 3.9.2014, 16:07

Присоединённый файл ( Кол-во скачиваний: 9 )
Присоединённый файл  pic2.GIF 12,01 Kb
PM MAIL   Вверх
bsa
Дата 4.9.2014, 11:38 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
****


Профиль
Группа: Модератор
Сообщений: 9185
Регистрация: 6.4.2006
Где: Москва, Россия

Репутация: 63
Всего: 196



Бегемот, этот вариант выглядит значительно лучше.
PM   Вверх
Бегемот
Дата 4.9.2014, 12:12 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Бывалый
*


Профиль
Группа: Участник
Сообщений: 219
Регистрация: 26.1.2005
Где: На границе Европы и Азии

Репутация: нет
Всего: 3



Мне в этом варианте не нравится вот что...
У меня будет один объект класса Table и к нему примерно 1000 раз в секунду каждый из потоков будет обращать. Как вы полагаете это будет работать?
Опять же объект глобальный. 
Как-то вот так...

Код

const ThreadCnt=4;
// Создаем таблицу с ThreadCnt столбцами
Table table(ThreadCnt);


Код

void __fastcall TMainForm::IdUDPServer1UDPRead(TIdUDPListenerThread *AThread, TBytes AData,
                 TIdSocketHandle *ABinding)
{
     // Идентификатор вызывающего объекта
     int idx = IdUDPServer1->Tag;
     // Буффер для данных
     byte buffer[SIZE];
     // Длина полученного пакета
     int len = AData.Length;
     // Преобразование в массив байт
     BytesToRaw(AData, &buffer, len);
     // Передаем массив объекту класса Message.
     // Внутри производится разбиение сообщения на номер и данные.
     // На самом деле внутри могут быть сложные операции, требующие
     // определенного времени, поэтому сказать, что поток простаивает нельза
     Message *message = new Message;
     message->SetValue(buffer);
     // Получаем номер текущего сообщения
     int NumberMsg = message.GetNumber();

     // Что-то типа вот этого...
     // table.SetValue(int Column, int Row, Message* data_ptr)
     table.SetValue(idx, NumberMsg, message);
}


Это сообщение отредактировал(а) Бегемот - 4.9.2014, 12:24
PM MAIL   Вверх
bsa
Дата 4.9.2014, 16:58 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
****


Профиль
Группа: Модератор
Сообщений: 9185
Регистрация: 6.4.2006
Где: Москва, Россия

Репутация: 63
Всего: 196



Бегемот, не обязательно делать глобальную таблицу. можно сделать только столбцы. В итоге обращений к каждому будет меньше, чем к таблице.
Как вариант, можно сделать доступ к ячейкам через свой мьютекс, тогда обращение к одной ячейке таблицы не будет мешать обращениям к другим.
PM   Вверх
  
Ответ в темуСоздание новой темы Создание опроса
Правила форума "С++:Общие вопросы"
Earnest Daevaorn

Добро пожаловать!

  • Черновик стандарта C++ (за октябрь 2005) можно скачать с этого сайта. Прямая ссылка на файл черновика(4.4мб).
  • Черновик стандарта C (за сентябрь 2005) можно скачать с этого сайта. Прямая ссылка на файл черновика (3.4мб).
  • Прежде чем задать вопрос, прочтите это и/или это!
  • Здесь хранится весь мировой запас ссылок на документы, связанные с C++ :)
  • Не брезгуйте пользоваться тегами [code=cpp][/code].
  • Пожалуйста, не просите написать за вас программы в этом разделе - для этого существует "Центр Помощи".
  • C++ FAQ

Если Вам понравилась атмосфера форума, заходите к нам чаще! С уважением, Earnest Daevaorn

 
1 Пользователей читают эту тему (1 Гостей и 0 Скрытых Пользователей)
0 Пользователей:
« Предыдущая тема | C/C++: Общие вопросы | Следующая тема »


 




[ Время генерации скрипта: 0.0858 ]   [ Использовано запросов: 21 ]   [ GZIP включён ]


Реклама на сайте     Информационное спонсорство

 
По вопросам размещения рекламы пишите на vladimir(sobaka)vingrad.ru
Отказ от ответственности     Powered by Invision Power Board(R) 1.3 © 2003  IPS, Inc.