diff --git a/components/camera/client.go b/components/camera/client.go index 682e82dd732..e4c406f764a 100644 --- a/components/camera/client.go +++ b/components/camera/client.go @@ -661,9 +661,14 @@ func (c *client) Unsubscribe(ctx context.Context, id rtppassthrough.Subscription } func (c *client) trackName() string { - // if c.conn is a *grpc.SharedConn then the client - // is talking to a module and we need to send the fully qualified name - if _, ok := c.conn.(*grpc.SharedConn); ok { + // if c.conn is a *grpc.SharedConn then, this is being used for communication between a + // viam-server and module. The viam-server will have one SharedConn and the module will have a + // different one. When asking a module to start a video stream, we create a track name with the + // full resource name (i.e: rdk:components:camera/foo). + // + // Modules talking back to the viam-server for a camera stream should use the "short + // name"/`SDPTrackName` (i.e: `foo`). + if sc, ok := c.conn.(*grpc.SharedConn); ok && sc.IsConnectedToModule() { return c.Name().String() } @@ -673,6 +678,7 @@ func (c *client) trackName() string { // as the remote doesn't know it's own name from the perspective of the main part return c.Name().PopRemote().SDPTrackName() } + // in this case we are talking to a main part & the remote name (if it exists) needs to be preserved return c.Name().SDPTrackName() } diff --git a/grpc/shared_conn.go b/grpc/shared_conn.go index 79ba0d0d85b..a4c04c17076 100644 --- a/grpc/shared_conn.go +++ b/grpc/shared_conn.go @@ -25,6 +25,10 @@ import ( // a resource may register with a SharedConn which supports WebRTC. type OnTrackCB func(tr *webrtc.TrackRemote, r *webrtc.RTPReceiver) +//nolint +// The following describes the SharedConn lifetime for viam-server's modmanager communicating with +// modules it has spawned. +// // SharedConn wraps both a GRPC connection & (optionally) a peer connection & controls access to both. // For modules, the grpc connection is over a Unix socket. The WebRTC `PeerConnection` is made // separately. The `SharedConn` continues to implement the `rpc.ClientConn` interface by pairing up @@ -71,10 +75,9 @@ type SharedConn struct { // `peerConnMu` synchronizes changes to the underlying `peerConn`. Such that calls consecutive // calls to `GrpcConn` and `PeerConn` will return connections from the same (or newer, but not // prior) "generations". - peerConnMu sync.Mutex - peerConn *webrtc.PeerConnection - peerConnReady <-chan struct{} - peerConnClosed <-chan struct{} + peerConnMu sync.Mutex + peerConn *webrtc.PeerConnection + peerConnReady <-chan struct{} // peerConnFailed gets closed when a PeerConnection fails to connect. The peerConn pointer is // set to nil before this channel is closed. peerConnFailed chan struct{} @@ -82,9 +85,60 @@ type SharedConn struct { onTrackCBByTrackNameMu sync.Mutex onTrackCBByTrackName map[string]OnTrackCB + // isConnectedToViamServer identifies whether this SharedConn is running inside a viam-server + // talking to a module, or a module talking to a viam-server. We use this to determine whether + // to use long names or short names for PeerConnection video (or audio) track names. + isConnectedToViamServer bool + logger logging.Logger } +// NewSharedConnForModule acts as a constructor for `SharedConn` for modules that are communicating +// back to their parent viam-server. +func NewSharedConnForModule(grpcConn rpc.ClientConn, peerConn *webrtc.PeerConnection, logger logging.Logger) *SharedConn { + // We must be passed a ready connection. + pcReady := make(chan struct{}) + close(pcReady) + + ret := &SharedConn{ + peerConn: peerConn, + peerConnReady: pcReady, + // We were passed in a ready connection. Only create this for when `Close` is called. + peerConnFailed: make(chan struct{}), + onTrackCBByTrackName: make(map[string]OnTrackCB), + isConnectedToViamServer: true, + logger: logger, + } + ret.grpcConn.ReplaceConn(grpcConn) + + ret.peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) { + ret.onTrackCBByTrackNameMu.Lock() + onTrackCB, ok := ret.onTrackCBByTrackName[trackRemote.StreamID()] + ret.onTrackCBByTrackNameMu.Unlock() + if !ok { + msg := "Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v" + ret.logger.Errorf(msg, trackRemote.StreamID(), maps.Keys(ret.onTrackCBByTrackName)) + return + } + onTrackCB(trackRemote, rtpReceiver) + }) + + return ret +} + +// IsConnectedToModule returns whether this shared conn is being used to communicate with a module. +func (sc *SharedConn) IsConnectedToModule() bool { + return sc.isConnectedToViamServer +} + +// IsConnectedToViamServer returns whether this shared conn is being used to communicate with a +// viam-server. Note this implies the client is running within a module process. Typical +// clients/remote connections are a pure webrtc connection. As opposed to a frankenstein tcp/unix +// socket + webrtc connection. +func (sc *SharedConn) IsConnectedToViamServer() bool { + return !sc.isConnectedToViamServer +} + // Invoke forwards to the underlying GRPC Connection. func (sc *SharedConn) Invoke( ctx context.Context, @@ -143,8 +197,8 @@ func (sc *SharedConn) PeerConn() *webrtc.PeerConnection { return ret } -// ResetConn acts as a constructor for `SharedConn`. ResetConn replaces the underlying -// connection objects in addition to some other initialization. +// ResetConn acts as a constructor for `SharedConn` inside the viam-server (not modules). ResetConn +// replaces the underlying connection objects in addition to some other initialization. // // The first call to `ResetConn` is guaranteed to happen before any access to connection objects // happens. But subequent calls can be entirely asynchronous to components/services accessing @@ -193,7 +247,7 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger } sc.peerConn = peerConn - sc.peerConnReady, sc.peerConnClosed, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleClient, sc.logger) + sc.peerConnReady, _, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleServer, sc.logger) if err != nil { sc.logger.Warnw("Unable to create optional renegotiation channel for module. Ignoring.", "err", err) return diff --git a/module/module.go b/module/module.go index cd02bdee8e1..4e7e06f04a8 100644 --- a/module/module.go +++ b/module/module.go @@ -395,6 +395,9 @@ func (m *Module) connectParent(ctx context.Context) error { } m.parent = rc + if m.pc != nil { + m.parent.SetPeerConnection(m.pc) + } return nil } diff --git a/robot/client/client.go b/robot/client/client.go index 55482769893..a35b9ee4263 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -16,6 +16,7 @@ import ( grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/grpcreflect" + "github.com/viamrobotics/webrtc/v3" "go.uber.org/multierr" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -112,6 +113,12 @@ type RobotClient struct { // webrtc. We don't want a network disconnect to result in reconnecting over tcp such that // performance would be impacted. serverIsWebrtcEnabled bool + + // If this client is running in a module process and this client represents the gRPC connection + // back to a viam-server. If this is true, `pc` and `sharedConn` are expected to be set. + isModuleConnection bool + pc *webrtc.PeerConnection + sharedConn *grpc.SharedConn } // RemoteTypeName is the type name used for a remote. This is for internal use. @@ -268,6 +275,7 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible sessionsDisabled: rOpts.disableSessions, heartbeatCtx: heartbeatCtx, heartbeatCtxCancel: heartbeatCtxCancel, + isModuleConnection: rOpts.modName != "", } // interceptors are applied in order from first to last @@ -680,10 +688,10 @@ func (rc *RobotClient) ResourceByName(name resource.Name) (resource.Resource, er func (rc *RobotClient) createClient(name resource.Name) (resource.Resource, error) { apiInfo, ok := resource.LookupGenericAPIRegistration(name.API) if !ok || apiInfo.RPCClient == nil { - return grpc.NewForeignResource(name, &rc.conn), nil + return grpc.NewForeignResource(name, rc.getClientConn()), nil } logger := rc.Logger().Sublogger(resource.RemoveRemoteName(name).ShortName()) - return apiInfo.RPCClient(rc.backgroundCtx, &rc.conn, rc.remoteName, name, logger) + return apiInfo.RPCClient(rc.backgroundCtx, rc.getClientConn(), rc.remoteName, name, logger) } func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resource.RPCAPI, error) { @@ -1299,6 +1307,27 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest return errors.Join(err, readerSenderErr, recvWriterErr) } +// SetPeerConnection is only to be called internally from modules. +func (rc *RobotClient) SetPeerConnection(pc *webrtc.PeerConnection) { + rc.mu.Lock() + rc.pc = pc + rc.mu.Unlock() +} + +func (rc *RobotClient) getClientConn() rpc.ClientConn { + // Must be called with `rc.mu` in ReadLock+ mode. + if rc.sharedConn != nil { + return rc.sharedConn + } + + if rc.pc == nil { + return &rc.conn + } + + rc.sharedConn = grpc.NewSharedConnForModule(&rc.conn, rc.pc, rc.logger.Sublogger("shared_conn")) + return rc.sharedConn +} + func unaryClientInterceptor() googlegrpc.UnaryClientInterceptor { return func( ctx context.Context,