Модераторы: Poseidon, Snowy, bems, MetalFan
  

Поиск:

Ответ в темуСоздание новой темы Создание опроса
> Многопоточная загрузка, проблема с одновременностью 
:(
    Опции темы
RelictOs
Дата 4.1.2013, 13:48 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Новичок



Профиль
Группа: Участник
Сообщений: 45
Регистрация: 14.2.2012

Репутация: нет
Всего: 1



Всем привет  smile 
Недавно меня посетила идея написать нечто вроде многопоточного загрузщика файлов. Такие данные, как размер, скорость и главное состояние загрузки я решил выводить в TNextGrid (аналог TStringGrid с расширенными возможностями вроде колонок-прогрессбаров).
Как я уже сказал, концепция программы предполагает многопоточность, которую в целом и общем я реализовал. Но есть одно но (точнее даже 2).

Начну с первого и главного - при загрузке нескольких файлов одновременно они загружаются не одновременно, а по очереди.
Код выполнения потока:
Код

procedure TDownloadThread.Execute;
var
  hInet,  //соединение
  hURL: HINtERNET;    //ссылка на файл
  BufArr: array[1..1024] of byte; //буфер, в который будем скачивать
  BufLen: DWORD;
  sFile: file; //наш файл
  i: integer;
begin
  hInet := InternetOpen(PChar(Application.ExeName),INTERNET_OPEN_TYPE_PrECONFIG,nil,nil,0); //открываем соединение

  for i := 0 to Length(filelist)-1 do  //где filelist - чтото вроде набора файлов для скачивания
    begin
      hURL := InternetOpenURL(hInet,PAnsiChar(filelist[i].url),nil,0,0,0);
      if FileExists(fclientpath+'\'+filelist[i].folder) then
        DeleteFile(fclientpath+'\'+filelist[i].folder);
      AssignFile(sFile,fclientpath+'\'+filelist[i].folder);
      Rewrite(sFile,1);

      repeat
        InternetReadFile(hURL,@BufArr,Sizeof(BufArr),BufLen);
        downloaded := downloaded+ BufLen;
        BlockWrite(sFile,BufArr,BufLen);
        Synchronize(showprogress);
      until BufLen=0;
      CloseFile(sFile);
      hURL := 0;
    end;
  InternetCloseHandle(hInet); //закрываем соединение
end;

//и процедура вывода ShowProgress
procedure TDownloadThread.ShowProgress;
begin
  fGrid.Cell[3,frow].AsInteger := round(downloaded/totalsize*100); //fGrid - указатель на наш NextGrid; Downloaded - сколько загруженно
  //a TotalSize - размер всех всех файлов в списке, который вычисляется еще до Execute
  Application.ProcessMessages;
end;



Надеюсь, в коде все понятно прокомментированно, и проблема лишь в моей невнимательности или глупости  smile 
С нетерпением жду ответов  smile 


Теперь вторая проблема - очень хотелось бы реализовать паузу при загрузке, но и это оказалось не так просто.
Поскольку Suspend в данном случае (насколько я понял) не лучший метод (хотя и он не работает), я решил сделать это через булевскую переменную.
Проблема же в том, что при вызове функции возникает AccessViolation, и я понятия не имею почему и что с ним делать.

Код создания потока и собственно паузы:

Код

constructor TDownloadThread.Create(list: TXMLDocument; var Grid: TNextGrid; row: integer; clientpath: string);
begin
  inherited Create(true);
  FreeOnTerminate := true; //поток освобождается при завершении
  frow := row;             //id или наша строка таблицы
  fclientpath := clientpath; //папка для общей загрузки
  SetFileList(list);       //составляем список файлов для скачивания

  fEnablePause := false; //нельзя ставить поток на паузу
  fPaused := false; // переменная паузы в false

  if Length(filelist) <= 0 then  // если скачивать нечего - закрываемся
    begin
      Grid.Cell[3,frow].AsInteger := 100;
      Self.Terminate;
      exit;
    end;

  totalsize := GetTotalSize; //вычисляем полный размер файлов
  downloaded := 0; // загружено - пока 0
  fGrid := @Grid;   // присваиваем указатель на таблицу
  fGrid.Cell[5,frow].AsString := inttostr(round(totalsize/1024))+ ' KB';
end;

//и собственно функция* паузы
function TDownloadThread.Pause: boolean;
begin
  result := false;
  if (EnablePause) or (fPaused) then exit;
  fPaused := true;
  result := true;
end;


Хочу уточнить, что в коде первой проблемы вы не заметите применения цикла вроде
Код

while fPaused do
 Application.ProcessMessages;

потому что из-за изменения алгоритма загрузки(раньше использовал Indy) я ее временно убрал. Но суть проблемы не в несратываемости, а в ошибке при ее вызове из программы.
PM MAIL ICQ Skype   Вверх
Illusion Dolphin
Дата 4.1.2013, 23:14 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
***


Профиль
Группа: Участник Клуба
Сообщений: 1198
Регистрация: 3.5.2003

Репутация: 36
Всего: 63



Цитата

при загрузке нескольких файлов одновременно они загружаются не одновременно, а по очереди.

Ну так в коде они загружатся по очереди, откуда им загружаться параллельно?

Чтобы реализовать параллельность надо писать пул потоков:
1) Есть список задач, он, например, сохраняется в общее место (класс, потокобезопасный)
2) Запускается N потоков, где N - макс. количество одновременно выполняющихся заданий. N - находится эксперементально, я не думаю что больше 4-5 будет смысл
3) Каждый поток работает по принципу:
  3.1) Берёт из общего места задачу (урл и путь) 
  3.2) Если задачи закончились, то - выход потока
  3.3) Работает
  3.4) Переход в 3.1

При этом общий полный размер и общее количество загруженных битиков тоже можно хранить в этом общем хранилище с синхронизацией. Для уменьшения блокировок потоков - по таймеру где-то 0,5-1с форма запрашивает у общего хранилище текущее состояние и отобращает прогресс.

А теперь по коду.

Цитата

fGrid.Cell[3,frow].AsInteger := round(downloaded/totalsize*100); //fGrid - указатель на наш NextGrid; Downloaded - сколько загруженно

Не стоит из потока лазить в элементы формы, лучше вызывать call-back процедуру, и форма сама будет решать что делать с прогрессом

Цитата

Application.ProcessMessages;

Не нужно, убрать

Цитата

  if Length(filelist) <= 0 then  // если скачивать нечего - закрываемся
    begin
      Grid.Cell[3,frow].AsInteger := 100;
      Self.Terminate;
      exit;
    end;

Вызовет утечку памяти, поток останется в памяти. Поток не стоит вызывать если Length(filelist) = 0. Условие на меньше в этом месте - это клиника. 

Цитата

fGrid := @Grid;

Изврат100%, указатель на объект. Вообще не надо передавать объект, и тем более не стоит делать указатели на классы.



--------------------
В мире всего две бесконечности: вселенная и человеческая глупость... На счёт вселенной я не уверен.
Шифрование и организация фотографий - Photo Database 4.5
PM MAIL WWW ICQ   Вверх
RelictOs
  Дата 5.1.2013, 09:54 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Новичок



Профиль
Группа: Участник
Сообщений: 45
Регистрация: 14.2.2012

Репутация: нет
Всего: 1



Illusion Dolphin, огромное спасибо  smile 
сейчас попробую сделать чтото вроде массива рекордов, в каждом из которых будет храниться ячейка таблицы и инфа(то есть номер ячейки), и поток, который при изменении кол-ва элементов массива будет писать уже в грид (тоесть только он будет иметь доступ к ней). Гениально  smile 
PM MAIL ICQ Skype   Вверх
RelictOs
Дата 5.1.2013, 14:51 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Новичок



Профиль
Группа: Участник
Сообщений: 45
Регистрация: 14.2.2012

Репутация: нет
Всего: 1



Попробовал (на мой взгляд) тот способ, что вы предложили. С простым действием(вроде обычного изменения полоски progressbara) сработало.
Но когда то же самое я попробовал со скачкой файла - проблема та же.

Код всего тестового файла:
Код

unit Unit1;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls, ComCtrls, WinInet, syncobjs;

type
  TProg = record
    id: integer;
    progress: integer;
  end;

  TProgressThread = class(tThread)
  private
    hId: integer;
    folder, url: string;
  public
    constructor Create(id: integer; sfolder, surl: string);
    procedure DoWork;
  protected
    procedure Execute;override;
  end;

  TControlThread = class(TThread)
  protected
    procedure Execute;override;
  end;

  TForm1 = class(TForm)
    ProgressBar1: TProgressBar;
    ProgressBar2: TProgressBar;
    Button1: TButton;
    Memo1: TMemo;
    Button2: TButton;
    procedure Button1Click(Sender: TObject);
    procedure Button2Click(Sender: TObject);
  private
    { Private declarations }
  public
    { Public declarations }
  end;

var
  Form1: TForm1;
  progress: array of TProg;
  cs: TCriticalSection;
implementation

{$R *.dfm}

constructor TProgressThread.Create(id: integer; sfolder, surl: string);
begin
  inherited Create(false);
  self.Priority :=  tpLower;
  hId := id;
  folder := sfolder;
  url := surl;
end;

procedure TProgressThread.Execute;
begin

      Synchronize(DoWork);
      sleep(50);
end;

procedure TProgressThread.DoWork;
var
  hInet,  //соединение
  hURL: HINtERNET;    //ссылка на файл
  BufArr: array[1..1024] of byte;
  BufLen: DWORD;
  sFile: file;
  i: integer;
begin
  hInet := InternetOpen(PChar(Application.ExeName),INTERNET_OPEN_TYPE_PrECONFIG,nil,nil,0);

      hURL := InternetOpenURL(hInet,PAnsiChar(url),nil,0,0,0);
      if FileExists(ExtractFilePath(Application.ExeName)+folder) then
        DeleteFile(ExtractFilePath(Application.ExeName)+folder);
      AssignFile(sFile,ExtractFilePath(Application.ExeName)+folder);
      Rewrite(sFile,1);
      repeat

        InternetReadFile(hURL,@BufArr,Sizeof(BufArr),BufLen);
        BlockWrite(sFile,BufArr,BufLen);
        setLength(progress,length(progress)+1);
        progress[length(progress)-1].id := hId;
        progress[length(progress)-1].progress := round(random(100));

        sleep(0);
      until BufLen=0;
      CloseFile(sFile);

end;

procedure TControlThread.Execute;
var
  i: integer;
begin
  while not terminated do
  begin
    if length(progress)> 0 then
      begin
        for i := 0 to Length(progress)-1 do
          begin
          if progress[i].id = 1 then
            Form1.ProgressBar1.Position := progress[i].progress
          else
            Form1.ProgressBar2.Position := progress[i].progress;
          Form1.Memo1.Lines.Add(timetostr(now)+'_'+inttostr(progress[i].id));
      Application.ProcessMessages;    
          end;
      SetLength(progress,0);
      end;
   sleep(0);
  end;
end;

procedure TForm1.Button1Click(Sender: TObject);
var
  ctrl: TControlThread;
  pr1: TProgressThread;
begin
  ctrl := TControlThread.Create(false);
  pr1 := TProgressThread.Create(1,'Data\file1.txt','http://127.0.0.1/Launcher/download/theme.rlt');
  
end;

procedure TForm1.Button2Click(Sender: TObject);
var
  pr2: TProgressThread;
begin
pr2 := TProgressThread.Create(2,'Data\file2.txt','http://127.0.0.1/Launcher/download/theme.rlt');
end;

end.


PM MAIL ICQ Skype   Вверх
Illusion Dolphin
Дата 5.1.2013, 17:57 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
***


Профиль
Группа: Участник Клуба
Сообщений: 1198
Регистрация: 3.5.2003

Репутация: 36
Всего: 63



Код

procedure TProgressThread.Execute;
begin
      Synchronize(DoWork);
      sleep(50);
end;

Полная бессмысленность - всё что находится в Synchronize выполняется в основном потоке, т.е. смысла от потока ноль.

Попробуйте вот это:
Код

unit Unit30;

interface

uses
  Generics.Collections,
  System.SyncObjs,
  System.Math,
  Vcl.ExtCtrls, Vcl.StdCtrls, System.Classes, Vcl.Controls, Vcl.ComCtrls,
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, Vcl.Forms, Vcl.Graphics;

type
  TForm30 = class(TForm)
    ProgressBar1: TProgressBar;
    Button1: TButton;
    Timer1: TTimer;
    procedure Button1Click(Sender: TObject);
    procedure Timer1Timer(Sender: TObject);
  private
    { Private declarations }
  public
    { Public declarations }
  end;

  TTask = class
  public
    FileName: string;
    FileSize: Int64;
    DownloadedSize: Int64;
  end;

  TTaskStorage = class
  private
    FData: TQueue<TTask>;
    FSync: TCriticalSection;
    FTotalSize: Int64;
    FDownloadedTotal: Int64;
    function GetCount: Integer;
    function GetDownloadedTotal: Int64;
    function GetTotalSize: Int64;
  public
    constructor Create;
    destructor Destroy; override;
    procedure AddTask(Task: TTask);
    function ExtractTask: TTask;
    procedure DownloadedFilePart(PartSize: Integer);
    property Count: Integer read GetCount;
    property TotalSize: Int64 read GetTotalSize;
    property DownloadedTotal: Int64 read GetDownloadedTotal;
  end;

  TWorkerThread = class(TThread)
  private
    procedure ProcessTask(Task: TTask);
  protected
    procedure Execute; override;
  end;

var
  Form30: TForm30;

implementation

var
  TaskStorage: TTaskStorage = nil;

{$R *.dfm}

procedure TForm30.Button1Click(Sender: TObject);
const
  ThreadCount = 5;
  FilesCount = 50;
var
  I: Integer;
  T: TTask;
begin
  for I := 1 to FilesCount do
  begin
    T := TTask.Create;
    T.FileName := IntToStr(I);
    T.FileSize := Random(1024 * 1024);
    T.DownloadedSize := 0;

    TaskStorage.AddTask(T);
  end;

  for I := 1 to ThreadCount do
    TWorkerThread.Create;
end;

procedure TForm30.Timer1Timer(Sender: TObject);
begin
  ProgressBar1.Max := TaskStorage.TotalSize;
  ProgressBar1.Position := TaskStorage.DownloadedTotal;
end;

{ TTaskStorage }

procedure TTaskStorage.AddTask(Task: TTask);
begin
  FSync.Enter;
  try
    FTotalSize := FTotalSize + Task.FileSize;
    FData.Enqueue(Task);
  finally
    FSync.Leave;
  end;
end;

constructor TTaskStorage.Create;
begin
  FData := TQueue<TTask>.Create;
  FSync := TCriticalSection.Create;
end;

destructor TTaskStorage.Destroy;
begin
  FreeAndNil(FData);
  FreeAndNil(FSync);
  inherited;
end;

procedure TTaskStorage.DownloadedFilePart(PartSize: Integer);
begin
  FSync.Enter;
  try
    FDownloadedTotal := FDownloadedTotal + PartSize;
  finally
    FSync.Leave;
  end;
end;

function TTaskStorage.ExtractTask: TTask;
begin
  FSync.Enter;
  try
    if FData.Count = 0 then
      Exit(nil);

    Result := FData.Dequeue;
  finally
    FSync.Leave;
  end;
end;

function TTaskStorage.GetCount: Integer;
begin
  FSync.Enter;
  try
    Result := FData.Count;
  finally
    FSync.Leave;
  end;
end;

function TTaskStorage.GetDownloadedTotal: Int64;
begin
  FSync.Enter;
  try
    Result := FDownloadedTotal;
  finally
    FSync.Leave;
  end;
end;

function TTaskStorage.GetTotalSize: Int64;
begin
  FSync.Enter;
  try
    Result := FTotalSize;
  finally
    FSync.Leave;
  end;
end;

{ TWorkerThread }

procedure TWorkerThread.Execute;
var
  T: TTask;
begin
  FreeOnTerminate := True;

  T := TaskStorage.ExtractTask;
  while T <> nil do
  begin

    ProcessTask(T);

    T := TaskStorage.ExtractTask;
  end;
end;

procedure TWorkerThread.ProcessTask(Task: TTask);
const
  Speed = 1024 * 1024;
var
  I: Integer;
  BytesCopied: Integer;
begin

  while Task.FileSize > Task.DownloadedSize do
  begin
    BytesCopied := Min(Random(Speed), Task.FileSize - Task.DownloadedSize);
    Task.DownloadedSize := Task.DownloadedSize + BytesCopied;

    TaskStorage.DownloadedFilePart(BytesCopied);
    Sleep(Random(1000));
  end;
end;

initialization
  TaskStorage := TTaskStorage.Create;

finalization
  FreeAndNil(TaskStorage);

end.




--------------------
В мире всего две бесконечности: вселенная и человеческая глупость... На счёт вселенной я не уверен.
Шифрование и организация фотографий - Photo Database 4.5
PM MAIL WWW ICQ   Вверх
  
Ответ в темуСоздание новой темы Создание опроса
Правила форума "Delphi: Общие вопросы"
SnowyMetalFan
bemsPoseidon
Rrader

Запрещается!

1. Публиковать ссылки на вскрытые компоненты

2. Обсуждать взлом компонентов и делиться вскрытыми компонентами

  • Литературу по Дельфи обсуждаем здесь
  • Действия модераторов можно обсудить здесь
  • С просьбами о написании курсовой, реферата и т.п. обращаться сюда
  • Вопросы по реализации алгоритмов рассматриваются здесь
  • 90% ответов на свои вопросы можно найти в DRKB (Delphi Russian Knowledge Base) - крупнейшем в рунете сборнике материалов по Дельфи


Если Вам понравилась атмосфера форума, заходите к нам чаще! С уважением, Snowy, MetalFan, bems, Poseidon, Rrader.

 
1 Пользователей читают эту тему (1 Гостей и 0 Скрытых Пользователей)
0 Пользователей:
« Предыдущая тема | Delphi: Общие вопросы | Следующая тема »


 




[ Время генерации скрипта: 0.0729 ]   [ Использовано запросов: 21 ]   [ GZIP включён ]


Реклама на сайте     Информационное спонсорство

 
По вопросам размещения рекламы пишите на vladimir(sobaka)vingrad.ru
Отказ от ответственности     Powered by Invision Power Board(R) 1.3 © 2003  IPS, Inc.