Skip to content

Commit

Permalink
Return errors using channles and not embedded in result type
Browse files Browse the repository at this point in the history
Asynchronous functions should return errors over channels instead of embedding the error in the result type.

Closes #9974
  • Loading branch information
gammazero committed Nov 20, 2024
1 parent d506003 commit dd033d4
Show file tree
Hide file tree
Showing 12 changed files with 248 additions and 170 deletions.
46 changes: 23 additions & 23 deletions client/rpc/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,49 +62,49 @@ type pinLsObject struct {
Type string
}

func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, error) {
func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, <-chan error) {
pins := make(chan iface.Pin)
errOut := make(chan error, 1)

options, err := caopts.PinLsOptions(opts...)
if err != nil {
return nil, err
errOut <- err
close(pins)
close(errOut)
return pins, errOut
}

res, err := api.core().Request("pin/ls").
Option("type", options.Type).
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
errOut <- err
close(pins)
close(errOut)
return pins, errOut
}

pins := make(chan iface.Pin)
go func(ch chan<- iface.Pin) {
go func(ch chan<- iface.Pin, errCh chan<- error) {
defer res.Output.Close()
defer close(ch)
defer close(errCh)

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
switch err := dec.Decode(&out); err {
case nil:
case io.EOF:
return
default:
select {
case ch <- pin{err: err}:
return
case <-ctx.Done():
return
err := dec.Decode(&out)
if err != nil {
if err != io.EOF {
errCh <- err
}
return
}

c, err := cid.Parse(out.Cid)
if err != nil {
select {
case ch <- pin{err: err}:
return
case <-ctx.Done():
return
}
errCh <- err
return
}

select {
Expand All @@ -113,8 +113,8 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan i
return
}
}
}(pins)
return pins, nil
}(pins, errOut)
return pins, errOut
}

// IsPinned returns whether or not the given cid is pinned
Expand Down
47 changes: 23 additions & 24 deletions client/rpc/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,16 @@ type lsOutput struct {
Objects []lsObject
}

func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, error) {
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, <-chan error) {
out := make(chan iface.DirEntry)
errOut := make(chan error, 1)

options, err := caopts.UnixfsLsOptions(opts...)
if err != nil {
return nil, err
errOut <- err
close(out)
close(errOut)
return out, errOut
}

resp, err := api.core().Request("ls", p.String()).
Expand All @@ -156,56 +162,49 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
errOut <- err
close(out)
close(errOut)
return out, errOut
}
if resp.Error != nil {
return nil, resp.Error
errOut <- resp.Error
close(out)
close(errOut)
return out, errOut
}

dec := json.NewDecoder(resp.Output)
out := make(chan iface.DirEntry)

go func() {
defer resp.Close()
defer close(out)
defer close(errOut)

for {
var link lsOutput
if err := dec.Decode(&link); err != nil {
if err == io.EOF {
return
}
select {
case out <- iface.DirEntry{Err: err}:
case <-ctx.Done():
if err != io.EOF {
errOut <- err
}
return
}

if len(link.Objects) != 1 {
select {
case out <- iface.DirEntry{Err: errors.New("unexpected Objects len")}:
case <-ctx.Done():
}
errOut <- errors.New("unexpected Objects len")
return
}

if len(link.Objects[0].Links) != 1 {
select {
case out <- iface.DirEntry{Err: errors.New("unexpected Links len")}:
case <-ctx.Done():
}
errOut <- errors.New("unexpected Links len")
return
}

l0 := link.Objects[0].Links[0]

c, err := cid.Decode(l0.Hash)
if err != nil {
select {
case out <- iface.DirEntry{Err: err}:
case <-ctx.Done():
}
errOut <- err
return
}

Expand Down Expand Up @@ -235,7 +234,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
}
}()

return out, nil
return out, errOut
}

func (api *UnixfsAPI) core() *HttpApi {
Expand Down
11 changes: 4 additions & 7 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,11 @@ The JSON output contains type information.
return err
}

results, err := api.Unixfs().Ls(req.Context, pth,
results, errCh := api.Unixfs().Ls(req.Context, pth,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
if err != nil {
return err
}

processLink, dirDone = processDir()
for link := range results {
if link.Err != nil {
return link.Err
}
var ftype unixfs_pb.Data_DataType
switch link.Type {
case iface.TFile:
Expand All @@ -174,6 +168,9 @@ The JSON output contains type information.
return err
}
}
if err = <-errCh; err != nil {
return err
}
dirDone(i)
}
return done()
Expand Down
12 changes: 2 additions & 10 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,15 +557,8 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
panic("unhandled pin type")
}

pins, err := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
if err != nil {
return err
}

pins, errCh := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
for p := range pins {
if err := p.Err(); err != nil {
return err
}
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
Expand All @@ -577,8 +570,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
return err
}
}

return nil
return <-errCh
}

const (
Expand Down
35 changes: 18 additions & 17 deletions core/commands/pin/remotepin.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,19 +285,15 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states.
cmds.DelimitedStringsOption(",", pinStatusOptionName, "Return pins with the specified statuses (queued,pinning,pinned,failed).").WithDefault([]string{"pinned"}),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
ctx, cancel := context.WithCancel(req.Context)
defer cancel()

c, err := getRemotePinServiceFromRequest(req, env)
if err != nil {
return err
}

psCh, errCh, err := lsRemote(ctx, req, c)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(req.Context)
defer cancel()

psCh, errCh := lsRemote(ctx, req, c)
for ps := range psCh {
if err := res.Emit(toRemotePinOutput(ps)); err != nil {
return err
Expand All @@ -317,7 +313,7 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states.
}

// Executes GET /pins/?query-with-filters
func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan pinclient.PinStatusGetter, chan error, error) {
func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-chan pinclient.PinStatusGetter, <-chan error) {
opts := []pinclient.LsOption{}
if name, nameFound := req.Options[pinNameOptionName]; nameFound {
nameStr := name.(string)
Expand All @@ -330,7 +326,12 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan
for _, rawCID := range cidsRawArr {
parsedCID, err := cid.Decode(rawCID)
if err != nil {
return nil, nil, fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err)
psCh := make(chan pinclient.PinStatusGetter)
errCh := make(chan error, 1)
errCh <- fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err)
close(psCh)
close(errCh)
return psCh, errCh
}
parsedCIDs = append(parsedCIDs, parsedCID)
}
Expand All @@ -342,16 +343,19 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan
for _, rawStatus := range statusRawArr {
s := pinclient.Status(rawStatus)
if s.String() == string(pinclient.StatusUnknown) {
return nil, nil, fmt.Errorf("status %q is not valid", rawStatus)
psCh := make(chan pinclient.PinStatusGetter)
errCh := make(chan error, 1)
errCh <- fmt.Errorf("status %q is not valid", rawStatus)
close(psCh)
close(errCh)
return psCh, errCh
}
parsedStatuses = append(parsedStatuses, s)
}
opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...))
}

psCh, errCh := c.Ls(ctx, opts...)

return psCh, errCh, nil
return c.Ls(ctx, opts...)
}

var rmRemotePinCmd = &cmds.Command{
Expand Down Expand Up @@ -403,10 +407,7 @@ To list and then remove all pending pin requests, pass an explicit status list:

rmIDs := []string{}
if len(req.Arguments) == 0 {
psCh, errCh, err := lsRemote(ctx, req, c)
if err != nil {
return err
}
psCh, errCh := lsRemote(ctx, req, c)
for ps := range psCh {
rmIDs = append(rmIDs, ps.GetRequestId())
}
Expand Down
Loading

0 comments on commit dd033d4

Please sign in to comment.