Skip to main content

hopr_api/chain/
channels.rs

1use std::{
2    error::Error,
3    ops::{Bound, RangeBounds},
4};
5
6use futures::{future::BoxFuture, stream::BoxStream};
7pub use hopr_internal_types::prelude::{ChannelDirection, ChannelEntry, ChannelId, ChannelStatusDiscriminants};
8use hopr_internal_types::prelude::{ChannelStatus, generate_channel_id};
9use hopr_primitive_types::prelude::Address;
10pub use hopr_primitive_types::prelude::HoprBalance;
11pub type DateTime = chrono::DateTime<chrono::Utc>;
12pub use chrono::Utc;
13use strum::IntoDiscriminant;
14
15use crate::chain::ChainReceipt;
16
17/// Selector for channels.
18///
19/// See [`ChainReadChannelOperations::stream_channels`].
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct ChannelSelector {
22    /// Filter by source address.
23    pub source: Option<Address>,
24    /// Filter by destination address
25    pub destination: Option<Address>,
26    /// Filter by possible channel states.
27    pub allowed_states: Vec<ChannelStatusDiscriminants>,
28    /// Range of closure times if `PendingToClose` was specified in `allowed_states`,
29    /// otherwise has no effect.
30    pub closure_time_range: (Bound<DateTime>, Bound<DateTime>),
31}
32
33impl Default for ChannelSelector {
34    fn default() -> Self {
35        Self {
36            source: None,
37            destination: None,
38            allowed_states: vec![],
39            closure_time_range: (Bound::Unbounded, Bound::Unbounded),
40        }
41    }
42}
43
44impl ChannelSelector {
45    /// Sets the `source` bound on channel.
46    #[must_use]
47    pub fn with_source<A: Into<Address>>(mut self, address: A) -> Self {
48        self.source = Some(address.into());
49        self
50    }
51
52    /// Sets the `destination` bound on channel.
53    #[must_use]
54    pub fn with_destination<A: Into<Address>>(mut self, address: A) -> Self {
55        self.destination = Some(address.into());
56        self
57    }
58
59    /// Sets the allowed channel states.
60    #[must_use]
61    pub fn with_allowed_states(mut self, allowed_states: &[ChannelStatusDiscriminants]) -> Self {
62        self.allowed_states.extend_from_slice(allowed_states);
63        self
64    }
65
66    /// Sets the channel closure range.
67    ///
68    /// This has an effect only if `PendingToClose` is set in the allowed states.
69    #[must_use]
70    pub fn with_closure_time_range<T: RangeBounds<DateTime>>(mut self, range: T) -> Self {
71        self.closure_time_range = (range.start_bound().cloned(), range.end_bound().cloned());
72        self
73    }
74
75    /// Checks if the given [`channel`](ChannelEntry) satisfies the selector.
76    pub fn satisfies(&self, channel: &ChannelEntry) -> bool {
77        if let Some(source) = &self.source
78            && channel.source != *source
79        {
80            return false;
81        }
82
83        if let Some(dst) = &self.destination
84            && channel.destination != *dst
85        {
86            return false;
87        }
88
89        if !self.allowed_states.is_empty() && !self.allowed_states.contains(&channel.status.discriminant()) {
90            return false;
91        }
92
93        if self
94            .allowed_states
95            .contains(&ChannelStatusDiscriminants::PendingToClose)
96            && let ChannelStatus::PendingToClose(time) = &channel.status
97        {
98            let time = DateTime::from(*time);
99            if !self.closure_time_range.contains(&time) {
100                return false;
101            }
102        }
103
104        true
105    }
106}
107
108/// On-chain read operations regarding channels.
109#[async_trait::async_trait]
110#[auto_impl::auto_impl(&, Box, Arc)]
111pub trait ChainReadChannelOperations {
112    type Error: Error + Send + Sync + 'static;
113
114    /// Returns on-chain [`Address`] of the current node.
115    fn me(&self) -> &Address;
116
117    /// Returns a single channel given `src` and `dst`.
118    async fn channel_by_parties(&self, src: &Address, dst: &Address) -> Result<Option<ChannelEntry>, Self::Error> {
119        self.channel_by_id(&generate_channel_id(src, dst)).await
120    }
121
122    /// Returns a single channel given `channel_id`.
123    async fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error>;
124
125    /// Returns a stream of channels given the [`ChannelSelector`].
126    async fn stream_channels<'a>(
127        &'a self,
128        selector: ChannelSelector,
129    ) -> Result<BoxStream<'a, ChannelEntry>, Self::Error>;
130}
131
132/// On-chain write operations regarding channels.
133#[async_trait::async_trait]
134#[auto_impl::auto_impl(&, Box, Arc)]
135pub trait ChainWriteChannelOperations {
136    type Error: Error + Send + Sync + 'static;
137    /// Opens a channel with `dst` and `amount`.
138    async fn open_channel<'a>(
139        &'a self,
140        dst: &'a Address,
141        amount: HoprBalance,
142    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error>;
143
144    /// Funds an existing channel.
145    async fn fund_channel<'a>(
146        &'a self,
147        channel_id: &'a ChannelId,
148        amount: HoprBalance,
149    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error>;
150
151    /// Closes an existing channel.
152    async fn close_channel<'a>(
153        &'a self,
154        channel_id: &'a ChannelId,
155    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error>;
156}