use std::sync::Arc;
use execution_definitions::activation::{
ActivationFunction, ObjectArgument, ResolvedNandoArgument,
};
use execution_definitions::nando_handle::SharedHandleState;
use execution_definitions::txn_context::TxnContext;
use nando_support::{
activation_intent::NandoResult,
nando_metadata::{NandoKind, NandoMetadata},
resolve_object, resolve_read_only_object,
};
#[cfg(feature = "object-caching")]
use nando_support::{
activation_intent::{NandoActivationIntent, NandoArgument},
iptr::IPtr,
};
use object_lib::{tls::IntoObjectMapping, Object, ObjectId};
use crate::nando_manager::NandoManager;
use crate::NandoManagerBase;
macro_rules! generate_object_mappings {
($args:expr) => {{
$args.iter().fold(vec![], |mut acc, ra| {
acc.extend(ra.get_inner_object_arguments().iter().map(|oa| match oa {
ObjectArgument::RWObject(o) => o.into_mapping(),
ObjectArgument::ROObject(o) => o.into_mapping(),
ObjectArgument::UnresolvedObject(_) => unreachable!(),
}));
acc
})
}};
}
macro_rules! extract_object_ref {
($args:ident, $idx:expr, $err_msg:expr) => {{
match $args.get($idx).expect(&$err_msg) {
ResolvedNandoArgument::Object(o) => {
let Some(o) = o.as_object_ref() else {
panic!("Could not extract object ref from nando arg {}", $idx);
};
o
}
o @ _ => panic!("Could not resolve nando object arg {}: {:?}", $idx, o),
}
}};
}
macro_rules! extract_object_refs {
($args:ident, $idx:expr, $err_msg:expr) => {{
match $args.get($idx).expect(&$err_msg) {
ResolvedNandoArgument::Objects(os) => {
os.iter().map(|o| o.as_object_ref().unwrap()).collect()
}
o @ _ => panic!("Could not resolve nando objects arg {}: {:?}", $idx, o),
}
}};
}
pub fn resolve_function(intent_name: &str) -> Option<Arc<ActivationFunction>> {
let f = match intent_name {
"whomstone" => |_ctx: TxnContext,
activation_handle: SharedHandleState,
args: &Vec<ResolvedNandoArgument>| {
let error_msg = format!(
"invalid number of arguments to whomstone: expected 1 but received {}",
args.len()
);
let obj = extract_object_ref!(args, 0, error_msg);
let whomstone_version: u64 = obj.get_version().try_into().unwrap();
obj.bump_version()
.expect("failed to bump object version during whomstoning");
obj.flush();
let activation_handle = activation_handle.borrow_mut();
activation_handle.append_result(NandoResult::Value(whomstone_version.into()));
},
"whomstone_multi" => |_ctx: TxnContext,
activation_handle: SharedHandleState,
args: &Vec<ResolvedNandoArgument>| {
let error_msg = format!(
"invalid number of arguments to whomstone: expected 1 but received {}",
args.len()
);
let objs: Vec<&Object> = extract_object_refs!(args, 0, error_msg);
let activation_handle = activation_handle.borrow_mut();
for obj in &objs {
let whomstone_version: u64 = obj.get_version().try_into().unwrap();
obj.bump_version()
.expect("failed to bump object version during whomstoning");
obj.flush();
activation_handle.append_result(NandoResult::Value(whomstone_version.into()));
}
},
"noop" => |_ctx: TxnContext,
_activation_handle: SharedHandleState,
_args: &Vec<ResolvedNandoArgument>| {
#[cfg(debug_assertions)]
println!("noop");
},
"put" => |ctx: TxnContext,
_activation_handle: SharedHandleState,
args: &Vec<ResolvedNandoArgument>| {
let error_msg = format!(
"invalid number of arguments to put: expected 3 but received {}",
args.len()
);
let bucket_obj = resolve_object!(
args,
0,
error_msg,
sample_programs::kvs::StorageBucket<ObjectId>,
ctx.get_log_entry()
);
let key = match args.get(1).expect(&error_msg) {
ResolvedNandoArgument::Value(v) => v.try_into().unwrap(),
_ => panic!(),
};
let value: ObjectId = match args.get(2).expect(&error_msg) {
ResolvedNandoArgument::Value(v) => v.try_into().unwrap(),
_ => panic!(),
};
let object_mappings = generate_object_mappings!(args);
NandoManagerBase::put_nando(
&ctx,
&object_mappings,
ctx.get_log_entry(),
bucket_obj,
key,
value,
);
},
"get" => |ctx: TxnContext,
activation_handle: SharedHandleState,
args: &Vec<ResolvedNandoArgument>| {
let error_msg = format!(
"invalid number of arguments to get: expected 2 but received {}",
args.len()
);
let bucket_obj = resolve_read_only_object!(
args,
0,
error_msg,
sample_programs::kvs::StorageBucket<ObjectId>,
ctx.get_log_entry()
);
let key: String = match args.get(1).expect(&error_msg) {
ResolvedNandoArgument::Value(v) => v.try_into().unwrap(),
_ => panic!(),
};
let _cloned_key = key.clone();
let object_mappings = generate_object_mappings!(args);
let res = NandoManagerBase::get_nando::<ObjectId>(
&ctx,
&object_mappings,
ctx.get_log_entry(),
bucket_obj,
key,
);
#[cfg(debug_assertions)]
println!("get of '{}': {:?}", _cloned_key, res);
let res: i32 = match res {
None => return,
Some(r) => r.try_into().unwrap(),
};
let activation_handle = activation_handle.borrow_mut();
activation_handle.append_result(NandoResult::Value(res.into()));
},
#[cfg(feature = "object-caching")]
"set_under_invalidation" => {
|_ctx: TxnContext,
_activation_handle: SharedHandleState,
args: &Vec<ResolvedNandoArgument>| {
let error_msg = "invalid argument to set_under_invalidation".to_string();
let invalidation_trigger_id = match args.get(0).expect(&error_msg) {
ResolvedNandoArgument::ControlBlock(ecb_id) => *ecb_id,
_ => panic!("missing invalidation triggering task in set_under_invalidation"),
};
for i in 1..args.len() {
let resolved_obj = extract_object_ref!(args, i, error_msg);
resolved_obj.set_under_invalidation(invalidation_trigger_id);
}
}
}
#[cfg(feature = "object-caching")]
"set_caching_permissible" => {
|_ctx: TxnContext,
_activation_handle: SharedHandleState,
args: &Vec<ResolvedNandoArgument>| {
let error_msg = "invalid argument to set_caching_permissible".to_string();
for i in 0..args.len() {
let resolved_obj = extract_object_ref!(args, i, error_msg);
resolved_obj.set_caching_permissible();
}
}
}
#[cfg(feature = "object-caching")]
"spawn_cache" => |ctx: TxnContext,
activation_handle: SharedHandleState,
args: &Vec<ResolvedNandoArgument>| {
let error_msg = "invalid argument to spawn_cache".to_string();
let log_entry = ctx.get_log_entry();
let mut log_entry = log_entry.borrow_mut();
let object_to_cache = extract_object_ref!(args, 0, error_msg);
match args.get(1).expect(&error_msg) {
ResolvedNandoArgument::Value(v) => {
let promised_version: nando_support::ObjectVersion = v.try_into().unwrap();
if promised_version != 0 {
if !object_to_cache.object_is_cacheable()
|| object_to_cache.get_version() != promised_version
{
return;
}
}
}
_ => panic!("could not get promised cache version"),
};
let requested_cache_id = match args.get(2) {
None => None,
Some(ResolvedNandoArgument::Value(v)) => Some(
v.try_into()
.expect("failed to parse cache id argument in spawn_cache"),
),
Some(_) => panic!("invalid argument type for cache id in spawn_cache"),
};
if !object_to_cache.is_cached() {
object_to_cache.flush();
}
#[cfg(debug_assertions)]
println!(
"about to spawn cache of {} with id {:?}",
object_to_cache.id, requested_cache_id
);
let object_tracker = ctx.get_object_tracker();
let cache_object_id =
match object_tracker.copy_underlying_object(object_to_cache, requested_cache_id) {
None => return,
Some(oid) => oid,
};
if requested_cache_id.is_none() {
log_entry.add_object_to_write_set(cache_object_id, 0);
}
object_to_cache.add_cache(cache_object_id);
#[cfg(debug_assertions)]
println!(
"spawned cache of {} with id {}",
object_to_cache.id, cache_object_id
);
let cached_object = object_tracker
.open(cache_object_id)
.expect("failed to open allocated object");
ctx.get_ownership_tracker().mark_owned(cache_object_id);
cached_object.reset_caches();
{
let ecb = ctx.get_ecb().unwrap();
let mut ecb = ecb.borrow_mut();
let cache_iptr = IPtr::new(cache_object_id, 0, 0);
ecb.set_result(cache_iptr.into());
}
let activation_handle = activation_handle.borrow_mut();
activation_handle.append_result(NandoResult::Ref(IPtr::new(cache_object_id, 0, 0)));
activation_handle
.append_result(NandoResult::Value(object_to_cache.get_version().into()));
},
#[cfg(feature = "object-caching")]
"update_caches" => |ctx: TxnContext,
_activation_handle: SharedHandleState,
args: &Vec<ResolvedNandoArgument>| {
let error_msg = "invalid argument to update_cache".to_string();
let log_entry = ctx.get_log_entry();
let mut log_entry = log_entry.borrow_mut();
let object_to_cache = extract_object_ref!(args, 0, error_msg);
let object_caches = object_to_cache.get_cached_versions();
let caches_exist = !object_caches.is_empty();
let object_caches: Vec<Option<u128>> = match object_caches.is_empty() {
false => object_caches.iter().map(|e| Some(*e)).collect(),
true => {
let caches_to_spawn: usize = match args.get(1).expect(&error_msg) {
ResolvedNandoArgument::Value(ref v) => v.try_into().unwrap(),
_ => panic!(),
};
println!("No existing caches to update, will spawn");
vec![None; caches_to_spawn]
}
};
let object_tracker = ctx.get_object_tracker();
if caches_exist {
let ecb = ctx.get_ecb().unwrap();
let mut ecb = ecb.borrow_mut();
let mut cache_res = Vec::with_capacity(object_caches.len());
let mut args = Vec::with_capacity(1 + object_caches.len());
args.push(NandoArgument::Ref(object_to_cache.iptr_of()));
for object_cache in &object_caches {
let object_cache_id = object_cache.unwrap();
let cache_iptr = IPtr::new(object_cache_id, 0, 0);
ctx.get_ownership_tracker().mark_owned(object_cache_id);
cache_res.push(cache_iptr);
}
args.push(cache_res.clone().into());
ecb.set_result(cache_res.into());
ecb.add_new_spawned_task(NandoActivationIntent {
host_idx: None,
name: "update_caches_internal".to_string(),
args,
});
return;
}
if !object_to_cache.is_cached() {
object_to_cache.flush();
}
let mut cache_res = Vec::with_capacity(object_caches.len());
for requested_cache_id in &object_caches {
let cache_object_id = match object_tracker
.copy_underlying_object(object_to_cache, *requested_cache_id)
{
None => return,
Some(oid) => oid,
};
object_to_cache.add_cache(cache_object_id);
if requested_cache_id.is_none() {
log_entry.add_object_to_write_set(cache_object_id, 0);
}
cache_res.push(IPtr::new(cache_object_id, 0, 0));
let cached_object = object_tracker
.open(cache_object_id)
.expect("failed to open allocated object");
cached_object.reset_caches();
ctx.get_ownership_tracker().mark_owned(cache_object_id);
}
let ecb = ctx.get_ecb().unwrap();
let mut ecb = ecb.borrow_mut();
ecb.set_result(cache_res.into());
},
#[cfg(feature = "object-caching")]
"update_caches_internal" => {
|ctx: TxnContext,
_activation_handle: SharedHandleState,
args: &Vec<ResolvedNandoArgument>| {
let error_msg = "invalid argument to update_caches_internal".to_string();
let object_to_cache = extract_object_ref!(args, 0, error_msg);
if !object_to_cache.is_cached() {
object_to_cache.flush();
}
let object_caches: Vec<&Object> = extract_object_refs!(args, 1, error_msg);
let object_tracker = ctx.get_object_tracker();
for object_cache in &object_caches {
let requested_cache_id = Some(object_cache.id);
let cache_object_id = match object_tracker
.copy_underlying_object(object_to_cache, requested_cache_id)
{
None => return,
Some(oid) => oid,
};
let cached_object = object_tracker
.open(cache_object_id)
.expect("failed to open allocated object");
cached_object.reset_caches();
ctx.get_ownership_tracker().mark_owned(cache_object_id);
}
}
}
#[cfg(feature = "object-caching")]
"spawn_cache_invalidations" => {
|ctx: TxnContext,
activation_handle: SharedHandleState,
args: &Vec<ResolvedNandoArgument>| {
let error_msg = "invalid argument to spawn_cache_invalidations".to_string();
let cached_object = extract_object_ref!(args, 0, error_msg);
let object_caches = cached_object.get_cached_versions();
if object_caches.is_empty() {
return;
}
let ecb = ctx.get_ecb().unwrap();
let mut ecb = ecb.borrow_mut();
for object_cache in &object_caches {
let args = vec![
cached_object.iptr_of().into(),
IPtr::new(*object_cache, 0, 0).into(),
cached_object.get_version().into(),
];
ecb.add_new_spawned_task(NandoActivationIntent {
host_idx: None,
name: "invalidate".to_string(),
args,
});
}
}
}
"reset_scheduler_state" => {
|_ctx: TxnContext,
_activation_handle: SharedHandleState,
_args: &Vec<ResolvedNandoArgument>| {
}
}
_ => return None,
};
Some(Arc::new(f))
}
pub fn get_nando_metadata(intent_name: &str) -> NandoMetadata {
match intent_name {
"whomstone" => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: true,
mutable_argument_indices: Some(&[0]),
invalidate_on_completion: None,
},
"whomstone_multi" => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: true,
mutable_argument_indices: None,
invalidate_on_completion: None,
},
"whomstone_and_move" => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: false,
mutable_argument_indices: Some(&[0]),
invalidate_on_completion: None,
},
"noop" => NandoMetadata {
kind: NandoKind::ReadOnly,
spawns_nandos: false,
mutable_argument_indices: None,
invalidate_on_completion: None,
},
"put" => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: false,
mutable_argument_indices: Some(&[0]),
invalidate_on_completion: None,
},
"get" => NandoMetadata {
kind: NandoKind::ReadOnly,
spawns_nandos: false,
mutable_argument_indices: Some(&[0]),
invalidate_on_completion: None,
},
"set_under_invalidation" => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: true,
mutable_argument_indices: None,
invalidate_on_completion: None,
},
"set_caching_permissible" => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: false,
mutable_argument_indices: None,
invalidate_on_completion: None,
},
"spawn_cache" => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: false,
mutable_argument_indices: Some(&[0]),
invalidate_on_completion: None,
},
"update_caches" => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: true,
mutable_argument_indices: Some(&[0]),
invalidate_on_completion: None,
},
"update_caches_internal" => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: false,
mutable_argument_indices: None,
invalidate_on_completion: None,
},
"reset_scheduler_state" => NandoMetadata {
kind: NandoKind::ReadOnly,
spawns_nandos: false,
mutable_argument_indices: None,
invalidate_on_completion: None,
},
"spawn_cache_invalidations" => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: true,
mutable_argument_indices: Some(&[0]),
invalidate_on_completion: None,
},
_ => NandoMetadata {
kind: NandoKind::WriteOnly,
spawns_nandos: true,
mutable_argument_indices: None,
invalidate_on_completion: None,
},
}
}