Модераторы: LSD, AntonSaburov
  

Поиск:

Ответ в темуСоздание новой темы Создание опроса
> ThreadPoolExecutors, не выполняются таски. 
V
    Опции темы
SoulKeeper
Дата 30.6.2008, 13:08 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 375
Регистрация: 14.1.2007
Где: Ukraine, Lviv.

Репутация: 11
Всего: 15



Всем привет, собственно игрался я на досуге с построением эффективной модели многопоточности для своего приложения, наткнулся на подводные камни.

Вкратце описание задачи.

Есть NIO сервер, крутится себе в своем потоке, общается с клиентами.
Есть клиенты, которые передают запросы на выполнение. 
Запросы, естественно, не выполняются в потоке Selector"а, а передаются на выполнение ThreadPoolExecutor"ам.

Есть одно ограничение:
Запросы клиента должны выполняться последовательно для клиента, т.е. вот такое является нормальным ходом выполнения.

к = клиент, з = запрос. 

к1з1, к2з1, к3з1, к3з2, к1з2, к1з3, к2з2...

Запросы поступают не сразу. Между ними может пройти какое-то время.
В общем картина такова, что клиентам между собой синхронизация не нужна, а запросы от клиентов должны выполняться последовательно, по мере поступления относительно клиента.

Как вариант - можно все это дело запихнуть в 1 поток, но не нравится оно мне smile

Суть проблемы в том что некоторые передаваемые на выполнение Runnable просто игнорируются, а RejectedExecutionhandler"ы молчат :(

P.S. BlockingThreadFactory - это я смотрел насколько выгодно ограничивать количество параллельных тридов. Как оказалось - не выгодно, общее време выполнения увеличивалось. Во всяком случае в даном примере.

Код

import java.util.HashMap;
import java.util.concurrent.*;

public class ThreadPoolManager {

    private static final int MAX_THREADS = 200;
    private static final BlockingThreadFactory BLOCKING_THREAD_FACTORY = new BlockingThreadFactory(MAX_THREADS);
    private static final HashMap<Object, ExecutorService> contextExecutors = new HashMap<Object, ExecutorService>();
    private static ExecutorService taskDeliverer = Executors.newSingleThreadExecutor();

    static void executeInContextPrivate(ContextTask contextTask) {

        ExecutorService executorService = contextExecutors.get(contextTask.getContext());

        if (executorService == null) {
// Use this
//            executorService = new ThreadPoolExecutor(0, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), BLOCKING_THREAD_FACTORY);
// Or this
            executorService = Executors.newSingleThreadExecutor();
            contextExecutors.put(contextTask.getContext(), executorService);
        }

        executorService.execute(contextTask.getTask());
    }

    public static void executeInContext(Object object, Runnable task) {
        taskDeliverer.execute(new ContextTask(object, task));
    }

    // Test code and some other crap

    public static void main(String[] args) {

        Object[] objects = new Object[PARALEL_CLIENTS];
        for (int i = 0; i < PARALEL_CLIENTS; i++) {
            objects[i] = new Object();
        }

        Runnable r = new Runnable() {
            public void run() {
                done();
            }
        };

        time = System.currentTimeMillis();
        for (int i = 0; i < TASKS_PER_CLIENT; i++) {
            for (int o = 0; o < PARALEL_CLIENTS; o++) {
                executeInContext(objects[o], r);
            }
        }

        new Thread(new Runnable(){
            public void run(){
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(counter);
                System.exit(0);
            }
        }).start();
    }

    private static long time;
    private static int counter;
    private static final int TASKS_PER_CLIENT = 5000;
    private static final int PARALEL_CLIENTS = 100;


    public static void done() {
        counter++;
//        System.out.println(counter);
        if (counter == PARALEL_CLIENTS * TASKS_PER_CLIENT) {
            System.out.println("Done in " + (System.currentTimeMillis() - time) / 1000D + " seconds");
        }
    }
}


Код

public class ContextTask implements Runnable{

    private Object context;

    private Runnable task;

    public ContextTask(Object context, Runnable task) {
        this.context = context;
        this.task = task;
    }

    public Object getContext() {
        return context;
    }

    public void setContext(Object context) {
        this.context = context;
    }

    public Runnable getTask() {
        return task;
    }

    public void setTask(Runnable task) {
        this.task = task;
    }

    public void run() {
        ThreadPoolManager.executeInContextPrivate(this);
    }
}


Код

import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;

public class BlockingThreadFactory implements ThreadFactory {

    private final Semaphore semaphore;

    public BlockingThreadFactory(int maxThreads) {
        this.semaphore = new Semaphore(maxThreads, false);
    }

    public Thread newThread(Runnable r) {

        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new ContextThread(r);
    }

    private class ContextThread extends Thread {

        private Runnable target;

        private ContextThread(Runnable target) {
            this.target = target;
        }

        public void run() {

            if (target != null) {
                try {
                    target.run();
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }

            semaphore.release();
        }
    }
}

PM MAIL   Вверх
ivg
Дата 30.6.2008, 15:15 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Autonomous R&D
**


Профиль
Группа: Участник
Сообщений: 686
Регистрация: 8.2.2006
Где: Екатеринбург

Репутация: 33
Всего: 81



Цитата(SoulKeeper @  30.6.2008,  16:08 Найти цитируемый пост)
Суть проблемы в том что некоторые передаваемые на выполнение Runnable просто игнорируются

А как вы это выяснили? По значению поля counter выводимого на консоль?  smile
Сделайте метод done() synchronized и тогда всё поймёте.
PM MAIL   Вверх
SoulKeeper
Дата 30.6.2008, 15:29 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 375
Регистрация: 14.1.2007
Где: Ukraine, Lviv.

Репутация: 11
Всего: 15



Thx! smile

Я что-то думал что примитивы не страдают от возможных проблем с синхронизацией... Как оказалось - страдают.
PM MAIL   Вверх
Дрон
Дата 30.6.2008, 15:39 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Java-ненавистник :)
****


Профиль
Группа: Участник Клуба
Сообщений: 3179
Регистрация: 29.12.2002
Где: Санкт-Петербург

Репутация: 6
Всего: 92



Цитата(SoulKeeper @  30.6.2008,  15:29 Найти цитируемый пост)
Я что-то думал что примитивы не страдают от возможных проблем с синхронизацией... Как оказалось - страдают. 

Примитивы "страдают" от кэширования в рабочей памяти потока. Попробуй объявить счётчик вот так:
Код
private static volatile int counter;


Хотя synchronized -- более общее решение.

Это сообщение отредактировал(а) Дрон - 30.6.2008, 15:39


--------------------
Да. Именно так.
PM   Вверх
SoulKeeper
Дата 30.6.2008, 15:43 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 375
Регистрация: 14.1.2007
Где: Ukraine, Lviv.

Репутация: 11
Всего: 15



Цитата(Дрон @ 30.6.2008,  15:39)
Код
private static volatile int counter;

volatile не помогает
PM MAIL   Вверх
ivg
Дата 30.6.2008, 15:49 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Autonomous R&D
**


Профиль
Группа: Участник
Сообщений: 686
Регистрация: 8.2.2006
Где: Екатеринбург

Репутация: 33
Всего: 81



synchronized - чтобы понять проблему, а вообще можно например java.util.concurrent.atomic.AtomicInteger
PM MAIL   Вверх
Дрон
Дата 30.6.2008, 15:50 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Java-ненавистник :)
****


Профиль
Группа: Участник Клуба
Сообщений: 3179
Регистрация: 29.12.2002
Где: Санкт-Петербург

Репутация: 6
Всего: 92



Цитата(SoulKeeper @  30.6.2008,  15:43 Найти цитируемый пост)
volatile не помогает 

Странно, ну видимо есть ещё какие-то тонкости. Я в детали твоего кода не вникал smile



--------------------
Да. Именно так.
PM   Вверх
SoulKeeper
Дата 30.6.2008, 16:04 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 375
Регистрация: 14.1.2007
Где: Ukraine, Lviv.

Репутация: 11
Всего: 15



Цитата(ivg @ 30.6.2008,  15:49)
synchronized - чтобы понять проблему, а вообще можно например java.util.concurrent.atomic.AtomicInteger

Еше раз thx smile Знаем такого зверя, будем использовать smile
PM MAIL   Вверх
  
Ответ в темуСоздание новой темы Создание опроса
Правила форума "Java"
LSD   AntonSaburov
powerOn   tux
javastic
  • Прежде, чем задать вопрос, прочтите это!
  • Книги по Java собираются здесь.
  • Документация и ресурсы по Java находятся здесь.
  • Используйте теги [code=java][/code] для подсветки кода. Используйтe чекбокс "транслит", если у Вас нет русских шрифтов.
  • Помечайте свой вопрос как решённый, если на него получен ответ. Ссылка "Пометить как решённый" находится над первым постом.
  • Действия модераторов можно обсудить здесь.
  • FAQ раздела лежит здесь.

Если Вам помогли, и атмосфера форума Вам понравилась, то заходите к нам чаще! С уважением, LSD, AntonSaburov, powerOn, tux, javastic.

 
1 Пользователей читают эту тему (1 Гостей и 0 Скрытых Пользователей)
0 Пользователей:
« Предыдущая тема | Java: Общие вопросы | Следующая тема »


 




[ Время генерации скрипта: 0.0794 ]   [ Использовано запросов: 21 ]   [ GZIP включён ]


Реклама на сайте     Информационное спонсорство

 
По вопросам размещения рекламы пишите на vladimir(sobaka)vingrad.ru
Отказ от ответственности     Powered by Invision Power Board(R) 1.3 © 2003  IPS, Inc.