Skip to content

Commit

Permalink
Address viam-server/module name mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
dgottlieb committed Feb 4, 2025
1 parent 923e3b9 commit 6f6e058
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 12 deletions.
12 changes: 9 additions & 3 deletions components/camera/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,14 @@ func (c *client) Unsubscribe(ctx context.Context, id rtppassthrough.Subscription
}

func (c *client) trackName() string {
// if c.conn is a *grpc.SharedConn then the client
// is talking to a module and we need to send the fully qualified name
if _, ok := c.conn.(*grpc.SharedConn); ok {
// if c.conn is a *grpc.SharedConn then, this is being used for communication between a
// viam-server and module. The viam-server will have one SharedConn and the module will have a
// different one. When asking a module to start a video stream, we create a track name with the
// full resource name (i.e: rdk:components:camera/foo).
//
// Modules talking back to the viam-server for a camera stream should use the "short
// name"/`SDPTrackName` (i.e: `foo`).
if sc, ok := c.conn.(*grpc.SharedConn); ok && sc.IsConnectedToModule() {
return c.Name().String()
}

Expand All @@ -673,6 +678,7 @@ func (c *client) trackName() string {
// as the remote doesn't know it's own name from the perspective of the main part
return c.Name().PopRemote().SDPTrackName()
}

// in this case we are talking to a main part & the remote name (if it exists) needs to be preserved
return c.Name().SDPTrackName()
}
Expand Down
68 changes: 61 additions & 7 deletions grpc/shared_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
// a resource may register with a SharedConn which supports WebRTC.
type OnTrackCB func(tr *webrtc.TrackRemote, r *webrtc.RTPReceiver)

//nolint
// The following describes the SharedConn lifetime for viam-server's modmanager communicating with
// modules it has spawned.
//
// SharedConn wraps both a GRPC connection & (optionally) a peer connection & controls access to both.
// For modules, the grpc connection is over a Unix socket. The WebRTC `PeerConnection` is made
// separately. The `SharedConn` continues to implement the `rpc.ClientConn` interface by pairing up
Expand Down Expand Up @@ -71,20 +75,70 @@ type SharedConn struct {
// `peerConnMu` synchronizes changes to the underlying `peerConn`. Such that calls consecutive
// calls to `GrpcConn` and `PeerConn` will return connections from the same (or newer, but not
// prior) "generations".
peerConnMu sync.Mutex
peerConn *webrtc.PeerConnection
peerConnReady <-chan struct{}
peerConnClosed <-chan struct{}
peerConnMu sync.Mutex
peerConn *webrtc.PeerConnection
peerConnReady <-chan struct{}
// peerConnFailed gets closed when a PeerConnection fails to connect. The peerConn pointer is
// set to nil before this channel is closed.
peerConnFailed chan struct{}

onTrackCBByTrackNameMu sync.Mutex
onTrackCBByTrackName map[string]OnTrackCB

// isConnectedToViamServer identifies whether this SharedConn is running inside a viam-server
// talking to a module, or a module talking to a viam-server. We use this to determine whether
// to use long names or short names for PeerConnection video (or audio) track names.
isConnectedToViamServer bool

logger logging.Logger
}

// NewSharedConnForModule acts as a constructor for `SharedConn` for modules that are communicating
// back to their parent viam-server.
func NewSharedConnForModule(grpcConn rpc.ClientConn, peerConn *webrtc.PeerConnection, logger logging.Logger) *SharedConn {
// We must be passed a ready connection.
pcReady := make(chan struct{})
close(pcReady)

ret := &SharedConn{
peerConn: peerConn,
peerConnReady: pcReady,
// We were passed in a ready connection. Only create this for when `Close` is called.
peerConnFailed: make(chan struct{}),
onTrackCBByTrackName: make(map[string]OnTrackCB),
isConnectedToViamServer: true,
logger: logger,
}
ret.grpcConn.ReplaceConn(grpcConn)

ret.peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
ret.onTrackCBByTrackNameMu.Lock()
onTrackCB, ok := ret.onTrackCBByTrackName[trackRemote.StreamID()]
ret.onTrackCBByTrackNameMu.Unlock()
if !ok {
msg := "Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v"
ret.logger.Errorf(msg, trackRemote.StreamID(), maps.Keys(ret.onTrackCBByTrackName))
return
}
onTrackCB(trackRemote, rtpReceiver)
})

return ret
}

// IsConnectedToModule returns whether this shared conn is being used to communicate with a module.
func (sc *SharedConn) IsConnectedToModule() bool {
return sc.isConnectedToViamServer
}

// IsConnectedToViamServer returns whether this shared conn is being used to communicate with a
// viam-server. Note this implies the client is running within a module process. Typical
// clients/remote connections are a pure webrtc connection. As opposed to a frankenstein tcp/unix
// socket + webrtc connection.
func (sc *SharedConn) IsConnectedToViamServer() bool {
return !sc.isConnectedToViamServer
}

// Invoke forwards to the underlying GRPC Connection.
func (sc *SharedConn) Invoke(
ctx context.Context,
Expand Down Expand Up @@ -143,8 +197,8 @@ func (sc *SharedConn) PeerConn() *webrtc.PeerConnection {
return ret
}

// ResetConn acts as a constructor for `SharedConn`. ResetConn replaces the underlying
// connection objects in addition to some other initialization.
// ResetConn acts as a constructor for `SharedConn` inside the viam-server (not modules). ResetConn
// replaces the underlying connection objects in addition to some other initialization.
//
// The first call to `ResetConn` is guaranteed to happen before any access to connection objects
// happens. But subequent calls can be entirely asynchronous to components/services accessing
Expand Down Expand Up @@ -193,7 +247,7 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger
}

sc.peerConn = peerConn
sc.peerConnReady, sc.peerConnClosed, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleClient, sc.logger)
sc.peerConnReady, _, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleServer, sc.logger)
if err != nil {
sc.logger.Warnw("Unable to create optional renegotiation channel for module. Ignoring.", "err", err)
return
Expand Down
3 changes: 3 additions & 0 deletions module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ func (m *Module) connectParent(ctx context.Context) error {
}

m.parent = rc
if m.pc != nil {
m.parent.SetPeerConnection(m.pc)
}
return nil
}

Expand Down
33 changes: 31 additions & 2 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/grpcreflect"
"github.com/viamrobotics/webrtc/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -112,6 +113,12 @@ type RobotClient struct {
// webrtc. We don't want a network disconnect to result in reconnecting over tcp such that
// performance would be impacted.
serverIsWebrtcEnabled bool

// If this client is running in a module process and this client represents the gRPC connection
// back to a viam-server. If this is true, `pc` and `sharedConn` are expected to be set.
isModuleConnection bool
pc *webrtc.PeerConnection
sharedConn *grpc.SharedConn
}

// RemoteTypeName is the type name used for a remote. This is for internal use.
Expand Down Expand Up @@ -268,6 +275,7 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible
sessionsDisabled: rOpts.disableSessions,
heartbeatCtx: heartbeatCtx,
heartbeatCtxCancel: heartbeatCtxCancel,
isModuleConnection: rOpts.modName != "",
}

// interceptors are applied in order from first to last
Expand Down Expand Up @@ -680,10 +688,10 @@ func (rc *RobotClient) ResourceByName(name resource.Name) (resource.Resource, er
func (rc *RobotClient) createClient(name resource.Name) (resource.Resource, error) {
apiInfo, ok := resource.LookupGenericAPIRegistration(name.API)
if !ok || apiInfo.RPCClient == nil {
return grpc.NewForeignResource(name, &rc.conn), nil
return grpc.NewForeignResource(name, rc.getClientConn()), nil
}
logger := rc.Logger().Sublogger(resource.RemoveRemoteName(name).ShortName())
return apiInfo.RPCClient(rc.backgroundCtx, &rc.conn, rc.remoteName, name, logger)
return apiInfo.RPCClient(rc.backgroundCtx, rc.getClientConn(), rc.remoteName, name, logger)
}

func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resource.RPCAPI, error) {
Expand Down Expand Up @@ -1299,6 +1307,27 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest
return errors.Join(err, readerSenderErr, recvWriterErr)
}

// SetPeerConnection is only to be called internally from modules.
func (rc *RobotClient) SetPeerConnection(pc *webrtc.PeerConnection) {
rc.mu.Lock()
rc.pc = pc
rc.mu.Unlock()
}

func (rc *RobotClient) getClientConn() rpc.ClientConn {
// Must be called with `rc.mu` in ReadLock+ mode.
if rc.sharedConn != nil {
return rc.sharedConn
}

if rc.pc == nil {
return &rc.conn
}

rc.sharedConn = grpc.NewSharedConnForModule(&rc.conn, rc.pc, rc.logger.Sublogger("shared_conn"))
return rc.sharedConn
}

func unaryClientInterceptor() googlegrpc.UnaryClientInterceptor {
return func(
ctx context.Context,
Expand Down

0 comments on commit 6f6e058

Please sign in to comment.