Имеется следующий граф зависимости потоков:  Также потоки должны передавать дальше результаты своей работы. Класс потоков слева: Код | using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO.Pipes; using System.Threading; using System.IO; using System.Runtime.Serialization.Formatters.Binary; using System.Runtime.Serialization.Formatters;
namespace Kramer { public class LeftActivity : Activity { private NamedPipeServerStream outStream;
public LeftActivity(int name,ThreadMaterial material) { this.id = name; outStream = new NamedPipeServerStream(name.ToString(), PipeDirection.Out); this.ThreadMaterial = material; }
private void SerializeMaterial() { Stream stream = null; try { BinaryFormatter formatter = new BinaryFormatter(); stream = new FileStream(this.ID.ToString(), FileMode.Create, FileAccess.Write, FileShare.None); formatter.Serialize(stream, this.ThreadMaterial); } catch { // do nothing, just ignore any possible errors } finally { if (null != stream) stream.Close(); } } public override void Activate() { new Thread(() => { this.ThreadMaterial.Run(); //Console.WriteLine("Causal Activity {0} wait for Dependent Activity", m_name); this.SerializeMaterial(); outStream.WaitForConnection(); //Console.WriteLine("Causal Activity {0} is ready to signal", m_name);
}).Start(); }
} }
|
Класс потоков справа (турникет на семафорах): Код | using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO.Pipes; using System.Threading; using System.IO; using System.Runtime.Serialization.Formatters.Binary; using System.Runtime.Serialization.Formatters; using System.Runtime.Serialization;
namespace Kramer { public class RightActivity : Activity { private List<int> causalActivities = new List<int>(); private Printer printer; public RightActivity(Printer pr) { printer = pr; } public void AddCausalActivity(int causalServerPipe) { causalActivities.Add(causalServerPipe); } public override void Activate() { new Thread(() => { Semaphore s = new Semaphore(0,1); Semaphore mutex = new Semaphore(1,1); int count = 0; foreach (var causalServerPipe in causalActivities) { NamedPipeClientStream inStream = new NamedPipeClientStream(".", causalServerPipe.ToString(), PipeDirection.In); inStream.Connect(); try { IFormatter formatter = new BinaryFormatter(); Stream stream = new FileStream(causalServerPipe.ToString(), FileMode.Open, FileAccess.Read, FileShare.None); ThreadMaterial formerResults = (ThreadMaterial)formatter.Deserialize(stream); lock(this) { printer.SetResults(causalServerPipe, formerResults.results); } mutex.WaitOne(); count++; mutex.Release(); if (count == 2) s.Release(); } catch { } s.WaitOne(); s.Release(); } printer.Run();
}).Start(); } } }
|
В средних точках нужно ждать коннекта с двух потоков и потом запускать свой коннект. Я не соображу, как это сделать правильно, приведу сырой нерабочий код, который есть на данный момент: Код | using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO.Pipes; using System.Threading; using System.IO; using System.Runtime.Serialization.Formatters.Binary; using System.Runtime.Serialization.Formatters; using System.Runtime.Serialization;
namespace Kramer { public class TwoWayActivity : Activity { private List<int> causalActivities = new List<int>(); private NamedPipeServerStream outStream; public TwoWayActivity (int name) { this.id = name; outStream = new NamedPipeServerStream(name.ToString(), PipeDirection.Out); } public void AddCausalActivity(int causalServerPipe) { causalActivities.Add(causalServerPipe); } private void SerializeMaterial() { Stream stream = null; try { BinaryFormatter formatter = new BinaryFormatter(); stream = new FileStream(this.ID.ToString(), FileMode.Create, FileAccess.Write, FileShare.None); formatter.Serialize(stream, this.ThreadMaterial); } catch { // do nothing, just ignore any possible errors } finally { if (null != stream) stream.Close(); } } public override void Activate() { //string number; Thread t0 = new Thread(TwoWayActivity.Listen); Thread t4 = new Thread(TwoWayActivity.Listen); Thread t5 = new Thread(TwoWayActivity.Listen); t0.Start(); t4.Start(); t5.Start(); t0.Join(); t4.Join(); this.ThreadMaterial.Run(); this.SerializeMaterial(); outStream.WaitForConnection(); t5.Join(); //foreach (var causalServerPipe in causalActivities) //{ // NamedPipeClientStream inStream = new NamedPipeClientStream(".", causalServerPipe.ToString(), PipeDirection.In); // inStream.Connect(); // try // { // IFormatter formatter = new BinaryFormatter(); // Stream stream = new FileStream(inStream.ToString(), FileMode.Open, FileAccess.Read, FileShare.None); // ThreadMaterial formerResults = (ThreadMaterial)formatter.Deserialize(stream); // } // catch // { } //} this.ThreadMaterial.Run(); //Console.WriteLine("Causal Activity {0} wait for Dependent Activity", m_name); this.SerializeMaterial(); outStream.WaitForConnection(); } public static void Listen(string number) { NamedPipeClientStream inStream = new NamedPipeClientStream(".", number.ToString(), PipeDirection.In); inStream.Connect(); try { IFormatter formatter = new BinaryFormatter(); Stream stream = new FileStream(inStream.ToString(), FileMode.Open, FileAccess.Read, FileShare.None); } catch { } } } }
|
Это сообщение отредактировал(а) Hohhi - 7.11.2010, 10:34
|