1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
use std::collections::{HashMap, HashSet};
use std::mem::MaybeUninit;
use std::sync::Once;

use async_std::sync::Arc;
use nando_support::{ObjectId, ObjectVersion};
use parking_lot::RwLock;

use crate::orchestration::HostIdx;

#[derive(Clone)]
pub struct VersionRange {
    pub host_idx: HostIdx,
    pub start_version: ObjectVersion,
    pub end_version: ObjectVersion,
}

impl VersionRange {
    fn new(host_idx: HostIdx) -> Self {
        Self {
            host_idx,
            start_version: 1,
            end_version: 0,
        }
    }
}

pub struct OwnershipRegistry {
    past_ownership: RwLock<HashMap<ObjectId, RwLock<Vec<VersionRange>>>>,
    pub current_ownership: RwLock<HashMap<ObjectId, RwLock<VersionRange>>>,
}

impl OwnershipRegistry {
    fn new() -> Self {
        Self {
            past_ownership: RwLock::new(HashMap::new()),
            current_ownership: RwLock::new(HashMap::new()),
        }
    }

    pub(crate) fn get_ownership_registry() -> &'static Arc<OwnershipRegistry> {
        static mut INSTANCE: MaybeUninit<Arc<OwnershipRegistry>> = MaybeUninit::uninit();
        static mut ONCE: Once = Once::new();

        unsafe {
            ONCE.call_once(|| {
                INSTANCE
                    .as_mut_ptr()
                    .write(Arc::new(OwnershipRegistry::new()));
            });
        }

        unsafe { &*INSTANCE.as_ptr() }
    }

    pub fn get_owning_host(&self, object_id: &ObjectId) -> Option<HostIdx> {
        let current_ownership = self.current_ownership.read();
        let Some(vr_lock) = current_ownership.get(object_id) else {
            return None;
        };

        let vr = vr_lock.read();
        Some(vr.host_idx)
    }

    pub fn get_owning_hosts(&self, object_ids: &Vec<ObjectId>) -> HashMap<HostIdx, Vec<ObjectId>> {
        let mut res = HashMap::new();
        let current_ownership = self.current_ownership.read();
        for object_id in object_ids {
            let Some(vr_lock) = current_ownership.get(object_id) else {
                // FIXME
                panic!(
                    "attempting to get owning host of unpublished object {}",
                    object_id
                );
            };

            let vr = vr_lock.read();
            res.entry(vr.host_idx)
                .and_modify(|e: &mut Vec<ObjectId>| {
                    e.push(*object_id);
                })
                .or_insert({
                    let mut e = Vec::with_capacity(object_ids.len());
                    e.push(*object_id);
                    e
                });
        }

        res
    }

    /// Returns option with host id if objects are colocated, none otherwise.
    pub fn objects_are_colocated(&self, object_ids: &Vec<ObjectId>) -> Option<HostIdx> {
        // FIXME this is not atomic over the set of keys. If we used something like flurry we would
        // get a snapshot read through the iterator, which might be more appropriate for us.
        let mut present_ownership_information: HashSet<HostIdx> =
            HashSet::from_iter(object_ids.iter().map(|oid| {
                match self.current_ownership.read().get(oid) {
                    Some(e) => (*e.read()).host_idx.clone(),
                    // FIXME we should actually return a different kind of error here. The only
                    // way we might end up here is if we got a request to
                    None => todo!("request to schedule on unpublished object {}", oid),
                }
            }));

        match present_ownership_information.len() > 1 {
            true => None,
            false => Some(
                present_ownership_information
                    .drain()
                    .next()
                    .expect("failed to get owning host for colocated objects"),
            ),
        }
    }

    pub fn handle_publish(&self, published_object_id: ObjectId, owner: HostIdx) -> bool {
        // FIXME return type
        if let Some(e) = self.current_ownership.read().get(&published_object_id) {
            if e.read().host_idx != owner {
                return false;
            }

            return true;
        }

        let publication_vr = RwLock::new(VersionRange::new(owner));
        {
            let mut current_ownership = self.current_ownership.write();
            current_ownership.insert(published_object_id, publication_vr);
        }

        true
    }

    pub fn update_ownership(
        &self,
        target_object: ObjectId,
        whomstone_version: ObjectVersion,
        old_owner: HostIdx,
        new_owner: HostIdx,
    ) {
        // FIXME change return type to `Result`
        // FIXME we should store the timestamp of the last move in the current_ownership map, so
        // that we can use it in the cost model to weigh potential objects moves.
        // NOTE we can probably avoid locking the current ownership entry twice, as longa as we are
        // a bit careful on the reader's side.
        let current_ownership_range_clone = {
            let mut current_ownership = self.current_ownership.write();
            let current_ownership_range = match current_ownership.get(&target_object) {
                None => {
                    current_ownership
                        .insert(target_object, RwLock::new(VersionRange::new(old_owner)));
                    current_ownership.get(&target_object).unwrap()
                }
                Some(r) => r,
            };
            let mut current_ownership_range = current_ownership_range.write();
            current_ownership_range.end_version = whomstone_version;

            current_ownership_range.clone()
        };

        #[cfg(debug_assertions)]
        println!(
            "updating ownership of {}, whomstone version {}",
            target_object, whomstone_version
        );

        // if only we could upgrade a read lock to a write lock
        match {
            let past_ownership = self.past_ownership.read();
            match past_ownership.get(&target_object) {
                None => None,
                Some(e) => {
                    let mut version_ranges = e.write();
                    let last_range = version_ranges.last().unwrap();
                    let start_version = last_range.end_version + 1;
                    version_ranges.push(VersionRange {
                        host_idx: old_owner,
                        start_version,
                        end_version: whomstone_version,
                    });

                    Some(())
                }
            }
        } {
            None => {
                let mut ownership_range_vec = Vec::with_capacity(32);
                ownership_range_vec.push(current_ownership_range_clone);
                let mut past_ownership = self.past_ownership.write();
                past_ownership.insert(target_object, RwLock::new(ownership_range_vec));
            }
            _ => (),
        }

        let new_version = whomstone_version + 1;

        let current_ownership = self.current_ownership.read();
        let mut current_ownership_range = current_ownership.get(&target_object).unwrap().write();
        current_ownership_range.host_idx = new_owner;
        current_ownership_range.start_version = new_version;
        current_ownership_range.end_version = 0;
    }

    pub fn reset_state(&self) {
        {
            let mut past_ownership = self.past_ownership.write();
            past_ownership.clear();
        }

        {
            let mut current_ownership = self.current_ownership.write();
            current_ownership.clear();
        }
    }
}