diff --git a/pkg/beam/examples/beamsh/beamsh.go b/pkg/beam/examples/beamsh/beamsh.go index f3930e0ebb..f4bc2bf2bd 100644 --- a/pkg/beam/examples/beamsh/beamsh.go +++ b/pkg/beam/examples/beamsh/beamsh.go @@ -407,44 +407,23 @@ func GetHandler(name string) Handler { defer stderr.Close() var tasks sync.WaitGroup defer tasks.Wait() - for { - payload, attachment, err := in.Receive() - if err != nil { - return - } - cmd := data.Message(payload).Get("cmd") - if attachment != nil && len(cmd) > 0 && cmd[0] == "log" { - w, err := beam.SendPipe(out, payload) - if err != nil { - attachment.Close() - fmt.Fprintf(stderr, "sendpipe: %v\n", err) - return - } - tasks.Add(1) - go func(payload []byte, attachment *os.File, sink *os.File) { - defer tasks.Done() - defer attachment.Close() - defer sink.Close() - cmd := data.Message(payload).Get("cmd") - if cmd == nil || len(cmd) == 0 { - return - } - if cmd[0] != "log" { - return - } - var output io.Writer - if len(cmd) == 1 || cmd[1] == "stdout" { - output = os.Stdout - } else if cmd[1] == "stderr" { - output = os.Stderr - } - io.Copy(io.MultiWriter(output, sink), attachment) - }(payload, attachment, w) - } else { - if err := out.Send(payload, attachment); err != nil { - return - } - } + + r := beam.NewRouter(out) + r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { + tasks.Add(1) + go func() { + defer tasks.Done() + defer attachment.Close() + io.Copy(os.Stdout, attachment) + attachment.Close() + }() + return nil + }).Tee(out) + + if _, err := beam.Copy(r, in); err != nil { + Fatal(err) + fmt.Fprintf(stderr, "%v\n", err) + return } } } else if name == "echo" {