hopr_transport_protocol/
timer.rs1use std::time::Duration;
2
3use futures::{
4 FutureExt,
5 future::{Either, select},
6 pin_mut,
7};
8use hopr_async_runtime::prelude::sleep;
9use hopr_platform::time::native::current_time;
10use hopr_primitive_types::prelude::AsUnixTimestamp;
11use tracing::{trace, warn};
12
13pub async fn execute_on_tick<F>(cycle: Duration, action: impl Fn() -> F, operation: String)
16where
17 F: std::future::Future<Output = ()> + Send,
18{
19 loop {
20 let start = current_time().as_unix_timestamp();
21
22 let timeout = sleep(cycle).fuse();
23 let todo = (action)().fuse();
24
25 pin_mut!(timeout, todo);
26
27 match select(timeout, todo).await {
28 Either::Left(_) => warn!(operation, "Timer tick interrupted by timeout"),
29 Either::Right(_) => {
30 trace!(operation, "Timer tick finished");
31
32 let action_duration = current_time().as_unix_timestamp().saturating_sub(start);
33 if let Some(remaining) = cycle.checked_sub(action_duration) {
34 trace!(
35 remaining_time_in_ms = remaining.as_millis(),
36 "Universal timer sleeping for",
37 );
38 sleep(remaining).await
39 }
40 }
41 };
42 }
43}