Версия для печати темы
Нажмите сюда для просмотра этой темы в оригинальном формате
Форум программистов > Java: Общие вопросы > ThreadPoolExecutors, не выполняются таски.


Автор: SoulKeeper 30.6.2008, 13:08
Всем привет, собственно игрался я на досуге с построением эффективной модели многопоточности для своего приложения, наткнулся на подводные камни.

Вкратце описание задачи.

Есть NIO сервер, крутится себе в своем потоке, общается с клиентами.
Есть клиенты, которые передают запросы на выполнение. 
Запросы, естественно, не выполняются в потоке Selector"а, а передаются на выполнение ThreadPoolExecutor"ам.

Есть одно ограничение:
Запросы клиента должны выполняться последовательно для клиента, т.е. вот такое является нормальным ходом выполнения.

к = клиент, з = запрос. 

к1з1, к2з1, к3з1, к3з2, к1з2, к1з3, к2з2...

Запросы поступают не сразу. Между ними может пройти какое-то время.
В общем картина такова, что клиентам между собой синхронизация не нужна, а запросы от клиентов должны выполняться последовательно, по мере поступления относительно клиента.

Как вариант - можно все это дело запихнуть в 1 поток, но не нравится оно мне smile

Суть проблемы в том что некоторые передаваемые на выполнение Runnable просто игнорируются, а RejectedExecutionhandler"ы молчат :(

P.S. BlockingThreadFactory - это я смотрел насколько выгодно ограничивать количество параллельных тридов. Как оказалось - не выгодно, общее време выполнения увеличивалось. Во всяком случае в даном примере.

Код

import java.util.HashMap;
import java.util.concurrent.*;

public class ThreadPoolManager {

    private static final int MAX_THREADS = 200;
    private static final BlockingThreadFactory BLOCKING_THREAD_FACTORY = new BlockingThreadFactory(MAX_THREADS);
    private static final HashMap<Object, ExecutorService> contextExecutors = new HashMap<Object, ExecutorService>();
    private static ExecutorService taskDeliverer = Executors.newSingleThreadExecutor();

    static void executeInContextPrivate(ContextTask contextTask) {

        ExecutorService executorService = contextExecutors.get(contextTask.getContext());

        if (executorService == null) {
// Use this
//            executorService = new ThreadPoolExecutor(0, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), BLOCKING_THREAD_FACTORY);
// Or this
            executorService = Executors.newSingleThreadExecutor();
            contextExecutors.put(contextTask.getContext(), executorService);
        }

        executorService.execute(contextTask.getTask());
    }

    public static void executeInContext(Object object, Runnable task) {
        taskDeliverer.execute(new ContextTask(object, task));
    }

    // Test code and some other crap

    public static void main(String[] args) {

        Object[] objects = new Object[PARALEL_CLIENTS];
        for (int i = 0; i < PARALEL_CLIENTS; i++) {
            objects[i] = new Object();
        }

        Runnable r = new Runnable() {
            public void run() {
                done();
            }
        };

        time = System.currentTimeMillis();
        for (int i = 0; i < TASKS_PER_CLIENT; i++) {
            for (int o = 0; o < PARALEL_CLIENTS; o++) {
                executeInContext(objects[o], r);
            }
        }

        new Thread(new Runnable(){
            public void run(){
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(counter);
                System.exit(0);
            }
        }).start();
    }

    private static long time;
    private static int counter;
    private static final int TASKS_PER_CLIENT = 5000;
    private static final int PARALEL_CLIENTS = 100;


    public static void done() {
        counter++;
//        System.out.println(counter);
        if (counter == PARALEL_CLIENTS * TASKS_PER_CLIENT) {
            System.out.println("Done in " + (System.currentTimeMillis() - time) / 1000D + " seconds");
        }
    }
}


Код

public class ContextTask implements Runnable{

    private Object context;

    private Runnable task;

    public ContextTask(Object context, Runnable task) {
        this.context = context;
        this.task = task;
    }

    public Object getContext() {
        return context;
    }

    public void setContext(Object context) {
        this.context = context;
    }

    public Runnable getTask() {
        return task;
    }

    public void setTask(Runnable task) {
        this.task = task;
    }

    public void run() {
        ThreadPoolManager.executeInContextPrivate(this);
    }
}


Код

import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;

public class BlockingThreadFactory implements ThreadFactory {

    private final Semaphore semaphore;

    public BlockingThreadFactory(int maxThreads) {
        this.semaphore = new Semaphore(maxThreads, false);
    }

    public Thread newThread(Runnable r) {

        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new ContextThread(r);
    }

    private class ContextThread extends Thread {

        private Runnable target;

        private ContextThread(Runnable target) {
            this.target = target;
        }

        public void run() {

            if (target != null) {
                try {
                    target.run();
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }

            semaphore.release();
        }
    }
}

Автор: ivg 30.6.2008, 15:15
Цитата(SoulKeeper @  30.6.2008,  16:08 Найти цитируемый пост)
Суть проблемы в том что некоторые передаваемые на выполнение Runnable просто игнорируются

А как вы это выяснили? По значению поля counter выводимого на консоль?  smile
Сделайте метод done() synchronized и тогда всё поймёте.

Автор: SoulKeeper 30.6.2008, 15:29
Thx! smile

Я что-то думал что примитивы не страдают от возможных проблем с синхронизацией... Как оказалось - страдают.

Автор: Дрон 30.6.2008, 15:39
Цитата(SoulKeeper @  30.6.2008,  15:29 Найти цитируемый пост)
Я что-то думал что примитивы не страдают от возможных проблем с синхронизацией... Как оказалось - страдают. 

Примитивы "страдают" от кэширования в рабочей памяти потока. Попробуй объявить счётчик вот так:
Код
private static volatile int counter;


Хотя synchronized -- более общее решение.

Автор: SoulKeeper 30.6.2008, 15:43
Цитата(Дрон @ 30.6.2008,  15:39)
Код
private static volatile int counter;

volatile не помогает

Автор: ivg 30.6.2008, 15:49
synchronized - чтобы понять проблему, а вообще можно например java.util.concurrent.atomic.AtomicInteger

Автор: Дрон 30.6.2008, 15:50
Цитата(SoulKeeper @  30.6.2008,  15:43 Найти цитируемый пост)
volatile не помогает 

Странно, ну видимо есть ещё какие-то тонкости. Я в детали твоего кода не вникал smile

Автор: SoulKeeper 30.6.2008, 16:04
Цитата(ivg @ 30.6.2008,  15:49)
synchronized - чтобы понять проблему, а вообще можно например java.util.concurrent.atomic.AtomicInteger

Еше раз thx smile Знаем такого зверя, будем использовать smile

Powered by Invision Power Board (http://www.invisionboard.com)
© Invision Power Services (http://www.invisionpower.com)