Skip to content

Commit

Permalink
Merge pull request #162 from openforcefield/bugfix/issue-138
Browse files Browse the repository at this point in the history
Only action `Task`s that are `status` 'waiting', 'error', or 'running'
  • Loading branch information
dotsdl authored Aug 8, 2023
2 parents 26c845f + ec0cb08 commit ea8d802
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 0 deletions.
1 change: 1 addition & 0 deletions alchemiscale/interface/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ def action_tasks(
A Task cannot be actioned:
- to an AlchemicalNetwork in a different Scope.
- if it extends another Task that is not complete.
- if it has any status other than 'waiting', 'running', or 'error'
Parameters
----------
Expand Down
5 changes: 5 additions & 0 deletions alchemiscale/storage/statestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,9 @@ def action_tasks(
A given compute task can be represented in any number of
AlchemicalNetwork TaskHubs, or none at all.
Only Tasks with status 'waiting', 'running', or 'error' can be
actioned.
"""
with self.transaction() as tx:
actioned_sks = []
Expand All @@ -1086,8 +1089,10 @@ def action_tasks(
MATCH (task:Task {{_scoped_key: '{t}'}})-[:PERFORMS]->(tf:Transformation)<-[:DEPENDS_ON]-(an)
// only proceed for cases where task is not already actioned on hub
// and where the task is either in 'waiting', 'running', or 'error' status
WITH th, an, task
WHERE NOT (th)-[:ACTIONS]->(task)
AND task.status IN ['waiting', 'running', 'error']
// create the connection
CREATE (th)-[ar:ACTIONS {{weight: 1.0}}]->(task)
Expand Down
29 changes: 29 additions & 0 deletions alchemiscale/tests/integration/storage/test_statestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,35 @@ def test_action_task(self, n4js: Neo4jStore, network_tyk2, scope_test):
task_sks_fail = n4js.action_tasks(task_sks, taskhub_sk2)
assert all([i is None for i in task_sks_fail])

def test_action_task_other_statuses(
self, n4js: Neo4jStore, network_tyk2, scope_test
):
an = network_tyk2
network_sk = n4js.create_network(an, scope_test)
taskhub_sk: ScopedKey = n4js.create_taskhub(network_sk)

transformation = list(an.edges)[0]
transformation_sk = n4js.get_scoped_key(transformation, scope_test)

# create 10 tasks
task_sks = [n4js.create_task(transformation_sk) for i in range(6)]

# set all but first task to running
n4js.set_task_running(task_sks[1:])

# set 1 task for each available status
n4js.set_task_error(task_sks[2:3])
n4js.set_task_complete(task_sks[3:4])
n4js.set_task_invalid(task_sks[4:5])
n4js.set_task_deleted(task_sks[5:6])

# action all tasks; only those that are 'waiting', 'running', or
# 'error' should be actioned
actioned = n4js.action_tasks(task_sks, taskhub_sk)

assert actioned[:3] == task_sks[:3]
assert actioned[3:] == [None] * 3

def test_action_task_extends(self, n4js: Neo4jStore, network_tyk2, scope_test):
an = network_tyk2
network_sk = n4js.create_network(an, scope_test)
Expand Down

0 comments on commit ea8d802

Please sign in to comment.