Модераторы: LSD, AntonSaburov
  

Поиск:

Ответ в темуСоздание новой темы Создание опроса
> ThreadPoolExecutor, Вопрос по beforeExecute и afterExecute 
V
    Опции темы
Anime
Дата 8.10.2014, 21:41 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Awaiting Authorisation
Сообщений: 88
Регистрация: 13.11.2006
Где: Киев

Репутация: нет
Всего: нет



Примерный код
Код

class Executer implements Callable<Object> {
    @Override
    public Executer call() throws Exception {
        Thread.sleep(5000);
        //тут выполняется какое длительное задание
        return null;
    }
}

class SuperPool extends ThreadPoolExecutor {
    @Override
    protected void beforeExecute(Thread thread, Runnable runnable) {
        try {
            Data data = datas.take(); //класс дата является LinkedBlockingDeque
            data.increment();

            ComparableFutureTask task = (ComparableFutureTask) runnable;
            Executer executer = (Executer) task.callable;

            executer.setData(data);
        } catch(InterruptedException e) {
        }
        
        super.beforeExecute(thread, runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        
        ComparableFutureTask future = (ComparableFutureTask) runnable;

        if (throwable == null && runnable instanceof Future<?>) {
            try {
                if(future.isDone()) {
                    Object obj = (Object) future.get();
                    //после завершения можно в этом методе обработать полученные данные
                    parse(obj);
                    
                    //если отработано второстепенное сообщение до добавляем его в начало списка
                    //... предварительные манипуляции по извлечению data...
                    datas.addFirst(data);
                }
            } catch (CancellationException | ExecutionException ce) {
                throwable = ce;
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        
        if(throwable != null) {
            if(throwable instanceof Exception) {
                Executer exec = (Executer) future.callable;
                submit(exec, MAX_PRIORITY);
            }
        }
    }
}


Есть реализованный ThreadPoolExecutor когда произошел submit, метод beforeExecute выдает сообщение Executer`у ( LinkedBlockingDeque = data.take() ) , после выполнения метод afterExecute выдает полученные данные и передает на вторичную обработку, методу parse (т.е. парсить данные)

Так вот вопрос все эти манипуляция надо делать в классе Executer (которые реализованные в методах beforeExecute и afterExecute) ? Или оставить метод beforeExecute как есть, но метод parse перенести в Executer ?

Это сообщение отредактировал(а) Anime - 9.10.2014, 10:10
PM   Вверх
korian
Дата 9.10.2014, 02:31 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 651
Регистрация: 8.3.2008
Где: Украина, Харьков

Репутация: нет
Всего: 17



не очень понятно, что вообще надо.
beforeExecute и afterExecute имеет смысл переопределять в том случае, если это касаеться именно ThreadPoolExecutor. Т.е. если код, который будет в beforeExecute и afterExecute необходим для "правильной" работы самого пула.
а у вас там походу бизнес логика какая-то.
PM   Вверх
Anime
Дата 9.10.2014, 10:05 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Awaiting Authorisation
Сообщений: 88
Регистрация: 13.11.2006
Где: Киев

Репутация: нет
Всего: нет



Идея тут такова.
Есть два массива с сообщениями: первый массив сообщений это PriorityBlockingQueue, второй это LinkedBlockingDeque.
Так вот первый массив должен ждать сообщений от второго, т.е. он не выполниться пока во второй массив не придут сообщения.

И так. Класс SuperPool создан на основе PriorityBlockingQueue и имеет приоритет в выполнении сообщений. При поступлении сообщения он выполняется с нормальным приоритетом и в методе beforeExecute выдается ему второе сообщение и счетчик выданного второго сообщения повышается. В методе afterExecute возвращает результат выполнения и если он не выполнился с ошибкой, сабмитится заново но с высоким приоритетом.

Вот псевдо пример запуска
Код

LinkedBlockingDeque<Data> data = new LinkedBlockingDeque<>();
for(int i = 0; i < 20; i++) {
    data.add(new Data("world" + i));
}

int corePoolSize = 100;
int maximumPoolSize = 100;

SuperPool pool = new SuperPool(corePoolSize, maximumPoolSize, new PriorityBlockingQueue());
pool.setData(data);
for(int i = 0; i < 100; i++) {
    pool.submit(new Executer("hello" + i), NORMAL_PRIOR);
}

в данном примере он запустит 100 потоков с сообщениями hello, но из них отработают тока 20, так как в data тока 20 сообщений типа world, а все остальные 80 будут ждать прихода сообщений в data.
Подправил малость первый пост...

Правильно ли в методе beforeExecute реализовывать выдачу второстепенного сообщения, а в методе afterExecute  после обработки второстепенного сообщения заново его выбрасывать в массив datas? Или по хорошему это надо делать в самом потоке Executer? Или в каких то отдельных потоках реализовывать?... надеюсь объяснил как-то-)


Это сообщение отредактировал(а) Anime - 9.10.2014, 10:12
PM   Вверх
korian
Дата 9.10.2014, 16:33 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 651
Регистрация: 8.3.2008
Где: Украина, Харьков

Репутация: нет
Всего: 17



А можно описать решаемую задачу? %) Т.е. что вообще надо сделать?
Без всяких там "сообщений" и других слов из программисткого жаргона. Чтобы было понятно даже 7-ми летнему ребенку.
Или это просто надуманная задача и не имеет физического описания?

Цитата(Anime @  9.10.2014,  09:05 Найти цитируемый пост)
Есть два массива с сообщениями: первый массив сообщений это PriorityBlockingQueue, второй это LinkedBlockingDeque.
Так вот первый массив должен ждать сообщений от второго, т.е. он не выполниться пока во второй массив не придут сообщения.

почему их вообще два? почему нельзя сделать то, что надо сразу? Т.е. почему нельзя иметь только LinkedBlockingDeque? из которого беруться данные и дальше выполняется все, что необходимо.

Цитата(Anime @  9.10.2014,  09:05 Найти цитируемый пост)
в данном примере он запустит 100 потоков с сообщениями hello, но из них отработают тока 20, так как в data тока 20 сообщений типа world, а все остальные 80 будут ждать прихода сообщений в data.

зачем создавать 100 потоков, если используются из них реально только 20? Это ресурсы, которые таки надо экономить.

Цитата(Anime @  9.10.2014,  09:05 Найти цитируемый пост)
Правильно ли в методе beforeExecute реализовывать выдачу второстепенного сообщения, а в методе afterExecute  после обработки второстепенного сообщения заново его выбрасывать в массив datas? Или по хорошему это надо делать в самом потоке Executer? Или в каких то отдельных потоках реализовывать?... 

Похоже то, что вам надо, должно быть реализовано в самом потоке, но...
С точки зрения философии, так сказать, не правильно в обоих случаях. Задачу реализовывать надо в классах, а не в методах. Каждый класс должен делать одну и только одну работу. И называться соответственно этой работе.
ThreadPoolExecutor - это класс, который занимается тем, что распределяет потоки для предоставленных задач. И это его работа. Другой работы он делать не должен. Наследоваться от этого класса имеет смысл, только если вы хотите делать ту же работу но немного по другому. Ваш код никак не связан с этой работой. Поэтому абсолютно весь код, который у вас написан в методах beforeExecute и afterExecute должен быть где-то в другом месте.


И еще про касты типа:
ComparableFutureTask task = (ComparableFutureTask) runnable;
Если брать java после того, как там появились generics, то можно сказать, что в java кастить нельзя.
А точнее, если вам приходиться кастить, то вы пишите, что-то не так.
Кастить можно только там, где без этого нельзя обойтись. А таких случаев где-то пару штук на милион.
Например, когда приходиться работать через reflection.
Или когда чужой код засавляет это.
Или когда вам прсото лень и вы экономите время.

Это сообщение отредактировал(а) korian - 9.10.2014, 16:36
PM   Вверх
  
Ответ в темуСоздание новой темы Создание опроса
Правила форума "Java"
LSD   AntonSaburov
powerOn   tux
javastic
  • Прежде, чем задать вопрос, прочтите это!
  • Книги по Java собираются здесь.
  • Документация и ресурсы по Java находятся здесь.
  • Используйте теги [code=java][/code] для подсветки кода. Используйтe чекбокс "транслит", если у Вас нет русских шрифтов.
  • Помечайте свой вопрос как решённый, если на него получен ответ. Ссылка "Пометить как решённый" находится над первым постом.
  • Действия модераторов можно обсудить здесь.
  • FAQ раздела лежит здесь.

Если Вам помогли, и атмосфера форума Вам понравилась, то заходите к нам чаще! С уважением, LSD, AntonSaburov, powerOn, tux, javastic.

 
1 Пользователей читают эту тему (1 Гостей и 0 Скрытых Пользователей)
0 Пользователей:
« Предыдущая тема | Java: Общие вопросы | Следующая тема »


 




[ Время генерации скрипта: 0.0694 ]   [ Использовано запросов: 21 ]   [ GZIP включён ]


Реклама на сайте     Информационное спонсорство

 
По вопросам размещения рекламы пишите на vladimir(sobaka)vingrad.ru
Отказ от ответственности     Powered by Invision Power Board(R) 1.3 © 2003  IPS, Inc.