2 Rendezvous System
Dan Bohus редактировал(а) эту страницу 2022-04-20 22:48:28 +00:00

The rendezvous system allows each pipeline process to advertise its available streams and to discover those of other pipelines. This is accomplished by a centralized "rendezvous point" which maintains and relays endpoint connection and stream information.

Note: The RedezvousClient and RedezvousServer components below rely on TCP sockets, and all communication happens in the clear. These communication channels are not secure, and the user must ensure the security of the network as appropriate.

Rendezvous Information

The Rendezvous class holds information about available streams of data as a hierarchy of processes, containing endpoints, containing streams. Each Rendezvous.Process is named, as well as optionally versioned, and contains a collection of Rendezvous.Endpoints. Endpoints represent the information needed to connect to the various types of network stream protocols available in \psi.

  • Rendezvous.TcpEndpoint represents TcpWriter output (address and port)
  • Rendezvous.NetMQEndpoint represents NetMQWriters (address)
  • Rendezvous.RemoteExporterEndpoint represents RemoteExporters (host, port and transport [TCP/UDP/Named Pipes])
  • Rendezvous.RemoteClockExporterEndpoint represents RemoteClockExporters (host and port)

Each endpoint contains a collection of Rendezvous.Streams which describe the streams available (names, message types). TcpEndpoints happen to always map to a single stream, while NetMQEndpoints may have one per topic and RemoteExporterEndpoints often have many \psi streams.

Batches of endpoints come and go en masse with \psi pipelines. That is, as pipelines containing components exposing network endpoints are started and stopped, these endpoints come and go together. These "batches" of endpoints are represented by the named Rendezvous.Processes. To a Rendezvous point, we may TryAdd/RemoveProcess(...) and enumerate the current Processes. There are also ProcessAdded/Removed events to which we can subscribe.

Server

The RendezvousServer relays the state of a Rendezvous point to one or more clients. It listens on a TCP port (default 13331) for clients that may be interested in rendezvous information. It may be hosted by an app:

var server = new RendezvousServer();
server.Start();

An application may create a \psi pipeline containing components such as RemoteExporter, RemoteClockExporter, NetMQWriter and TcpWriter:

using (var pipeline = Pipeline.Create())
{
    // create pipeline components, including:
    var remoteExporter = new RemoteExporter(pipeline);
    myStream.Write("MyStream", remoteExporter);
    myOtherStream.Write("MyOtherStream", remoteExporter);

    var remoteClock = new RemoteClockExporter(Remoting.DefaultClockSyncPort);

    var netMqWriter = new NetMQWriter<double>(pipeline, "MyTopic", "tcp://127.0.0.1:30000", MessagePackFormat.Instance);
    var receiver = netMqWriter.AddTopic<int>("MyOtherTopic");

    var tcpWriter = new TcpWriter<double>(pipeline, "MyStream", port: 112233, MessagePackFormat.Instance);
}

Notice that some endpoints may expose multiple message streams under a single endpoint. For example, the myStream and myOtherStream streams written to remoteExporter above or the "MyTopic" and "MyOtherTopic" topics added to the netMqWriter. Other endpoints implicitly expose just a single stream (e.g. tcpWriter). Still others (e.g. remoteClock) may expose connection information on the endpoint but no message streams. Information about these endpoint may is published by adding a Rendezvous.Process representing the pipeline, containing Rendezvous.Endpoint instances representing each endpoint exposed by components within.

var currentMachine = "123.123.123.123";
server.Rendezvous.TryAddProcess(
    new Rendezvous.Process(
        "MyApp",
        new[] {
            remoteClock.ToRendezvousEndpoint(host: currentMachine),
            remoteExporter.ToRendezvousEndpoint(host: currentMachine),
            tcpWriter.ToRendezvousEndpoint(address: currentMachine),
            netMqWriter.ToRendezvousEndpoint(),
        },
        "Version1.0"));

As pipelines are shutdown, server.Rendezvous.TryRemoveProcess("MyApp") should be called.

Processes may be added directly at the server or it may merely wait for clients to connect and push updates. When updates are made, these are replicated to all of the clients. In this way all of the clients and the server maintain a synchronized view of the rendezvous information.

Clients

RendezvousClients connect to servers and similarly relay Rendezvous state to/from the server. Beginning with only the server address and port (default 13331), a client may discover all of the other \psi processes, endpoints and streams available on the network. For example, to discover the endpoints published above:

var client = new RendezvousClient(address: "123.123.123.123");
client.Rendezvous.ProcessAdded += (_, p) =>
{
    if (process.Name == "MyApp")
    {
        if (process.Version != "Version1.0")
        {
            throw new Exception("Unexpected process version.");
        }

        foreach (var endpoint in process.Endpoints)
        {
            if (endpoint is Rendezvous.RemoteExporterEndpoint remoteExporterEndpoint)
            {
                var remoteImporter = remoteExporterEndpoint.ToRemoteImporter(pipeline);
                foreach (var stream in remoteExporterEndpoint.Streams)
                {
                    if (stream.StreamName == "MyStream" && stream.TypeName == typeof(double).ToString())
                    {
                        var myStream = remoteImporter.Importer.OpenStream<double>(stream.StreamName);
                    }
                }
            }
            else if (endpoint is Rendezvous.RemoteClockExporterEndpoint remoteClockEndpoint)
            {
                var remoteClockImporter = remoteClockEndpoint.ToRemoteClockImporter(pipeline);
            }
            else if (endpoint is Rendezvous.TcpSourceEndpoint tcpSourceEndpoint)
            {
                var tcpSource = tcpSourceEndpoint.ToTcpSource<double>(pipeline, MessagePackFormat.Instance);
            }
            else if (endpoint is Rendezvous.NetMQSourceEndpoint netMqSourceEndpoint)
            {
                foreach (var stream in netMqSourceEndpoint.Streams)
                {
                    if (stream.StreamName == "MyTopic")
                    {
                        var netMqSource = netMqSourceEndpoint.ToNetMQSource<double>(pipeline, stream.StreamName, MessagePackFormat.Instance);
                    }
                }
            }
        }
    }
};
client.Connected.WaitOne();

Upon initially connecting, clients will immediately be updated with the current state at the server. To ensure that the connection and initial update is complete, client.Connected.WaitOne().

Notice also, the use of extension methods such as ToRemoteImporter(...), ToRemoteClockImporter(...), ToTcpSource(...) and ToNetMQSource(...) may be used to easily convert the Rendezvous.Endpoint information to a concrete component communicating with the endpoint.

Python Client

A python client implementation is available. This may be used along with ZeroMQ to bridge to \psi. Example usage:

client = RendezvousClient('123.123.123.123')
client.start(
    lambda p: print(f"Added: {p['name']}"),
    lambda p: print(f"Removed: {p['name']}"))

It may be used for discovery or process information may be added/removed on the Python side:

process = { 'name': 'Foo',
            'endpoints': [
                { 'endpoint': RendezvousClient.Endpoint.NetMQSource,
                  'address': 'tcp://1.2.3.4:567',
                  'streams': [{ 'name': 'Bar',
                                'type': 'SomeType' },
                              { 'name': 'Baz',
                                'type': 'SomeOtherType' }]}]}

client.removeProcess(process)
client.addProcess(process)

Helpers

To assist in constructing endpoints to describe connection information, extension methods are available in the Microsoft.Psi.Interop.Rendezvous namespace allowing for conversion from TcpWriter/NetMQWriter/RemoteExporter .ToRendezvousEndpoint(...) and from each type of Tcp/NetMq/RemoteExporterEndpoint back to a corresponding ToTcpSource(...)/ToNetMQSource(...)/ToRemoteImporter(...).