Версия для печати темы
Нажмите сюда для просмотра этой темы в оригинальном формате
Форум программистов > C/C++: Программирование под Unix/Linux > Потоковый сервер на С++


Автор: kiruhin 1.4.2013, 21:48
Добрый день.
Стала задача написать сервер на плюсах (под линукс). Признаться, данный язык только начал осваивать, поэтому могу показаться совсем уж нубом (за что прошу больно не пинать smile ). Итак, задача:
1. Реализация потокового (pre-thread) сервера на С++ под Linux (GNU)
2. В данном случае не оговариваются действия сервера (его логика - это сам осилю), а только грамотное построение модели (принял - передал)
3. Подчеркиваю - код для Linux
Начал с того, что гуглил много. В итоге нашел некий "пример" (код ниже), который взял за основу, которую в последствии буду "допиливать". Итак, код был взят отсюда:
http://www.unix.com/programming/17611-multi-threaded-server-pthreads-sleep.html
Дублирую код в топик:
Код

#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT_PER_THREAD 300
#define MAX_THREAD 200
#define PORT 3355
#define MAX_CLIENT_BUFFER 256
/*#define DEBUG*/

int listenfd;

typedef struct {
    pthread_t tid;
    int client_count;
    int clients[MAX_CLIENT_PER_THREAD];
} Thread;

pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;


Thread threads[MAX_THREAD];

void nonblock(int sockfd)
{
    int opts;
    opts = fcntl(sockfd, F_GETFL);
    if(opts < 0)
    {
        perror("fcntl(F_GETFL)\n");
        exit(1);
    }
    opts = (opts | O_NONBLOCK);
    if(fcntl(sockfd, F_SETFL, opts) < 0) 
    {
        perror("fcntl(F_SETFL)\n");
        exit(1);
    }
}

void *thread_init_func(void *arg)
{
    int tid = (int) arg;
    
    int readsocks;
    int i;
    char buffer[MAX_CLIENT_BUFFER];
    char c;
    int n;
#ifdef DEBUG
    printf("thread %d created\n", tid);
    printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
    memset((int *) &threads[tid].clients, 0, sizeof(threads[tid].clients));
    memset((char *) &buffer, 0, sizeof(buffer));    
    while(1)
    {
#ifdef DEBUG
        printf("thread %d running, client count: %d\n", tid, threads[tid].client_count);
        sleep(3);
#endif
        sleep(1); /* <-- it works ??? :-| */

        for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
        {
            if(threads[tid].clients[i] != 0)
            {
                n = recv(threads[tid].clients[i], buffer, MAX_CLIENT_BUFFER, 0);
                if(n == 0)
                {
#ifdef DEBUG
                    printf("client %d closed connection 0\n", threads[tid].clients[i]);
#endif
                    threads[tid].clients[i] = 0;
                    threads[tid].client_count--;
                    memset((char *) &buffer, 0, strlen(buffer));
                }
                else if(n < 0)
                {
                    if(errno == EAGAIN)
                    {
#ifdef DEBUG
                        printf("errno: EAGAIN\n");
#endif
                    }
                    else {
#ifdef DEBUG
                        printf("errno: %d\n", errno);
#endif
                        threads[tid].clients[i] = 0;
                        threads[tid].client_count--;
                        memset( (char *) &buffer, 0, strlen(buffer));
#ifdef DEBUG
                        printf("client %d closed connection -1\n", threads[tid].clients[i]);
#endif
                    }
                }
                else {
#ifdef DEBUG
                    printf("%d bytes received from %d - %s\n", n, threads[tid].clients[i], buffer);
#endif
                    
                    send(threads[tid].clients[i], buffer, strlen(buffer), 0);
                    memset((char *) &buffer, 0, strlen(buffer));
                }
            }
        }
    }
}

int choose_thread()
{
    int i=MAX_THREAD-1;
    int min = 0;
    while(i > -1)
    {
        if(threads[i].client_count < threads[i-1].client_count)
        {
            min = i;
            break;
        }
        i--;
    }        
    return min;
}

int main()
{
    char c;
    struct sockaddr_in srv, cli;
    int clifd;
    int tid;
    int i;
    int choosen;

    signal(SIGPIPE, SIG_IGN);
    
    if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
        perror("sockfd\n");
        exit(1);
    }
    bzero(&srv, sizeof(srv));
    srv.sin_family = AF_INET;
    srv.sin_addr.s_addr = INADDR_ANY;
    srv.sin_port = htons(PORT);

    if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
    {
        perror("bind\n");
        exit(1);
    }
    
    listen(listenfd, 1024);

    
    /* create threads  */
    for(i = 0; i < MAX_THREAD; i++)
    {
        pthread_create(&threads[i].tid, NULL, &thread_init_func, (void *) i);
        threads[i].client_count = 0;
    }
    

    for( ; ; )
    {
        clifd = accept(listenfd, NULL, NULL);
        nonblock(clifd);

        pthread_mutex_lock(&new_connection_mutex);
        
        choosen = choose_thread();

        for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
        {
            if(threads[choosen].clients[i] == 0)
            {
#ifdef DEBUG
                printf("before threads clifd\n");
#endif
                threads[choosen].clients[i] = clifd;
#ifdef DEBUG
                printf("after threads clifd\n");
#endif
                threads[choosen].client_count++;
                break;
            }
        }

#ifdef DEBUG
        printf("choosen: %d\n", choosen);

        for(i = 0; i < MAX_THREAD; i++)
        {
            printf("threads[%d].client_count:%d\n", i, threads[i].client_count);
        }
#endif

        pthread_mutex_unlock(&new_connection_mutex);
    }

    if(errno)
    {
        printf("errno: %d", errno);
    }
    
    return 0;
}

Сам автор этого кода, в своем посте пишет:
"I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle 
requests of multiple clients.

But here I have a problem. I find a solution but it is not how 
it must be... i think. When threads working without sleep(1) 
I can't get response from server but when I put sleep(1) in 
thread function as you will see in code, everything works fine 
and server can make echo nearly for 40000 clients between 
1 or 2 seconds. What am I doing wrong here? 
"
И это действительно так. С одной оговоркой, что если заккоментировать sleep(1), то это сервер работает именно так, как нужно, НО вводит систему в ступор (загрузка ЦП 100%, что очевидно, т.к. while(1) ) С другой стороны, из-за этого sleep(1), во-первых, ответ сервера возвращается только через секунду, во-вторых, если передать серверу несколько пакетов (передать быстро), то он "склеит" это пакеты в один, как следствие, пакеты невозможно будет разобрать.
Пожалуйста, подскажите:
1. Если приведенный пример (код, взятый за основу) корректный - как можно избежать sleep (1)?
2. Если код в своей основе неверный (или морально устарел, т.к. пост датируется аж 2005 годом) - помогите собрать этот покотовый сервер (хоть пример реально работающего)
3. Обязательно ли объявлять сокет неблокируемым (как в приведенном примере)?
Спасибо огромное!

Автор: dershokus 2.4.2013, 10:53
Вот тут плохо
Код

...
    while(1)
    {
#ifdef DEBUG
        printf("thread %d running, client count: %d\n", tid, threads[tid].client_count);
        sleep(3);
#endif
        sleep(1); /* <-- it works ??? :-| */
        for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
        {
...
        }
    } // end while(1);


Это всеравно что написать программу с бесконечным циклом.
Код

void main(void) { for(;;); }

Т.е. делать это ничего не будет но грузить ЦП будет на 100%. Если долго не париться, то можно написать так:
Код

...
    // пустой заход был?
    BOOL is_empty=true;
    while(1)
    {
#ifdef DEBUG
        printf("thread %d running, client count: %d\n", tid, threads[tid].client_count);
        sleep(3);
#endif
        if(is_empty) sleep(1);
        is_empty=true;
        for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
        {
            if(threads[tid].clients[i] != 0)
            {
                is_empty=false;  // не пустой
              ...
            }
        }
    } // end while(1);


А вообще все треды должны спать по идее, если пользователя нет и управляющий тред при новом клиенте должен пробуждать тред куда кинул этого клиента. Как-то так.

Автор: fish9370 2.4.2013, 12:23
смысла от неблокируемости в данной реализации нет, неблокируемость нужна, чтобы делать параллельно что-то еще..

ошибка номер один - нет проверки EINTR - чтение нужно повторить
два - кривое обнуление буфера "memset( (char *) &buffer, 0, strlen(buffer))" - собственно зачем оно?
три - задержки сбоку ввиде sleep - кривое решение


собственно, архитектура не верная  - 300 потоков крутятся в холостую -> решается вставкой проверки на присутствие данных (select/poll/epoll)

повторюсь, неблокируемость в этом случае вред, в этом нет никакого смысла

хорошо бы понять, с какими потоками данных будут иметь дело треды,  интенсивность, в какую сторону преобладать - это тоже важно


PS, это не Си++, это чистый Си

Добавлено через 9 минут и 12 секунд
еще бы рекомендовал взглянуть на системные вызовы readv/writev, возможно в данной задаче они будут как никто другой уместны..

Автор: Acer 2.4.2013, 16:33
Цитата(fish9370 @ 2.4.2013,  12:23)
три - задержки сбоку ввиде sleep - кривое решение

А как еще можно реализовать такую систему без sleep или usleep?

Автор: dershokus 2.4.2013, 19:04
Цитата

А как еще можно реализовать такую систему без sleep или usleep?


В смысле именно в такой реализации? Можно семафор поставить и ждать пока его откроют (если я правильно все понял).

Вообще как-то глупо вешать на тред какое-то количество пользователей и в цикле их обходить (вдруг только один пользователь есть?), epoll выглядит здесь уместнее IMHO.

Автор: fish9370 2.4.2013, 19:56
Цитата(Acer @  2.4.2013,  16:33 Найти цитируемый пост)
А как еще можно реализовать такую систему без sleep или usleep? 


Цитата(fish9370 @  2.4.2013,  12:23 Найти цитируемый пост)
решается вставкой проверки на присутствие данных (select/poll/epoll)


заставляет ядро замораживать поток исполнения до возникновения события

Автор: bsa 3.4.2013, 15:40
kiruhin, http://www.boost.org/doc/libs/1_53_0/doc/html/boost_asio.html. Начни с примеров.

Powered by Invision Power Board (http://www.invisionboard.com)
© Invision Power Services (http://www.invisionpower.com)