1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use std::sync::Arc;

use async_lib::AsyncRuntimeManager;
use fast_rsync::Signature;
#[cfg(feature = "observability")]
use lazy_static::lazy_static;
use object_lib::ObjectId;
use object_tracker::ObjectTracker;
#[cfg(feature = "observability")]
use prometheus::{register_histogram_vec, HistogramVec};

use crate::net::data_exchange_client::DataExchangeClient;
use crate::object_move_handle::{ObjectMoveHandle, ObjectMoveStatus};
use crate::rsync_lib;
use crate::MoveHandleMap;

#[cfg(feature = "observability")]
lazy_static! {
    static ref OWNERSHIP_SIGNATURE_DIFF_HISTOGRAM: HistogramVec = register_histogram_vec!(
        "location_manager_ownership_signature_diff_milliseconds",
        "Location Manager signature diff latencies in milliseconds",
        &[],
        vec![0.1, 1.0, 10.0, 100.0],
    )
    .unwrap();
}

pub fn get_object_bytes(
    object_id: ObjectId,
    object_tracker: Arc<ObjectTracker>,
) -> Option<Vec<u8>> {
    match { object_tracker.get_as_bytes(object_id) } {
        Ok(ob) => Some(ob.to_vec()),
        Err(_) => None,
    }
}

pub async fn trigger_object_move(
    object_id: ObjectId,
    move_handles: Arc<MoveHandleMap>,
    target_host_id: String,
    foreign_signature: &Signature,
    object_bytes: &[u8],
    data_exchange_client: Arc<DataExchangeClient>,
    async_rt_manager: Arc<AsyncRuntimeManager>,
) -> () {
    {
        let move_handle = match move_handles.get(&object_id) {
            Some(mh) => Some(mh.clone()),
            None => {
                let move_handle =
                    ObjectMoveHandle::new(object_id, Some(ObjectMoveStatus::InProgress));
                move_handles.insert(object_id, move_handle);
                None
            }
        };

        if let Some(move_handle) = move_handle {
            move_handle.set_status_in_progress().await;
        }
    }

    let mut out = vec![];
    #[cfg(feature = "observability")]
    let signature_diff_start = tokio::time::Instant::now();
    match rsync_lib::calculate_diff(&foreign_signature, &object_bytes, &mut out) {
        Ok(()) => {
            #[cfg(feature = "observability")]
            {
                let signature_diff_duration = signature_diff_start.elapsed();
                OWNERSHIP_SIGNATURE_DIFF_HISTOGRAM
                    .with_label_values(&[])
                    .observe(signature_diff_duration.as_micros() as f64 / 1000.0);
            }
            // asynchronously invoke `apply` on target host
            let move_handles = Arc::clone(&move_handles);
            async_rt_manager.spawn(async move {
                let result = data_exchange_client
                    .apply_delta(&out, object_id, target_host_id.to_string())
                    .await;

                let move_handle = move_handles.get(&object_id).unwrap().clone();
                match result {
                    Ok(_) => {
                        move_handle.mark_move_done().await;
                    }
                    Err(e) => {
                        eprintln!(
                            "Failed to apply delta on remote host {} for object {}: {}",
                            target_host_id, object_id, e
                        );

                        move_handle.mark_move_denied().await;
                    }
                }
            });
        }
        Err(_e) => {
            eprintln!("Could not calculate diff for object {}", object_id);
            panic!("Could not calculate diff for object");
        }
    }
}