Модераторы: LSD, AntonSaburov
  

Поиск:

Ответ в темуСоздание новой темы Создание опроса
> Реализация сервера на базе Apache MINA 
:(
    Опции темы
LSD
Дата 18.10.2007, 17:21 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Leprechaun Software Developer
****


Профиль
Группа: Модератор
Сообщений: 15718
Регистрация: 24.3.2004
Где: Dublin

Репутация: 210
Всего: 538



Спасибо Platon за хорошую статью.

Ну, что ж, уважаемые. Сочту за честь поделиться накопленным за 2 дня опытом в сфере Apache MINA Demuxing
Предыстория:
Эпиграф: "Яви мне чудо!"
Началось это давно, когда я решил написать тестовую систему задачек для школьников и студентов. Взялся с ходу накатал сервер, клиент, даже архитектура была приятной, легко расширяемый протокол сервера, но 1 НО, проблема с потоками, я сделал простенькую схему 1 соединение - 1 поток. Нет, не думайте, что я лузер, и моя система полетела, нет она работала нормально, даже 500 клиентов легко держала, но меня угрызали сомнения: "Все равно не по умному сделал, расход ресурсов большой, да и 500-1000 потоков - это какое-то больное приложение". Сел делать опять же свою сетевую архитектуру. Все, теперь были worker'ы в пуле, все путем, все красиво, но так и ни разу не откомпилировав, ни разу не запустив, обратился я к его величеству "Google", с 1 только вопросом: "О, Великий Google ты знаешь всех и вся, скажи маюсь ли я дурью?", Google немного подумал, и выдал мне свой немногословный вердикт: "Маешься", любезно указав на сайт http://mina.apache.org просвещенный от беседы с гуру, я направился в святая святых - сайт проекта MINA, и нашел я там своё умиротворение.

Причины:
В MINA есть много всяких разных протоколов, которые подходят для рядовых задач(типа протокола текстовых строк или серриализации), но я привык экономить на спичках, поэтому решил выбрать самый экономный(на траффике) и гибкий протокол а именно некий Demux

Т.к. на сайте еще не было ни одной статьи по MINA (чему я был очень удивлен), я взял смелось возложить на себя такую ответственность и размочить нулевой счет.

Действие:
Я не собираюсь давать основы MINA, они довольно толково описаны на сайте http://mina.apache.org/documentation.html, повторяюсь, я лишь делюсь полученным за 2 дня опытом.

Итак, Demux - это гибкий и экономящий траффик механизм, сразу скажу, что далеко не самый простой, если не сказать самый сложный, но как говорится - "Скупой платит дважды".

Введем такие классы как
Код

package platon.mina.examples.common;

public class CountMessage {
    public int a;
    public int b;
    public static final int TYPE = 1;
}

и 
Код

package platon.mina.examples.common;

public class HelloMessage {
    public String hello = "Hello";
    public static final int TYPE = 2;
}

и
Код

package platon.mina.examples.common;

public class Options {
    public static final int PORT = 4321;
}

Начнем с сервера:

Код

package platon.mina.examples.server;

import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.handler.demux.DemuxingIoHandler;

import java.net.InetSocketAddress;
import java.io.IOException;

import platon.mina.examples.common.Options;
import platon.mina.examples.common.CountMessage;
import platon.mina.examples.common.HelloMessage;
import platon.mina.examples.server.handlers.HelloHandler;
import platon.mina.examples.server.handlers.CountHandler;

public class Server {
    public static void main(String[] args) {
        ByteBuffer.setUseDirectBuffers(false);
        ByteBuffer.setAllocator(new SimpleByteBufferAllocator());

        IoAcceptor acceptor = new SocketAcceptor();
        
        DemuxingProtocolCodecFactory demuxFactory = new DemuxingProtocolCodecFactory();
        demuxFactory.register(new MyDecoder());

        SocketAcceptorConfig cfg = new SocketAcceptorConfig();
        cfg.getSessionConfig().setReuseAddress( true );
        cfg.getFilterChain().addLast( "codec", new ProtocolCodecFilter(demuxFactory));

        DemuxingIoHandler d = new DemuxingIoHandler();
        d.addMessageHandler(CountMessage.class, new CountHandler());
        d.addMessageHandler(HelloMessage.class, new HelloHandler());

        try {
            acceptor.bind( new InetSocketAddress(Options.PORT), d, cfg);
            System.out.println("Initialized");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Стандартная структура строительства сервера.

Код

DemuxingProtocolCodecFactory demuxFactory = new DemuxingProtocolCodecFactory();
demuxFactory.register(new MyDecoder());

Здесь мы инициализируем demux фабрику и регистрируем всех обработчиков входных данных
В моем случае сделан просто 1, но можно сделать и 2 (CounerDecoder и HelloDecoder) и зарегистрировать их по очереди
Код

demuxFactory.register(new (CounerDecoder());
demuxFactory.register(new HelloDecoder());

Я сделал 1 обработчик для простоты.

Еще 1 отличающий момент - это обработчик сообщений:
Код

DemuxingIoHandler d = new DemuxingIoHandler();
d.addMessageHandler(CountMessage.class, new CountHandler());
d.addMessageHandler(HelloMessage.class, new HelloHandler());

Как мы видим здесь к каждому сообщению (причем оно уже высокоуровневое, это очень приятно) мы приписываем свой обработчик, т.е. CountHandler обрабатывает все сообщения типа CountMessage. а HelloHandler обрабатывает все сообщения типа HelloMessage.

Подведем итоги:
В экземпляр класса DemuxingProtocolCodecFactory мы регистрируем обработчика низкоуровневых сообщений (буквально поток байтов), который ддолжен выдавать нам высокоуровневое сообщение в виде POJO
В экземпляре DemuxingIoHandler мы связываем тип сообщения с его обработчиком. (Не попробовал случай когда 1 сообщению привязаны 2 обработчика или наоборот, и думаю не стоит)


Слудующий обработчик низкоуровневых сообщений
Код

package platon.mina.examples.server;

import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageDecoderResult;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.ByteBuffer;
import platon.mina.examples.common.CountMessage;
import platon.mina.examples.common.HelloMessage;

import java.nio.charset.Charset;

public class MyDecoder implements MessageDecoder {


    public MessageDecoderResult decodable(IoSession session, ByteBuffer in) {
        if (in.remaining() < 4)
            return MessageDecoderResult.NEED_DATA;
        int type = in.getInt();
        if (type != CountMessage.TYPE && type != HelloMessage.TYPE)
            return MessageDecoderResult.NOT_OK;
        return MessageDecoderResult.OK;
    }

    public MessageDecoderResult decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
        if (in.remaining() < 4)
            return MessageDecoderResult.NEED_DATA;
        int type = in.getInt();
        switch(type) {
            case CountMessage.TYPE : {
                if (in.remaining() < 8)
                    return MessageDecoderResult.NEED_DATA;
                CountMessage message = new CountMessage();
                message.a = in.getInt();
                message.b = in.getInt();
                out.write(message);
                break;
            }
            case HelloMessage.TYPE : {
                // Длина строки "hello" в формате UTF-8
                if (in.remaining() < 5)
                    return MessageDecoderResult.NEED_DATA;
                HelloMessage message = new HelloMessage();
                message.hello = in.getString(Charset.forName("UTF-8").newDecoder());
                out.write(message);
                break;
            }
            default: return MessageDecoderResult.NOT_OK;
        }
        return MessageDecoderResult.OK;
    }

    public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
        // пока не знаю зачем эта вещь
    }
}


Метод decodable сообщает нам есть ли возможность прочитать данным обработчиком текущий поток данных. Во всех чтения из буфера необходимо убедиться, что данные в нем есть и их хватит на проведение операции. Если у нас не хватает данных, мы сообщаем MessageDecoderResult.NEED_DATA.
В нашем случае мы допускаем сообщения с типом счетного сообщения или сообщения приветствия. Где и как мы генерируем тип будет видно дальше по тексту в классе MyEncoder

в методе decode мы мы непосредственно декодируем полученные в наше управление данные.
извлекаем тип и по типу восстанавливаем POJO объекты, причем мы видим, что если сначала мы смотрели как бы нам хватило 4 байт чтобы узнать тип, то тепеь в каждой ветке условия мы проверяем свой индивидуальный размер, для CountMessage это 8 ибо 2 int столько весят, с сообщением сложнее, я пока не знаю как декодировать строку, поэтому сделал ее фиксированной длинны, 5 потому что "hello" в UTF-8 весит столько, но "Привет" весит 12.

С декодированием (самой сложной частью) разобрались, теперь беремся за клиента
Код

package platon.mina.examples.client;

import org.apache.mina.common.IoSession;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.handler.demux.DemuxingIoHandler;

import java.net.SocketAddress;
import java.net.InetSocketAddress;

import platon.mina.examples.common.CountMessage;
import platon.mina.examples.common.HelloMessage;
import platon.mina.examples.common.Options;

public class Client {

    public void connect(SocketConnector connector, SocketAddress address) {
        try {
            SocketConnectorConfig config = new SocketConnectorConfig();
            DemuxingProtocolCodecFactory factory = new DemuxingProtocolCodecFactory();
            factory.register(new MyEncoder());
            config.getFilterChain().addLast( "codec", new ProtocolCodecFilter(factory));
            ConnectFuture future1 = connector.connect(address, new DemuxingIoHandler(), config);
            future1.join();
            if (!future1.isConnected()) {
                return;
            }

            CountMessage message = new CountMessage();
            message.a = 4;
            message.b = 6;
            IoSession session = future1.getSession();
            System.out.println("send count");
            session.write(message);
            System.out.println("send hello");
            session.write(new HelloMessage());


        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new Client().connect(new SocketConnector(), new InetSocketAddress("127.0.0.1", Options.PORT));
    }
}

Тоже стандартная конструкция, которую я выдрал из мана.
Здесь опять пишем 
Код

DemuxingProtocolCodecFactory factory = new DemuxingProtocolCodecFactory();
factory.register(new MyEncoder());

Абсолютно ничего не поменялось по сравнению с сервером, разве что только мы создаем кодировщик из высокоуровневых сообщений в низкоуровневые.

Собственно и всё, здесь не предусмотренно никаких входных данных, поэтому мы с чистой совестью можем создать просто demux обработчик, не связывая в нем никаких обработчиков
Код

ConnectFuture future1 = connector.connect(address, new DemuxingIoHandler(), config);


Теперь MyEncoder
Код

package platon.mina.examples.client;

import org.apache.mina.filter.codec.demux.MessageEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.ByteBuffer;

import java.util.Set;
import java.util.HashSet;
import java.nio.charset.Charset;

import platon.mina.examples.common.CountMessage;
import platon.mina.examples.common.HelloMessage;

public class MyEncoder implements MessageEncoder {

    private static final Set<Class<?>> TYPES = new HashSet<Class<?>>();

    static {
        TYPES.add(CountMessage.class);
        TYPES.add(HelloMessage.class);
    }

    public Set<Class<?>> getMessageTypes() {
        return TYPES;
    }

    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        // обратите внимание : это не java.nio.ByteBuffer, это org.apache.mina.common.ByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(16);
        if (message instanceof CountMessage) {
            CountMessage countMessage = (CountMessage)message;
            // Зададим тип 1 счетному сообщению
            buffer.putInt(CountMessage.TYPE);
            buffer.putInt(countMessage.a);
            buffer.putInt(countMessage.b);
        } else if (message instanceof HelloMessage) {
            // Зададим тип 2 сообщению приветствия
            buffer.putInt(HelloMessage.TYPE);
            buffer.putString(((HelloMessage)message).hello, Charset.forName("UTF-8").newEncoder());
        }
        buffer.flip();
        out.write(buffer);
    }
}

Тут интересность состоит в private static final Set<Class<?>> TYPES. В этой коллекции содержатся все типы классов (не знаю как это правильно сказать), которые подлежат обработке именно этим обработчиком.

encode просто делает свое дело, нам надо просто диффиринцировать мух от котлет, чем мы и занимаемся, проверяя тип объекта. Далее мы видим, то что упоминалось ранее: тип мы записываем 1-м в сообщении, затем все данные по очереди, при чем ОБЯЗАТЕЛЬНО в том порядке в котором они будут считываться, или если по-русски, то наоборот.

Осталась самая простая мелочь, которую мы не рассмотрели на сервере - это обработчики высокоуровневых сообщений.
Код

package platon.mina.examples.server.handlers;

import org.apache.mina.handler.demux.MessageHandler;
import org.apache.mina.common.IoSession;
import platon.mina.examples.common.CountMessage;

public class CountHandler implements MessageHandler<CountMessage> {

    public void messageReceived(IoSession session, CountMessage message) throws Exception {
        System.out.println("message received from " + session + ". Counting " + message.a + "+" + message.b + "=" + (message.a + message.b));
    }
}


и 

Код

package platon.mina.examples.server.handlers;

import org.apache.mina.handler.demux.MessageHandler;
import org.apache.mina.common.IoSession;
import platon.mina.examples.common.HelloMessage;

public class HelloHandler implements MessageHandler<HelloMessage> {

    public void messageReceived(IoSession session, HelloMessage message) throws Exception {
        System.out.println("message received from " + session + "\"" + message.hello + "\"");
    }
}


Как мы видим, тут все просто. Мы принимаем высокоуровневое сообщение и делаем с ним что захотим с приятной возможностью отписывать в session прямо тут же.
Автор: Platon Источник: Vingrad


--------------------
Disclaimer: this post contains explicit depictions of personal opinion. So, if it sounds sarcastic, don't take it seriously. If it sounds dangerous, do not try this at home or at all. And if it offends you, just don't read it.
PM MAIL WWW   Вверх
Maksym
Дата 18.10.2007, 17:33 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


.
***


Профиль
Группа: Участник Клуба
Сообщений: 1456
Регистрация: 19.8.2005
Где: Odessa, Black Sea

Репутация: 14
Всего: 62



Platon браво! +1
PM MAIL   Вверх
Samotnik
Дата 18.10.2007, 18:11 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Super star !
****


Профиль
Группа: Awaiting Authorisation
Сообщений: 7192
Регистрация: 4.11.2006
Где: Минск City

Репутация: 8
Всего: 191



 smile 
Круто, правда мне до этого еще далеко,  как минимум полгода ))))))
PM MAIL   Вверх
Platon
Дата 20.10.2007, 21:45 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
***


Профиль
Группа: Завсегдатай
Сообщений: 1801
Регистрация: 25.4.2006

Репутация: 16
Всего: 40



Samotnik, я бы может этим вопросом и не занялся, если бы не моя задачка, на свою голову предложил преподавателю в качестве курсовой такую шляпу, до сих пор развязаться не могу.
PM MAIL ICQ   Вверх
Samotnik
Дата 21.10.2007, 13:11 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Super star !
****


Профиль
Группа: Awaiting Authorisation
Сообщений: 7192
Регистрация: 4.11.2006
Где: Минск City

Репутация: 8
Всего: 191



Platon,   smile    Прикольнинько,  сам добровольно!?!??!?!? Молодец! Я тут тоже со своим курсачом сижу. Правда, он по-проще твоего,  всего лишь "Клиент-сервер" + БД (SQL) + GUI интерфейс.
PM MAIL   Вверх
Platon
Дата 25.1.2008, 15:40 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
***


Профиль
Группа: Завсегдатай
Сообщений: 1801
Регистрация: 25.4.2006

Репутация: 16
Всего: 40



Кстати, я сделал 2 класса-билдера для более удобной работы с библиотекой

Код

package platon.network.mina;

import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageEncoder;
import org.apache.mina.handler.demux.DemuxingIoHandler;
import org.apache.mina.handler.demux.MessageHandler;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;

/**
 * @version 1.0
 *          Дата: 16.12.2007
 *          Время: 14:12:36
 *
 */
public class ClientUnit {
    private DemuxingProtocolCodecFactory factory;
    private DemuxingIoHandler handler;

    public ClientUnit() {
        factory = new DemuxingProtocolCodecFactory();
        handler = new DemuxingIoHandler();
    }

    public IoSession establishConnection(String host, int port) {
        try {
            return establishConnection(InetAddress.getByName(host), port);
        } catch (UnknownHostException e) {
            e.printStackTrace();
            return null;
        }
    }

    public IoSession establishConnection(InetAddress addr, int port) {
        return establishConnection(new InetSocketAddress(addr, port));
    }

    public IoSession establishConnection(InetSocketAddress address) {
        ByteBuffer.setUseDirectBuffers(false);
        ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
        SocketConnectorConfig config = new SocketConnectorConfig();
        config.getFilterChain().addLast( "codec", new ProtocolCodecFilter(factory));
        SocketConnector connector = new SocketConnector();
        ConnectFuture future1 = connector.connect(address, new DemuxingIoHandler(), config);
        future1.join();
        if (!future1.isConnected()) {
            return null;
        }
        return future1.getSession();
    }

    public void register(MessageEncoder encoder) {
        factory.register(encoder);
    }

    public void register(MessageDecoder decoder) {
        factory.register(decoder);
    }

    public <E> void addMessageHandler(Class<E> messageClass, MessageHandler<? super E> messageHandler) {
        handler.addMessageHandler(messageClass, messageHandler);
    }
}



Код

package platon.network.mina;

import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageEncoder;
import org.apache.mina.handler.demux.DemuxingIoHandler;
import org.apache.mina.handler.demux.MessageHandler;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;

/**
 * @version 1.0
 *          Дата: 16.12.2007
 *          Время: 14:12:36
 *
 */
public class ServerUnit {
    private DemuxingProtocolCodecFactory factory;
    private DemuxingIoHandler handler;
    private IoAcceptor acceptor;

    public ServerUnit() {
        factory = new DemuxingProtocolCodecFactory();
        handler = new DemuxingIoHandler();
    }

    public void establishServer(String host, int port) throws IOException {
        establishServer(new InetSocketAddress(host, port));
    }

    public void establishServer(InetAddress addr, int port) throws IOException {
        establishServer(new InetSocketAddress(addr, port));
    }

    public void establishServer(int port) throws IOException {
        establishServer(new InetSocketAddress(port));
    }

    public void establishServer(InetSocketAddress address) throws IOException {
        ByteBuffer.setUseDirectBuffers(false);
        ByteBuffer.setAllocator(new SimpleByteBufferAllocator());

        acceptor = new SocketAcceptor();

        SocketAcceptorConfig cfg = new SocketAcceptorConfig();
        cfg.getSessionConfig().setReuseAddress( true );
        cfg.getFilterChain().addLast( "codec", new ProtocolCodecFilter(factory));

        acceptor.bind(address, handler, cfg);
    }

    public void stopServer() {
        acceptor.unbindAll();
    }

    public void register(MessageEncoder encoder) {
        factory.register(encoder);
    }

    public void register(MessageDecoder decoder) {
        factory.register(decoder);
    }

    public <E> void addMessageHandler(Class<E> messageClass, MessageHandler<? super E> messageHandler) {
        handler.addMessageHandler(messageClass, messageHandler);
    }


    public DemuxingIoHandler getHandler() {
        return handler;
    }

    public void setHandler(DemuxingIoHandler handler) {
        this.handler = handler;
    }
}

PM MAIL ICQ   Вверх
  
Ответ в темуСоздание новой темы Создание опроса
Правила форума "Java"
LSD   AntonSaburov
powerOn   tux
javastic
  • Прежде, чем задать вопрос, прочтите это!
  • Книги по Java собираются здесь.
  • Документация и ресурсы по Java находятся здесь.
  • Используйте теги [code=java][/code] для подсветки кода. Используйтe чекбокс "транслит", если у Вас нет русских шрифтов.
  • Помечайте свой вопрос как решённый, если на него получен ответ. Ссылка "Пометить как решённый" находится над первым постом.
  • Действия модераторов можно обсудить здесь.
  • FAQ раздела лежит здесь.

Если Вам помогли, и атмосфера форума Вам понравилась, то заходите к нам чаще! С уважением, LSD, AntonSaburov, powerOn, tux, javastic.

 
0 Пользователей читают эту тему (0 Гостей и 0 Скрытых Пользователей)
0 Пользователей:
« Предыдущая тема | Java: Общие вопросы | Следующая тема »


 




[ Время генерации скрипта: 0.0792 ]   [ Использовано запросов: 23 ]   [ GZIP включён ]


Реклама на сайте     Информационное спонсорство

 
По вопросам размещения рекламы пишите на vladimir(sobaka)vingrad.ru
Отказ от ответственности     Powered by Invision Power Board(R) 1.3 © 2003  IPS, Inc.