Модераторы: LSD, AntonSaburov
  

Поиск:

Ответ в темуСоздание новой темы Создание опроса
> Многопоточная очередь заявок, СМО поисковая машина 
:(
    Опции темы
Atum
Дата 17.5.2012, 14:05 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 265
Регистрация: 3.10.2008

Репутация: нет
Всего: нет



Добрый день : 

Нужна помощь, чего хочу добиться !? 

(система поиска! к поиску приходят запросы - в много потоков - они выстраиваются в очередь , и хранятся там не более 2х секунд ... есть несколько воркеров которые ищут ответы на поисковые запросы -и возвращают ответ пользователю ... если время на поиск ответа превышает время t  то такой запрос перестает выполняться а Воркер берет новый запрос из очереди на исполнение.)

и так заявки должны  приходить в несколько потоков .
как это организовать и смоделировать ?

Что есть : 
класс список - который удаляет элементы из себя автоматом если они пролежали в нем больше 2-х секунд.


Нет синхронизации :(  при удалении и добавлении элементов (добавлять элементы могу  многие потоки).
Нужно как то следить за удалением и обработкой элементов.

обрабатывать запросы - так же могут несколько Воркеров 
(забирать запросы и обрабатывать их возвращая пользователю ответ на его запрос ).

к функции  removeFirst() как раз и обращаются  воркеры, когда один из них становится свободным .



Код

import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledLinkedList<T> extends LinkedList<T> implements Runnable{
      ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
      static long removeCount = 0;
      
      public void scheduledRemove() {
        
        removeCount = this.size();
        
        System.out.println("***   "+ this.size()+"   ***");
        System.out.println(""+this);
        service.scheduleAtFixedRate(this, 0, 2, TimeUnit.SECONDS);
    }
    @Override
   synchronized  public T removeFirst() {
        if(removeCount>0)removeCount--;
         return super.removeFirst();
    }
      
      
    @Override
    public void run() {
        long count = removeCount;
        System.out.println("["+this.size() +" - "+count + " = " + (this.size() - count)+"] : "  + this) ;
         System.err.print("Ушли : [");
        for (long i = 0; i < count; i++) {
            System.err.print(" " + removeFirst());
        }
         System.err.println("]");
        removeCount = this.size();
    }
}


Код

public static void main(String[] args) throws InterruptedException {
        
        final ScheduledLinkedList<String> list = new ScheduledLinkedList<String>();
        for (int i = 0; i < 10; i++) {
            list.add("" + i);
        }
        list.scheduledRemove();
        
         ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
         
          service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.err.println("Обсдужил : [ " + list.removeFirst() + " " + list.removeFirst() + " " + list.removeFirst()+" " + list.removeFirst() + " ]");
                 System.out.println("Ожидают  : ("+list.size()+") " + list);
            }
        }, 1, 1, TimeUnit.SECONDS);
        
        while (true) {
            long sleep = Math.round(Math.random() * 1000);
            long count = Math.round(Math.random() * 10)+1;
            for (int i = 1; i < count+1; i++) {
                list.add(count + "a" + i);
            }
            System.out.println("add : "+count+  " sleep : " + sleep + " ms" );
            Thread.sleep(sleep);
            
            
        }
    }

PM MAIL   Вверх
priam220
Дата 17.5.2012, 16:31 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 291
Регистрация: 4.6.2010

Репутация: 7
Всего: 8



Код

Нет синхронизации :(  при удалении и добавлении элементов (добавлять элементы могу  многие потоки)


Насколько я понял, основная проблема именно в этом. И наверное, ничего стандартного из Коллекций для хранения запросов не подойдет. Хотя могу ошибаться, конечно.
Если все так, надо смотреть в сторону lock-free алгоритмов. 


Это сообщение отредактировал(а) priam220 - 17.5.2012, 16:32
PM MAIL   Вверх
Ares4322
Дата 18.5.2012, 08:35 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 339
Регистрация: 25.9.2007
Где: Россия, Москва

Репутация: 1
Всего: 3



http://docs.oracle.com/javase/6/docs/api/j...ge-summary.html
Скорее всего в качестве коллекции Вам подойдет http://docs.oracle.com/javase/6/docs/api/j...inkedQueue.html (если проблема все-так и в этом)

PM MAIL   Вверх
Atum
Дата 18.5.2012, 12:25 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 265
Регистрация: 3.10.2008

Репутация: нет
Всего: нет



Цитата(Ares4322 @ 18.5.2012,  08:35)
http://docs.oracle.com/javase/6/docs/api/j...ge-summary.html
Скорее всего в качестве коллекции Вам подойдет http://docs.oracle.com/javase/6/docs/api/j...inkedQueue.html (если проблема все-так и в этом)

ConcurrentLinkedQueue  мне кажется что это очень медленное и не эффективное решение ... 

ибо это универсальная штука ... а когда известно чего хочется то можно написать код более эффективно и не использовать 

ConcurrentLinkedQueue и пр.

Но я могу и ошибаться ! 

ConcurrentLinkedQueue- отличное решение - для решения без оптимизации.

Добавлено через 55 секунд
Цитата(priam220 @ 17.5.2012,  16:31)

я запросов не подойдет. Хотя могу ошибаться, конечно.
Если все так, надо смотреть в сторону lock-free алгоритмов.

lock-free алгоритмов.  можно по подробнее ?
PM MAIL   Вверх
Ares4322
Дата 18.5.2012, 12:36 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 339
Регистрация: 25.9.2007
Где: Россия, Москва

Репутация: 1
Всего: 3



Ну так чтобы не ошибаться, лучше ознакомьтесь с его реализацией. Вполне возможно, что он Вам и подойдет.
PM MAIL   Вверх
priam220
Дата 18.5.2012, 22:28 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 291
Регистрация: 4.6.2010

Репутация: 7
Всего: 8



Цитата

можно по подробнее ?

Я как раз и имел ввиду что то вроде того, что предложил Ares4322, но он оказался еще круче (wait-free). 
Я думаю Вы вряд ли реализуете что то лучше, если только на языке более низкого уровня.

PM MAIL   Вверх
Atum
Дата 14.11.2013, 08:40 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 265
Регистрация: 3.10.2008

Репутация: нет
Всего: нет



Скажем так  небольшой тест И пара вопросов : 
poll vs remove  - remove  вызывает poll 
add vs offer  - add - обертка над offer  .

В чем профит?

LinkedBlockingQueue vs ConcurrentLinkedQueue ?

Код

import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;


public class QueueExample {
    
    private final static int pool = 10;
    private static ExecutorService service = Executors.newFixedThreadPool(pool);
    private static AtomicInteger next = new AtomicInteger(0);

     private static Lock lock = new ReentrantLock();
    
    public static void main(String[] args) throws InterruptedException {
//        LinkedBlockingQueue<String> workQueue = new LinkedBlockingQueue<String>();
//        workQueue.add("1");
//        workQueue.add("2");
//        workQueue.add("3");
//        workQueue.put("4");
//        
//        System.out.println(workQueue.take());
//        System.out.println(workQueue.element());
//        System.out.println(workQueue.peek());
//        System.out.println(workQueue.remove());
//        System.out.println(workQueue.poll());
//        System.out.println(workQueue);
//        
//        System.exit(0);
        
        
     final ConcurrentLinkedQueue<Integer> linkedQueue = new ConcurrentLinkedQueue<Integer>();
        
        
        System.out.println(linkedQueue);

       // добавляем если очередь меньше 10.

        service.submit(new Runnable() {

            @Override
            public void run() {
                while(true){
                    if(linkedQueue.size() < 10){
                    Integer n = next.incrementAndGet();
                    System.out.printf(" add ... %d\n",n);  
                    linkedQueue.add(n);
                    }
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException ex) {
                        Logger.getLogger(QueueExample.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        });
        
// Просто вывод каждую секунду того что есть в очереди.
        service.submit(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(1000);
                        System.out.println(linkedQueue);
                    } catch (InterruptedException ex) {
                        Logger.getLogger(QueueExample.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        });

        System.out.println("delete... ");
       //удаление из очереди
        while (true) {
            Thread.sleep(500);
            service.submit(new Runnable() {
                @Override
                public void run() {
                     //TODO 
                     //lock.lock();
                    if (!linkedQueue.isEmpty()) {
                        Integer elem = linkedQueue.poll();
                        System.out.println(" delete ... " + elem);
                    }
                     //lock.unlock();
                }
            });
        }
    }
}


PM MAIL   Вверх
Ares4322
Дата 14.11.2013, 15:00 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Опытный
**


Профиль
Группа: Участник
Сообщений: 339
Регистрация: 25.9.2007
Где: Россия, Москва

Репутация: 1
Всего: 3



Непонятный тест.  Добавляет один поток, выводит один поток.
Надо чтоб писало много потоков и читало много потоков. И тогда эти две реализации очереди можно будет сравнивать. А так они, конечно, одинаковы.

Посмотри исходники этих двух классов. 
LinkedBlockingQueue при добавлении будет лочить доступ к очереди (т.е. это блокирующая очередь), а ConcurrentLinkedQueue использует CAS и блокировок внутри нет. 
Можно использовать и то, и то. Но ConcurrentLinkedQueue будет производительнее. Так как CAS быстрее локов.
PM MAIL   Вверх
  
Ответ в темуСоздание новой темы Создание опроса
Правила форума "Java"
LSD   AntonSaburov
powerOn   tux
javastic
  • Прежде, чем задать вопрос, прочтите это!
  • Книги по Java собираются здесь.
  • Документация и ресурсы по Java находятся здесь.
  • Используйте теги [code=java][/code] для подсветки кода. Используйтe чекбокс "транслит", если у Вас нет русских шрифтов.
  • Помечайте свой вопрос как решённый, если на него получен ответ. Ссылка "Пометить как решённый" находится над первым постом.
  • Действия модераторов можно обсудить здесь.
  • FAQ раздела лежит здесь.

Если Вам помогли, и атмосфера форума Вам понравилась, то заходите к нам чаще! С уважением, LSD, AntonSaburov, powerOn, tux, javastic.

 
0 Пользователей читают эту тему (0 Гостей и 0 Скрытых Пользователей)
0 Пользователей:
« Предыдущая тема | Java: Общие вопросы | Следующая тема »


 




[ Время генерации скрипта: 0.1104 ]   [ Использовано запросов: 22 ]   [ GZIP включён ]


Реклама на сайте     Информационное спонсорство

 
По вопросам размещения рекламы пишите на vladimir(sobaka)vingrad.ru
Отказ от ответственности     Powered by Invision Power Board(R) 1.3 © 2003  IPS, Inc.