hopr_transport_protocol/
timer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use futures::future::{select, Either};
use futures::pin_mut;
use futures::FutureExt;
use std::time::Duration;
use tracing::{trace, warn};

use hopr_async_runtime::prelude::sleep;
use hopr_platform::time::native::current_time;
use hopr_primitive_types::prelude::AsUnixTimestamp;

/// Construct an infinitely running background loop producing ticks with a given period
/// with the maximum tick duration at most the period.
pub async fn execute_on_tick<F>(cycle: Duration, action: impl Fn() -> F, operation: String)
where
    F: std::future::Future<Output = ()> + Send,
{
    loop {
        let start = current_time().as_unix_timestamp();

        let timeout = sleep(cycle).fuse();
        let todo = (action)().fuse();

        pin_mut!(timeout, todo);

        match select(timeout, todo).await {
            Either::Left(_) => warn!(operation, "Timer tick interrupted by timeout"),
            Either::Right(_) => {
                trace!(operation, "Timer tick finished");

                let action_duration = current_time().as_unix_timestamp().saturating_sub(start);
                if let Some(remaining) = cycle.checked_sub(action_duration) {
                    trace!(
                        remaining_time_in_ms = remaining.as_millis(),
                        "Universal timer sleeping for",
                    );
                    sleep(remaining).await
                }
            }
        };
    }
}