Skip to content

Commit

Permalink
RSDK-9839: Have Go module clients be webrtc aware.
Browse files Browse the repository at this point in the history
  • Loading branch information
dgottlieb committed Jan 28, 2025
1 parent 5c3446a commit e5b46e0
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 9 deletions.
53 changes: 46 additions & 7 deletions grpc/shared_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
// a resource may register with a SharedConn which supports WebRTC.
type OnTrackCB func(tr *webrtc.TrackRemote, r *webrtc.RTPReceiver)

//nolint
// The following describes the SharedConn lifetime for viam-server's modmanager communicating with
// modules it has spawned.
//
// SharedConn wraps both a GRPC connection & (optionally) a peer connection & controls access to both.
// For modules, the grpc connection is over a Unix socket. The WebRTC `PeerConnection` is made
// separately. The `SharedConn` continues to implement the `rpc.ClientConn` interface by pairing up
Expand Down Expand Up @@ -71,10 +75,9 @@ type SharedConn struct {
// `peerConnMu` synchronizes changes to the underlying `peerConn`. Such that calls consecutive
// calls to `GrpcConn` and `PeerConn` will return connections from the same (or newer, but not
// prior) "generations".
peerConnMu sync.Mutex
peerConn *webrtc.PeerConnection
peerConnReady <-chan struct{}
peerConnClosed <-chan struct{}
peerConnMu sync.Mutex
peerConn *webrtc.PeerConnection
peerConnReady <-chan struct{}
// peerConnFailed gets closed when a PeerConnection fails to connect. The peerConn pointer is
// set to nil before this channel is closed.
peerConnFailed chan struct{}
Expand All @@ -85,6 +88,42 @@ type SharedConn struct {
logger logging.Logger
}

// NewSharedConnForModule acts as a constructor for `SharedConn` for modules that are communicating
// back to their parent viam-server.
func NewSharedConnForModule(grpcConn rpc.ClientConn, peerConn *webrtc.PeerConnection, logger logging.Logger) *SharedConn {
// We must be passed a ready connection.
pcReady := make(chan struct{})
close(pcReady)

ret := &SharedConn{
peerConn: peerConn,
peerConnReady: pcReady,
// We were passed in a ready connection. Only create this for when `Close` is called.
peerConnFailed: make(chan struct{}),
onTrackCBByTrackName: make(map[string]OnTrackCB),
logger: logger,
}
ret.grpcConn.ReplaceConn(grpcConn)

ret.peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
sid := trackRemote.StreamID()
if sid == "rtsp-1" {
sid = "rdk:component:camera/rtsp-1"
}
ret.onTrackCBByTrackNameMu.Lock()
onTrackCB, ok := ret.onTrackCBByTrackName[sid]
ret.onTrackCBByTrackNameMu.Unlock()
if !ok {
msg := "Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v"
ret.logger.Errorf(msg, trackRemote.StreamID(), maps.Keys(ret.onTrackCBByTrackName))
return
}
onTrackCB(trackRemote, rtpReceiver)
})

return ret
}

// Invoke forwards to the underlying GRPC Connection.
func (sc *SharedConn) Invoke(
ctx context.Context,
Expand Down Expand Up @@ -143,8 +182,8 @@ func (sc *SharedConn) PeerConn() *webrtc.PeerConnection {
return ret
}

// ResetConn acts as a constructor for `SharedConn`. ResetConn replaces the underlying
// connection objects in addition to some other initialization.
// ResetConn acts as a constructor for `SharedConn` inside the viam-server (not modules). ResetConn
// replaces the underlying connection objects in addition to some other initialization.
//
// The first call to `ResetConn` is guaranteed to happen before any access to connection objects
// happens. But subequent calls can be entirely asynchronous to components/services accessing
Expand Down Expand Up @@ -193,7 +232,7 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger
}

sc.peerConn = peerConn
sc.peerConnReady, sc.peerConnClosed, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleClient, sc.logger)
sc.peerConnReady, _, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleServer, sc.logger)
if err != nil {
sc.logger.Warnw("Unable to create optional renegotiation channel for module. Ignoring.", "err", err)
return
Expand Down
3 changes: 3 additions & 0 deletions module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ func (m *Module) connectParent(ctx context.Context) error {
}

m.parent = rc
if m.pc != nil {
m.parent.SetPeerConnection(m.pc)
}
return nil
}

Expand Down
33 changes: 31 additions & 2 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/grpcreflect"
"github.com/viamrobotics/webrtc/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -111,6 +112,12 @@ type RobotClient struct {
// webrtc. We don't want a network disconnect to result in reconnecting over tcp such that
// performance would be impacted.
serverIsWebrtcEnabled bool

// If this client is running in a module process and this client represents the gRPC connection
// back to a viam-server. If this is true, `pc` and `sharedConn` are expected to be set.
isModuleConnection bool
pc *webrtc.PeerConnection
sharedConn *grpc.SharedConn
}

// RemoteTypeName is the type name used for a remote. This is for internal use.
Expand Down Expand Up @@ -267,6 +274,7 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible
sessionsDisabled: rOpts.disableSessions,
heartbeatCtx: heartbeatCtx,
heartbeatCtxCancel: heartbeatCtxCancel,
isModuleConnection: rOpts.modName != "",
}

// interceptors are applied in order from first to last
Expand Down Expand Up @@ -679,10 +687,10 @@ func (rc *RobotClient) ResourceByName(name resource.Name) (resource.Resource, er
func (rc *RobotClient) createClient(name resource.Name) (resource.Resource, error) {
apiInfo, ok := resource.LookupGenericAPIRegistration(name.API)
if !ok || apiInfo.RPCClient == nil {
return grpc.NewForeignResource(name, &rc.conn), nil
return grpc.NewForeignResource(name, rc.getClientConn()), nil
}
logger := rc.Logger().Sublogger(resource.RemoveRemoteName(name).ShortName())
return apiInfo.RPCClient(rc.backgroundCtx, &rc.conn, rc.remoteName, name, logger)
return apiInfo.RPCClient(rc.backgroundCtx, rc.getClientConn(), rc.remoteName, name, logger)
}

func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resource.RPCAPI, error) {
Expand Down Expand Up @@ -1190,6 +1198,27 @@ func (rc *RobotClient) Version(ctx context.Context) (robot.VersionResponse, erro
return mVersion, nil
}

// SetPeerConnection is only to be called internally from modules.
func (rc *RobotClient) SetPeerConnection(pc *webrtc.PeerConnection) {
rc.mu.Lock()
rc.pc = pc
rc.mu.Unlock()
}

func (rc *RobotClient) getClientConn() rpc.ClientConn {
// Must be called with `rc.mu` in ReadLock+ mode.
if rc.sharedConn != nil {
return rc.sharedConn
}

if rc.pc == nil {
return &rc.conn
}

rc.sharedConn = grpc.NewSharedConnForModule(&rc.conn, rc.pc, rc.logger.Sublogger("shared_conn"))
return rc.sharedConn
}

func unaryClientInterceptor() googlegrpc.UnaryClientInterceptor {
return func(
ctx context.Context,
Expand Down

0 comments on commit e5b46e0

Please sign in to comment.