Skip to content

Commit

Permalink
making task creator use in memory state
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Feb 2, 2025
1 parent 7e1a55b commit 893f31c
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 270 deletions.
92 changes: 71 additions & 21 deletions server/processor/src/graph_processor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use std::{sync::Arc, vec};

use anyhow::Result;
use data_model::{ChangeType, StateChange};
use data_model::{
ChangeType,
GraphInvocationCtx,
StateChange,
TaskOutcome,
TaskOutputsIngestedEvent,
};
use state_store::{
requests::{
DeleteComputeGraphRequest,
Expand All @@ -13,8 +19,7 @@ use state_store::{
RequestPayload,

Check warning on line 19 in server/processor/src/graph_processor.rs

View workflow job for this annotation

GitHub Actions / Build Indexify Server

Diff in /home/runner/work/indexify/indexify/server/processor/src/graph_processor.rs
StateMachineUpdateRequest,
TaskAllocationUpdateRequest,
},
IndexifyState,
}, IndexifyState
};
use tokio::sync::Notify;
use tracing::{error, info};
Expand Down Expand Up @@ -173,7 +178,10 @@ impl GraphProcessor {
info!("invoking compute graph: {:?}", event);
let task_creation_result = self
.task_creator
.handle_invoke_compute_graph(event.clone())
.handle_invoke_compute_graph(
event.clone(),
&self.indexify_state.in_memory_state().await,
)
.await?;
Ok(task_creation_result_to_sm_update(
&event.namespace,
Expand All @@ -183,24 +191,33 @@ impl GraphProcessor {
&state_change,
))
}
ChangeType::TaskOutputsIngested(event) => Ok(StateMachineUpdateRequest {
payload: RequestPayload::FinalizeTask(FinalizeTaskRequest {
namespace: event.namespace.clone(),
compute_graph: event.compute_graph.clone(),
compute_fn: event.compute_fn.clone(),
invocation_id: event.invocation_id.clone(),
task_id: event.task_id.clone(),
task_outcome: event.outcome.clone(),
executor_id: event.executor_id.clone(),
diagnostics: event.diagnostic.clone(),
}),
processed_state_changes: vec![state_change.clone()],
}),
ChangeType::TaskOutputsIngested(event) => {
let graph_ctx =
update_graph_ctx_finalize_task(event, self.indexify_state.clone()).await;
if let None = graph_ctx {
return Ok(StateMachineUpdateRequest {
payload: RequestPayload::Noop,
processed_state_changes: vec![state_change.clone()],
});
}
Ok(StateMachineUpdateRequest {
payload: RequestPayload::FinalizeTask(FinalizeTaskRequest {
namespace: event.namespace.clone(),
compute_graph: event.compute_graph.clone(),
compute_fn: event.compute_fn.clone(),
invocation_id: event.invocation_id.clone(),
task_id: event.task_id.clone(),
task_outcome: event.outcome.clone(),
executor_id: event.executor_id.clone(),
diagnostics: event.diagnostic.clone(),
invocation_ctx: graph_ctx,
}),
processed_state_changes: vec![state_change.clone()],
})
}
ChangeType::TaskFinalized(event) => {
let task_creation_result = self
.task_creator
.handle_task_finished_inner(self.indexify_state.clone(), event)
.await?;
let task_creation_result =
self.task_creator.handle_task_finished_inner(event).await?;
Ok(task_creation_result_to_sm_update(
&event.namespace,
&event.compute_graph,
Expand Down Expand Up @@ -292,6 +309,7 @@ fn task_creation_result_to_sm_update(
new_reduction_tasks: task_creation_result.new_reduction_tasks,
processed_reduction_tasks: task_creation_result.processed_reduction_tasks,
},
invocation_ctx: task_creation_result.invocation_ctx,
}),
processed_state_changes: vec![state_change.clone()],
}
Expand All @@ -310,3 +328,35 @@ fn task_placement_result_to_sm_update(
processed_state_changes: vec![state_change.clone()],
}
}

async fn update_graph_ctx_finalize_task(
event: &TaskOutputsIngestedEvent,
indexify_state: Arc<IndexifyState>,

Check warning on line 334 in server/processor/src/graph_processor.rs

View workflow job for this annotation

GitHub Actions / Build Indexify Server

Diff in /home/runner/work/indexify/indexify/server/processor/src/graph_processor.rs
) -> Option<GraphInvocationCtx> {
let in_memory_state = indexify_state.in_memory_state().await;
let invocation_ctx_map= in_memory_state
.invocation_ctx
.clone();

let may_be_invocation_ctx = invocation_ctx_map
.get(&format!(
"{}|{}|{}",
event.namespace, event.compute_graph, event.invocation_id
));
if may_be_invocation_ctx.is_none() {
error!("namespace: {}, invocation: {}, cg: {}, fn: {}, task: {} no invocation ctx found for task outputs ingested event",
event.namespace, event.invocation_id, event.compute_graph, event.compute_fn, event.task_id);
return None;
}
let mut invocation_ctx = may_be_invocation_ctx.unwrap().clone();
invocation_ctx.outstanding_tasks -= 1;
invocation_ctx
.fn_task_analytics
.entry(event.compute_fn.clone())
.and_modify(|e| match event.outcome {
TaskOutcome::Success => e.success(),
TaskOutcome::Failure => e.fail(),
_ => {}
});
Some(invocation_ctx)
}
Loading

0 comments on commit 893f31c

Please sign in to comment.