use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use nando_support::activation_intent;
use nandoize::{nandoize, PersistableDerive};
use object_lib::allocators::bump_allocator::BumpAllocator;
use object_lib::allocators::persistently_allocatable::PersistentlyAllocatable;
use object_lib::collections::pmap::PHashMap;
use object_lib::pstring::PString;
use object_lib::{IPtr, ObjectId, Persistable};
use object_tracker::ObjectTracker;
use parking_lot::RwLock;
const BUCKET_OBJECT_IDS: [ObjectId; 4] = [
29137423903577562994337864214303417648,
25148377545275998100238667688304102095,
35988441269193809587021109209775173014,
72379670391941473510834640684194650870,
];
pub async fn init_bucket_objects(object_tracker: &Arc<ObjectTracker>) -> Vec<IPtr> {
println!("[DEBUG] Initting kvs buckets");
let mut res = Vec::with_capacity(BUCKET_OBJECT_IDS.len());
for bucket_object_id in BUCKET_OBJECT_IDS {
{
let object = object_tracker
.allocate(
std::mem::size_of::<StorageBucket<ObjectId>>(),
Some(bucket_object_id),
)
.unwrap();
match object.is_initialized() {
false => {
let object_data = object.allocate::<StorageBucket<ObjectId>>().unwrap();
object.set_inner_allocator(object_data);
object_data.inner.with_capacity(100);
}
true => {
let object_data = unsafe {
object
.read_into::<StorageBucket<ObjectId>>(None)
.unwrap()
.as_mut()
}
.unwrap();
object.set_inner_allocator(object_data);
}
};
object_tracker.push_initial_version(object.id, (&*object).into());
object.bump_version().expect(&format!(
"failed to bump version of just-initialized bucket {}",
bucket_object_id
));
object.flush();
}
res.push(IPtr::new(bucket_object_id, 0, 0));
}
res
}
pub fn set_allocator_for_bucket_object(object_tracker: &Arc<ObjectTracker>, object_id: ObjectId) {
if !BUCKET_OBJECT_IDS.contains(&object_id) {
return;
}
let bucket_object = match object_tracker.get(object_id) {
Some(o) => o,
None => return,
};
let object_data = unsafe {
bucket_object
.read_into::<StorageBucket<ObjectId>>(None)
.unwrap()
.as_mut()
}
.unwrap();
object_data
.inner
.set_allocator(Arc::clone(&bucket_object.backing_allocator));
}
pub fn compute_bucket_object_id(
activation_intent: &mut activation_intent::NandoActivationIntent,
) -> ObjectId {
let key: String = match &activation_intent.args[0] {
activation_intent::NandoArgument::Value(v) => {
v.try_into().expect(&format!("failed to parse key {:?}", v))
}
_ => panic!("expected key as the first arg in {:#?}", activation_intent),
};
let key_hash = {
let mut hasher = DefaultHasher::default();
key.hash(&mut hasher);
hasher.finish() as u128
};
let bucket_object_id = {
let mut max_weight = 0;
let mut max_weight_bucket = 0;
let mult_constant = 1103515245;
for bucket_object_id in BUCKET_OBJECT_IDS {
let bucket_object_id_digest: u128 = bucket_object_id & 0xffffffff;
let weight: u128 = (mult_constant
* ((mult_constant * bucket_object_id_digest + 12345) ^ key_hash)
+ 12345)
% (2u128.pow(31) - 1);
#[cfg(debug_assertions)]
println!("weight for {} and {} is {}", key, bucket_object_id, weight);
if max_weight >= weight {
continue;
}
max_weight = weight;
max_weight_bucket = bucket_object_id;
}
max_weight_bucket
};
activation_intent.prepend_object_argument(object_lib::IPtr {
object_id: bucket_object_id,
offset: 0,
size: 0,
});
bucket_object_id
}
#[repr(C)]
#[derive(PersistableDerive)]
pub struct StorageBucket<V>
where
V: Persistable,
{
inner: PHashMap<PString, V>,
}
impl<V> StorageBucket<V>
where
V: Persistable,
{
pub fn new() -> Self {
Self {
inner: PHashMap::new(),
}
}
}
impl<V> PersistentlyAllocatable for StorageBucket<V>
where
V: Persistable,
{
fn set_allocator(&mut self, allocator: Arc<RwLock<BumpAllocator>>) {
self.inner.set_allocator(allocator);
}
fn get_allocator(&self) -> Option<Arc<RwLock<BumpAllocator>>> {
self.inner.get_allocator()
}
}
impl<V> StorageBucket<V> where V: Persistable {}
#[nandoize]
pub fn put<V: Persistable + Copy + std::fmt::Debug>(
bucket: &mut StorageBucket<V>,
key: String,
value: V,
) {
bucket.inner.insert(key, value);
}
#[nandoize]
pub fn get<V: Persistable + Copy + 'static>(bucket: &StorageBucket<V>, key: String) -> Option<V> {
bucket.inner.get(&key).copied()
}