Содержание
The rendezvous system allows each pipeline process to advertise its available streams and to discover those of other pipelines. This is accomplished by a centralized "rendezvous point" which maintains and relays endpoint connection and stream information.
Note: The RedezvousClient
and RedezvousServer
components below rely on TCP sockets, and all communication happens in the clear. These communication channels are not secure, and the user must ensure the security of the network as appropriate.
Rendezvous Information
The Rendezvous
class holds information about available streams of data as a hierarchy of processes, containing endpoints, containing streams. Each Rendezvous.Process
is named, as well as optionally versioned, and contains a collection of Rendezvous.Endpoints
. Endpoints represent the information needed to connect to the various types of network stream protocols available in \psi.
Rendezvous.TcpEndpoint
representsTcpWriter
output (address and port)Rendezvous.NetMQEndpoint
representsNetMQWriters
(address)Rendezvous.RemoteExporterEndpoint
representsRemoteExporters
(host, port and transport [TCP/UDP/Named Pipes])Rendezvous.RemoteClockExporterEndpoint
representsRemoteClockExporters
(host and port)
Each endpoint contains a collection of Rendezvous.Streams
which describe the streams available (names, message types). TcpEndpoints
happen to always map to a single stream, while NetMQEndpoints
may have one per topic and RemoteExporterEndpoints
often have many \psi streams.
Batches of endpoints come and go en masse with \psi pipelines. That is, as pipelines containing components exposing network endpoints are started and stopped, these endpoints come and go together. These "batches" of endpoints are represented by the named Rendezvous.Processes
. To a Rendezvous
point, we may TryAdd
/RemoveProcess(...)
and enumerate the current Processes
. There are also ProcessAdded
/Removed
events to which we can subscribe.
Server
The RendezvousServer
relays the state of a Rendezvous
point to one or more clients. It listens on a TCP port (default 13331) for clients that may be interested in rendezvous information. It may be hosted by an app:
var server = new RendezvousServer();
server.Start();
An application may create a \psi pipeline containing components such as RemoteExporter
, RemoteClockExporter
, NetMQWriter
and TcpWriter
:
using (var pipeline = Pipeline.Create())
{
// create pipeline components, including:
var remoteExporter = new RemoteExporter(pipeline);
myStream.Write("MyStream", remoteExporter);
myOtherStream.Write("MyOtherStream", remoteExporter);
var remoteClock = new RemoteClockExporter(Remoting.DefaultClockSyncPort);
var netMqWriter = new NetMQWriter<double>(pipeline, "MyTopic", "tcp://127.0.0.1:30000", MessagePackFormat.Instance);
var receiver = netMqWriter.AddTopic<int>("MyOtherTopic");
var tcpWriter = new TcpWriter<double>(pipeline, "MyStream", port: 112233, MessagePackFormat.Instance);
}
Notice that some endpoints may expose multiple message streams under a single endpoint. For example, the myStream
and myOtherStream
streams written to remoteExporter
above or the "MyTopic" and "MyOtherTopic" topics added to the netMqWriter
. Other endpoints implicitly expose just a single stream (e.g. tcpWriter
). Still others (e.g. remoteClock
) may expose connection information on the endpoint but no message streams. Information about these endpoint may is published by adding a Rendezvous.Process
representing the pipeline, containing Rendezvous.Endpoint
instances representing each endpoint exposed by components within.
var currentMachine = "123.123.123.123";
server.Rendezvous.TryAddProcess(
new Rendezvous.Process(
"MyApp",
new[] {
remoteClock.ToRendezvousEndpoint(host: currentMachine),
remoteExporter.ToRendezvousEndpoint(host: currentMachine),
tcpWriter.ToRendezvousEndpoint(address: currentMachine),
netMqWriter.ToRendezvousEndpoint(),
},
"Version1.0"));
As pipelines are shutdown, server.Rendezvous.TryRemoveProcess("MyApp")
should be called.
Processes may be added directly at the server or it may merely wait for clients to connect and push updates. When updates are made, these are replicated to all of the clients. In this way all of the clients and the server maintain a synchronized view of the rendezvous information.
Clients
RendezvousClients
connect to servers and similarly relay Rendezvous
state to/from the server. Beginning with only the server address and port (default 13331), a client may discover all of the other \psi processes, endpoints and streams available on the network. For example, to discover the endpoints published above:
var client = new RendezvousClient(address: "123.123.123.123");
client.Rendezvous.ProcessAdded += (_, p) =>
{
if (process.Name == "MyApp")
{
if (process.Version != "Version1.0")
{
throw new Exception("Unexpected process version.");
}
foreach (var endpoint in process.Endpoints)
{
if (endpoint is Rendezvous.RemoteExporterEndpoint remoteExporterEndpoint)
{
var remoteImporter = remoteExporterEndpoint.ToRemoteImporter(pipeline);
foreach (var stream in remoteExporterEndpoint.Streams)
{
if (stream.StreamName == "MyStream" && stream.TypeName == typeof(double).ToString())
{
var myStream = remoteImporter.Importer.OpenStream<double>(stream.StreamName);
}
}
}
else if (endpoint is Rendezvous.RemoteClockExporterEndpoint remoteClockEndpoint)
{
var remoteClockImporter = remoteClockEndpoint.ToRemoteClockImporter(pipeline);
}
else if (endpoint is Rendezvous.TcpSourceEndpoint tcpSourceEndpoint)
{
var tcpSource = tcpSourceEndpoint.ToTcpSource<double>(pipeline, MessagePackFormat.Instance);
}
else if (endpoint is Rendezvous.NetMQSourceEndpoint netMqSourceEndpoint)
{
foreach (var stream in netMqSourceEndpoint.Streams)
{
if (stream.StreamName == "MyTopic")
{
var netMqSource = netMqSourceEndpoint.ToNetMQSource<double>(pipeline, stream.StreamName, MessagePackFormat.Instance);
}
}
}
}
}
};
client.Connected.WaitOne();
Upon initially connecting, clients will immediately be updated with the current state at the server. To ensure that the connection and initial update is complete, client.Connected.WaitOne()
.
Notice also, the use of extension methods such as ToRemoteImporter(...)
, ToRemoteClockImporter(...)
, ToTcpSource(...)
and ToNetMQSource(...)
may be used to easily convert the Rendezvous.Endpoint
information to a concrete component communicating with the endpoint.
Python Client
A python client implementation is available. This may be used along with ZeroMQ to bridge to \psi. Example usage:
client = RendezvousClient('123.123.123.123')
client.start(
lambda p: print(f"Added: {p['name']}"),
lambda p: print(f"Removed: {p['name']}"))
It may be used for discovery or process information may be added/removed on the Python side:
process = { 'name': 'Foo',
'endpoints': [
{ 'endpoint': RendezvousClient.Endpoint.NetMQSource,
'address': 'tcp://1.2.3.4:567',
'streams': [{ 'name': 'Bar',
'type': 'SomeType' },
{ 'name': 'Baz',
'type': 'SomeOtherType' }]}]}
client.removeProcess(process)
client.addProcess(process)
Helpers
To assist in constructing endpoints to describe connection information, extension methods are available in the Microsoft.Psi.Interop.Rendezvous
namespace allowing for conversion from TcpWriter
/NetMQWriter
/RemoteExporter
.ToRendezvousEndpoint(...)
and from each type of Tcp
/NetMq
/RemoteExporterEndpoint
back to a corresponding ToTcpSource(...)
/ToNetMQSource(...)
/ToRemoteImporter(...)
.
- Basic Stream Operators
- Writing Components
- Pipeline Execution
- Delivery Policies
- Stream Fusion and Merging
- Interpolation and Sampling
- Windowing Operators
- Stream Generators
- Parallel Operator
- Intervals
- Data Visualization (PsiStudio)
- Data Annotation (PsiStudio)
- Distributed Systems
- Bridging to Other Ecosystems
- Debugging and Diagnostics
- Shared Objects
- Datasets
- Event Sources
- 3rd Party Visualizers
- 3rd Party Stream Readers
Components and Toolkits
- List of NuGet Packages
- List of Components
- Audio Overview
- Azure Kinect Overview
- Kinect Overview
- Speech and Language Overview
- Imaging Overview
- Media Overview
- ONNX Overview
- Finite State Machine Toolkit
- Mixed Reality Overview
- How to Build/Configure
- How to Define Tasks
- How to Place Holograms
- Data Types Collected
- System Transparency Note
Community
Project Management