Skip to content

Commit

Permalink
feat (very rough): Add some nu: on startup accept an option closure t…
Browse files Browse the repository at this point in the history
…hat will be run for every item in the event stream. This closure has access to a custom command "xs cas <hash>", which can fetch the content for a given hash from the xs store
  • Loading branch information
cablehead committed Aug 11, 2024
1 parent 355e305 commit f4e2ae6
Show file tree
Hide file tree
Showing 9 changed files with 3,654 additions and 434 deletions.
3,739 changes: 3,306 additions & 433 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ tokio-stream = "0.1.15"
tokio-util = { version = "0.7.11", features = ["compat"] }
url = "2.5.0"

nu-cli = "0.96.1"
nu-command = "0.96.1"
nu-protocol = "0.96.1"
nu-cmd-lang = "0.96.1"
nu-engine = "0.96.1"
nu-parser = "0.96.1"
crossbeam-channel = "0.5.13"

[dev-dependencies]
assert_cmd = "2.0.14"
duct = "0.13.7"
Expand Down
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub type Error = Box<dyn std::error::Error + Send + Sync>;
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod api;
pub mod error;
pub mod http;
pub mod listener;
pub mod nu;
pub mod spawn;
pub mod store;
14 changes: 13 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::Duration;

use clap::Parser;

use xs::nu;
use xs::store::Store;

#[derive(Parser, Debug)]
Expand All @@ -15,7 +16,13 @@ struct Args {
#[clap(long, value_parser, value_name = "LISTEN_ADDR")]
http: Option<String>,

/// Enable discord websocket (temporary)
/// A Nushell closure which will be called for every item added to the stream (temporary, you'll be
/// able add arbitrary closures at runtime in the future)
#[clap(long, value_parser, value_name = "CLOSURE")]
closure: Option<String>,

/// Enable discord websocket (temporary, you'll be able spawn arbitrary CLI commands at runtime
/// in the future)
#[clap(long)]
ws: bool,
}
Expand All @@ -32,6 +39,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
});
}

if let Some(closure) = args.closure {
let store = store.clone();
nu::spawn_closure(&store, closure).await?;
}

if args.ws {
let store = store.clone();
tokio::spawn(async move {
Expand Down
133 changes: 133 additions & 0 deletions src/nu/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use futures::io::AsyncReadExt;

use nu_cli::{add_cli_context, gather_parent_env_vars};
use nu_cmd_lang::create_default_context;
use nu_command::add_shell_command_context;
use nu_engine::eval_block;
use nu_parser::parse;
use nu_protocol::debugger::WithoutDebug;
use nu_protocol::engine::{Call, Closure};
use nu_protocol::engine::{Command, EngineState, Stack, StateWorkingSet};
use nu_protocol::{Category, PipelineData, ShellError, Signature, Span, SyntaxShape, Type, Value};

use crate::error::Error;
use crate::store::Store;

#[derive(Clone)]
struct XsCasCommand {
store: Store,
}

use nu_engine::CallExt;

impl XsCasCommand {
fn new(store: Store) -> Self {
Self { store }
}
}

impl Command for XsCasCommand {
fn name(&self) -> &str {
"xs cas"
}

fn signature(&self) -> Signature {
Signature::build("xs cas")
.input_output_types(vec![(Type::Nothing, Type::String)])
.required(
"hash",
SyntaxShape::String,
"hash of the content to retrieve",
)
.category(Category::Experimental)
}

fn usage(&self) -> &str {
"Retrieve content from the CAS for the given hash"
}

fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;

let hash: String = call.req(engine_state, stack, 0)?;
eprintln!("hash: {:?}", hash);
let hash: ssri::Integrity = hash.parse().map_err(|e| ShellError::IOError {
msg: format!("YIKES:: {}", e),
})?;
eprintln!("HASH: {:?}", hash);

let rt = tokio::runtime::Runtime::new().map_err(|e| ShellError::IOError {
msg: format!("YIKES:: {}", e),
})?;

let contents = rt.block_on(async {
let mut reader =
self.store
.cas_reader(hash)
.await
.map_err(|e| ShellError::IOError {
msg: format!("R:: {}", e),
})?;
let mut contents = Vec::new();
reader
.read_to_end(&mut contents)
.await
.map_err(|e| ShellError::IOError { msg: e.to_string() })?;
String::from_utf8(contents).map_err(|e| ShellError::IOError { msg: e.to_string() })
})?;

Ok(PipelineData::Value(
Value::String {
val: contents,
internal_span: span,
},
None,
))
}
}

fn add_custom_commands(store: Store, mut engine_state: EngineState) -> EngineState {
let delta = {
let mut working_set = StateWorkingSet::new(&engine_state);
working_set.add_decl(Box::new(XsCasCommand::new(store)));
working_set.render()
};

if let Err(err) = engine_state.merge_delta(delta) {
eprintln!("Error adding custom commands: {err:?}");
}

engine_state
}

pub fn create(store: Store) -> Result<EngineState, Error> {
let mut engine_state = create_default_context();
engine_state = add_shell_command_context(engine_state);
engine_state = add_cli_context(engine_state);
engine_state = add_custom_commands(store, engine_state);

let init_cwd = std::env::current_dir()?;
gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());

Ok(engine_state)
}

pub fn parse_closure(
engine_state: &mut EngineState,
closure_snippet: &str,
) -> Result<Closure, ShellError> {
let mut working_set = StateWorkingSet::new(engine_state);
let block = parse(&mut working_set, None, closure_snippet.as_bytes(), false);
engine_state.merge_delta(working_set.render())?;

let mut stack = Stack::new();
let result =
eval_block::<WithoutDebug>(engine_state, &mut stack, &block, PipelineData::empty())?;
result.into_value(Span::unknown())?.into_closure()
}
32 changes: 32 additions & 0 deletions src/nu/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::sync::Arc;

mod engine;
mod run;
mod thread_pool;

use crate::error::Error;
use crate::store::{FollowOption, ReadOptions, Store};

pub async fn spawn_closure(store: &Store, closure_snippet: String) -> Result<(), Error> {
let mut engine_state = engine::create(store.clone())?;
let closure = engine::parse_closure(&mut engine_state, &closure_snippet)?;
let pool = Arc::new(thread_pool::ThreadPool::new(10));

let mut rx = store
.read(ReadOptions {
follow: FollowOption::On,
tail: false,
last_id: None,
})
.await;

std::thread::spawn(move || {
let mut i = 0;
while let Some(frame) = rx.blocking_recv() {
run::line(i, frame, &engine_state, &closure, &pool);
i += 1;
}
});

Ok(())
}
102 changes: 102 additions & 0 deletions src/nu/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::sync::Arc;

use nu_engine::get_eval_block_with_early_return;
use nu_protocol::engine::Closure;
use nu_protocol::engine::{EngineState, Stack};
use nu_protocol::{PipelineData, Record, ShellError, Span, Value};

use crate::nu::thread_pool;
use crate::store::Frame;

fn frame_to_value(frame: &Frame, span: Span) -> Value {
let mut record = Record::new();

record.push("id", Value::string(frame.id.to_string(), span));
record.push("topic", Value::string(frame.topic.clone(), span));

if let Some(hash) = &frame.hash {
record.push("hash", Value::string(hash.to_string(), span));
}

if let Some(meta) = &frame.meta {
record.push("meta", json_to_value(meta, span));
}

Value::record(record, span)
}

fn json_to_value(json: &serde_json::Value, span: Span) -> Value {
match json {
serde_json::Value::Null => Value::nothing(span),
serde_json::Value::Bool(b) => Value::bool(*b, span),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Value::int(i, span)
} else if let Some(f) = n.as_f64() {
Value::float(f, span)
} else {
Value::string(n.to_string(), span)
}
}
serde_json::Value::String(s) => Value::string(s, span),
serde_json::Value::Array(arr) => {
let values: Vec<Value> = arr.iter().map(|v| json_to_value(v, span)).collect();
Value::list(values, span)
}
serde_json::Value::Object(obj) => {
let mut record = Record::new();
for (k, v) in obj {
record.push(k, json_to_value(v, span));
}
Value::record(record, span)
}
}
}

pub fn line(
job_number: usize,
frame: Frame,
engine_state: &EngineState,
closure: &Closure,
pool: &Arc<thread_pool::ThreadPool>,
) {
let engine_state = engine_state.clone();
let closure = closure.clone();
pool.execute(move || {
println!("Thread {} starting execution", job_number);
let input = PipelineData::Value(frame_to_value(&frame, Span::unknown()), None);
match eval_closure(&engine_state, &closure, input) {
Ok(pipeline_data) => match pipeline_data.into_value(Span::unknown()) {
Ok(value) => match value {
Value::String { val, .. } => println!("Thread {}: {}", job_number, val),
Value::List { vals, .. } => {
for val in vals {
println!("Thread {}: {:?}", job_number, val);
}
}
other => println!("Thread {}: {:?}", job_number, other),
},
Err(err) => {
eprintln!(
"Thread {}: Error converting pipeline data: {:?}",
job_number, err
)
}
},
Err(error) => {
eprintln!("Thread {}: Error: {:?}", job_number, error);
}
}
});
}

fn eval_closure(
engine_state: &EngineState,
closure: &Closure,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let block = &engine_state.get_block(closure.block_id);
let mut stack = Stack::new();
let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
eval_block_with_early_return(engine_state, &mut stack, block, input)
}
57 changes: 57 additions & 0 deletions src/nu/thread_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

pub struct ThreadPool {
tx: crossbeam_channel::Sender<Box<dyn FnOnce() + Send + 'static>>,
active_count: Arc<AtomicUsize>,
completion_pair: Arc<(Mutex<()>, Condvar)>,
}

impl ThreadPool {
pub fn new(size: usize) -> Self {
let (tx, rx) = crossbeam_channel::bounded::<Box<dyn FnOnce() + Send + 'static>>(0);
let active_count = Arc::new(AtomicUsize::new(0));
let completion_pair = Arc::new((Mutex::new(()), Condvar::new()));

for _ in 0..size {
let rx = rx.clone();
let active_count = active_count.clone();
let completion_pair = completion_pair.clone();

thread::spawn(move || {
while let Ok(job) = rx.recv() {
active_count.fetch_add(1, Ordering::SeqCst);
job();
if active_count.fetch_sub(1, Ordering::SeqCst) == 1 {
let (lock, cvar) = &*completion_pair;
let guard = lock.lock().unwrap();
cvar.notify_all();
drop(guard);
}
}
});
}

ThreadPool {
tx,
active_count,
completion_pair,
}
}

pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.tx.send(Box::new(f)).unwrap();
}

pub fn wait_for_completion(&self) {
let (lock, cvar) = &*self.completion_pair;
let mut guard = lock.lock().unwrap();
while self.active_count.load(Ordering::SeqCst) > 0 {
guard = cvar.wait(guard).unwrap();
}
}
}

0 comments on commit f4e2ae6

Please sign in to comment.