1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use std::sync::{
    atomic::{AtomicU8, Ordering},
    Arc,
};

use nando_support::ecb_id::EcbId;
use parking_lot::RwLock;
use tokio::sync::Notify;

#[derive(Copy, Clone, Debug)]
pub enum EpicStatus {
    Triggered = 0,
    Completed = 1,
}

impl Into<u8> for EpicStatus {
    fn into(self) -> u8 {
        match self {
            Self::Triggered => 0,
            Self::Completed => 1,
        }
    }
}

impl TryInto<EpicStatus> for u8 {
    type Error = &'static str;

    fn try_into(self) -> Result<EpicStatus, Self::Error> {
        let epic_status = match self {
            0 => EpicStatus::Triggered,
            1 => EpicStatus::Completed,
            _ => return Err("failed to convert u8 to EpicStatus, value out of range"),
        };

        Ok(epic_status)
    }
}

#[derive(Clone)]
pub struct EpicCompletionHandle {
    pub root_ecb_id: EcbId,
    epic_status: Arc<AtomicU8>,
    pub completion_timestamp: Arc<RwLock<Option<std::time::Instant>>>,

    epic_done_notify: Arc<Notify>,
}

impl EpicCompletionHandle {
    pub fn new_triggered(root_ecb_id: EcbId) -> Self {
        Self {
            root_ecb_id,
            epic_status: Arc::new(AtomicU8::new(EpicStatus::Triggered.into())),
            completion_timestamp: Arc::new(RwLock::new(None)),
            epic_done_notify: Arc::new(Notify::new()),
        }
    }

    pub fn new_completed(root_ecb_id: EcbId) -> Self {
        Self {
            root_ecb_id,
            epic_status: Arc::new(AtomicU8::new(EpicStatus::Completed.into())),
            completion_timestamp: Arc::new(RwLock::new(None)),
            epic_done_notify: Arc::new(Notify::new()),
        }
    }

    pub async fn await_completion(&self) {
        let status = self.epic_status.load(Ordering::Relaxed).try_into().unwrap();
        match status {
            EpicStatus::Triggered => self.epic_done_notify.notified().await,
            _ => {}
        }
    }

    pub fn mark_completed_and_notify(&self, completion_timestamp: Option<std::time::Instant>) {
        match completion_timestamp {
            None => {}
            Some(ct) => {
                let mut ts = self.completion_timestamp.write();
                *ts = Some(ct);
            }
        }

        self.mark_completed();
        self.notify_all();
    }

    pub fn mark_completed(&self) {
        self.epic_status
            .store(EpicStatus::Completed.into(), Ordering::Relaxed);
    }

    pub fn notify_all(&self) {
        self.epic_done_notify.notify_waiters();
        // NOTE so the below is pretty important because, at the time of writing this, the
        // semantics of `notify_waiters()` is to notify all _already registered_ tasks waiting on
        // the `Notify` (https://docs.rs/tokio/1.37.0/tokio/sync/struct.Notify.html#method.notify_waiters),
        // which means that in the case where a task races with the completion method for a given epic,
        // it will sometimes miss the notification.
        // If that behavior changes in the future we can reimplement this.
        self.epic_done_notify.notify_one();
    }

    pub async fn get_status(&self) -> EpicStatus {
        self.epic_status.load(Ordering::Relaxed).try_into().unwrap()
    }
}