Модераторы: Daevaorn

Поиск:

Ответ в темуСоздание новой темы Создание опроса
> многопоточный даунлоадер картинок, делаю многопоточный закатчик картинок 
:(
    Опции темы
kulibinka
Дата 11.12.2006, 20:30 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Бывалый
*


Профиль
Группа: Участник
Сообщений: 191
Регистрация: 20.11.2006

Репутация: 2
Всего: 4



Нужно сделать многопоточный закатчик картинок.

Разбирался с многопоточностью по этому коду http://effbot.org/librarybook/queue.htm.

Немножко разобрался.

Код

# -*- coding: windows-1251 -*-

import os, time, urllib, threading, Queue

save_to = 'images'
f_images = 'images.txt'
potokov = 5


def download_image_to_file (url, filename):
    """скачивает ТОЛЬКО КАРТИНКИ из интернета"""
    log_message = ''

    try:
        f = urllib.urlopen(url)
        info = f.info()

        contentType = info.get('Content-Type')
        if contentType.find('image') != -1:
            contentLength = info.get('Content-Length')

            if not contentLength: 
                s = f.read()
            else: 
                s = f.read(int(contentLength))
            f.close()

            f = open(filename, 'wb')
            f.write(s)
            f.close()
            log_message = 'success'
        else:
            log_message = 'not image'
    except:
        log_message = 'bad url'
    if log_message != 'success':
        print '\n', url, log_message, '\n'

class Downloader(threading.Thread):
    """ооо - это умный даунлоадер. Качает все по очереди :)
    идея взята отсюда http://effbot.org/librarybook/queue.htm"""

    def __init__(self, queue):
        self.__queue = queue
        threading.Thread.__init__(self)

    def run(self):
        while 1:
            item = self.__queue.get()
            if item is None:
                break # reached end of queue

            num, info = item
            url, filename = info
            print num, '\t', self.getName(), '\t', "Fetching ", url
            download_image_to_file (url, filename)
            print '\t', self.getName(), '\t', "Saved in ", filename, '\t', time.ctime()



def download_list (lst, save_to_dir = '', DownloaderS = 5):
    """а эта штука берет список урл-куда сохранить и сохраняет"""
    print 'OBRABOTATJ: ', len(lst)
    if not os.path.isdir(save_to_dir):
        os.mkdir(save_to_dir)
    
    new_lst = []
    for (url, filename) in lst:
        new_lst.append( (url, os.path.join(save_to_dir, filename)) )

    queue = Queue.Queue(1)

    for i in range(DownloaderS):
        Downloader(queue).start()

    for i in range(len(new_lst)):
#        print "push", item
        queue.put((i, new_lst[i]))

    for i in range(DownloaderS):
        queue.put(None)

def create_lst (image_file):
    """получает файлик с картинками и генерирует список урл-куда сохранить"""
    rezult = []
    f = open(image_file, 'r').read()
    all_urls = f.split('\n')
    for i in range(len(all_urls)):
        rezult.append( (all_urls[i], str(i)) )
    return rezult
    


if not os.path.isdir(save_to):
    os.mkdir(save_to)

images = create_lst(f_images)
download_list(images, save_to, potokov)


В файл 'images.txt' для теста положил следующие картинки



Суть: скрипт получает файлик с урлами + сколько потоков одновременно запускать.
Ну а после этого он начинает качать, но не все а только картинки.

Эта моя версия качает успешно.

Теперь вопрос: мне нужно чтобы он вел лог-файл. А вот как сделать так чтобы все потоки отдавали log_message в одно место я не знаю...

В этом и вопрос - как при многопоточности сделать так, чтобы все потоки могли отсылать информацию в одно общее место?
PM MAIL   Вверх
cherep
Дата 12.12.2006, 01:31 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Участник
Сообщений: 74
Регистрация: 11.1.2006
Где: Москва

Репутация: 1
Всего: 4



Видимо нужно на время отсылки тредом информации залочить все остальные треды, поместив нужный тред в монитор (Хоара). Пока он в мониторе, может работать только он один, все остальные ждут. Таким образом можно предотвратить порчу разделяемого ресурса конкурирующими тредами. Для примера могу привести 2 кусочка кода. Сравните, как они работают.

Код

import threading, time, random

def makerun(i):
    def run():
        for i in range(5):
            time.sleep(random.random() * 0.2)
            print str(i)*10
    return run
    
threads = []

for i in range(10):
    threads += [threading.Thread(target=makerun(i))]
    
for t in threads:
    t.start()


Вывод:

Код

8888888888
1111111111
6666666666
7777777777
1111111111
7777777777
9999999999
3333333333
2222222222
1111111111
1111111111
5555555555
8888888888
6666666666
...


Как видите, все "мешается" из-за того, что треды работают одновременно.

Добавим кое-что:
Код

import threading, time, random

lock = threading.Lock()

def makerun(i):
    def run():
        global lock
        lock.acquire()
        
        for j in range(5):
            time.sleep(random.random() * 0.2)
            print str(i)*10
            
        lock.release()
    return run
    
threads = []

for i in range(10):
    threads += [threading.Thread(target=makerun(i))]
    
for t in threads:
    t.start()


Результат:

Код

0000000000
0000000000
0000000000
0000000000
0000000000
1111111111
1111111111
1111111111
1111111111
1111111111
2222222222
2222222222
2222222222
2222222222
2222222222
...


Как видите, мы окружили критический участок кода вызовами
lock.acquire() и lock.release(). В данном случае lock и есть тем самым монитором (синхронизационным примитивом, если хотите smile )
PM MAIL WWW ICQ   Вверх
kulibinka
Дата 12.12.2006, 02:42 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Бывалый
*


Профиль
Группа: Участник
Сообщений: 191
Регистрация: 20.11.2006

Репутация: 2
Всего: 4



Сделал как Вы посоветовали - заработало, спасибо smile

Код

# -*- coding: windows-1251 -*-

import os, time, urllib, threading, Queue

save_to = 'images'
f_images = 'images.txt'
f_log = 'log_file.txt'
potokov = 5
lock = threading.Lock()
full_log = []


def download_image_to_file (url, filename):
    """скачивает ТОЛЬКО КАРТИНКИ из интернета"""
    global lock
    global full_log

    log_message = ''

    try:
        f = urllib.urlopen(url)
        info = f.info()

        contentType = info.get('Content-Type')
        if contentType.find('image') != -1:
            contentLength = info.get('Content-Length')

            if not contentLength: 
                s = f.read()
            else: 
                s = f.read(int(contentLength))
            f.close()

            f = open(filename, 'wb')
            f.write(s)
            f.close()
            log_message = 'success'
        else:
            log_message = 'not image'
    except:
        log_message = 'bad url'
    if log_message != 'success':
        print '\n', url, log_message, filename, '\n'

    lock.acquire()
    full_log.append(filename + ' ' + url + ' ' + log_message)
    lock.release()


class Downloader(threading.Thread):
    """ооо - это умный даунлоадер. Качает все по очереди :)
    идея взята отсюда http://effbot.org/librarybook/queue.htm"""

    def __init__(self, queue):
        self.__queue = queue
        threading.Thread.__init__(self)

    def run(self):
        while 1:
            item = self.__queue.get()
            if item is None:
                break # reached end of queue

            num, info = item
            url, filename = info
            print num, '\t', self.getName(), '\t', "Fetching ", url
            download_image_to_file (url, filename)
            print '\t', self.getName(), '\t', "Saved in ", filename, '\t', time.ctime()



def download_list (lst, save_to_dir = '', DownloaderS = 5):
    """а эта штука берет список урл-куда сохранить и сохраняет"""
    print 'OBRABOTATJ: ', len(lst)
    if not os.path.isdir(save_to_dir):
        os.mkdir(save_to_dir)
    
    new_lst = []
    for (url, filename) in lst:
        new_lst.append( (url, os.path.join(save_to_dir, filename)) )

    queue = Queue.Queue(1)

    for i in range(DownloaderS):
        Downloader(queue).start()

    for i in range(len(new_lst)):
#        print "push", item
        queue.put((i, new_lst[i]))

    for i in range(DownloaderS):
        queue.put(None)
    
    f = open(f_log, 'w')
    f.write("\n".join(full_log))
    f.close()
    print 'SAVED :)'


def create_lst (image_file):
    """получает файлик с картинками и генерирует список урл-куда сохранить"""
    rezult = []
    f = open(image_file, 'r').read()
    all_urls = f.split('\n')
    for i in range(len(all_urls)):
        rezult.append( (all_urls[i], str(i)) )
    return rezult
    


if not os.path.isdir(save_to):
    os.mkdir(save_to)

images = create_lst(f_images)
download_list(images, save_to, potokov)


Вопрос в догонку - мне нужно запустить ф-ю сохранения лога только после того, как все потоки отработают.
Как это делается - ввести переменную, в которой сохранять количество процессов, и перед сохранением циклить пока она не станет 0, или же есть какой-либо способ покрасивее?
PM MAIL   Вверх
cherep
Дата 12.12.2006, 03:04 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Участник
Сообщений: 74
Регистрация: 11.1.2006
Где: Москва

Репутация: 1
Всего: 4



Цитата

Сделал как Вы посоветовали - заработало, спасибо smile


Пожалуйста, я рад!

На счет второго вопроса, думаю все что Вам нужно - это за-join-ить все запускаемые треды на главный тред (основной поток выполнения)
Это хорошо написано в хэлпе. Теория тут такова: вы в треде A (в данном случае это основная программа) запускаете тред B:

B.start() и сразу же делаете B.join()

Таким образом тред A (основная программа) будет ожидать завершения треда B и лишь потом продолжит выполняться...

На примере ранее приведенной программы

Код

import threading, time, random

lock = threading.Lock()

def makerun(i):
    def run():
        global lock
        lock.acquire()
        
        for j in range(5):
            time.sleep(random.random() * 0.2)
            print str(i)*10
            
        lock.release()
    return run
    
threads = []

for i in range(10):
    t = threading.Thread(target=makerun(i))
    threads += [t]
        
for t in threads:
    t.start()
    t.join() # <-- here
    
print 'All done!!!'


Вам нужно поставить сохранять лог на месте print 'All done!!!'  smile 
PM MAIL WWW ICQ   Вверх
_Viper_
Дата 12.12.2006, 12:14 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Участник
Сообщений: 87
Регистрация: 8.11.2006

Репутация: 10
Всего: 11



cherep, так как ты написал 
Код
for t in threads:
    t.start()
    t.join() # <-- here
все потоки будут выполняться последовательно, join стоит вынести за пределы цикла. К тому же если закончился один из потоков, то это совсем не означает что закончились все.
Так же 
Код

        lock.acquire()
        
        for j in range(5):
            time.sleep(random.random() * 0.2)
            print str(i)*10
            
        lock.release()
смысла не имеет, поскольку это заблокирует все потоки, будет работать только один - все остальные ждать. Думаю имеет смысл блокировать ресурс именно перед записью 
Код
        
        for j in range(5):
            time.sleep(random.random() * 0.2)
            lock.acquire()
            print str(i)*10
            lock.release()

PM MAIL   Вверх
cherep
Дата 12.12.2006, 13:47 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Участник
Сообщений: 74
Регистрация: 11.1.2006
Где: Москва

Репутация: 1
Всего: 4



Цитата

cherep, так как ты написал  
Код

for t in threads:
    t.start()
    t.join() # <-- here

все потоки будут выполняться последовательно, join стоит вынести за пределы цикла. К тому же если закончился один из потоков, то это совсем не означает что закончились все.


Да, ты совершенно прав. Каждый вызов join просто-напросто будет блокировать работу цикла по запуску тредов до окончания последнего запущенного треда. Нужно делать так:
Код

for t in threads:
    t.start()
    
for t in threads:
    t.join() # <-- here


Насчет второго замечания, то тут ты не так понял. Суть в том, что весь участок будет выполнен как "единое целое". В данном случае да, потоки по большому счету будут выполняться последовательно. Нам важно было именно синхронизовать "всю записывающую" часть программы. Имеется ввиду, что там еще есть некоторая "полезная работа".

Код

def makerun(i):
    def run():
        global lock

        WORK() # <-- полезная работа

        lock.acquire()
        
        for j in range(5):
            time.sleep(random.random() * 0.2)
            print str(i)*10
            
        lock.release()
    return run

PM MAIL WWW ICQ   Вверх
_Viper_
Дата 12.12.2006, 14:09 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Участник
Сообщений: 87
Регистрация: 8.11.2006

Репутация: 10
Всего: 11



Цитата(cherep @  12.12.2006,  13:47 Найти цитируемый пост)
 Имеется ввиду, что там еще есть некоторая "полезная работа".
Я просто рассматривал 
Код
time.sleep(random.random() * 0.2)
как имитацию "полезной работы".
PM MAIL   Вверх
cherep
Дата 12.12.2006, 14:21 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Участник
Сообщений: 74
Регистрация: 11.1.2006
Где: Москва

Репутация: 1
Всего: 4



Цитата

Я просто рассматривал 
Код

time.sleep(random.random() * 0.2)

как имитацию "полезной работы". 


Это была имитация "протяженности" операции записи в лог  smile 
PM MAIL WWW ICQ   Вверх
kulibinka
Дата 12.12.2006, 15:12 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Бывалый
*


Профиль
Группа: Участник
Сообщений: 191
Регистрация: 20.11.2006

Репутация: 2
Всего: 4



Методом перебора пытался применить join к потокам если там еще и Queue прилеплено - все время вылазили ошибки.

А потом посмотрел http://docs.python.org/lib/QueueObjects.html и увидел что в питоне 2.5 можно делать банально     queue.join(). 

Поставил 2.5, потестил - да, действительно можно и все работает smile

Код

# -*- coding: windows-1251 -*-

import os, time, urllib, threading, Queue

save_to = 'images'
f_images = 'images.txt'
f_log = 'log_file.txt'
potokov = 30
lock = threading.Lock()
full_log = []


def download_image_to_file (url, filename):
    """скачивает ТОЛЬКО КАРТИНКИ из интернета"""
    global lock
    global full_log

    log_message = ''

    try:
        f = urllib.urlopen(url)
        info = f.info()

        contentType = info.get('Content-Type')
        if contentType.find('image') != -1:
            contentLength = info.get('Content-Length')

            if not contentLength: 
                s = f.read()
            else: 
                s = f.read(int(contentLength))
            f.close()

            f = open(filename, 'wb')
            f.write(s)
            f.close()
            log_message = 'success'
        else:
            log_message = 'not image'
    except:
        log_message = 'bad url'
    if log_message != 'success':
        print '\n', url, log_message, filename, '\n'

    lock.acquire()
    full_log.append(filename + ' ' + url + ' ' + log_message)
    lock.release()


class Downloader(threading.Thread):
    """ооо - это умный даунлоадер. Качает все по очереди :)
    идея взята отсюда http://effbot.org/librarybook/queue.htm"""

    def __init__(self, queue):
        self.__queue = queue
        threading.Thread.__init__(self)

    def run(self):
        while 1:
            item = self.__queue.get()
            if item is None:
                break # reached end of queue

            num, info = item
            url, filename = info
            print num, '\t', self.getName(), '\t', "Fetching ", url
            download_image_to_file (url, filename)
            print '\t', self.getName(), '\t', "Saved in ", filename, '\t', time.ctime()
            self.__queue.task_done()



def download_list (lst, save_to_dir = '', DownloaderS = 5):
    """а эта штука берет список урл-куда сохранить и сохраняет"""
    t1 = time.time()
    print 'OBRABOTATJ: ', len(lst)
    if not os.path.isdir(save_to_dir):
        os.mkdir(save_to_dir)
    
    new_lst = []
    for (url, filename) in lst:
        new_lst.append( (url, os.path.join(save_to_dir, filename)) )

    queue = Queue.Queue()

    all_downloaders = []
    for i in range(DownloaderS):
        all_downloaders.append(Downloader(queue).start())
        

    for i in range(len(new_lst)):
        queue.put((i, new_lst[i]))

#    ждем пока все не доделается
    queue.join()

    f = open(f_log, 'w')
    f.write("\n".join(full_log))
    f.close()
    print 'SAVED :)'
    print time.time() - t1


def create_lst (image_file):
    """получает файлик с картинками и генерирует список урл-куда сохранить"""
    rezult = []
    f = open(image_file, 'r').read()
    all_urls = f.split('\n')
    for i in range(len(all_urls)):
        rezult.append( (all_urls[i], str(i)) )
    return rezult


if not os.path.isdir(save_to):
    os.mkdir(save_to)

images = create_lst(f_images)
download_list(images, save_to, potokov)


Спасибо всем за наводки.
Но если кто подскажет как queue и join связать в 2.4 - буду немеряно благодарный smile
PM MAIL   Вверх
_Viper_
Дата 12.12.2006, 15:24 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Участник
Сообщений: 87
Регистрация: 8.11.2006

Репутация: 10
Всего: 11



Цитата(kulibinka @  12.12.2006,  15:12 Найти цитируемый пост)
Методом перебора пытался применить join к потокам если там еще и Queue прилеплено - все время вылазили ошибки.
В документации по 2.4 написано, что доступность Queue зависит от наличия модуля thread. А потоки из этого модуля не поддерживают join. Так что скорее всего использовать одновременно  Queue и join никак не удастся.
PM MAIL   Вверх
kulibinka
Дата 12.12.2006, 15:28 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Бывалый
*


Профиль
Группа: Участник
Сообщений: 191
Регистрация: 20.11.2006

Репутация: 2
Всего: 4



Ок. 
Тогда другой вопрос - а что это моя программа ждет после того как все отработала?
SAVED написала, все сделала (по результатам видно), и все равно продолжает что-то делать...
PM MAIL   Вверх
cherep
Дата 12.12.2006, 15:30 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Участник
Сообщений: 74
Регистрация: 11.1.2006
Где: Москва

Репутация: 1
Всего: 4



Цитата

Но если кто подскажет как queue и join связать в 2.4 - буду немеряно благодарный smile


Самое простое и глупое решение - берешь файлик Queue.py из python25\Lib и ложишь в директории с исходником своей проги.  Думаю, должно сработать. Идея, думаю, понятна - он будет импортироваться как в 24 так и в 25 -> все будет работать идентично.
PM MAIL WWW ICQ   Вверх
_Viper_
Дата 12.12.2006, 16:02 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Участник
Сообщений: 87
Регистрация: 8.11.2006

Репутация: 10
Всего: 11



Цитата(kulibinka @  12.12.2006,  15:28 Найти цитируемый пост)
Тогда другой вопрос - а что это моя программа ждет после того как все отработала?SAVED написала, все сделала (по результатам видно), и все равно продолжает что-то делать...
Такое бывает когда не все потоки закончили выполнение. Проверь нормально ли останавливаются твои потоки.

PM MAIL   Вверх
kulibinka
Дата 12.12.2006, 20:05 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Бывалый
*


Профиль
Группа: Участник
Сообщений: 191
Регистрация: 20.11.2006

Репутация: 2
Всего: 4



Цитата

Такое бывает когда не все потоки закончили выполнение. Проверь нормально ли останавливаются твои потоки.


я так понимал что  queue.join() для того и создано чтобы это проверять...

Добавлено @ 20:07 
проверил - запустил все в один поток
оно все сделало, отчиталось, но все продолжает чего-то ждать...
PM MAIL   Вверх
cherep
Дата 12.12.2006, 21:16 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Шустрый
*


Профиль
Группа: Участник
Сообщений: 74
Регистрация: 11.1.2006
Где: Москва

Репутация: 1
Всего: 4



Попробуй глянуть чему равно

Код

threading.activeCount( ) 


Попробуй в самом конце
Код

import sys
sys.exit(0)


PS. А вообще совет. Попробуй добиться того же эфекта (описанный баг) на более простом случае (те же потоки, но без закачки картинок). Если будет также - пости сюда, нам будет легче тебе помочь.

Это сообщение отредактировал(а) cherep - 12.12.2006, 21:21
PM MAIL WWW ICQ   Вверх
Ответ в темуСоздание новой темы Создание опроса
1 Пользователей читают эту тему (1 Гостей и 0 Скрытых Пользователей)
0 Пользователей:
« Предыдущая тема | Python: Общие вопросы | Следующая тема »


 




[ Время генерации скрипта: 0.1195 ]   [ Использовано запросов: 21 ]   [ GZIP включён ]


Реклама на сайте     Информационное спонсорство

 
По вопросам размещения рекламы пишите на vladimir(sobaka)vingrad.ru
Отказ от ответственности     Powered by Invision Power Board(R) 1.3 © 2003  IPS, Inc.