Всем привет, собственно игрался я на досуге с построением эффективной модели многопоточности для своего приложения, наткнулся на подводные камни.
Вкратце описание задачи.
Есть NIO сервер, крутится себе в своем потоке, общается с клиентами. Есть клиенты, которые передают запросы на выполнение. Запросы, естественно, не выполняются в потоке Selector"а, а передаются на выполнение ThreadPoolExecutor"ам.
Есть одно ограничение: Запросы клиента должны выполняться последовательно для клиента, т.е. вот такое является нормальным ходом выполнения.
к = клиент, з = запрос.
к1з1, к2з1, к3з1, к3з2, к1з2, к1з3, к2з2...
Запросы поступают не сразу. Между ними может пройти какое-то время. В общем картина такова, что клиентам между собой синхронизация не нужна, а запросы от клиентов должны выполняться последовательно, по мере поступления относительно клиента.
Как вариант - можно все это дело запихнуть в 1 поток, но не нравится оно мне 
Суть проблемы в том что некоторые передаваемые на выполнение Runnable просто игнорируются, а RejectedExecutionhandler"ы молчат :(
P.S. BlockingThreadFactory - это я смотрел насколько выгодно ограничивать количество параллельных тридов. Как оказалось - не выгодно, общее време выполнения увеличивалось. Во всяком случае в даном примере.
Код | import java.util.HashMap; import java.util.concurrent.*;
public class ThreadPoolManager {
private static final int MAX_THREADS = 200; private static final BlockingThreadFactory BLOCKING_THREAD_FACTORY = new BlockingThreadFactory(MAX_THREADS); private static final HashMap<Object, ExecutorService> contextExecutors = new HashMap<Object, ExecutorService>(); private static ExecutorService taskDeliverer = Executors.newSingleThreadExecutor();
static void executeInContextPrivate(ContextTask contextTask) {
ExecutorService executorService = contextExecutors.get(contextTask.getContext());
if (executorService == null) { // Use this // executorService = new ThreadPoolExecutor(0, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), BLOCKING_THREAD_FACTORY); // Or this executorService = Executors.newSingleThreadExecutor(); contextExecutors.put(contextTask.getContext(), executorService); }
executorService.execute(contextTask.getTask()); }
public static void executeInContext(Object object, Runnable task) { taskDeliverer.execute(new ContextTask(object, task)); }
// Test code and some other crap
public static void main(String[] args) {
Object[] objects = new Object[PARALEL_CLIENTS]; for (int i = 0; i < PARALEL_CLIENTS; i++) { objects[i] = new Object(); }
Runnable r = new Runnable() { public void run() { done(); } };
time = System.currentTimeMillis(); for (int i = 0; i < TASKS_PER_CLIENT; i++) { for (int o = 0; o < PARALEL_CLIENTS; o++) { executeInContext(objects[o], r); } }
new Thread(new Runnable(){ public void run(){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(counter); System.exit(0); } }).start(); }
private static long time; private static int counter; private static final int TASKS_PER_CLIENT = 5000; private static final int PARALEL_CLIENTS = 100;
public static void done() { counter++; // System.out.println(counter); if (counter == PARALEL_CLIENTS * TASKS_PER_CLIENT) { System.out.println("Done in " + (System.currentTimeMillis() - time) / 1000D + " seconds"); } } }
|
Код | public class ContextTask implements Runnable{
private Object context;
private Runnable task;
public ContextTask(Object context, Runnable task) { this.context = context; this.task = task; }
public Object getContext() { return context; }
public void setContext(Object context) { this.context = context; }
public Runnable getTask() { return task; }
public void setTask(Runnable task) { this.task = task; }
public void run() { ThreadPoolManager.executeInContextPrivate(this); } }
|
Код | import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory;
public class BlockingThreadFactory implements ThreadFactory {
private final Semaphore semaphore;
public BlockingThreadFactory(int maxThreads) { this.semaphore = new Semaphore(maxThreads, false); }
public Thread newThread(Runnable r) {
try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } return new ContextThread(r); }
private class ContextThread extends Thread {
private Runnable target;
private ContextThread(Runnable target) { this.target = target; }
public void run() {
if (target != null) { try { target.run(); } catch (Throwable t) { t.printStackTrace(); } }
semaphore.release(); } } }
|
|