hopr_api/chain/
channels.rs

1use std::{
2    error::Error,
3    ops::{Bound, RangeBounds},
4};
5
6use futures::{future::BoxFuture, stream::BoxStream};
7pub use hopr_internal_types::prelude::{ChannelDirection, ChannelEntry, ChannelId, ChannelStatusDiscriminants};
8use hopr_internal_types::prelude::{ChannelStatus, generate_channel_id};
9use hopr_primitive_types::prelude::Address;
10pub use hopr_primitive_types::prelude::HoprBalance;
11pub type DateTime = chrono::DateTime<chrono::Utc>;
12pub use chrono::Utc;
13use strum::IntoDiscriminant;
14
15use crate::chain::ChainReceipt;
16
17/// Selector for channels.
18///
19/// See [`ChainReadChannelOperations::stream_channels`].
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct ChannelSelector {
22    /// Filter by source address.
23    pub source: Option<Address>,
24    /// Filter by destination address
25    pub destination: Option<Address>,
26    /// Filter by possible channel states.
27    pub allowed_states: Vec<ChannelStatusDiscriminants>,
28    /// Range of closure times if `PendingToClose` was specified in `allowed_states`,
29    /// otherwise has no effect.
30    pub closure_time_range: (Bound<DateTime>, Bound<DateTime>),
31}
32
33impl Default for ChannelSelector {
34    fn default() -> Self {
35        Self {
36            source: None,
37            destination: None,
38            allowed_states: vec![],
39            closure_time_range: (Bound::Unbounded, Bound::Unbounded),
40        }
41    }
42}
43
44impl ChannelSelector {
45    /// Sets the `source` bound on channel.
46    #[must_use]
47    pub fn with_source<A: Into<Address>>(mut self, address: A) -> Self {
48        self.source = Some(address.into());
49        self
50    }
51
52    /// Sets the `destination` bound on channel.
53    #[must_use]
54    pub fn with_destination<A: Into<Address>>(mut self, address: A) -> Self {
55        self.destination = Some(address.into());
56        self
57    }
58
59    /// Sets the allowed channel states.
60    #[must_use]
61    pub fn with_allowed_states(mut self, allowed_states: &[ChannelStatusDiscriminants]) -> Self {
62        self.allowed_states.extend_from_slice(allowed_states);
63        self
64    }
65
66    /// Sets the channel closure range.
67    ///
68    /// This has an effect only if `PendingToClose` is set in the allowed states.
69    #[must_use]
70    pub fn with_closure_time_range<T: RangeBounds<DateTime>>(mut self, range: T) -> Self {
71        self.closure_time_range = (range.start_bound().cloned(), range.end_bound().cloned());
72        self
73    }
74
75    /// Checks if the given [`channel`](ChannelEntry) satisfies the selector.
76    pub fn satisfies(&self, channel: &ChannelEntry) -> bool {
77        if let Some(source) = &self.source {
78            if channel.source != *source {
79                return false;
80            }
81        }
82
83        if let Some(dst) = &self.destination {
84            if channel.destination != *dst {
85                return false;
86            }
87        }
88
89        if !self.allowed_states.is_empty() && !self.allowed_states.contains(&channel.status.discriminant()) {
90            return false;
91        }
92
93        if self
94            .allowed_states
95            .contains(&ChannelStatusDiscriminants::PendingToClose)
96        {
97            if let ChannelStatus::PendingToClose(time) = &channel.status {
98                let time = DateTime::from(*time);
99                if !self.closure_time_range.contains(&time) {
100                    return false;
101                }
102            }
103        }
104
105        true
106    }
107}
108
109/// On-chain read operations regarding channels.
110#[async_trait::async_trait]
111#[auto_impl::auto_impl(&, Box, Arc)]
112pub trait ChainReadChannelOperations {
113    type Error: Error + Send + Sync + 'static;
114
115    /// Returns on-chain [`Address`] of the current node.
116    fn me(&self) -> &Address;
117
118    /// Returns a single channel given `src` and `dst`.
119    async fn channel_by_parties(&self, src: &Address, dst: &Address) -> Result<Option<ChannelEntry>, Self::Error> {
120        self.channel_by_id(&generate_channel_id(src, dst)).await
121    }
122
123    /// Returns a single channel given `channel_id`.
124    async fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error>;
125
126    /// Returns a stream of channels given the [`ChannelSelector`].
127    async fn stream_channels<'a>(
128        &'a self,
129        selector: ChannelSelector,
130    ) -> Result<BoxStream<'a, ChannelEntry>, Self::Error>;
131}
132
133/// On-chain write operations regarding channels.
134#[async_trait::async_trait]
135#[auto_impl::auto_impl(&, Box, Arc)]
136pub trait ChainWriteChannelOperations {
137    type Error: Error + Send + Sync + 'static;
138    /// Opens a channel with `dst` and `amount`.
139    async fn open_channel<'a>(
140        &'a self,
141        dst: &'a Address,
142        amount: HoprBalance,
143    ) -> Result<BoxFuture<'a, Result<(ChannelId, ChainReceipt), Self::Error>>, Self::Error>;
144
145    /// Funds an existing channel.
146    async fn fund_channel<'a>(
147        &'a self,
148        channel_id: &'a ChannelId,
149        amount: HoprBalance,
150    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error>;
151
152    /// Closes an existing channel.
153    async fn close_channel<'a>(
154        &'a self,
155        channel_id: &'a ChannelId,
156    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error>;
157}