Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve async #160

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/orocos/async/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,19 @@
# asynchronous object.
#
module Orocos::Async
KNOWN_ERRORS = [Orocos::ComError,Orocos::NotFound,Typelib::NotFound,Orocos::TypekitTypeNotFound,Orocos::TypekitTypeNotExported,Orocos::StateTransitionFailed,Orocos::ConnectionFailed,OroGen::DefinitionTypekitNotFound]
class Disposed < RuntimeError; end

KNOWN_ERRORS = [
Orocos::ComError,
Orocos::NotFound,
Typelib::NotFound,
Orocos::TypekitTypeNotFound,
Orocos::TypekitTypeNotExported,
Orocos::StateTransitionFailed,
Orocos::ConnectionFailed,
OroGen::DefinitionTypekitNotFound,
Disposed
]
class << self
extend ::Forwardable

Expand Down
2 changes: 1 addition & 1 deletion lib/orocos/async/task_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def configure_delegation(options = Hash.new)
end

def respond_to_missing?(method_name, include_private = false)
(reachable? && @delegator_obj.respond_to?(method_name)) || super
@delegator_obj&.respond_to?(method_name) || super
end

def method_missing(m,*args)
Expand Down
30 changes: 17 additions & 13 deletions lib/orocos/async/task_context_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -208,25 +208,29 @@ def configure_delegation(configure_options = Hash.new)
#
# @param [Exception] reason The reason for the disconnect
# @return [Orocos::TaskContext,nil,Utilrb::EventLoop::Event]
def unreachable!(options = Hash.new)
options = Kernel.validate_options options, :error
def unreachable!(error: nil)
# ensure that this is always called from the
# event loop thread
@event_loop.call do
old_task = @mutex.synchronize do
if valid_delegator?
@access_error = options.delete(:error) ||
ArgumentError.new("cannot access the remote task context for an unknown reason")
task = @delegator_obj
invalidate_delegator!
@watchdog_timer.cancel if @watchdog_timer
task
end
dispose(access_error: error)
end
end

def dispose(access_error: nil)
old_task = @mutex.synchronize do
if valid_delegator?
@access_error =
access_error ||
Disposed.new("cannot access the remote task context for an unknown reason")
task = @delegator_obj
invalidate_delegator!
@watchdog_timer&.cancel
task
end
clear_interface
event :unreachable if old_task
old_task
end
clear_interface
old_task
end

def clear_interface
Expand Down
22 changes: 13 additions & 9 deletions lib/orocos/async/task_context_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def initialize(task_proxy,port_name,options=Hash.new)
end

def to_s
"#<Orocos::Async::PortProxy #{full_name}[#{type.name}]>"
"#<Orocos::Async::PortProxy #{full_name}[#{@type&.name}]>"
end

def type_name
Expand Down Expand Up @@ -506,7 +506,7 @@ def initialize(name,options=Hash.new)

on_port_reachable(false) do |name|
p = @ports[name]
if p && !p.reachable?
if p && !p.valid_delegator?
error_callback = Proc.new do |error|
p.emit_error(error)
end
Expand All @@ -517,7 +517,7 @@ def initialize(name,options=Hash.new)
end
on_property_reachable(false) do |name|
p = @properties[name]
if(p && !p.reachable?)
if(p && !p.valid_delegator?)
error_callback = Proc.new do |error|
p.emit_error(error)
end
Expand All @@ -528,7 +528,7 @@ def initialize(name,options=Hash.new)
end
on_attribute_reachable(false) do |name|
a = @attributes[name]
if(a && !a.reachable?)
if(a && !a.valid_delegator?)
error_callback = Proc.new do |error|
a.emit_error(error)
end
Expand Down Expand Up @@ -589,7 +589,8 @@ def property(name,options = Hash.new)
Orocos.warn "ignoring options: #{other_options}"
end

return p if !reachable? || p.reachable?
return p if !valid_delegator? || p.valid_delegator?

if options[:wait]
connect_property(p)
p.wait
Expand Down Expand Up @@ -618,7 +619,8 @@ def attribute(name,options = Hash.new)
Orocos.warn "ignoring options: #{other_options}"
end

return a if !reachable? || a.reachable?
return a if !valid_delegator? || a.valid_delegator?

if options[:wait]
connect_attribute(a)
a.wait
Expand All @@ -632,7 +634,7 @@ def attribute(name,options = Hash.new)

def port(name,options = Hash.new)
name = name.to_str
options,other_options = Kernel.filter_options options,:wait => @options[:wait]
options, other_options = Kernel.filter_options options,:wait => @options[:wait]
wait if options[:wait]

# support for subports
Expand Down Expand Up @@ -664,7 +666,7 @@ def port(name,options = Hash.new)
Orocos.warn "ignoring options: #{other_options}"
end

if reachable? && !p.reachable?
if valid_delegator? && !p.valid_delegator?
if options[:wait]
connect_port(p)
p.wait
Expand Down Expand Up @@ -841,8 +843,10 @@ def unreachable!(options = {:reconnect => false})
# all private methods must be thread safe
def connect_port(port)
return if port.reachable?

p = @mutex.synchronize do
return unless valid_delegator?

@delegator_obj.disable_emitting do
#called in the context of @delegator_obj
begin
Expand Down Expand Up @@ -928,7 +932,7 @@ def disconnect_properties
end

def respond_to_missing?(method_name, include_private = false)
(reachable? && @delegator_obj.respond_to?(method_name)) || super
(valid_delegator? && @delegator_obj.respond_to?(method_name)) || super
end

def method_missing(m,*args)
Expand Down