Модераторы: Snowy, bartram, MetalFan, bems, Poseidon, Riply

Поиск:

Закрытая темаСоздание новой темы Создание опроса
> Многопоточность - как это делается в Дельфи. Не используйте потоки, не прочитав это 
:(
    Опции темы
Петрович
Дата 31.7.2005, 21:08 (ссылка) |    (голосов:1) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
***


Профиль
Группа: Участник Клуба
Сообщений: 1000
Регистрация: 2.12.2003
Где: Москва

Репутация: 15
Всего: 55



Глава 12. Еще о возможностях синхронизации в Win32.

Содержание:
  • Повышенная эффективность с помощью операций взаимоблокировки (interlocked).
  • Атомарность ниоткуда.
  • Счетчики событий и секвенсоры.
  • Другие возможности синхронизации Win32.


Повышенная эффективность с помощью операций взаимоблокировки (interlocked).

Стандартные примитивы синхронизации могут ввести излишние ограничения для простых многопоточных систем, особенно для потоков, которые интенсивно синхронизируются друг с другом. Одна из возможных альтернатив - использование interlocked операций.
Interlocked (взаимосвязанные, взаимоблокировочные) операции первоначально были задуманы как механизм синхронизации низкого уровня для симметричных многопроцессорных систем с разделяемой памятью. Во многопроцессорных системах общая память представляет собой чрезвычайно эффективный путь обмена данными между процессами и потоками. Необходимо обеспечить методы решения проблем атомарности, когда два или более процессоров пытаются использовать один и тот же участок памяти. Почти все современные процессоры поддерживают соответствующие операции взаимоблокировки. Эти такие операции, посредством которыхо процессор может прочитать значение из памяти, модифицировать его, а затем записать атомарно, причем гарантируется, что другие процессоры не получит доступ к тому же участку памяти, а процессор, выполняющий операцию, не будет прерван. Win32 обеспечивает следующие операции взаимоблокировки:
  • InterlockedCompareExchange (только Win NT/2K).
  • InterlockedDecrement.
  • InterlockedExchange.
  • InterlockedExchangeAdd (только Win NT/2K).
  • InterlockedIncrement.
Так почему бы не использовать такие операции всегда? Хорошим примером может служить замок (spin lock). Допустим, есть желание создать что-то подобное критической секции. Однако в критической секции может быть очень мало кода, а доступ к нему нужен очень часто. В таких случаях, полноценный объект синхронизации может оказаться излишним. Spin lock (замок) позволяет нам добиться аналогичного результата, и работает примерно так
Код
var
  Lock:integer;

{Lock Initialization}
Lock := -1;

{Entering the spin lock}
while InterlockedIncrement(Lock)>0 do
begin
  Dec(Lock);
  Sleep(0);
end;

{Leaving the spin lock}
Dec(Lock);

Поток захватывает замок, если при выполнении приращения InterlockedIncrement значение Lock будет нулевым. Если эта величина больше нуля, то замок захвачен другим потоком, и требуется новая попытка. Вызов Sleep нужен, чтобы один поток не ждал впустую слишком долго, пока поток с более низким приоритетом удерживает замок. Для простых планировщиков, если приоритеты потоков равны, то вызов Sleep может и не потребоваться. Операция блокировки необходима, поскольку если поток выполняет чтение значения из памяти, увеличение его, сравнение, а затем записывает назад, то два потока могут захватить замок одновременно.
Излишние действия почти исключены, поскольку лишь несколько инструкций CPU требуется для входа и выхода из замка, а потоку не приходится ждать. Если потокам нужно ожидать существенное время, то процессор работает впустую, так что это подходит только для создания небольших критических секций. Замки полезны для реализации критических секций, которые сами являются частью структур синхронизации. Коллективные данные внутри примитивов синхронизации или планировщиков часто защищены блокировкой подобного типа: часто это необходимо, поскольку примитивы синхронизации уровня OС не могут быть использованы, чтобы осуществлять примитивы синхронизации уровня OС. У такой блокировки имеются те же проблемы с конкуренцией потоков, как и у мьютексов, но отличие состоит в том, что зацикливание происходит не путем замораживания (deadlock), а динамически (livelock). Это несколько худшая ситуация, потому что хотя "блокированные" потоки не выполняют полезного кода, они работают в бесконечном цикле, расходуя время процессора и понижая производительность всей системы. Замки нельзя использовать как семафоры, чтобы "приостанавливать" потоки.

Атомарность ниоткуда.

При достаточной аккуратности возможно создать замок, который является атомарным, вообще не прибегая к взаимоблокировке, при условии, что прерывания будут происходить только между инструкциями CPU. Рассмотрим код
Код
procedure DoSomethingCritical(var Lock: integer);

var
  Temp: integer;

begin
{ Initialize lock }
  Lock := -1;
{ Enter Lock }
  repeat
    Inc(Lock);
    Temp := Lock;
    if Temp > 0 then
      Dec(Lock);
  until not (Temp > 0);
{ Perform operations }
{ Leave Lock }
  Dec(Lock);
end;

procedure AsmDoSomethingCritical(var Lock: integer);
asm
{ Initialize lock }
  lock mov dword ptr[eax],$FFFFFFFF
{ Enter Lock }
@spin:
  lock inc dword ptr[eax]
  mov edx,[eax]
  test edx,edx
  jng @skipdec
  lock dec dword ptr[eax]
@skipdec:
  test edx,edx
  jg @spin
{ Perform operations }
{ Leave Lock }
  lock dec dword ptr[eax]
end;


Сначала обратим внимание на код на Паскале, чтобы понять основную идею. У нас есть замок - целое число в памяти. При попытке войти в замок мы сначала увеличиваем значение в памяти. Затем читаем значение из памяти в локальную переменную, и проверяем, как и раньше, больше ли оно нуля. Если это так, то кто-то еще обладает этим замком, и мы возвращаемся к началу, в противном случае мы захватываем замок.
Самое важное в этом наборе операций то, что при определенных условиях переключение потоков может произойти в любой момент времени, но потокобезопасность все же сохранится. Первое приращение замка является косвенным приращением регистра. Значение всегда находится в памяти, и приращение атомарно. Затем мы читаем значение замка в локальную переменную. Это действие не атомарное. Значение, прочитанное в локальную переменную, может отличаться от результата приращения. Тем не менее, хитрость состоит в том, что поскольку приращение выполняется перед действием чтения, происходящие конфликты потоков всегда будут приводить к слишком высокому прочитанному значению, а не к слишком низкому: в результате конфликтов потоков можно узнать, свободен ли замок.
Часто полезно писать подобные действия на ассемблере, чтобы быть полностью уверенным, что правильные значения останутся в памяти, а не кешируются в регистрах. Компилятор Delphi (по крайней мере Delphi 4), при передаче замка как var-параметра, и с использованием локальной переменной, генерирует корректный код, который будет работать на однопроцессорных машинах. На многопроцессорных машинах косвенные приращения и декременты регистра не атомарны. Эта проблема решена в ассемблерной версии кода путем добавления префикса lock перед инструкциями, которые имеют дело с замком. Этот префикс указывает процессору исключительно заблокировать шину памяти на все время выполнения инструкции, таким образом этими операции становятся атомарными.
Плохо только, что хотя это и теоретически правильно, виртуальная машина Win32 не позволяет процессам пользовательского уровня исполнять инструкции с префиксом lock. Программисты, предполагающие действительно применять этот механизм, должны использовать его только в коде с привилегиями нулевого кольца (ring 0). Другая проблема состоит в том, что поскольку эта версия блокировки не вызывает Sleep, потоки способны монополизировать процессор, пока они ожидают снятия блокировки, а это гарантирует зависание машины.

Счетчики событий и секвенсоры.

Одна из альтернатив для семафоров - использовать два новых вида примитивов: eventcounts и sequencers. Оба они содержат счетчики, но, в отличие от семафоров, с момента их создания счетчики неограниченно возрастают. Некоторым очень нравится идея того, что можно различить 32-е и 33-е появление события в системе. Значения этих счетчиков сделаны доступными для использующих их потоков, и могут быть использованы процессами для упорядочения операций. Счетчики событий поддерживают три действия:
  • EVCount.Advance(): Увеличивает счетчик, возвращает новое значение.
  • EVCount.Read(): Возвращает текущее значение.
  • EVCount.Await(WaitCount:integer): Приостанавливает вызывающий поток, пока внутренний счетчик не станет больше или равным WaitCount.
Секвенсоры поддерживают только одну операцию:
  • Sequencer.Ticket(): Возвращает значение внутреннего счетчика и увеличивает его.
Определение указанных классов может выглядеть примерно так
Код
type
  TEventCount = class
  private
  protected
  public
    constructor Create;
    destructor Destroy; override;
    function Advance: integer;
    function Read: integer;
    procedure Await(WaitCount: integer);
  published
  end;

  TSequencer = class
  private
  protected
  public
    constructor Create;
    destructor Destroy; override;
    function Ticket: integer;
  published
  end;

Теперь довольно легко использовать счетчики событий и секвенсоры для осуществления всех операций, которые можно выполнять с применением семафоров:

Реализация взаимного исключения

Код
{ Enforcing a mutual exclusion }

var
  MyTurn: integer;
  EC: TEventCount;
  S: TSequencer; { assume already created appropriately }

begin
  MyTurn := S.Ticket;
  EC.Await(MyTurn);
    {  Critical operations }
  EC.Advance;
end;


Ограниченный буфер с одним поставщиком данных и одним потребителем

Код
{ Single producer consumer bounded buffer }
{ buffer has N slots }

var
  InE, OutE: TEventCount; { Set up and initially 0 }

{ producer }

var
  I: integer;

begin
  while not terminated do
  begin
    OutE.Await(I - N);
    { insert item at I mod N }
    InE.Advance;
    Inc(I);
  end;
end;

{ consumer }

var
  I: integer;

begin
  while not terminated do
  begin
    InE.Await(I);
    { remove item at i mod N }
    OutE.Advance;
  end;
end;


Ограниченный буфер с произвольным числом поставщиков и потребителей

Код
{ Bounded buffer with multiple producers and consumers }

var
  InE, OutE: TEventCount; { Set up and initially 0 }
  PrTk, CnTk: TSequencer; { Set up and initially 0 }

{ producer }

var
  MyTurn: integer;

begin
  while not terminated do
  begin
    MyTurn := PrTk.Ticket;
    InE.Await(MyTurn);
    OutE.Await(MyTurn - N + 1);
    { insert item at myturn mod N }
    InE.Advance;
  end;
end;

{ consumer }

var
  MyTurn: integer;

begin
  while not terminated do
  begin
    MyTurn := CnTk.Ticket;
    OutE.Await(MyTurn);
    InE.Await(MyTurn + 1);
    { remove item at MyTurn mod N }
    OutE.Advance;
  end;
end;


Одно из преимуществ этого типа примитива синхронизации состоит в том, что операции Advance и Ticket могут быть очень просто реализованы с использованием взаимоблокирующих инструкций сравнения и обмена. Оставим это читателю как несколько более трудное упражнение.

Другие возможности синхронизации Win32.

Waitable timers (ожидаемые таймеры). В Windows NT и Win2K имеются объекты ожидаемых таймеров. Они позволяют потоку или набору потоков определенное время ждать объекта таймера. Таймеры можно использовать для освобождения одного или некоторого числа потоков на повременной основе; разновидность контроля потоков данных. Вдобавок, задержка, обеспечиваемая ожидаемыми таймерами, может быть установлена с очень высокой точностью: наименьшее ее возможное значение около 100 наносекунд, что делает эти таймеры предпочтительнее использования Sleep, если поток следует приостановить на определенное время.
MessageWaits. Когда приложения Delphi ожидают выхода из потоков, главный поток VCL постоянно блокирован. Потенциально эта ситуация может вызывать проблемы, потому что поток VCL не может обрабатывать сообщения. Win32 предоставляет функцию MsgWaitForMultipleObjects для преодоления этих проблем. Поток, осуществляющий ожидание сообщения, блокируется до тех пор, пока или объект синхронизации не перейдет в сигнализированное состояние, или сообщение не будет помещено в очередь сообщений потока. Это означает, что вы можете заставить главный поток VCL ожидать рабочие потоки,и в то же время позволить ему отвечать на сообщения Windows. Хорошую статью по данной теме можно найти по адресу: http://www.midnightbeach.com/jon/pubs/MsgWaits/MsgWaits.html




--------------------
Все знать невозможно, но хочется
PM ICQ   Вверх
Петрович
Дата 31.7.2005, 21:44 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
***


Профиль
Группа: Участник Клуба
Сообщений: 1000
Регистрация: 2.12.2003
Где: Москва

Репутация: 15
Всего: 55



Chapter 13. Using threads in conjunction with the BDE, Exceptions and DLLs.

In this chapter:
  • DLL's and Multiprocess programming.
  • Thread and process scope. A single threaded DLL.
  • Writing a multithreaded DLL.
  • DLL Set-up and Tear down.
  • Pitfall 1: The Delphi encapsulation of the Entry Point Function.
  • Writing a multiprocess DLL.
  • Global named objects.
  • The DLL in detail.
  • DLL Initialization.
  • An application using the DLL.
  • Pitfall 2: Thread context in Entry Point Functions
  • Exception Handling.
  • The BDE.


DLL's and Multiprocess programming.

Dynamic link libraries, or DLL's allow a programmer to share executable code between several processes. They are commonly used to provide shared library code. for several programs. Writing code for DLL's is in most respects similar to writing code for executables. Despite this, the shared nature of DLL's means that programmers familiar with multithreading often use them to provide system wide services: that is code which affects several processes that have the DLL loaded. In this chapter, we will look at how to write code for DLL's that operates across more than one process.

Thread and process scope. A single threaded DLL.

Global variables in DLL's have process wide scope. This means that if two separate processes have a DLL loaded, all the global variables in the DLL are local to that process. This is not limited to variables in the users code: it also includes all global variables in the Borland run time libraries, and any units used by code in the DLL. This has the advantage that novice DLL programmers can treat DLL programming in the same way as executable programming: if a DLL contains a global variable, then each process has its own copy. Furthermore, this also means that if a DLL is invoked by a processes which contain only one thread, then no special techniques are required: the DLL need not be thread safe, since all the processes have completely isolated incarnations of the DLL.
We can demonstrate this with a simple DLL which does nothing but store an integer.
Код
library NumberDLL;

{ Martin Harvey 9/10/2000 }

var
  ANumber:integer;

function GetNumber:integer;stdcall;
begin
  Result := ANumber;
end;


procedure SetNumber(NewNumber:integer);stdcall;
begin
  ANumber := NewNumber;
end;

exports
  GetNumber,
  SetNumber;

begin
end.

It exports a couple of functions that enable an application to read and write the value of that integer. We can then write a simple test application which uses this DLL.
Код
unit NumberForm;

interface

uses
  Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,
  StdCtrls;

type
  TNumberFrm = class(TForm)
    GetBtn: TButton;
    SetBtn: TButton;
    NumberEdit: TEdit;
    procedure GetBtnClick(Sender: TObject);
    procedure SetBtnClick(Sender: TObject);
  private
    { Private declarations }
  public
    { Public declarations }
  end;

var
  NumberFrm: TNumberFrm;

implementation

{$R *.DFM}

function GetNumber:integer;stdcall;external 'NumberDLL.dll';
procedure SetNumber(NewNumber:integer);stdcall;external 'NumberDLL.dll';

procedure TNumberFrm.GetBtnClick(Sender: TObject);
begin
  NumberEdit.Text := IntToStr(GetNumber);
end;

procedure TNumberFrm.SetBtnClick(Sender: TObject);
begin
  try
    SetNumber(StrToInt(NumberEdit.Text));
  except
    on EConvertError do ShowMessage('Not a valid number. Number not set.');
  end;
end;

end.

If several copies of the application are executed, one notes that each application uses its own integer, and no interference exists between any of the applications.

Writing a multithreaded DLL.

Writing a multithreaded DLL is mostly the same as writing multithreaded code in an application. The behaviour of multiple threads inside the DLL is the same as the behaviour of multiple threads in a particular application. As always, there are a couple of pitfalls for the unwary:
The main pitfall one can fall into is the behaviour of the Delphi memory manager. By default, the Delphi memory manager is not thread safe. This is for efficiency reasons: if a program only ever contains one thread, then it is pure wasted overhead to include synchronization in the memory manager. The Delphi memory manager can be made thread safe by setting the IsMultiThread variable to true. This is done automatically for a given module if a descendant class of TThread is created.
The problem is that an executable and the DLL consist of two separate modules, each with their own copy of the Delphi memory manager. Thus, if an executable creates several threads, its memory manager is multithreaded. However, if those two threads call a DLL loaded by the executable, the DLL memory manager is not aware of the fact that it is being called by multiple threads. This can be solved by setting the IsMultiThread variable. It is best to set this by using the DLL entry point function, covered later.
The second pitfall occurs as a result of the same problem; that of having two separate memory managers. Memory allocated by the Delphi memory manager that is passed from the DLL to the executable cannot be allocated in one and disposed of in the other. This occurs most often with long strings, but can occur with memory allocated using New or GetMem, and disposed using Dispose or FreeMem. The solution in this case is to include ShareMem, a unit which keeps the two memory managers in step using techniques discussed later.

DLL Set-up and Tear down.

Mindful of the fact that DLL programmers often need to be aware of how many threads and processes are active in a DLL at any given time, the Win32 system architects provide a method for DLL programmers to keep track of thread and process counts in a DLL. This method is known as the DLL Entry Point Function.
In an executable, the entry point (as specified in the module header) indicates where program execution should start. In a DLL, it points to a function that is executed whenever an executable loads or unloads the DLL, or whenever an executable that is currently using the DLL creates or destroys a thread. The function takes a single integer argument which can be one of the following values:
  • DLL_PROCESS_ATTACH: A process has attached itself to the DLL. If this is the first process, then the DLL has just been loaded.
  • DLL_PROCESS_DETACH: A process has detached from the DLL. If this is the only process using the DLL, then the DLL will be unloaded.
  • DLL_THREAD_ATTACH: A thread in the has attached to the DLL. This will happen once when the process loads the DLL, and subsequently whenever the process creates a new thread.
  • DLL_THREAD_DETACH: A thread has detached from the DLL. This will happen whenever the process destroys a thread, and finally when the process unloads the DLL.
As it turns out, DLL entry points have two characteristics which can lead to misunderstandings and problems when writing entry point code. The first characteristic occurs as a result of the Delphi encapsulation of the entry point function, and is relatively simple to work around. The second occurs as a result of thread context, and will be discussed later on.

Pitfall 1: The Delphi encapsulation of the Entry Point Function.

Delphi uses the DLL entry point function to manage initialization and finalization of units within a DLL as well as execution of the main body of DLL code. The DLL writer can put a hook into the Delphi handling by assigning an appropriate function to the variable DLLProc. The default Delphi handling works as follows:
  • The DLL is loaded, which results in the entry point function being called with DLL_PROCESS_ATTACH
  • Delphi uses this to call the initialization of all the units in the DLL, followed by the main body of the DLL code.
  • The DLL is unloaded, resulting in two calls to the entry point function, with the arguments DLL_PROCESS_DETACH.
Now, the application writer only gets code to execute in response to the entry point function when the DLLProc variable points to a function. The correct point to set this up is in the main body of the DLL. However, this is in response to the second call to the entry point function. In short, what this means is that when using the entry point function in the DLL, the delphi programmer will never see the first process attachment to the DLL. As it turns out, this isn't such a huge problem: one can simply assume that the main body of the DLL is called in response to a process loading the DLL, and hence the process and thread count is 1 at that point. Since the DLLProc variable is replicated on a per process basis, even if more processes attach themselves later, the same argument applies, since each incarnation of the DLL has separate global variables.
In case the reader is still confused, I'll present an example. Here is a modified DLL with a function that displays a message.
Код
library InitDLL;

{Martin Harvey 9/10/2000}

uses
  SysUtils,
  Windows,
  Dialogs,
  TestUnit in 'TestUnit.pas';

procedure FancyMessage;stdcall;
begin
  TestUnit.UnitProc;
end;

procedure EntryPointFunc(Reason:integer);
begin
  case reason of
    DLL_PROCESS_ATTACH:ShowMessage('EntryPoint. Process Attach.');
    DLL_THREAD_ATTACH:ShowMessage('EntryPoint. Thread Attach.');
    DLL_PROCESS_DETACH:ShowMessage('EntryPoint. Process Detach.');
    DLL_THREAD_DETACH:ShowMessage('EntryPoint. Thread Detach.');
  else
    ShowMessage('EntryPoint. Unknown reason');
  end;
end;

exports FancyMessage;

begin
  ShowMessage('Main DLL Body.');
  DLLProc := @EntryPointFunc;
end.


that contains a unit
Код
unit TestUnit;

interface

procedure UnitProc;

implementation

uses Dialogs;

procedure UnitProc;
begin
  ShowMessage('Unit procedure');
end;

initialization
  ShowMessage('Unit initialization.');
finalization
  ShowMessage('Unit finalization.');
end.

As you can see, the main body, unit initialization and DLL entry point hooks all contain "ShowMessage" calls which enable one to trace what is going on. In order to test this DLL, here is a test application. It consists of a form with a button on.
Код
unit InitForm;

interface

uses
  Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,
  StdCtrls;

type
  TInitFrm = class(TForm)
    SpawnThrd: TButton;
    procedure SpawnThrdClick(Sender: TObject);
  private
    { Private declarations }
  public
    { Public declarations }
  end;

var
  InitFrm: TInitFrm;

implementation

uses DLLCaller;

{$R *.DFM}

procedure TInitFrm.SpawnThrdClick(Sender: TObject);

var
  CallerThread:TDLLCaller;

begin
  CallerThread:=TDLLCaller.Create(true);
  with CallerThread do
  begin
    FreeOnTerminate := true;
    Resume;
  end;
end;

end.

When the button is clicked, a thread is created, which calls the procedure in the DLL, and then destroys itself.
Код
unit DLLCaller;

interface

uses
  Classes;

type
  TDLLCaller = class(TThread)
  private
    { Private declarations }
  protected
    procedure Execute; override;
  end;

implementation

{ TDLLCaller }

procedure FancyMessage;stdcall;external 'InitDLL.dll';

procedure TDLLCaller.Execute;
begin
  FancyMessage;
end;

end.

So, what happens when we run the program?
  • The DLL reports units initialization
  • The DLL reports main DLL body execution
  • Every time the button is clicked the DLL reports:
    • Entry point: thread attach
    • Unit procedure.
    • Entry point: thread detach
  • Note that if we spawn more than one thread from the application, whilst leaving existing threads blocked on the Unit Procedure message box, the total thread attachment count can increase beyond one.
  • When the program is closed, the DLL reports entry point: process detach, followed by unit finalization.

Writing a multiprocess DLL.

Armed with a knowledge of how to use the entry point function, we will now write a multiprocess DLL. This DLL will store some information on a system wide basis using memory shared between processes. It is worth remembering that when code accesses data shared between processes, the programmer must provide appropriate synchronization. Just as multiple threads in a single process are not inherently synchronized, so the main threads in different processes are also not synchronized. We will also look at some subtleties which occur when trying to use the entry point function to keep track of global threads.
This DLL will share a single integer between processes, as well as keeping a count of the number of processes and threads in the DLL at any one time. It consists of a header file:
Код
unit GlobalHdr;

interface

type
  TGlobalInfo = record
    GlobalProcesses: integer;
    GlobalThreads: integer;
    SharedNumber: integer;
  end;

implementation

end.



shared between the DLL and applications that use the DLL, and the DLL project file
Код
library GlobalData;

uses
  Windows,
  GlobalHdr in 'GlobalHdr.pas';

const
  GlobalMutexName = 'MCHGlobalMutex - BBARAgAGBQI4r8mOAAoJEPwzBQsIT9FngEA';
  GlobalMapName = 'MCHGlobalMap   - AJ9Qfi3m4vqgUhKYA7qBfzrMDbkpbQtTWFy';

type
  TLocalInfo = record
    LocalThreads: integer;
  end;

  TSyncHandles = record
    LocalSection: TRTLCriticalSection;
    GlobalMutex: THandle;
  end;

  PGlobalInfo = ^TGlobalInfo;

  TGlobalHandles = record
    FileMapping: THandle;
    GlobalInfo: PGlobalInfo;
  end;

var
  SyncHandles: TSyncHandles;
  LocalInfo: TLocalInfo;
  GlobalHandles: TGlobalHandles;

procedure ReadInfo(var GlobalInfo: TGlobalInfo; var LocalThreads: integer); stdcall;
begin
  WaitForSingleObject(SyncHandles.GlobalMutex, INFINITE);
  EnterCriticalSection(SyncHandles.LocalSection);
  GlobalInfo := GlobalHandles.GlobalInfo^;
  LocalThreads := LocalInfo.LocalThreads;
  LeaveCriticalSection(SyncHandles.LocalSection);
  ReleaseMutex(SyncHandles.GlobalMutex);
end;

procedure SetSharedInteger(NewValue: integer); stdcall;
begin
  WaitForSingleObject(SyncHandles.GlobalMutex, INFINITE);
  GlobalHandles.GlobalInfo.SharedNumber := NewValue;
  ReleaseMutex(SyncHandles.GlobalMutex);
end;

procedure IncSharedInteger; stdcall;
begin
  WaitForSingleObject(SyncHandles.GlobalMutex, INFINITE);
  Inc(GlobalHandles.GlobalInfo.SharedNumber);
  ReleaseMutex(SyncHandles.GlobalMutex);
end;

procedure DecSharedInteger; stdcall;
begin
  WaitForSingleObject(SyncHandles.GlobalMutex, INFINITE);
  Dec(GlobalHandles.GlobalInfo.SharedNumber);
  ReleaseMutex(SyncHandles.GlobalMutex);
end;


function AtomicIncThreadCount: integer;
{returns number of local threads for Delphi Memory Manager}
begin
  WaitForSingleObject(SyncHandles.GlobalMutex, INFINITE);
  EnterCriticalSection(SyncHandles.LocalSection);
  Inc(GlobalHandles.GlobalInfo.GlobalThreads);
  Inc(LocalInfo.LocalThreads);
  result := LocalInfo.LocalThreads;
  LeaveCriticalSection(SyncHandles.LocalSection);
  ReleaseMutex(SyncHandles.GlobalMutex);
end;

procedure AtomicDecThreadCount;
begin
  WaitForSingleObject(SyncHandles.GlobalMutex, INFINITE);
  EnterCriticalSection(SyncHandles.LocalSection);
  Dec(GlobalHandles.GlobalInfo.GlobalThreads);
  Dec(LocalInfo.LocalThreads);
  LeaveCriticalSection(SyncHandles.LocalSection);
  ReleaseMutex(SyncHandles.GlobalMutex);
end;

procedure AtomicIncProcessCount;
begin
  WaitForSingleObject(SyncHandles.GlobalMutex, INFINITE);
  Inc(GlobalHandles.GlobalInfo.GlobalProcesses);
  ReleaseMutex(SyncHandles.GlobalMutex);
end;

procedure AtomicDecProcessCount;
begin
  WaitForSingleObject(SyncHandles.GlobalMutex, INFINITE);
  Dec(GlobalHandles.GlobalInfo.GlobalProcesses);
  ReleaseMutex(SyncHandles.GlobalMutex);
end;

procedure SetupSynchronisation;
begin
  with SyncHandles do
  begin
    InitializeCriticalSection(LocalSection);
    GlobalMutex := CreateMutex(nil, false, GlobalMutexName);
    Assert(GlobalMutex <> INVALID_HANDLE_VALUE);
  end;
end;

procedure CloseSynchronisation;
begin
  with SyncHandles do
  begin
    DeleteCriticalSection(LocalSection);
    CloseHandle(GlobalMutex);
  end;
end;

procedure SetupGlobalInfo;

var
  FirstCaller: boolean;

begin
  with GlobalHandles do
  begin
    WaitForSingleObject(SyncHandles.GlobalMutex, INFINITE);
    FileMapping := CreateFileMapping(High(Cardinal), nil, PAGE_READWRITE,
      0, sizeof(TGlobalInfo), GlobalMapName);
    Assert(FileMapping <> INVALID_HANDLE_VALUE);
    FirstCaller := GetLastError <> ERROR_ALREADY_EXISTS;
    GlobalInfo := MapViewOfFile(FileMapping, FILE_MAP_ALL_ACCESS, 0, 0, 0);
    Assert(GlobalInfo <> nil);
    if FirstCaller then FillMemory(GlobalInfo, sizeof(TGlobalInfo), 0);
    ReleaseMutex(SyncHandles.GlobalMutex);
  end;
end;

procedure CloseGlobalInfo;
begin
  with GlobalHandles do
  begin
    UnmapViewOfFile(GlobalInfo);
    CloseHandle(FileMapping);
  end;
end;

procedure DLLFinalisation;
begin
  AtomicDecThreadCount;
  AtomicDecProcessCount;
  CloseGlobalInfo;
  CloseSynchronisation;
end;

procedure EntryPoint(Reason: integer);
begin
  case Reason of
    DLL_THREAD_ATTACH: if AtomicIncThreadCount > 1 then IsMultiThread := true;
    DLL_THREAD_DETACH: AtomicDecThreadCount;
    DLL_PROCESS_DETACH: DLLFinalisation;
  else
    Assert(false);
  end;
end;

exports
  ReadInfo,
  SetSharedInteger,
  IncSharedInteger,
  DecSharedInteger;

begin
  SetupSynchronisation;
  SetupGlobalInfo;
  AtomicIncProcessCount;
  AtomicIncThreadCount;
  DLLProc := @EntryPoint;
end.

Before we look more closely at the code, it's worth reviewing some Win32 behaviour.

Global named objects.

The Win32 API allows the programmer to create various objects. For some of these objects, they may be created either anonymously, or with a certain name. Objects created anonymously are, on the whole, limited to use by a single process, the exception being that they may be inherited by child processes. Objects created with a name can be shared between processes. Typically, one process will create the object, specifying a name for that object, and other processes will open a handle to that object by specifying its name.
The delightful thing about named objects is that handles to these objects are reference counted throughout the system. That is, several processes can acquire handles to an object, and when all the handles to that object are closed, the object itself is destroyed, and not before. This includes the situation where an application crashes: typically windows does a good job of cleaning up unused handles after a crash.

The DLL in detail.

Our DLL uses this property to maintain a memory mapped file. Normally, memory mapped files are used to create an area of memory which is a mirror image of a file on disk. This has many useful applications, not least "on demand" paging in of executable images from disk. For this DLL however, a special case is used whereby a memory mapped file is created with no corresponding disk image. This allows the programmer to allocate a section of memory which is shared between several processes. This is surprisingly efficient: once the mapping is set up, no memory copying is done between processes. Once the memory mapped file has been set up, a global named mutex is used to synchronize access to that portion of memory.

DLL Initialization.

Initialization consists of four main stages:
  • Creation of synchronization objects (global and otherwise).
  • Creation of shared data.
  • Initial increment of thread and process counts.
  • Hooking the DLL entry point function.
In the first stage, two synchronization objects are created, a global mutex, and a critical section. Little needs to be said about the critical section. The global mutex is created via the CreateMutex API call. This call has the beneficial feature that if the mutex is named, and the named object already exists, then a handle to the existing named object is returned. This occurs atomically. Were this not the case, then a whole range of unpleasant race conditions could potentially occur. Determining the precise range of possible problems and potential solutions (mainly involving optimistic concurrency control) is left as an exercise to the reader. Suffice to say that if operations on handles to global shared objects were not atomic, the application level Win32 programmer would be staring into an abyss...
In the second stage the area of shared memory is set up. Since we have already set up the global mutex, it is used when setting up the file mapping. A view of the "file" is mapped, which maps the (virtual) file into the address space of the calling process. We also check whether we happened to be the process that originally created the file mapping, and if this is the case, then we zero out the data in our mapped view. This is why the procedure is wrapped in a mutex: CreateFileMapping has the same nice atomicity properties as CreateMutex, ensuring that race conditions on handles will never occur. In the general case, however, the same is not necessarily true for the data in the mapping. If the mapping had a backing file, then we might be able to assume validity of the shared data at start-up. For virtual mappings this is not assured. In this case we need to initialize the data in the mapping atomically with setting up a handle to the mapping, hence the mutex.
In the third stage, we perform our first manipulation on the globally shared data, by incrementing the process and thread counts, since the execution of the main body of the DLL is consistent with the addition of another thread and process to those using the DLL. Note that the AtomicIncThreadCount procedure increments both the local and global threads counts whilst both the global mutex and process local critical section have been acquired. This ensures that multiple threads from the same process see a fully consistent view of both counts.
In the final stage, the DLLProc is hooked, thus ensuring that the creation and destruction of other threads in the process is monitored, and the final exit of the process is also registered.

An application using the DLL.

A simple application that uses the DLL is presented here. It consists of the global shared unit
Код
unit GlobalHdr;

interface

type
  TGlobalInfo = record
    GlobalProcesses: integer;
    GlobalThreads: integer;
    SharedNumber: integer;
  end;

implementation

end.



, a unit containing the main form
Код
unit GlobalForm;

interface

uses
  Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,
  StdCtrls;

type
  TGlobalFrm = class(TForm)
    GlobalProcessEdit: TEdit;
    GlobalThreadEdit: TEdit;
    LocalThreadEdit: TEdit;
    SharedDataEdit: TEdit;
    ReadBtn: TButton;
    IncBtn: TButton;
    DecBtn: TButton;
    SetBtn: TButton;
    CreateThread: TButton;
    procedure ReadBtnClick(Sender: TObject);
    procedure IncBtnClick(Sender: TObject);
    procedure DecBtnClick(Sender: TObject);
    procedure SetBtnClick(Sender: TObject);
    procedure CreateThreadClick(Sender: TObject);
  private
    { Private declarations }
  public
    { Public declarations }
  end;

var
  GlobalFrm: TGlobalFrm;

implementation

uses GlobalHdr, DelayThread;

procedure ReadInfo(var GlobalInfo: TGlobalInfo; var LocalThreads: integer);
  stdcall; external 'GlobalData.dll';

procedure SetSharedInteger(NewValue: integer);
  stdcall; external 'GlobalData.dll';

procedure IncSharedInteger;
  stdcall; external 'GlobalData.dll';

procedure DecSharedInteger;
  stdcall; external 'GlobalData.dll';


{$R *.DFM}

procedure TGlobalFrm.ReadBtnClick(Sender: TObject);

var
  GlobalInfo: TGlobalInfo;
  LocalThreads: integer;

begin
  ReadInfo(GlobalInfo, LocalThreads);
  GlobalProcessEdit.Text := IntToStr(GlobalInfo.GlobalProcesses);
  GlobalThreadEdit.Text := IntToStr(GlobalInfo.GlobalThreads);
  SharedDataEdit.Text := IntToStr(GlobalInfo.SharedNumber);

  LocalThreadEdit.Text := IntToStr(LocalThreads);
end;

procedure TGlobalFrm.IncBtnClick(Sender: TObject);
begin
  IncSharedInteger;
  ReadBtnClick(Sender);
end;

procedure TGlobalFrm.DecBtnClick(Sender: TObject);
begin
  DecSharedInteger;
  ReadBtnClick(Sender);
end;

procedure TGlobalFrm.SetBtnClick(Sender: TObject);

var
  NewInt: integer;

begin
  NewInt := StrToInt(SharedDataEdit.Text);
  SetSharedInteger(NewInt);
end;

procedure TGlobalFrm.CreateThreadClick(Sender: TObject);

var
  NewThread: TDelayThread;

begin
  NewThread := TDelayThread.Create(true);
  NewThread.FreeOnTerminate := true;
  NewThread.Resume;
end;

end.

and a subsidiary unit containing a simple thread
Код
unit DelayThread;

interface

uses
  Windows, Classes;

type
  TDelayThread = class(TThread)
  private
  protected
    procedure Execute; override;
  end;

implementation

{ TDelayThread }

procedure TDelayThread.Execute;
begin
  Sleep(5000);
end;

end.

Five buttons exist on the form, allowing the user to read the data contained in the DLL, increment, decrement and set the shared integer, and create one or more threads within the application, just to verify that local thread counts work. As expected, the thread counts increment whenever a new copy of the application is executed, or one of the applications creates a thread. Note that the thread need not directly use the DLL in order for the DLL to be informed of its presence.

Pitfall 2: Thread context in Entry Point Functions.

Instead of using a simple application, let's try one that does something more advanced. In this situation, the DLL is loaded manually by the application programmer, instead of being automatically loaded. This is possible by replacing the previous form unit with this one
Код
unit GlobalForm;

interface

uses
  Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,
  StdCtrls, GlobalHdr;

type

  TReadInfoProc = procedure(var GlobalInfo: TGlobalInfo; var LocalThreads: integer); stdcall;
  TSetProc = procedure(NewValue: integer); stdcall;
  TIncDecProc = procedure; stdcall;

  TProcs = record
    ReadProc: TReadInfoProc;
    SetProc: TSetProc;
    IncProc, DecProc: TIncDecProc;
  end;

  TGlobalFrm = class(TForm)
    GlobalProcessEdit: TEdit;
    GlobalThreadEdit: TEdit;
    LocalThreadEdit: TEdit;
    SharedDataEdit: TEdit;
    ReadBtn: TButton;
    IncBtn: TButton;
    DecBtn: TButton;
    SetBtn: TButton;
    CreateThread: TButton;
    LoadDLLBtn: TButton;
    procedure ReadBtnClick(Sender: TObject);
    procedure IncBtnClick(Sender: TObject);
    procedure DecBtnClick(Sender: TObject);
    procedure SetBtnClick(Sender: TObject);
    procedure CreateThreadClick(Sender: TObject);
    procedure LoadDLLBtnClick(Sender: TObject);
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
  private
    { Private declarations }
    Procs: TProcs;
    Loaded: boolean;
    LibHandle: THandle;
  public
    { Public declarations }
  end;

var
  GlobalFrm: TGlobalFrm;

implementation

uses DelayThread;

{$R *.DFM}

procedure TGlobalFrm.ReadBtnClick(Sender: TObject);

var
  GlobalInfo: TGlobalInfo;
  LocalThreads: integer;

begin
  if Loaded then
  begin
    Procs.ReadProc(GlobalInfo, LocalThreads);
    GlobalProcessEdit.Text := IntToStr(GlobalInfo.GlobalProcesses);
    GlobalThreadEdit.Text := IntToStr(GlobalInfo.GlobalThreads);
    SharedDataEdit.Text := IntToStr(GlobalInfo.SharedNumber);
    LocalThreadEdit.Text := IntToStr(LocalThreads);
  end;
end;

procedure TGlobalFrm.IncBtnClick(Sender: TObject);
begin
  if Loaded then
  begin
    Procs.IncProc;
    ReadBtnClick(Sender);
  end;
end;

procedure TGlobalFrm.DecBtnClick(Sender: TObject);
begin
  if Loaded then
  begin
    Procs.DecProc;
    ReadBtnClick(Sender);
  end;
end;

procedure TGlobalFrm.SetBtnClick(Sender: TObject);

var
  NewInt: integer;

begin
  if Loaded then
  begin
    NewInt := StrToInt(SharedDataEdit.Text);
    Procs.SetProc(NewInt);
  end;
end;

procedure TGlobalFrm.CreateThreadClick(Sender: TObject);

var
  NewThread: TDelayThread;

begin
  NewThread := TDelayThread.Create(true);
  NewThread.FreeOnTerminate := true;
  NewThread.Resume;
end;

procedure TGlobalFrm.LoadDLLBtnClick(Sender: TObject);
begin
  if not Loaded then
  begin
    LibHandle := LoadLibrary('GlobalData.dll');
    if LibHandle = INVALID_HANDLE_VALUE then exit;
    with Procs do
    begin
      ReadProc := GetProcAddress(LibHandle, 'ReadInfo');
      IncProc := GetProcAddress(LibHandle, 'IncSharedInteger');
      DecProc := GetProcAddress(LibHandle, 'DecSharedInteger');
      SetProc := GetProcAddress(LibHandle, 'SetSharedInteger');
      if not (Assigned(ReadProc) and Assigned(IncProc) and Assigned(DecProc)
        and Assigned(SetProc)) then
      begin
        FreeLibrary(LibHandle);
        LibHandle := INVALID_HANDLE_VALUE;
        exit;
      end;
      Loaded := true;
    end;
  end;
end;

procedure TGlobalFrm.FormCreate(Sender: TObject);
begin
  LibHandle := INVALID_HANDLE_VALUE;
end;

procedure TGlobalFrm.FormDestroy(Sender: TObject);
begin
  if LibHandle <> INVALID_HANDLE_VALUE then
    FreeLibrary(LibHandle);
end;

end.

An extra button is added which loads the DLL, and sets up the procedure addressed manually. Try running the program, creating several threads and then loading the DLL. You should find that the DLL no longer correctly keeps track of the number of threads in the various processes that use it. Why is this? The Win32 help file states that when using the entry point function with the arguments DLL_THREAD_ATTACH and DLL_THREAD_DETACH:
"DLL_THREAD_ATTACH Indicates that the current process is creating a new thread. When this occurs, the system calls the entry-point function of all DLLs currently attached to the process. The call is made in the context of the new thread. DLLs can use this opportunity to initialize a TLS slot for the thread. A thread calling the DLL entry-point function with the DLL_PROCESS_ATTACH value does not call the DLL entry-point function with the DLL_THREAD_ATTACH value.
Note that a DLL's entry-point function is called with this value only by threads created after the DLL is attached to the process. When a DLL is attached by LoadLibrary, existing threads do not call the entry-point function of the newly loaded DLL."
It drives the point home by also stating:
"DLL_THREAD_DETACH Indicates that a thread is exiting cleanly. If the DLL has stored a pointer to allocated memory in a TLS slot, it uses this opportunity to free the memory. The operating system calls the entry-point function of all currently loaded DLLs with this value. The call is made in the context of the exiting thread. There are cases in which the entry-point function is called for a terminating thread even if the DLL never attached to the thread.
  • The thread was the initial thread in the process, so the system called the entry-point function with the DLL_PROCESS_ATTACH value.
  • The thread was already running when a call to the LoadLibrary function was made, so the system never called the entry-point function for it"
This behaviour has two potentially unpleasant side effects.
  • It is not possible, in the general case to keep track of how many threads are in the DLL on a global basis unless one can guarantee that an application loads the DLL before creating any child threads. One might mistakenly assume that an application loading a DLL would have the DLL_THREAD_ATTACH entry point called for already existing threads. This is not the case because, having guaranteed that thread attachments and detachments are notified to the DLL in the context of the thread attaching or detaching, it is impossible to call the DLL entry point in the correct context of threads that are already running.
  • Since the DLL entry point can be called by several different threads, race conditions can occur between the entry point function and DLL initialization. If a thread is created at about the same time as the DLL is loaded by an application, then it is possible that the DLL entry point might be called for the thread attachment whilst the thread main body is still being executed. This is why it is always a good idea to set up the entry point function as the very last action in DLL initialization.
Readers would benefit from noting that both these side effects have repercussions when deciding when to set the IsMultiThread variable.

Exception Handling.

When writing robust applications, the programmer should always be prepared for things to go wrong. The same is true for multithreaded programming. Most of the examples presented in this tutorial have been relatively simple, and exception handling has mostly been omitted for clarity. In real world applications, this is likely to be unacceptable.
Recall that threads have their own call stack. This means that an exception in a thread does not fall through the standard VCL exception handling mechanisms. Instead of raising a user-friendly dialog box, and an unhandled exception in a thread will terminate the application. As a result of this, the execute method of a thread is one of the few places where it can be useful to create an exception handler that catches all exceptions. Once an exception has been caught in a thread, dealing with it is also slightly different from ordinary VCL handling. It may not always be appropriate to show a dialog box. Quite often, a valid tactic is to let the thread communicate the fact that a failure has occurred to the main VCL thread, using whatever communication mechanisms are in place, and then let the VCL thread decide what to do. This is particularly useful if the VCL thread has created the child thread to perform a particular operation.
Despite this, there are some situations in threads where dealing with error cases can be particularly difficult. Most of these situations occur when using threads to perform continuous background operations. Recalling chapter 10, the BAB has a couple of threads that forward read and write operations from the VCL thread to a blocking buffer. If an error occurs in either of these threads, the error may show no clear causal relationship with any particular operation in the VCL thread, and it may be difficult to communicate failure instantly back to the VCL thread. Not only this, but an exception in either of these threads is likely to break them out of the read or write loop that they are in, raising the difficult question of whether these threads can be usefully restarted. About the best that can be done is to set some state indicating that all future operations should be failed, forcing the main thread to destroy and re-initialize the buffer.
The best solution is to include the possibility of such problems into the original application design, and to determine best effort recovery attempts that may be made.

The BDE.

In Chapter 7, I indicated that one potential solution to locking problems is to put shared data in a database, and use the BDE to perform concurrency control. The programmer should note that each thread must maintain a separate database connection for this to work properly. Hence, each thread should use a separate TSession object to manage its connection to the database. Each application has a TSessionList component called Sessions to enable this to be done easily. Detailed explanation of multiple sessions is beyond the scope of this document.


--------------------
Все знать невозможно, но хочется
PM ICQ   Вверх
Петрович
Дата 31.7.2005, 22:00 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
***


Профиль
Группа: Участник Клуба
Сообщений: 1000
Регистрация: 2.12.2003
Где: Москва

Репутация: 15
Всего: 55



Chapter 14. A real world problem, and its solution.

In this chapter:
  • The problem.
  • The solution.
  • The pipe DLL and interface files.
  • The reader and writer threads.
  • A socket based interface.


The problem.

Over the past couple of years I have been writing a distributed raytracer. This uses TCP/IP to send descriptions of scenes to be rendered across a network from a central server to a collection of clients. The clients render the image, and then return the data to the server. Some beta testers were interested in trying the program out, but mentioned that they did not have a TCP/IP stack loaded on their machine. I decided that it would be useful to write some code that emulated TCP sockets, allowing communication between two applications (both client and server) on the local machine.
Various potential solutions were investigated. The most promising at first seemed to be to use named pipes. Unfortunately a problem soon cropped up: The protocols I was using on top of TCP/IP assumed that connection semantics could be performed on a strictly peer to peer basis: either program could initiate a connection to the other, and either program could disconnect at any time. Both connection and disconnection were perfectly symmetrical: The protocols used on top of TCP performed a three way handshake over and above that performed at the TCP layer to negotiate whether a connection could be closed, and that having occured, either end could close the connection. Unfortunately, named pipes did not provide the correct disconnection semantics, and they did not cope well with various error situations.

The solution.

I do not intend to explain the solution in detail, but more advanced readers may find the code interesting reading. In the end, I decided to use shared memory for data transfer, and to implement all synchronisation from the ground up. The solution was implemented in 3 stages.
  • A DLL was written which provided a bidirectional blocking pipe between two applications.
  • A pair of reader and writer threads were written to allow asynchronous access to the blocking pipes.
  • A wrapper around the threads was written to provide an asynchronous interface similar to nonblocking sockets.


The pipe DLL and interface files.

MCHPipe.dpr:
Код
{ 6/10/00 11:34:50 PM > [martin on PERGOLESI] update:  (0.6) /  }
{ 27-07-1999 2:03:56 AM > [martin on MARTIN] update: Removing unneeded
   "uses" (0.5) /  }
{ 10-05-1999 11:59:56 PM > [martin on MARTIN] update: Reformatting according
   to Delphi guidelines. (0.4) /  }
{ 19-04-1999 8:19:01 PM > [martin on MARTIN] update: Inserting proper
   constant for SEMAPHORE_ALL_ACCESS (0.3) /  }
{ 08-04-1999 11:29:11 PM > [martin on MARTIN] update: Removed debug checks
   (0.2) /  }
{ 08-04-1999 11:10:15 PM > [martin on MARTIN] check in: (0.1) Initial
   Version /  }
library MCHPipe;

{Martin Harvey 23/8/98
 A simple pipe library}

{Note to the casual reader: There are synchronisation subtleties here,
 particularly with respect to blocking status variables. A good texbook on
 operating system theory is a prerequisite.

 The good news is that this code displays a 4 fold symmetry:

 1. The treatment of clients and servers is identical
    (with respect to handle checks). (2)
 2. The treatment of reads and writes is identical
    (with respect to blocking). (2*2=4)

 As a result, I've folded everything up into a set of "generic" procedures.

 The DLL currently only provides one bidirectional pipe. That's because it's all
 I currently need.

 It is expected that a typical use of this DLL will result in
 up to 6 threads being present in the DLL at any one time:

 1. Client reader and writer threads.
 2. Server reader and writer threads.
 3. Client and server set-up and tear-down threads.

 Note that pipes cannot be written to or read from unless both client and server
 are connected.}

{Improvement MCH 7/11/1998.
 In order to eliminate polling, a "WaitForPeer" function has been added.
 This function enables a reader thread to wait for the corresponding writer to
 connect, thus ensuring that the creation and startup of the reader thread does
 not have to be polled}

uses
  Windows,
  MCHPipeTypes in 'MCHPipeTypes.pas';

const
  BufSize = 4096;
  MapName = 'MCHGlobalPipeMap';
  GlobalLockName = 'MCHGlobalPipeLock';
  Cli2ServLockName = 'MCHCli2ServLock';
  Serv2CliLockName = 'MCHServ2CliLock';
  ServPeerWaitSemName = 'MCHServPeerSem';
  CliPeerWaitSemName = 'MCHCliPeerSem';
  Cli2ServReaderSemName = 'MCHCli2ServReaderSem';
  Cli2ServWriterSemName = 'MCHCli2ServWriterSem';
  Serv2CliReaderSemName = 'MCHServ2CliReaderSem';
  Serv2CliWriterSemName = 'MCHServ2CliWriterSem';
  SEMAPHORE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED or SYNCHRONIZE or 3;
{ From SRC Modula-3 NT header files. The SEMAPHORE_ALL_ACCESS macro is missing
 from windows.pas}

type
  {Misc definitions}
  PByte = ^Byte;

(******************************************************************************)

  {Definition of global data structures which are shared via a memory mapped file
  without a disk image}
  TCycBuf = array[0..BufSize - 1] of byte;
  TDataBuf = record
    ReadPtr: integer;
    WritePtr: integer;
    CycBuf: TCycBuf;
  end;

  TPipe = record
    Buf: TDataBuf;
    ReaderBlocked, WriterBlocked: boolean; {bools to allow for checking before signalling}
  end;
  TBiDirPipe = record
    Cli2ServPipe: TPipe;
    Serv2CliPipe: TPipe;
    ServConnected, CliConnected: boolean; {check connections}
    ServPeerWait, CliPeerWait: boolean; {wait for peer variables}
    ServHandle, CliHandle: TMCHHandle; {handles for reading/writing}
  end;
  PBiDirPipe = ^TBiDirPipe;

(******************************************************************************)

  {definition of local data structures, which consist of synchronisation
   handles initialised by creating or getting handles to named objects}

  TPipeLocks = record
    PipeLock: THandle; {data Mutex}
    ReaderSem, WriterSem: THandle; {semaphores to block operations}
  end;
  PPipeLocks = ^TPipeLocks;
  TBiDirPipeLocks = record
    Cli2ServLocks: TPipeLocks;
    Serv2CliLocks: TPipeLocks;
    ServPeerWaitSem, CliPeerWaitSem: THandle; {wait for peer semaphores}
    BiLock: THandle; {global Mutex}
    MapHandle: THandle;
  end;
  PBiDirPipeLocks = ^TBiDirPipeLocks;

var
  BiDirPipe: PBiDirPipe; {pointer to global shared memory}
  BiDirPipeLocks: TBiDirPipeLocks; {local instance of nested record of synchronisation structures}

{Note on semaphore / mutex ordering:

 1. First global mutex must be aquired.
 2. Then data mutex must be aquired
 3. Releases must occur in the same order
 4. One should not block on the semaphores when holding any mutexes.

 If these rules are not respected, deadlock will occur.
 See a decent O/S theory textbook for more explanation of synchronisation primitives}


{procedures and functions to do with maintaining the cyclic buffers}

procedure InitBuf(var DataBuf: TDataBuf);
begin
  with DataBuf do
  begin
    ReadPtr := 0;
    WritePtr := 0;
  end;
end;

function EntriesUsed(var DataBuf: TDataBuf): integer;
begin
  with DataBuf do
  begin
    if WritePtr >= ReadPtr then
      result := WritePtr - ReadPtr
    else
      result := BufSize - (ReadPtr - WritePtr);
  end;
end;

function EntriesFree(var DataBuf: TDataBuf): integer;

{Note that we have introduced an asymmetry here, to ensure
 that we never completely fill up the buffer}

begin
  with DataBuf do
  begin
    if WritePtr >= ReadPtr then
      result := BufSize - (WritePtr - ReadPtr)
    else
      result := ReadPtr - WritePtr;
  end;
  dec(result);
end;

procedure GetEntries(var DataBuf: TDataBuf; var Dest; Count: integer);

var
  Write: PByte;
  Iter: integer;

begin
  Write := @Dest;
  with DataBuf do
  begin
    if EntriesUsed(DataBuf) >= Count then
    begin
      for iter := 0 to Count - 1 do
      begin
        Write^ := CycBuf[ReadPtr];
        ReadPtr := (ReadPtr + 1) mod BufSize;
        Inc(Write);
      end;
    end;
  end;
end;

procedure AddEntries(var DataBuf: TDataBuf; var Src; Count: integer);

var
  Read: PByte;
  Iter: integer;

begin
  Read := @Src;
  with DataBuf do
  begin
    if EntriesFree(DataBuf) >= Count then
    begin
      for iter := 0 to Count - 1 do
      begin
        CycBuf[WritePtr] := Read^;
        WritePtr := (WritePtr + 1) mod BufSize;
        Inc(Read);
      end;
    end;
  end;
end;

{Connection and disconnection functions. Disconnection should always unblock
 everything}

function GenericConnect(var hHandle: TMCHHandle; Server: boolean): TMCHError;
{Returns error if server already connected}
begin
  result := meOK;
  {Get the global mutex}
  WaitForSingleObject(BiDirPipeLocks.BiLock, INFINITE);
  if Server then
  begin
    if BiDirPipe.ServConnected then
      result := meAlreadyConnected
    else
    begin
      hHandle := BiDirPipe.ServHandle;
      BiDirPipe.ServConnected := true;
      {Now think about peer unblocking functions}
      with BiDirPipe^ do
      begin
        if CliConnected and CliPeerWait then
        begin
          {Unblock client}
          CliPeerWait := FALSE;
          ReleaseSemaphore(BiDirPipeLocks.CliPeerWaitSem, 1, nil);
        end;
      end;
    end;
  end
  else
  begin
    if BiDirPipe.CliConnected then
      result := meAlreadyConnected
    else
    begin
      hHandle := BiDirPipe.CliHandle;
      BiDirPipe.CliConnected := true;
      with BiDirPipe^ do
      begin
        if ServConnected and ServPeerWait then
        begin
          {Unblock server}
          ServPeerWait := FALSE;
          ReleaseSemaphore(BiDirPipeLocks.ServPeerWaitSem, 1, nil);
        end;
      end;
    end;
  end;
  ReleaseMutex(BiDirPipeLocks.BiLock);
end;

function GenericDisconnect(hHandle: TMCHHandle; Server: boolean): TMCHError;
{Returns error if server not connected, or bad handle}

begin
  result := meOK;
  WaitForSingleObject(BiDirPipeLocks.BiLock, INFINITE);
  {Now check handle}
  if Server then
  begin
    if hHandle = BiDirPipe.ServHandle then
      BiDirPipe.ServConnected := false
    else
      result := meServerNotConnected;
  end
  else
  begin
    if hHandle = BiDirPipe.CliHandle then
      BiDirPipe.CliConnected := false
    else
      result := meClientNotConnected;
  end;
  {Now. If quit was successfull, then potentially have to unblock
   all blocked reading and writing threads, so that they can
   return error}
  if Result = meOK then
  begin
    WaitForSingleObject(BiDirPipeLocks.Cli2ServLocks.PipeLock, INFINITE);
    WaitForSingleObject(BiDirPipeLocks.Serv2CliLocks.PipeLock, INFINITE);
    {Now unblock all potentially blocked threads reading/writing on the pipe}
    with BiDirPipeLocks do
    begin
      with Cli2ServLocks do
      begin
        with BiDirPipe.Cli2ServPipe do
        begin
          if ReaderBlocked then
          begin
            ReaderBlocked := false;
            ReleaseSemaphore(ReaderSem, 1, nil);
          end;
          if WriterBlocked then
          begin
            WriterBlocked := false;
            ReleaseSemaphore(WriterSem, 1, nil);
          end;
        end;
      end;
      with Serv2CliLocks do
      begin
        with BiDirPipe.Serv2CliPipe do
        begin
          if ReaderBlocked then
          begin
            ReaderBlocked := false;
            ReleaseSemaphore(ReaderSem, 1, nil);
          end;
          if WriterBlocked then
          begin
            WriterBlocked := false;
            ReleaseSemaphore(WriterSem, 1, nil);
          end;
        end;
      end;
      {Now have to think about functions waiting for peer.}
      {We basically have to unblock all threads blocked waiting for peer on our handle}
      with BiDirPipe^ do
      begin
        if Server then
        begin
          if ServPeerWait then
          begin
            ServPeerWait := false;
            ReleaseSemaphore(ServPeerWaitSem, 1, nil);
          end;
        end
        else
        begin
          if CliPeerWait then
          begin
            CliPeerWait := false;
            ReleaseSemaphore(CLiPeerWaitSem, 1, nil);
          end;
        end;
      end;
    end;
  end;
  {Release mutex before unblocking}
  ReleaseMutex(BiDirPipeLocks.BiLock);
  if Result = meOK then
  begin
    ReleaseMutex(BiDirPipeLocks.Cli2ServLocks.PipeLock);
    ReleaseMutex(BiDirPipeLocks.Serv2CliLocks.PipeLock);
  end;
end;


function ConnectServer(var hHandle: TMCHHandle): TMCHError stdcall;
{Returns error if server already connected}
begin
  SetLastError(0);
  result := GenericConnect(hHandle, true);
end;

function ConnectClient(var hHandle: TMCHHandle): TMCHError stdcall;
{Returns error if client already connected}
begin
  SetLastError(0);
  result := GenericConnect(hHandle, false);
end;

function DisconnectServer(hHandle: TMCHHandle): TMCHError stdcall;
begin
  SetLastError(0);
  result := GenericDisconnect(hHandle, true);
end;

function DisconnectClient(hHandle: TMCHHandle): TMCHError stdcall;
begin
  SetLastError(0);
  result := GenericDisconnect(hHandle, false);
end;

{Generic procedures to prevent duplicity}

{This function is *highly* cunning.
 It simply wraps up both reading and writing by both client and server into one procedure.

 He that writeth less code, debuggeth less at the end of the day.}

function GenericReadWrite(var Buf; Count: integer; var SrcDestPipe: TPipe; var Locks: TPipeLocks; Read: boolean): TMCHError;

var
  BlockSelf, UnblockPeer: boolean;
  DoThisTime: integer;
  SrcDestPtr: PByte;
  Avail: integer;


begin
  {Game plan.
  Check that neither client or server disconnected.
  Read/Write as much as possible and block if required.
  upon unblock, recheck connection status before proceeding.
  Once any data has been read/written, unblock the peer on the buffer.
  Nested mutex aquisition also required here. Respect ordering.}
  result := meOK;
  SrcDestPtr := @Buf;
  repeat
    {connection data critical section}
    WaitForSingleObject(BiDirPipeLocks.BiLock, INFINITE);
    WaitForSingleObject(Locks.PipeLock, INFINITE);
    {Now check connection status}
    if not BiDirPipe.ServConnected then result := meServerNotConnected;
    if not BiDirPipe.CliConnected then result := meClientNotConnected;
    if result <> meOK then
    begin
      {bomb out if not all connected}
      ReleaseMutex(BiDirPipeLocks.BiLock);
      ReleaseMutex(Locks.PipeLock);
      Exit;
    end;
    {So far, it's okay to read/write}
    {Read/write as much as we can this time.}
    if Read then
      Avail := EntriesUsed(SrcDestPipe.Buf)
    else
      Avail := EntriesFree(SrcDestPipe.Buf);
    if Count > Avail then
    begin
      DoThisTime := Avail;
      BlockSelf := true;
    end
    else
    begin
      DoThisTime := Count;
      BlockSelf := false;
    end;
    {work out whether to unblock any peer threads blocked on the converse
     read/write. Local vars are used so we can perform blocking / unblocking
     actions without holding any mutexes}
    if Read then
      UnblockPeer := (DoThisTime > 0) and SrcDestPipe.WriterBlocked
    else
      UnblockPeer := (DoThisTime > 0) and SrcDestPipe.ReaderBlocked;
    {Now do the read/write}
    if Read then
      GetEntries(SrcDestPipe.Buf, SrcDestPtr^, DoThisTime)
    else
      AddEntries(SrcDestPipe.Buf, SrcDestPtr^, DoThisTime);
    {update local vars}
    Count := Count - DoThisTime;
    Inc(SrcDestPtr, DoThisTime);
    {update blocking status variables}
    if Read then
    begin
      SrcDestPipe.WriterBlocked := SrcDestPipe.WriterBlocked and (not UnblockPeer);
      SrcDestPipe.ReaderBlocked := BlockSelf; {it is evident that we currently aren't blocked!}
    end
    else
    begin
      SrcDestPipe.ReaderBlocked := SrcDestPipe.ReaderBlocked and (not UnblockPeer);
      SrcDestPipe.WriterBlocked := BlockSelf; {it is evident that we currently aren't blocked!}
    end;
    {Release data mutexes and perform blocking / unblocking actions}
    ReleaseMutex(BiDirPipeLocks.BiLock);
    ReleaseMutex(Locks.PipeLock);
    if Read then
    begin
      if UnblockPeer then
        ReleaseSemaphore(Locks.WriterSem, 1, nil);
      if BlockSelf then
        WaitForSingleObject(Locks.ReaderSem, INFINITE);
    end
    else
    begin
      if UnblockPeer then
        ReleaseSemaphore(Locks.ReaderSem, 1, nil);
      if BlockSelf then
        WaitForSingleObject(Locks.WriterSem, INFINITE);
    end;
    {All done. If not complete, connection status will be rechecked next iteration}
  until count = 0;
end;


function GenericPeek(var BytesReady: integer; var SrcPipe: TPipe; var Locks: TPipeLocks): TMCHError;
begin
{Nonblocking peek. Fails if not both server and client connected}
  WaitForSingleObject(BiDirPipeLocks.BiLock, INFINITE);
  WaitForSingleObject(Locks.PipeLock, INFINITE);
  if BiDirPipe.CliConnected then
  begin
    if BiDirPipe.ServConnected then
      result := meOK
    else
      result := meServerNotConnected;
  end
  else
    result := meClientNotConnected;
  if result = meOK then BytesReady := EntriesUsed(SrcPipe.Buf);
{Now release in the same order that we aquired}
  ReleaseMutex(BiDirPipeLocks.BiLock);
  ReleaseMutex(Locks.PipeLock);
end;


function ReadWriteData(hHandle: TMCHHandle; var Buf; Count: integer; Read: boolean): TMCHError;
{Returns error if client or server not connected (or disconnects during block)
 Blocks if buffer empty}
begin
  WaitForSingleObject(BiDirPipeLocks.BiLock, INFINITE);
  if hHandle = BiDirPipe.ServHandle then
  begin
    if BiDirPipe.ServConnected then
    begin
      ReleaseMutex(BiDirPipeLocks.BiLock);
      if Read then
        {Server is reading, so read from Cli2Serv buffer}
        result := GenericReadWrite(Buf, Count, BiDirPipe.Cli2ServPipe, BiDirPipeLocks.Cli2ServLocks, Read)
      else
        {Server is writing so write to Serv2Cli buffer}
        result := GenericReadWrite(Buf, Count, BiDirPipe.Serv2CliPipe, BiDirPipeLocks.Serv2CliLocks, Read);
    end
    else
    begin
      ReleaseMutex(BiDirPipeLocks.BiLock);
      result := meServerNotConnected;
    end;
  end
  else
  begin
    if hHandle = BiDirPipe.CliHandle then
    begin
      if BiDirPipe.CliConnected then
      begin
        ReleaseMutex(BiDirPipeLocks.BiLock);
        if Read then
          {Client is reading, so read from Serv2Cli buffer}
          result := GenericReadWrite(Buf, Count, BiDirPipe.Serv2CliPipe, BiDirPipeLocks.Serv2CliLocks, Read)
        else
          {Client is writing, so write from Cli2Serv buffer}
          result := GenericReadWrite(Buf, Count, BiDirPipe.Cli2ServPipe, BiDirPipeLocks.Cli2ServLocks, Read);
      end
      else
      begin
        ReleaseMutex(BiDirPipeLocks.BiLock);
        result := meClientNotConnected;
      end;
    end
    else
    begin
      ReleaseMutex(BiDirPipeLocks.BiLock);
      result := meBadHandle;
    end;
  end;
end;

{Publicly accesible read, write and peek procedures}

function WriteData(hHandle: TMCHHandle; var Buf; Count: integer): TMCHError stdcall;
begin
  SetLastError(0);
  result := ReadWriteData(hHandle, Buf, Count, false);
end;

function ReadData(hHandle: TMCHHandle; var Buf; Count: integer): TMCHError stdcall;
begin
  SetLastError(0);
  result := ReadWriteData(hHandle, Buf, Count, true);
end;

function PeekData(hHandle: TMCHHandle; var BytesReady: integer): TMCHError stdcall;
{Returns error if client or server not connected, never blocks}
begin
  SetLastError(0);
  WaitForSingleObject(BiDirPipeLocks.BiLock, INFINITE);
  if hHandle = BiDirPipe.ServHandle then
  begin
    if BiDirPipe.ServConnected then
    begin
      ReleaseMutex(BiDirPipeLocks.BiLock);
      {Server is peeking, so peek Cli2Srv buffer}
      result := GenericPeek(BytesReady, BiDirPipe.Cli2ServPipe, BiDirPipeLocks.Cli2ServLocks);
    end
    else
    begin
      ReleaseMutex(BiDirPipeLocks.BiLock);
      result := meServerNotConnected;
    end;
  end
  else
  begin
    if hHandle = BiDirPipe.CliHandle then
    begin
      if BiDirPipe.CliConnected then
      begin
        ReleaseMutex(BiDirPipeLocks.BiLock);
        {Client is peeking, so peek Serv2Cli buffer}
        result := GenericPeek(BytesReady, BiDirPipe.Serv2CliPipe, BiDirPipeLocks.Serv2CliLocks);
      end
      else
      begin
        ReleaseMutex(BiDirPipeLocks.BiLock);
        result := meClientNotConnected;
      end;
    end
    else
    begin
      ReleaseMutex(BiDirPipeLocks.BiLock);
      result := meBadHandle;
    end;
  end;
end;

{Wait for peer blocks if self connected and peer not. Returns Okay if both
 connected. Returns error if the state at unblock time is anything other
 than both connected}

 {Compiler is a little bit stupid here, and tells me I might have some uninitialised vars.
  What garbage!}

{$WARNINGS OFF}

function WaitForPeer(hHandle: TMCHHandle): TMCHError; stdcall;

{Note: Only one thread can wait for a peer at any one time.}

var
  Server, Block: boolean;

begin
{Hmmm....
 Game plan.
      1. Get data lock.
      2. Check handles. Determine whether client or server. If error, release lock and quit.
      3. Read connection vars. If self connected and peer disconnected, set var + block (outside crit!)
      4. Upon unblock (or general passthru) determine retcode. If both connected, OK else return appropriate err.
 }
  SetLastError(0);
  WaitForSingleObject(BiDirPipeLocks.BiLock, INFINITE);

  {Check handles}
  result := meOK;
  if hHandle = BiDirPipe.ServHandle then
    Server := true
  else if hHandle = BiDirPipe.CliHandle then
    Server := false
  else result := meBadHandle;
  if Result = meOK then
  begin
    with BiDirPipe^ do
    begin
      if Server then
      begin
        Block := ServConnected and not CliConnected;
        if Block then ServPeerWait := true;
      end
      else
      begin
        Block := CliConnected and not ServConnected;
        if Block then CliPeerWait := true;
      end;
    end;
  end;
  ReleaseMutex(BiDirPipeLocks.BiLock);
  if Result = meOK then
  begin
    if Block then
    begin
      if Server then
        WaitForSingleObject(BiDirPipeLocks.ServPeerWaitSem, INFINITE)
      else
        WaitForSingleObject(BiDirPipeLocks.CliPeerWaitSem, INFINITE);
    end;
    {Regardless of whether we have blocked or not, reaquire global mutex and calculate ret code}
    WaitForSingleObject(BiDirPipeLocks.BiLock, INFINITE);
    if BiDirPipe.CliConnected then
    begin
      if BiDirPipe.ServConnected then
        result := meOK
      else
        result := meServerNotConnected;
    end
    else
      result := meClientNotConnected;
    ReleaseMutex(BiDirPipeLocks.BiLock);
  end;
end;

{$WARNINGS ON}

{
 If we are the process that managed to create the memory mapped file
(rather than opening it), then aquire global and pipe mutexes,
and init the data.

This will prevent multiple initialisations from conflicting.
}

procedure Initialise()stdcall;

var
  SharedMapCreator: boolean;

begin
  SetLastError(0);
  {Create mapping should always succeed}
  {IPC Shared memory creation}
  BiDirPipeLocks.MapHandle := CreateFileMapping($FFFFFFFF, nil, PAGE_READWRITE, 0, SizeOf(TBiDirPipe), MapName);
  SharedMapCreator := not (GetLastError = ERROR_ALREADY_EXISTS);
  {Now set up the file mapping...}
  BiDirPipe := MapViewOfFile(BiDirPipeLocks.MapHandle, FILE_MAP_ALL_ACCESS, 0, 0, SizeOf(TBiDirPipe));
  {Now create synchronisation objects Open all objects without requesting initial ownership}
  with BiDirPipeLocks do
  begin
    BiLock := CreateMutex(nil, false, GlobalLockName); {should always return valid handle}
    CliPeerWaitSem := CreateSemaphore(nil, 0, High(Integer), CliPeerWaitSemName);
    ServPeerWaitSem := CreateSemaphore(nil, 0, High(integer), ServPeerWaitSemName);
    {Now deal with individual pipe locks}
    with Cli2ServLocks do
    begin
      PipeLock := CreateMutex(nil, false, Cli2ServLockName); {should always return valid handle}
      ReaderSem := CreateSemaphore(nil, 0, High(integer), Cli2ServReaderSemName);
      WriterSem := CreateSemaphore(nil, 0, High(integer), Cli2ServWriterSemName);
    end;
    with Serv2CliLocks do
    begin
      PipeLock := CreateMutex(nil, false, Serv2CliLockName);
      ReaderSem := CreateSemaphore(nil, 0, High(integer), Serv2CliReaderSemName);
      WriterSem := CreateSemaphore(nil, 0, High(integer), Serv2CliWriterSemName);
    end;
  end;
  {Okay. Now if we created the memory map, initialise it. Respect mutex ordering}
  if SharedMapCreator then
  begin
    WaitForSingleObject(BiDirPipeLocks.BiLock, INFINITE);
    WaitForSingleObject(BiDirPipeLocks.Cli2ServLocks.PipeLock, INFINITE);
    WaitForSingleObject(BiDirPipeLocks.Serv2CliLocks.PipeLock, INFINITE);
    {Now initialise data structures}
    Randomize;
    with BiDirPipe^ do
    begin
      ServConnected := false;
      CliConnected := false;
      ServPeerWait := false;
      CliPeerWait := false;
      ServHandle := Random(High(integer) - 1);
      CliHandle := ServHandle + 1;
      with Cli2ServPipe do
      begin
        ReaderBlocked := false;
        WriterBlocked := false;
        InitBuf(Buf);
      end;
      with Serv2CliPipe do
      begin
        ReaderBlocked := false;
        WriterBlocked := false;
        InitBuf(Buf);
      end;
    end;
    {Now release locks in the order we aquired them}
    ReleaseMutex(BiDirPipeLocks.BiLock);
    ReleaseMutex(BiDirPipeLocks.Cli2ServLocks.PipeLock);
    ReleaseMutex(BiDirPipeLocks.Serv2CLiLocks.PipeLock);
  end;
  {Finished!}
end;

procedure Finalise()stdcall;
begin
  SetLastError(0);
  {Close just about every handle we have}
  UnMapViewofFile(BiDirPipe);
  with BiDirPipeLocks do
  begin
    CloseHandle(BiLock);
    CloseHandle(ServPeerWaitSem);
    CloseHandle(CliPeerWaitSem);
    CloseHandle(MapHandle);
    with Cli2ServLocks do
    begin
      CloseHandle(PipeLock);
      CloseHandle(ReaderSem);
      CloseHandle(WriterSem);
    end;
    with Serv2CliLocks do
    begin
      CloseHandle(PipeLock);
      CloseHandle(ReaderSem);
      CloseHandle(WriterSem);
    end;
  end;
end;

exports
  ConnectServer,
  ConnectClient,
  WriteData,
  ReadData,
  PeekData,
  WaitForPeer,
  DisconnectServer,
  DisconnectClient,
  Initialise,
  Finalise;

begin
end.


mchpipeinterface2.pas

Код
{ 10-05-1999 10:36:26 PM > [martin on MARTIN] checked out /Reformatting
   according to Delphi guidelines. }
{ 14-04-1999 11:59:06 PM > [martin on MARTIN] update: Changing dynamic
   methods to virtual. (0.1) /  }
{ 14-04-1999 11:52:47 PM > [martin on MARTIN] checked out /Changing dynamic
   methods to virtual. }
{ 06-04-1999 1:46:38 AM > [martin on MARTIN] check in: (0.0) Initial Version
   / None }
unit MCHPipeInterface2;

{Martin Harvey 23/9/1998}
{Another interface unit, but one that allows for dynamic loading of the DLL}

interface

uses MCHPipeTypes,SysUtils;

function LoadPipeDLL:boolean;
function UnloadPipeDLL:boolean;

{Load and unload functions automatically call initialise and finalise DLL procs}

{All the functions below will also raise EDLLNotLoaded}

function ConnectServer(var hHandle:TMCHHandle):TMCHError;
{Returns error if server already connected}
function ConnectClient(var hHandle:TMCHHandle):TMCHError;
{Returns error if client already connected}
function WriteData(hHandle:TMCHHandle;var Buf;Count:integer):TMCHError;
{Returns error if client or server not connected (or disconnects during block)
 Blocks if buffer full}
function ReadData(hHandle:TMCHHandle;var Buf;Count:integer):TMCHError;
{Returns error if client or server not connected (or disconnects during block)
 Blocks if buffer empty}
function PeekData(hHandle:TMCHHandle;var BytesReady:integer):TMCHError;
{Returns error if client or server not connected, never blocks}
function WaitForPeer(hHandle:TMCHHandle):TMCHError;
{Lets a thread wait for the peer to connect}
function DisconnectServer(hHandle:TMCHHandle):TMCHError;
{Returns error if server not connected, or bad handle}
function DisconnectClient(hHandle:TMCHHandle):TMCHError;
{Returns error if client not connected or bad handle}

function GetDLLLoaded:boolean;

implementation

uses Windows;

type
  ConnectProc = function(var hHandle:TMCHHandle):TMCHError stdcall;
  DisconnectProc = function(hHandle:TMCHHandle):TMCHError;stdcall;
  DataProc = function(hHandle:TMCHHandle;var Buf;Count:integer):TMCHError stdcall;
  PeekProc = function(hHandle:TMCHHandle;var BytesReady:integer):TMCHError stdcall;
  ProcNoParams = procedure stdcall;

var
  InitProc,FinalProc:ProcNoParams;
  ConnectServerProc,ConnectClientProc:ConnectProc;
  DisconnectServerProc,DisconnectClientProc:DisconnectProc;
  WaitForPeerProc:DisconnectProc;
  WriteDataProc,ReadDataProc:DataProc;
  PeekDataProc:PeekProc;
  DLLLoaded:boolean;
  DLLInstanceHandle:THandle;

function GetDLLLoaded:boolean;
begin
  result := DLLLoaded;
end;

function LoadPipeDLL:boolean;
begin
  result := false;
  DLLInstanceHandle := LoadLibrary(PipeDLLName);
  if DLLInstanceHandle <> 0 then
  begin
    InitProc := GetProcAddress(DLLInstanceHandle, 'Initialise');
    FinalProc := GetProcAddress(DLLInstanceHandle, 'Finalise');
    ConnectServerProc := GetProcAddress(DLLInstanceHandle, 'ConnectServer');
    ConnectClientProc := GetProcAddress(DLLInstanceHandle, 'ConnectClient');
    DisconnectServerProc := GetProcAddress(DLLInstanceHandle, 'DisconnectServer');
    DisconnectClientProc := GetProcAddress(DLLInstanceHandle, 'DisconnectClient');
    WaitForPeerProc := GetProcAddress(DLLInstanceHandle, 'WaitForPeer');
    WriteDataProc := GetProcAddress(DLLInstanceHandle, 'WriteData');
    ReadDataProc := GetProcAddress(DLLInstanceHandle, 'ReadData');
    PeekDataProc := GetProcAddress(DLLInstanceHandle, 'PeekData');
    result := true;
    InitProc;
  end;
  DLLLoaded := result;
end;

function UnLoadPipeDLL:boolean;
begin
  result := false;
  if DLLLoaded then
  begin
    FinalProc;
    result := FreeLibrary(DLLInstanceHandle);
    DLLLoaded := false;
  end;
end;

function ConnectServer(var hHandle:TMCHHandle):TMCHError;
{Returns error if server already connected}
begin
  if DLLLoaded then
    result := ConnectServerProc(hHandle)
  else
    result := meDLLNotLoaded;
end;

function ConnectClient(var hHandle:TMCHHandle):TMCHError;
{Returns error if client already connected}
begin
  if DLLLoaded then
    result := ConnectClientProc(hHandle)
  else
    result := meDLLNotLoaded;
end;

function DisconnectServer(hHandle:TMCHHandle):TMCHError;
{Returns error if server not connected, or bad handle}
begin
  if DLLLoaded then
    result := DisconnectServerProc(hHandle)
  else
    result := meDLLNotLoaded;
end;

function DisconnectClient(hHandle:TMCHHandle):TMCHError;
{Returns error if client not connected or bad handle}
begin
  if DLLLoaded then
    result := DisconnectClientProc(hHandle)
  else
    result := meDLLNotLoaded;
end;


function WriteData(hHandle:TMCHHandle;var Buf;Count:integer):TMCHError;
{Returns error if client or server not connected (or disconnects during block)
 Blocks if buffer full}
begin
  if DLLLoaded then
    result := WriteDataProc(hHandle,Buf,Count)
  else
    result := meDLLNotLoaded;
end;

function ReadData(hHandle:TMCHHandle;var Buf;Count:integer):TMCHError;
{Returns error if client or server not connected (or disconnects during block)
 Blocks if buffer empty}
begin
  if DLLLoaded then
    result := ReadDataProc(hHandle,Buf,Count)
  else
    result := meDLLNotLoaded;
end;

function PeekData(hHandle:TMCHHandle;var BytesReady:integer):TMCHError;
{Returns error if client or server not connected, never blocks}
begin
  if DLLLoaded then
    result := PeekDataProc(hHandle,BytesReady)
  else
    result := meDLLNotLoaded;
end;

function WaitForPeer(hHandle:TMCHHandle):TMCHError;
{Lets a thread wait for the peer to connect}
begin
  if DLLLoaded then
    result := WaitForPeerProc(hHandle)
  else
    result := meDLLNotLoaded;
end;

begin
  DLLLoaded := false;
end.



Это сообщение отредактировал(а) Петрович - 1.8.2005, 13:22


--------------------
Все знать невозможно, но хочется
PM ICQ   Вверх
Петрович
Дата 31.7.2005, 22:24 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
***


Профиль
Группа: Участник Клуба
Сообщений: 1000
Регистрация: 2.12.2003
Где: Москва

Репутация: 15
Всего: 55



mchpipetypes.pas

Код
{ 10-05-1999 10:37:17 PM > [martin on MARTIN] checked out /Reformatting
   according to Delphi guidelines. }
{ 06-04-1999 2:43:08 AM > [martin on MARTIN] checked out /Test changes }
{ 06-04-1999 1:46:39 AM > [martin on MARTIN] check in: (0.0) Initial Version
   / None }
unit MCHPipeTypes;

{Martin Harvey 22/9/98

Definition of types required for MCHPipe DLL}

interface

const
  PipeDLLName = 'MCHPipe';

type
  TMCHHandle = integer;
  TMCHError = (meOK,
    meBadHandle,
    meClientNotConnected,
    meServerNotConnected,
    meAlreadyConnected,
    meDLLNotLoaded);


implementation

end.


This DLL is similar to the bounded buffer example found in chapter 9. Looking back on this code, I can only presume that I'd written it after a couple of weeks frantic hacking in C at work, because it's far more convoluted than it needs to be. One point of interest is that the semaphores used for blocking operations do not assume that the bounded buffers are any particular size; instead state is kept on whether the reader or writer threads are blocked or not.

The reader and writer threads.

mchpipethreads.pas

Код
{ 10-05-1999 10:37:03 PM > [martin on MARTIN] checked out /Reformatting
   according to Delphi guidelines. }
{ 06-04-1999 7:49:40 PM > [martin on MARTIN] checked out /Modifying Class
   Names }
unit MCHPipeThreads;

{Martin Harvey 7/11/98}

{This unit gives us a base pipe thread type with some common support for
error tracking}

interface

uses Classes,MCHPipeTypes,Windows,MCHMemoryStream;

type
  TMCHPipeThread = class(TThread)
  private
    FOnTerminate:TNotifyEvent;
  protected
    FTermReason:TMCHError;
  public
    procedure Execute;override;
  published
    property TermReason:TMCHError read FTermReason;
    property OnTerminate:TNotifyEvent read FOnTerminate write FOnTerminate;
  end;

  TMCHPipeWriterThread = class(TMCHPipeThread)
  private
    FDataMutex,FIdleSemaphore:THandle;
    FPipeWriteHandle:TMCHHandle;
    FData:TMCHMemoryStream;
    FWriteIdx:integer;
  protected
  public
    constructor Create(CreateSuspended:boolean);
    procedure Execute;override;
    destructor Destroy;override;
    function WriteData(InStream:TStream):integer; {returns bytes written = InStream.Size}
    property PipeWriteHandle:TMCHHandle read FPipeWriteHandle write FPipeWriteHandle;
  end;

  TMCHPipeReaderThread = class(TMCHPipeThread)
  private
      { Private declarations }
    FDataMutex:THandle;
    FPipeReadHandle:TMCHHandle;
    FData:TMCHMemoryStream;
    FOnDataRecieved:TNotifyEvent;
    FOnConnect:TNotifyEvent;
  protected
  public
    constructor Create(CreateSuspended:boolean);
    procedure Execute;override;
    destructor Destroy;override;
    function ReadData(OutStream:TStream):integer; {returns bytes read}
    property OnDataRecieved:TNotifyEvent read FOnDataRecieved write FOnDataRecieved;
    property PipeReadHandle:TMCHHandle read FPipeReadHandle write FPipeReadHandle;
    property OnConnect:TNotifyEvent read FOnConnect write FOnConnect;
  end;

implementation

uses MCHPipeInterface2;

const
  BufSize = 4096;

type
  DataBuf = array[0..BufSize - 1] of integer;

procedure TMCHPipeThread.Execute;
begin
  if Assigned(FOnTerminate) then FOnTerminate(Self);
end;

constructor TMCHPipeReaderThread.Create(CreateSuspended:boolean);
begin
  inherited Create(CreateSuspended);
  FDataMutex := CreateMutex(nil,false,nil);
  FData := TMCHMemoryStream.Create;
end;

destructor TMCHPipeReaderThread.Destroy;
begin
  Terminate;
  if Suspended then Resume;
  WaitFor;
  FData.Free;
  CloseHandle(FDataMutex);
  inherited Destroy;
end;

function TMCHPipeReaderThread.ReadData(OutStream:TStream):integer;

begin
  WaitForSingleObject(FDataMutex,INFINITE);
  try
    OutStream.Seek(0,soFromEnd);
    FData.Seek(0,soFromBeginning);
    Result := FData.Size;
    OutStream.CopyFrom(FData,FData.Size);
    FData.Clear;
  finally
    ReleaseMutex(FDataMutex);
  end;
end;

procedure TMCHPipeReaderThread.Execute;

var
  Buffer:DataBuf;
  BytesToRead,BytesThisTime:integer;

begin
  FTermReason := WaitForPeer(FPipeReadHandle);
  if FTermReason <> meOK then
    terminate
  else
    if Assigned(FOnConnect) then FOnConnect(Self);
  while not terminated do
  begin
    FTermReason := PeekData(FPipeReadHandle,BytesToRead);
    if FTermReason <> meOK then
      terminate;
    if (not terminated) then
    begin
      if BytesToRead <= 0 then
      begin
        {Callback handler should implement lazy async notification}
        if Assigned(FOnDataRecieved) then FOnDataRecieved(Self);
        BytesToRead := 1;
      end;
      if BytesToRead > BufSize then
        BytesThisTime := BufSize
      else
        BytesThisTime := BytesToRead;
      FTermReason := MCHPipeInterface2.ReadData(FPipeReadHandle,Buffer,BytesThisTime);
      if FTermReason <> meOK then
        terminate
      else
      begin
        WaitForSingleObject(FDataMutex,INFINITE);
        FData.Seek(0,soFromEnd);
        FData.WriteBuffer(Buffer,BytesThisTime);
        ReleaseMutex(FDataMutex);
      end;
    end;
  end;
  inherited Execute;
end;

constructor TMCHPipeWriterThread.Create(CreateSuspended:boolean);
begin
  inherited Create(CreateSuspended);
  FDataMutex := CreateMutex(nil,false,nil);
  FIdleSemaphore := CreateSemaphore(nil,0,High(Integer),nil);
  FData := TMCHMemoryStream.Create;
end;

destructor TMCHPipeWriterThread.Destroy;
begin
  Terminate;
  ReleaseSemaphore(FIdleSemaphore,1,nil);
  if Suspended then Resume;
  WaitFor;
  FData.Free;
  CloseHandle(FDataMutex);
  inherited Destroy;
end;

function TMCHPipeWriterThread.WriteData(InStream:TStream):integer;

begin
  InStream.Seek(0,soFromBeginning);
  WaitForSingleObject(FDataMutex,INFINITE);
  try
    Result := InStream.Size;
    FData.Seek(0,soFromEnd);
    FData.CopyFrom(InStream,InStream.Size);
  finally
    ReleaseMutex(FDataMutex);
  end;
  ReleaseSemaphore(FIdleSemaphore,1,nil);
end;

procedure TMCHPipeWriterThread.Execute;

var
  Buf:DataBuf;
  BytesThisTime,BytesToWrite:integer;

begin
  while not (terminated) do
  begin
    WaitForSingleObject(FDataMutex,INFINITE);
    BytesToWrite := FData.Size - FWriteIdx;
    ReleaseMutex(FDataMutex);
    while (BytesToWrite > 0) and (not terminated) do
    begin
      if BytesToWrite > BufSize then
        BytesThisTime := BufSize
      else
        BytesThisTime := BytesToWrite;
      WaitForSingleObject(FDataMutex,INFINITE);
      FData.Seek(FWriteIdx,soFromBeginning);
      FData.ReadBuffer(Buf,BytesThisTime);
      ReleaseMutex(FDataMutex);
      {Note that we should not block when we have the mutex!}
      FTermReason := MCHPipeInterface2.WriteData(FPipeWriteHandle,Buf,BytesThisTime);
      if (FTermReason = meOK) then
      begin
        BytesToWrite := BytesToWrite - BytesThisTime;
        FWriteIdx := FWriteIdx + BytesThisTime;
      end
      else
        terminate;
    end;
    if (not (terminated)) then
    begin
      WaitForSingleObject(FDataMutex,INFINITE);
      {Cannot be sure that the stream hasn't been written to in the meantime!}
      {When the expression below is false, the wait on the idle semaphore
       will not block, so the stream should not get unnecessarily large}
      if FWriteIdx = FData.Size then
      begin
        FData.Clear;
        FWriteIdx := 0;
      end;
      ReleaseMutex(FDataMutex);
      WaitForSingleObject(FIdleSemaphore,INFINITE);
    end;
  end;
  inherited Execute;
end;

end.


mchmemorystream.pas

Код
{ 10-05-1999 10:36:02 PM > [martin on MARTIN] checked out /Reformatting
   according to Delphi guidelines. }
{ 06-04-1999 2:42:04 AM > [martin on MARTIN] update: Removed checking
   pragmas Initial Version (0.1) /  }
{ 06-04-1999 1:46:37 AM > [martin on MARTIN] check in: (0.0) Initial Version
   / None }
unit MCHMemoryStream;

{Martin Harvey 18/7/1998

For donkeys years I've had performance problems when reading to
or writing from memory streams in small increments. This unit
intends to fix this problem.

Completely rewritten: Martin Harvey 5/4/1999.

I was unhappy with some performance issues in the original,
and the scheme for calculating size and position was not very logical,
and had special cases. This is now a more consistent and effecient rewrite}

{$RANGECHECKS OFF}
{$STACKCHECKS OFF}
{$IOCHECKS OFF}
{$OVERFLOWCHECKS OFF}

interface

{This stream acts as a memory stream by storing the data in 4k blocks,
 all of which are attached to a TList}

uses Classes;

const
  DataBlockSize = 4096;

type
  TDataBlockOffset = 0..DataBlockSize - 1;

  TDataBlock = array[0..DataBlockSize - 1] of byte;

  PDataBlock = ^TDataBlock;

{New rules for offset values are as follows:

 FPosBlock contains the number of the block which is about to be read or written to,
 given the current position.

 FPosOfs contains the offset of the byte that is about to be read or written to,
 given the current position. Always between 0 and DataBlockSize-1

 The number of blocks in the stream is given by the list count. If we are at the
 end of the stream, and the size of the stream is an exact multiple of the
 block size, then the last block will be empty.

 ie: The last block is never full.

}
  TMCHMemoryStream = class(TStream)
  private
    FBlockList:TList;
    FPosBlock:Longint;
    FPosOfs:TDataBlockOffset;
    FLastOfs:TDataBlockOffset;
    {FLastOfs is the offset of the byte to be read or written just off the end of the stream}
  protected
    function GetSize:longint;
    function GetPosition:longint;
    function ConvertOffsetsToLongint(Blocks:longint;BlockOfs:TDataBlockOffset):longint;
    procedure ConvertLongintToOffsets(Input:longint;var Blocks:longint;var BlockOfs:TDataBlockOffset);
    procedure ResizeBlockList(NewNumBlocks:longint);
  public
    constructor Create;
    destructor Destroy;override;
    {Necessary overrides}
    function Read(var Buffer;Count:longint):longint;override;
    function Write(const Buffer;Count:longint):longint;override;
    function Seek(Offset:Longint;Origin:word):Longint;override;
    {Procedures duplicating TCustomMemoryStream functionality}
    procedure SaveToStream(Stream:TStream);
    procedure SaveToFile(const FileName:string);
    {Procedures Duplicating TMemoryStream functionality}
    procedure Clear;
    procedure LoadFromStream(Stream:TStream);
    procedure LoadFromFile(const FileName:string);
    procedure SetSize(NewSize:longint);override;
  end;


implementation

uses SysUtils,Windows;

procedure TMCHMemoryStream.ResizeBlockList(NewNumBlocks:longint);

var
  iter,CurCount:longint;
  NewBlock:PDataBlock;

begin
  CurCount := FBlockList.Count;
  if NewNumBlocks > CurCount then
  begin
    for iter := CurCount to NewNumBlocks - 1 do
    begin
      New(NewBlock);
      FBlockList.Add(NewBlock);
    end;
  end
  else if NewNumBlocks < CurCount then
  begin
    for iter := NewNumBlocks to CurCount - 1 do
    begin
      Dispose(PDataBlock(FBlockList.Items[FBlockList.Count - 1]));
      FBlockList.Delete(FBlockList.Count - 1);
    end;
  end;
end;

function TMCHMemoryStream.GetSize;
begin
  result := ConvertOffsetsToLongint(FBlockList.Count - 1,FLastOfs);
end;

function TMCHMemoryStream.GetPosition;
begin
  result := ConvertOffsetsToLongint(FPosBlock,FposOfs);
end;

function TMCHMemoryStream.ConvertOffsetsTolongint(Blocks:longint;BlockOfs:TDataBlockOffset):longint;
begin
  Result := Blocks * DataBlockSize;
  Result := Result + BlockOfs;
end;

procedure TMCHMemoryStream.ConvertLongintToOffsets(Input:longint;var Blocks:longint;var BlockOfs:TDataBlockOffset);
begin
  Blocks := Input div DataBlockSize;
  BlockOfs := Input mod DataBlockSize;
end;

procedure TMCHMemoryStream.SetSize(NewSize:longint);

var
  NewNumBlocks:longint;
  CurPosition:longint;

begin
  if NewSize >= 0 then
  begin
    {Calculate current position}
    CurPosition := GetPosition;
    {Calculate end offsets for new size}
    ConvertLongintToOffsets(NewSize,NewNumBlocks,FLastOfs);
    {Now have the number of blocks needed, and the offset in the last block}
    ResizeBlockList(NewNumBlocks + 1);
    {List resized}
    {Now adjust position vars if needed}
    if NewSize < CurPosition then
    begin
      {Set current position to the end of the stream}
      FPosBlock := NewNumBlocks - 1;
      FPosOfs := FLastOfs;
    end;
  end;
end;

procedure TMCHMemoryStream.LoadFromStream(Stream:TStream);

var
  TempBlock:TDataBlock;
  BytesThisIteration:longint;

begin
  Stream.Seek(0,soFromBeginning);
  repeat
    BytesThisIteration := DataBlockSize;
    if BytesThisIteration > (Stream.Size - Stream.Position) then
      BytesThisIteration := Stream.Size - Stream.Position;
    Stream.ReadBuffer(TempBlock,BytesThisIteration);
    WriteBuffer(TempBlock,BytesThisIteration);
  until Stream.Position = Stream.Size;
end;

procedure TMCHMemoryStream.LoadFromFile(const FileName:string);
var
  Stream:TStream;
begin
  Stream := TFileStream.Create(FileName,fmOpenRead);
  try
    LoadFromStream(Stream);
  finally
    Stream.Free;
  end;
end;


procedure TMCHMemoryStream.SaveToStream(Stream:TStream);

var
  TempBlock:TDataBlock;
  BytesThisIteration:longint;

begin
  Seek(0,soFromBeginning);
  repeat
    BytesThisIteration := DataBlockSize;
    if BytesThisIteration > (Size - Position) then
      BytesThisIteration := Size - Position;
    ReadBuffer(TempBlock,BytesThisIteration);
    Stream.WriteBuffer(TempBlock,BytesThisIteration);
  until Position = Size;
end;

procedure TMCHMemoryStream.SaveToFile(const FileName:string);
var
  Stream:TStream;
begin
  Stream := TFileStream.Create(FileName,fmCreate);
  try
    SaveToStream(Stream);
  finally
    Stream.Free;
  end;
end;

function TMCHMemoryStream.Write(const Buffer;Count:longint):longint;

var
  CurPos,CurSize,BytesWritten,BytesThisBlock:longint;
  Src:Pointer;
  DestBlock:PDataBlock;

begin
  {Returns bytes written}
  if Count < 0 then
  begin
    result := 0;
    exit;
  end;
  result := count;
  CurPos := GetPosition;
  CurSize := GetSize;
  if CurPos + Result > CurSize then
    SetSize(CurPos + Result);
  {Enough blocks allocated, may result in zero sized block at end}
  {Now do the write}
  Src := @Buffer;
  BytesWritten := 0;
  repeat
    DestBlock := PDataBlock(FBlockList.Items[FPosBlock]);
    BytesThisBlock := DataBlockSize - FPosOfs;
    if BytesThisBlock > (Result - BytesWritten) then
      BytesThisBlock := Result - BytesWritten;
    CopyMemory(@DestBlock^[FPosOfs],Src,BytesThisBlock);
    {Now update position vars}
    if BytesThisBlock + FPosOfs = DataBlockSize then
    begin
      FPosOfs := 0;
      Inc(FPosBlock);
    end
    else
      FPosOfs := FPosOfs + BytesThisBlock;
    BytesWritten := BytesWritten + BytesThisBlock;
    Src := Pointer(Integer(Src) + BytesThisBlock);
  until BytesWritten = result;
end;

function TMCHMemoryStream.Read(var Buffer;Count:longint):longint;

var
  CurPos,CurSize,BytesRead,BytesThisBlock:longint;
  SrcBlock:PDataBlock;
  Dest:pointer;


begin
  {Returns bytes read}
  CurPos := GetPosition;
  CurSize := GetSize;
  result := Count;
  if result < 0 then result := 0;
  if result > (CurSize - CurPos) then result := CurSize - CurPos;
  if result > 0 then
  begin
    Dest := @Buffer;
    BytesRead := 0;
    repeat
      SrcBlock := PDataBlock(FBlockList.items[FPosBlock]);
      BytesThisBlock := DataBlockSize;
      if FPosBlock = FBlockList.Count - 1 then {We're on the last block}
        BytesThisBlock := FLastOfs;
      BytesThisBlock := BytesThisBlock - FPosOfs;
      if BytesThisBlock > (result - BytesRead) then
        BytesThisBlock := result - BytesRead;
      {Now copy the required number of bytes}
      CopyMemory(Dest,@SrcBlock^[FPosOfs],BytesThisBlock);
      {Now update position state}
      if BytesThisBlock + FPosOfs = DataBlockSize then
      begin
        FPosOfs := 0;
        Inc(FPosBlock);
      end
      else
        FPosOfs := FPosOfs + BytesThisBlock;
      BytesRead := BytesRead + BytesThisBlock;
      Dest := Pointer(Integer(Dest) + BytesThisBlock);
    until BytesRead = result;
  end;
end;


function TMCHMemoryStream.Seek(Offset:Longint;Origin:word):longint;

var
  CurPos,CurSize:longint;

begin
  {Remember that it returns new position}
  CurPos := GetPosition;
  CurSize := GetSize;
  case Origin of
    soFromBeginning:result := Offset;
    soFromCurrent:result := CurPos + Offset;
    soFromEnd:result := CurSize - Offset;
  else
    result := CurPos;
  end;
  ConvertLongintToOffsets(result,FPosBlock,FPosOfs);
end;

procedure TMCHMemoryStream.Clear;

begin
  SetSize(0);
end;

destructor TMCHMemoryStream.Destroy;

begin
  Clear;
  Dispose(PDataBlock(FBlockList.Items[0]));
  FBlockList.Free;
  inherited Destroy;
end;

constructor TMCHMemoryStream.Create;
begin
  inherited Create;
  FBlockList := TList.Create;
  Clear; {Allocates first block}
end;

end.


The pipe threads are exactly analogous to the reader and writer threads in the BAB in chapter 10. Notifications are not used for write operations, instead, the writer thread buffers the data internally. This was allowable given the semantics of higher layer protocols.

A socket based interface.

mchpipesocket.pas

Код
{ 10-05-1999 10:36:51 PM > [martin on MARTIN] checked out /Reformatting
   according to Delphi guidelines. }
{ 14-04-1999 11:59:10 PM > [martin on MARTIN] update: Changing dynamic
   methods to virtual. (0.2) /  }
{ 14-04-1999 11:53:03 PM > [martin on MARTIN] checked out /Changing dynamic
   methods to virtual. }
{ 06-04-1999 7:49:29 PM > [martin on MARTIN] checked out /Modifying Class
   Names }
unit MCHPipeSocket;

{$OVERFLOWCHECKS OFF}

{Martin Harvey 7/11/1998

  This unit does the required interfacing from the socket paradigm to the
  pipe paradigm. It does the required interfacing between the pipe threads, which may
  block, and the pipe transaction manager, which treats all events as aynchronous.

  Main points to consider are:

  DLL Loading will be handled by the session.

  We will regularly have to check the state of the reader and writer threads.
  If either of them terminates, then we need to find out why, and signal that
  as a disconnection event or error event.

  We also need to check for success during connection.

  Note that writer thread will not terminate unless it fails to write data.
  Reader thread may terminate at any time after it has stopped waiting for the
  peer to connect.

  The connection status variable tells us whether we
  are disconnected, fully connected, or waiting for the peer.
  If we are waiting for the peer, ONCP code should not attempt to send anything,
  but signal an error.

  Although we signal a connection, this will probably not be used by the ONCP
  Session. It will just check whether we are connected every time something has
  to be sent, and signal an error if we aren't.

Design modification: 15/12/98

  An additional "issue" has cropped up. It is entirely possible for an immediate
  reconnection attempt after a disconnection to mean that unhandled messages from
  the previous connection get applied to the current connection. *NOT* what we want!

  We get around this by having a "Session Number" integer, which starts at 0,
  and always increments. We only allow messages given by the current connection
  any notice.

  It is not protected, because it's only read when the reader/writer threads
  do call our callbacks, and written when the threads do not exist.

}

interface

uses Classes,MCHPipeThreads,MCHPipeTypes,MCHTransactions,
  Messages,Windows,Controls;

const
  WM_ASYNC_LAZY_READ = WM_USER + 2878;
  WM_ASYNC_TERMINATE = WM_USER + 2879;
  WM_ASYNC_CONNECT = WM_USER + 2880;

type
  psServerType = (psServer,psClient,psPeer);

  TMCHPipeConnectionStatus = (pcsNotConnected,pcsConnecting,pcsConnected);

  TMCHPipeSocket = class(TComponent)
  private
    FSockHandle:TMCHHandle;
    FHWnd:THandle;
    FReaderThread:TMCHPipeReaderThread;
    FWriterThread:TMCHPipeWriterThread;
    FOnDisconnect,FOnConnect:TNotifyEvent;
    FOnSockError:TNotifyEvent;
    FOnRead:TNotifyEvent;
    FConnected:TMCHPipeConnectionStatus;
    FManager:TMCHCustomTransactionManager;
    FServer:psServerType;
    FSessionNumber:Word;
  protected
    procedure HandleDataRecieved(Sender:TObject); {Handle data, Called in separate thread}
    procedure HandleTerminate(Sender:TObject); {Handle termination, Called in Separate Thread}
    procedure HandleConnect(Sender:TObject); {Handle peer connection, Called in separate thread}
    procedure MessageHandler(var Msg:TMessage); {Message handling loop for bridging thread gap}
    procedure DoAsyncHandleTerminate(var Msg:TMessage); {asynchronous handler}
    procedure DoAsyncHandleDataRecieved(var Msg:TMessage); {asynchronous handler}
    procedure DoAsyncHandleConnect(var Msg:TMessage); {asynchronous handler}
    procedure DoDisconnect;virtual; {Event trigger}
    procedure DoSockError;virtual; {Event trigger}
    procedure DoRead;virtual; {Event trigger}
    procedure DoConnect;virtual; {Event trigger}
    procedure StartThreads; {Sets up handles and resumes threads}
  public
    constructor Create(AOwner:TComponent);override;
    destructor Destroy;override;
    function Connect:boolean; {Signals whether connection successful pending remote connection}
    procedure Disconnect; {Closes handles, and Frees threads}
    function ReadData(Stream:TStream):integer; {Appends new data to stream. Returns how many bytes read}
    procedure WriteData(Stream:TStream); {Writes stream data.}
  published
   {On Disconnect is to be treated by the higher layers like a dWinsock
    disconnection was in the original DOP/ONCP stuff.}
    property OnDisconnect:TNotifyEvent read FOnDisconnect write FOnDisconnect;
   {OnPipeError will normally just be handled by the transaction manager,
    which will signal OnFatalError, However, you can assign a handler if you
    want.}
    property OnSockError:TNotifyEvent read FOnSockError write FOnSockError;
    property OnRead:TNotifyEvent read FOnRead write FOnRead;
    property OnConnect:TNotifyEvent read FOnConnect write FOnConnect;
    property Connected:TMCHPipeConnectionStatus read FConnected;
    property Manager:TMCHCustomTransactionManager read FManager write FManager;
    property Server:psServerType read FServer write FServer;
  end;

implementation

uses MCHPipeInterface2,Forms,MCHPipeTransactions;

constructor TMCHPipeSocket.Create(AOwner:TComponent);
begin
  inherited Create(AOwner);
  FHWnd := AllocateHWnd(MessageHandler);
  FManager := TMCHPipeTransactionManager.Create;
  (FManager as TMCHPipeTransactionManager).Socket := Self;
  FSessionNumber := 0;
end;

destructor TMCHPipeSocket.Destroy;
begin
  if Assigned(FManager) then
  begin
    FManager.Free;
    FManager := nil;
  end;
  Disconnect;
  DeallocateHWnd(FHWnd);
  inherited Destroy;
end;

procedure TMCHPipeSocket.HandleDataRecieved(Sender:TObject);
begin
  PostMessage(FHwnd,WM_ASYNC_LAZY_READ,FSessionNumber,0);
end;

procedure TMCHPipeSocket.HandleTerminate(Sender:TObject);
begin
  PostMessage(FHwnd,WM_ASYNC_TERMINATE,FSessionNumber,Longint(Sender));
end;

procedure TMCHPipeSocket.HandleConnect(Sender:TObject);
begin
  PostMessage(FHwnd,WM_ASYNC_CONNECT,FSessionNumber,0);
end;

procedure TMCHPipeSocket.MessageHandler(var Msg:TMessage);
begin
{The session number check gets rid of a multitude of problems.}
{In particular, it means that we only handle the first termination
 message from the two threads... all later messages are discarded}
  if Msg.WParam = FSessionNumber then
  begin
    case Msg.Msg of
      WM_ASYNC_LAZY_READ:DoAsyncHandleDataRecieved(Msg);
      WM_ASYNC_TERMINATE:DoAsyncHandleTerminate(Msg);
      WM_ASYNC_CONNECT:DoAsyncHandleConnect(Msg);
    end;
  end;
end;

procedure TMCHPipeSocket.DoAsyncHandleTerminate(var Msg:TMessage);

var
  Sender:TObject;
  Error:TMCHError;
  OrigConnected:TMCHPipeConnectionStatus;

begin
  Sender := TObject(Msg.LParam);
{Find out termination reason}
  Error := (Sender as TMCHPipeThread).TermReason;
{Call disconnect, to disconnect everything & free both threads}
  OrigConnected := FConnected;
  Disconnect;
  if not ((Error = meClientNotConnected) or (Error = meServerNotConnected)) then
  begin
    {Serious algorithm failure}
    DoSockError;
  end
  else
  begin
    {Normal disconnection by peer}
    {Don't signal this if it's as a result of us disconnecting}
    if OrigConnected <> pcsNotConnected then DoDisconnect;
  end;
end;

procedure TMCHPipeSocket.DoAsyncHandleDataRecieved(var Msg:TMessage);
begin
{Check that we are connected and that we have a thread to read from!}
  if (FConnected = pcsConnected) and Assigned(FReaderThread) then
    DoRead;
end;

procedure TMCHPipeSocket.DoAsyncHandleConnect(var Msg:TMessage);
begin
  if FConnected = pcsConnecting then
  begin
    FConnected := pcsConnected;
    DoConnect;
  end
  else
    {Serious algorithm failure}
    DoSockError;
end;

procedure TMCHPipeSocket.DoDisconnect;
begin
  (Manager as TMCHPipeTransactionManager).HandleDisconnect;
  if Assigned(FOnDisconnect) then FOnDisconnect(Self);
end;

procedure TMCHPipeSocket.DoConnect;
begin
  (Manager as TMCHPipeTransactionManager).HandleConnect;
  if Assigned(FOnConnect) then FOnConnect(Self);
end;

procedure TMCHPipeSocket.DoSockError;
begin
  (Manager as TMCHPipeTransactionManager).HandleSockError;
  if Assigned(FOnSockError) then FOnSockError(Self);
end;

procedure TMCHPipeSocket.DoRead;
begin
  (Manager as TMCHPipeTransactionManager).HandleSockRead;
  if Assigned(FOnRead) then FOnRead(Self);
end;

function TMCHPipeSocket.Connect:boolean;
{Assumes DLL already loaded.}
begin
  if (FConnected = pcsNotConnected) then
  begin
    if Server = psServer then
      result := MCHPipeInterface2.ConnectServer(FSockHandle) = meOK
    else if Server = psClient then
      result := MCHPipeInterface2.ConnectClient(FSockHandle) = meOK
    else {Server=psPeer}
    begin
      result := MCHPipeInterface2.ConnectServer(FSockHandle) = meOK;
      if (not result) then
        result := MCHPipeInterface2.ConnectClient(FSockHandle) = meOK;
    end;
    if result then
    begin
      FConnected := pcsConnecting;
      StartThreads;
    end;
  end
  else
    result := FConnected <> pcsNotConnected;
end;

procedure TMCHPipeSocket.StartThreads;
begin
{Sets both reader and writer threads into a known state.
 This state is where all info is flushed from buffers,
 and they have just made their first read/write request
 or are waiting for the peer. }
{OVERFLOWCHECKS ARE OFF}
  Inc(FSessionNumber);
{Set session number to make socket ignore queued messages from all previous
 connections}
  FReaderThread := TMCHPipeReaderThread.Create(true);
  FWriterThread := TMCHPipeWriterThread.Create(true);
  FReaderThread.PipeReadHandle := FSockHandle;
  FWriterThread.PipeWriteHandle := FSockHandle;
  FReaderThread.OnDataRecieved := HandleDataRecieved;
  FReaderThread.OnConnect := HandleConnect;
  FReaderThread.Resume;
  FReaderThread.OnTerminate := HandleTerminate;
  FWriterThread.OnTerminate := HandleTerminate;
  FWriterThread.Resume;
end;

procedure TMCHPipeSocket.Disconnect;
begin
{Close handles}
  if MCHPipeInterface2.DisconnectClient(FSockHandle) <> meOK then
    MCHPipeInterface2.DisconnectServer(FSockHandle);
{Free threads}
{Reader thread is already unblocked}
{Writer thread may only unblock when it's destructor is called}
  if Assigned(FReaderThread) then
  begin
    with FReaderThread do
    begin
      Terminate; {Don't need to wait. Destructor calls WaitFor}
      Free;
    end;
    FReaderThread := nil;
  end;
  if Assigned(FWriterThread) then
  begin
    with FWriterThread do
    begin
      Terminate; {Don't need to wait. Destructor calls WaitFor}
      Free;
    end;
    FWriterThread := nil;
  end;
  FConnected := pcsNotConnected;
{OVERFLOWCHECKS ARE OFF}
  Inc(FSessionNumber);
end;

function TMCHPipeSocket.ReadData(Stream:TStream):integer;
begin
  if FConnected = pcsConnected then {FReader should be assigned}
    result := FReaderThread.ReadData(Stream)
  else
    result := 0;
end;

procedure TMCHPipeSocket.WriteData(Stream:TStream);
begin
  if FConnected = pcsConnected then
    FWriterThread.WriteData(Stream);
end;


end.


mchpipetypes.pas

Код
{ 10-05-1999 10:37:17 PM > [martin on MARTIN] checked out /Reformatting
   according to Delphi guidelines. }
{ 06-04-1999 2:43:08 AM > [martin on MARTIN] checked out /Test changes }
{ 06-04-1999 1:46:39 AM > [martin on MARTIN] check in: (0.0) Initial Version
   / None }
unit MCHPipeTypes;

{Martin Harvey 22/9/98

Definition of types required for MCHPipe DLL}

interface

const
  PipeDLLName = 'MCHPipe';

type
  TMCHHandle = integer;
  TMCHError = (meOK,
    meBadHandle,
    meClientNotConnected,
    meServerNotConnected,
    meAlreadyConnected,
    meDLLNotLoaded);


implementation

end.


This DLL is similar to the bounded buffer example found in chapter 9. Looking back on this code, I can only presume that I'd written it after a couple of weeks frantic hacking in C at work, because it's far more convoluted than it needs to be. One point of interest is that the semaphores used for blocking operations do not assume that the bounded buffers are any particular size; instead state is kept on whether the reader or writer threads are blocked or not.


Это сообщение отредактировал(а) Петрович - 31.7.2005, 22:33


--------------------
Все знать невозможно, но хочется
PM ICQ   Вверх
Петрович
Дата 31.7.2005, 22:53 (ссылка) | (нет голосов) Загрузка ... Загрузка ... Быстрая цитата Цитата


Эксперт
***


Профиль
Группа: Участник Клуба
Сообщений: 1000
Регистрация: 2.12.2003
Где: Москва

Репутация: 15
Всего: 55



The reader and writer threads.

mchpipethreads.pas

Код
{ 10-05-1999 10:37:03 PM > [martin on MARTIN] checked out /Reformatting
   according to Delphi guidelines. }
{ 06-04-1999 7:49:40 PM > [martin on MARTIN] checked out /Modifying Class
   Names }
unit MCHPipeThreads;

{Martin Harvey 7/11/98}

{This unit gives us a base pipe thread type with some common support for
error tracking}

interface

uses Classes,MCHPipeTypes,Windows,MCHMemoryStream;

type
  TMCHPipeThread = class(TThread)
  private
    FOnTerminate:TNotifyEvent;
  protected
    FTermReason:TMCHError;
  public
    procedure Execute;override;
  published
    property TermReason:TMCHError read FTermReason;
    property OnTerminate:TNotifyEvent read FOnTerminate write FOnTerminate;
  end;

  TMCHPipeWriterThread = class(TMCHPipeThread)
  private
    FDataMutex,FIdleSemaphore:THandle;
    FPipeWriteHandle:TMCHHandle;
    FData:TMCHMemoryStream;
    FWriteIdx:integer;
  protected
  public
    constructor Create(CreateSuspended:boolean);
    procedure Execute;override;
    destructor Destroy;override;
    function WriteData(InStream:TStream):integer; {returns bytes written = InStream.Size}
    property PipeWriteHandle:TMCHHandle read FPipeWriteHandle write FPipeWriteHandle;
  end;

  TMCHPipeReaderThread = class(TMCHPipeThread)
  private
      { Private declarations }
    FDataMutex:THandle;
    FPipeReadHandle:TMCHHandle;
    FData:TMCHMemoryStream;
    FOnDataRecieved:TNotifyEvent;
    FOnConnect:TNotifyEvent;
  protected
  public
    constructor Create(CreateSuspended:boolean);
    procedure Execute;override;
    destructor Destroy;override;
    function ReadData(OutStream:TStream):integer; {returns bytes read}
    property OnDataRecieved:TNotifyEvent read FOnDataRecieved write FOnDataRecieved;
    property PipeReadHandle:TMCHHandle read FPipeReadHandle write FPipeReadHandle;
    property OnConnect:TNotifyEvent read FOnConnect write FOnConnect;
  end;

implementation

uses MCHPipeInterface2;

const
  BufSize = 4096;

type
  DataBuf = array[0..BufSize - 1] of integer;

procedure TMCHPipeThread.Execute;
begin
  if Assigned(FOnTerminate) then FOnTerminate(Self);
end;

constructor TMCHPipeReaderThread.Create(CreateSuspended:boolean);
begin
  inherited Create(CreateSuspended);
  FDataMutex := CreateMutex(nil,false,nil);
  FData := TMCHMemoryStream.Create;
end;

destructor TMCHPipeReaderThread.Destroy;
begin
  Terminate;
  if Suspended then Resume;
  WaitFor;
  FData.Free;
  CloseHandle(FDataMutex);
  inherited Destroy;
end;

function TMCHPipeReaderThread.ReadData(OutStream:TStream):integer;

begin
  WaitForSingleObject(FDataMutex,INFINITE);
  try
    OutStream.Seek(0,soFromEnd);
    FData.Seek(0,soFromBeginning);
    Result := FData.Size;
    OutStream.CopyFrom(FData,FData.Size);
    FData.Clear;
  finally
    ReleaseMutex(FDataMutex);
  end;
end;

procedure TMCHPipeReaderThread.Execute;

var
  Buffer:DataBuf;
  BytesToRead,BytesThisTime:integer;

begin
  FTermReason := WaitForPeer(FPipeReadHandle);
  if FTermReason <> meOK then
    terminate
  else
    if Assigned(FOnConnect) then FOnConnect(Self);
  while not terminated do
  begin
    FTermReason := PeekData(FPipeReadHandle,BytesToRead);
    if FTermReason <> meOK then
      terminate;
    if (not terminated) then
    begin
      if BytesToRead <= 0 then
      begin
        {Callback handler should implement lazy async notification}
        if Assigned(FOnDataRecieved) then FOnDataRecieved(Self);
        BytesToRead := 1;
      end;
      if BytesToRead > BufSize then
        BytesThisTime := BufSize
      else
        BytesThisTime := BytesToRead;
      FTermReason := MCHPipeInterface2.ReadData(FPipeReadHandle,Buffer,BytesThisTime);
      if FTermReason <> meOK then
        terminate
      else
      begin
        WaitForSingleObject(FDataMutex,INFINITE);
        FData.Seek(0,soFromEnd);
        FData.WriteBuffer(Buffer,BytesThisTime);
        ReleaseMutex(FDataMutex);
      end;
    end;
  end;
  inherited Execute;
end;

constructor TMCHPipeWriterThread.Create(CreateSuspended:boolean);
begin
  inherited Create(CreateSuspended);
  FDataMutex := CreateMutex(nil,false,nil);
  FIdleSemaphore := CreateSemaphore(nil,0,High(Integer),nil);
  FData := TMCHMemoryStream.Create;
end;

destructor TMCHPipeWriterThread.Destroy;
begin
  Terminate;
  ReleaseSemaphore(FIdleSemaphore,1,nil);
  if Suspended then Resume;
  WaitFor;
  FData.Free;
  CloseHandle(FDataMutex);
  inherited Destroy;
end;

function TMCHPipeWriterThread.WriteData(InStream:TStream):integer;

begin
  InStream.Seek(0,soFromBeginning);
  WaitForSingleObject(FDataMutex,INFINITE);
  try
    Result := InStream.Size;
    FData.Seek(0,soFromEnd);
    FData.CopyFrom(InStream,InStream.Size);
  finally
    ReleaseMutex(FDataMutex);
  end;
  ReleaseSemaphore(FIdleSemaphore,1,nil);
end;

procedure TMCHPipeWriterThread.Execute;

var
  Buf:DataBuf;
  BytesThisTime,BytesToWrite:integer;

begin
  while not (terminated) do
  begin
    WaitForSingleObject(FDataMutex,INFINITE);
    BytesToWrite := FData.Size - FWriteIdx;
    ReleaseMutex(FDataMutex);
    while (BytesToWrite > 0) and (not terminated) do
    begin
      if BytesToWrite > BufSize then
        BytesThisTime := BufSize
      else
        BytesThisTime := BytesToWrite;
      WaitForSingleObject(FDataMutex,INFINITE);
      FData.Seek(FWriteIdx,soFromBeginning);
      FData.ReadBuffer(Buf,BytesThisTime);
      ReleaseMutex(FDataMutex);
      {Note that we should not block when we have the mutex!}
      FTermReason := MCHPipeInterface2.WriteData(FPipeWriteHandle,Buf,BytesThisTime);
      if (FTermReason = meOK) then
      begin
        BytesToWrite := BytesToWrite - BytesThisTime;
        FWriteIdx := FWriteIdx + BytesThisTime;
      end
      else
        terminate;
    end;
    if (not (terminated)) then
    begin
      WaitForSingleObject(FDataMutex,INFINITE);
      {Cannot be sure that the stream hasn't been written to in the meantime!}
      {When the expression below is false, the wait on the idle semaphore
       will not block, so the stream should not get unnecessarily large}
      if FWriteIdx = FData.Size then
      begin
        FData.Clear;
        FWriteIdx := 0;
      end;
      ReleaseMutex(FDataMutex);
      WaitForSingleObject(FIdleSemaphore,INFINITE);
    end;
  end;
  inherited Execute;
end;

end.


mchpipetypes.pas

Код
{ 10-05-1999 10:37:17 PM > [martin on MARTIN] checked out /Reformatting
   according to Delphi guidelines. }
{ 06-04-1999 2:43:08 AM > [martin on MARTIN] checked out /Test changes }
{ 06-04-1999 1:46:39 AM > [martin on MARTIN] check in: (0.0) Initial Version
   / None }
unit MCHPipeTypes;

{Martin Harvey 22/9/98

Definition of types required for MCHPipe DLL}

interface

const
  PipeDLLName = 'MCHPipe';

type
  TMCHHandle = integer;
  TMCHError = (meOK,
    meBadHandle,
    meClientNotConnected,
    meServerNotConnected,
    meAlreadyConnected,
    meDLLNotLoaded);


implementation

end.


This DLL is similar to the bounded buffer example found in chapter 9. Looking back on this code, I can only presume that I'd written it after a couple of weeks frantic hacking in C at work, because it's far more convoluted than it needs to be. One point of interest is that the semaphores used for blocking operations do not assume that the bounded buffers are any particular size; instead state is kept on whether the reader or writer threads are blocked or not.

The reader and writer threads.

mchpipethreads.pas

Код
{ 10-05-1999 10:37:03 PM > [martin on MARTIN] checked out /Reformatting
   according to Delphi guidelines. }
{ 06-04-1999 7:49:40 PM > [martin on MARTIN] checked out /Modifying Class
   Names }
unit MCHPipeThreads;

{Martin Harvey 7/11/98}

{This unit gives us a base pipe thread type with some common support for
error tracking}

interface

uses Classes,MCHPipeTypes,Windows,MCHMemoryStream;

type
  TMCHPipeThread = class(TThread)
  private
    FOnTerminate:TNotifyEvent;
  protected
    FTermReason:TMCHError;
  public
    procedure Execute;override;
  published
    property TermReason:TMCHError read FTermReason;
    property OnTerminate:TNotifyEvent read FOnTerminate write FOnTerminate;
  end;

  TMCHPipeWriterThread = class(TMCHPipeThread)
  private
    FDataMutex,FIdleSemaphore:THandle;
    FPipeWriteHandle:TMCHHandle;
    FData:TMCHMemoryStream;
    FWriteIdx:integer;
  protected
  public
    constructor Create(CreateSuspended:boolean);
    procedure Execute;override;
    destructor Destroy;override;
    function WriteData(InStream:TStream):integer; {returns bytes written = InStream.Size}
    property PipeWriteHandle:TMCHHandle read FPipeWriteHandle write FPipeWriteHandle;
  end;

  TMCHPipeReaderThread = class(TMCHPipeThread)
  private
      { Private declarations }
    FDataMutex:THandle;
    FPipeReadHandle:TMCHHandle;
    FData:TMCHMemoryStream;
    FOnDataRecieved:TNotifyEvent;
    FOnConnect:TNotifyEvent;
  protected
  public
    constructor Create(CreateSuspended:boolean);
    procedure Execute;override;
    destructor Destroy;override;
    function ReadData(OutStream:TStream):integer; {returns bytes read}
    property OnDataRecieved:TNotifyEvent read FOnDataRecieved write FOnDataRecieved;
    property PipeReadHandle:TMCHHandle read FPipeReadHandle write FPipeReadHandle;
    property OnConnect:TNotifyEvent read FOnConnect write FOnConnect;
  end;

implementation

uses MCHPipeInterface2;

const
  BufSize = 4096;

type
  DataBuf = array[0..BufSize - 1] of integer;

procedure TMCHPipeThread.Execute;
begin
  if Assigned(FOnTerminate) then FOnTerminate(Self);
end;

constructor TMCHPipeReaderThread.Create(CreateSuspended:boolean);
begin
  inherited Create(CreateSuspended);
  FDataMutex := CreateMutex(nil,false,nil);
  FData := TMCHMemoryStream.Create;
end;

destructor TMCHPipeReaderThread.Destroy;
begin
  Terminate;
  if Suspended then Resume;
  WaitFor;
  FData.Free;
  CloseHandle(FDataMutex);
  inherited Destroy;
end;

function TMCHPipeReaderThread.ReadData(OutStream:TStream):integer;

begin
  WaitForSingleObject(FDataMutex,INFINITE);
  try
    OutStream.Seek(0,soFromEnd);
    FData.Seek(0,soFromBeginning);
    Result := FData.Size;
    OutStream.CopyFrom(FData,FData.Size);
    FData.Clear;
  finally
    ReleaseMutex(FDataMutex);
  end;
end;

procedure TMCHPipeReaderThread.Execute;

var
  Buffer:DataBuf;
  BytesToRead,BytesThisTime:integer;

begin
  FTermReason := WaitForPeer(FPipeReadHandle);
  if FTermReason <> meOK then
    terminate
  else
    if Assigned(FOnConnect) then FOnConnect(Self);
  while not terminated do
  begin
    FTermReason := PeekData(FPipeReadHandle,BytesToRead);
    if FTermReason <> meOK then
      terminate;
    if (not terminated) then
    begin
      if BytesToRead <= 0 then
      begin
        {Callback handler should implement lazy async notification}
        if Assigned(FOnDataRecieved) then FOnDataRecieved(Self);
        BytesToRead := 1;
      end;
      if BytesToRead > BufSize then
        BytesThisTime := BufSize
      else
        BytesThisTime := BytesToRead;
      FTermReason := MCHPipeInterface2.ReadData(FPipeReadHandle,Buffer,BytesThisTime);
      if FTermReason <> meOK then
        terminate
      else
      begin
        WaitForSingleObject(FDataMutex,INFINITE);
        FData.Seek(0,soFromEnd);
        FData.WriteBuffer(Buffer,BytesThisTime);
        ReleaseMutex(FDataMutex);
      end;
    end;
  end;
  inherited Execute;
end;

constructor TMCHPipeWriterThread.Create(CreateSuspended:boolean);
begin
  inherited Create(CreateSuspended);
  FDataMutex := CreateMutex(nil,false,nil);
  FIdleSemaphore := CreateSemaphore(nil,0,High(Integer),nil);
  FData := TMCHMemoryStream.Create;
end;

destructor TMCHPipeWriterThread.Destroy;
begin
  Terminate;
  ReleaseSemaphore(FIdleSemaphore,1,nil);
  if Suspended then Resume;
  WaitFor;
  FData.Free;
  CloseHandle(FDataMutex);
  inherited Destroy;
end;

function TMCHPipeWriterThread.WriteData(InStream:TStream):integer;

begin
  InStream.Seek(0,soFromBeginning);
  WaitForSingleObject(FDataMutex,INFINITE);
  try
    Result := InStream.Size;
    FData.Seek(0,soFromEnd);
    FData.CopyFrom(InStream,InStream.Size);
  finally
    ReleaseMutex(FDataMutex);
  end;
  ReleaseSemaphore(FIdleSemaphore,1,nil);
end;

procedure TMCHPipeWriterThread.Execute;

var
  Buf:DataBuf;
  BytesThisTime,BytesToWrite:integer;

begin
  while not (terminated) do
  begin
    WaitForSingleObject(FDataMutex,INFINITE);
    BytesToWrite := FData.Size - FWriteIdx;
    ReleaseMutex(FDataMutex);
    while (BytesToWrite > 0) and (not terminated) do
    begin
      if BytesToWrite > BufSize then
        BytesThisTime := BufSize
      else
        BytesThisTime := BytesToWrite;
      WaitForSingleObject(FDataMutex,INFINITE);
      FData.Seek(FWriteIdx,soFromBeginning);
      FData.ReadBuffer(Buf,BytesThisTime);
      ReleaseMutex(FDataMutex);
      {Note that we should not block when we have the mutex!}
      FTermReason := MCHPipeInterface2.WriteData(FPipeWriteHandle,Buf,BytesThisTime);
      if (FTermReason = meOK) then
      begin
        BytesToWrite := BytesToWrite - BytesThisTime;
        FWriteIdx := FWriteIdx + BytesThisTime;
      end
      else
        terminate;
    end;
    if (not (terminated)) then
    begin
      WaitForSingleObject(FDataMutex,INFINITE);
      {Cannot be sure that the stream hasn't been written to in the meantime!}
      {When the expression below is false, the wait on the idle semaphore
       will not block, so the stream should not get unnecessarily large}
      if FWriteIdx = FData.Size then
      begin
        FData.Clear;
        FWriteIdx := 0;
      end;
      ReleaseMutex(FDataMutex);
      WaitForSingleObject(FIdleSemaphore,INFINITE);
    end;
  end;
  inherited Execute;
end;

end.




--------------------
Все знать невозможно, но хочется
PM ICQ   Вверх
Закрытая темаСоздание новой темы Создание опроса
Правила форума "Delphi: WinAPI и системное программирование"
Snowybartram
MetalFanbems
PoseidonRrader
Riply

Запрещено:

1. Публиковать ссылки на вскрытые компоненты

2. Обсуждать взлом компонентов и делиться вскрытыми компонентами

  • Литературу по Delphi обсуждаем здесь
  • Действия модераторов можно обсудить здесь
  • С просьбами о написании курсовой, реферата и т.п. обращаться сюда
  • Вопросы по реализации алгоритмов рассматриваются здесь
  • 90% ответов на свои вопросы можно найти в DRKB (Delphi Russian Knowledge Base) - крупнейшем в рунете сборнике материалов по Дельфи
  • 99% ответов по WinAPI можно найти в MSDN Library, оставшиеся 1% здесь

Если Вам понравилась атмосфера форума, заходите к нам чаще! С уважением, Snowy, bartram, MetalFan, bems, Poseidon, Rrader, Riply.

 
1 Пользователей читают эту тему (1 Гостей и 0 Скрытых Пользователей)
0 Пользователей:
« Предыдущая тема | Delphi: WinAPI и системное программирование | Следующая тема »


 




[ Время генерации скрипта: 0.2808 ]   [ Использовано запросов: 21 ]   [ GZIP включён ]


Реклама на сайте     Информационное спонсорство

 
По вопросам размещения рекламы пишите на vladimir(sobaka)vingrad.ru
Отказ от ответственности     Powered by Invision Power Board(R) 1.3 © 2003  IPS, Inc.