hopr_strategy/
promiscuous.rs

1//! ## Promiscuous Strategy
2//! This strategy opens or closes automatically channels based the following rules:
3//! - if node quality is below or equal to a threshold `network_quality_threshold` and we have a channel opened to it, the strategy will close it
4//!   - if node quality is above `network_quality_threshold` and no channel is opened yet, it will try to open channel to it (with initial stake `new_channel_stake`).
5//!     However, the channel is opened only if the following is both true:
6//!   - the total node balance does not drop below `minimum_node_balance`
7//!   - the number of channels opened by this strategy does not exceed `max_channels`
8//!
9//! Also, the candidates for opening (quality > `network_quality_threshold`), are sorted by best quality first.
10//! So that means if some nodes cannot have a channel opened to them, because we hit `minimum_node_balance` or `max_channels`,
11//! the better quality ones were taking precedence.
12//!
13//! The sorting algorithm is intentionally unstable, so that the nodes which have the same quality get random order.
14//! The constant `k` can be also set to a value > 1, which will make the strategy to open more channels for smaller networks,
15//! but it would keep the same asymptotic properties.
16//! Per default `k` = 1.
17//!
18//! The strategy starts acting only after at least `min_network_size_samples` network size samples were gathered, which means
19//! it does not start opening/closing channels earlier than `min_network_size_samples` number of minutes after the node has started.
20//!
21//! For details on default parameters see [PromiscuousStrategyConfig].
22//!
23use hopr_internal_types::prelude::*;
24use hopr_primitive_types::prelude::*;
25use std::collections::HashMap;
26use tracing::{debug, error, info, trace, warn};
27
28use crate::errors::Result;
29use crate::errors::StrategyError::CriteriaNotSatisfied;
30use crate::strategy::SingularStrategy;
31use crate::Strategy;
32use async_trait::async_trait;
33use futures::StreamExt;
34use hopr_chain_actions::channels::ChannelActions;
35use hopr_db_sql::api::peers::PeerSelector;
36use hopr_db_sql::errors::DbSqlError;
37use hopr_db_sql::HoprDbAllOperations;
38use rand::seq::SliceRandom;
39use semver::Version;
40use serde::{Deserialize, Serialize};
41use serde_with::{serde_as, DisplayFromStr};
42use std::fmt::{Debug, Display, Formatter};
43use std::str::FromStr;
44use std::time::Duration;
45
46#[cfg(all(feature = "prometheus", not(test)))]
47use hopr_metrics::metrics::{SimpleCounter, SimpleGauge};
48
49#[cfg(all(feature = "prometheus", not(test)))]
50lazy_static::lazy_static! {
51    static ref METRIC_COUNT_OPENS: SimpleCounter =
52        SimpleCounter::new("hopr_strategy_promiscuous_opened_channels_count", "Count of open channel decisions").unwrap();
53    static ref METRIC_COUNT_CLOSURES: SimpleCounter =
54        SimpleCounter::new("hopr_strategy_promiscuous_closed_channels_count", "Count of close channel decisions").unwrap();
55    static ref METRIC_MAX_AUTO_CHANNELS: SimpleGauge =
56        SimpleGauge::new("hopr_strategy_promiscuous_max_auto_channels", "Count of maximum number of channels managed by the strategy").unwrap();
57}
58
59/// A decision made by the Promiscuous strategy on each tick,
60/// represents which channels should be closed and which should be opened.
61/// Also indicates a number of maximum channels this strategy can open given the current network size.
62/// Note that the number changes as the network size changes.
63#[derive(Clone, Debug, PartialEq, Default)]
64struct ChannelDecision {
65    to_close: Vec<ChannelEntry>,
66    to_open: Vec<(Address, Balance)>,
67}
68
69impl ChannelDecision {
70    pub fn will_channel_be_closed(&self, counter_party: &Address) -> bool {
71        self.to_close.iter().any(|c| &c.destination == counter_party)
72    }
73
74    pub fn add_to_close(&mut self, entry: ChannelEntry) {
75        self.to_close.push(entry);
76    }
77
78    pub fn add_to_open(&mut self, address: Address, balance: Balance) {
79        self.to_open.push((address, balance));
80    }
81
82    pub fn get_to_close(&self) -> &Vec<ChannelEntry> {
83        &self.to_close
84    }
85
86    pub fn get_to_open(&self) -> &Vec<(Address, Balance)> {
87        &self.to_open
88    }
89}
90
91impl Display for ChannelDecision {
92    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93        write!(
94            f,
95            "channel decision: opening ({}), closing({})",
96            self.to_open.len(),
97            self.to_close.len()
98        )
99    }
100}
101
102#[inline]
103fn default_new_channel_stake() -> Balance {
104    Balance::new_from_str("10000000000000000000", BalanceType::HOPR)
105}
106
107#[inline]
108fn default_min_safe_balance() -> Balance {
109    Balance::new_from_str("1000000000000000000000", BalanceType::HOPR)
110}
111
112#[inline]
113fn default_network_quality_open_threshold() -> f64 {
114    0.9
115}
116
117#[inline]
118fn default_network_quality_close_threshold() -> f64 {
119    0.2
120}
121
122#[inline]
123fn default_minimum_pings() -> u32 {
124    50
125}
126
127#[inline]
128fn just_true() -> bool {
129    true
130}
131
132#[inline]
133fn default_initial_delay() -> Duration {
134    Duration::from_secs(5 * 60)
135}
136
137const MIN_AUTO_DETECTED_MAX_AUTO_CHANNELS: usize = 10;
138
139/// Configuration of [PromiscuousStrategy].
140#[serde_as]
141#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
142pub struct PromiscuousStrategyConfig {
143    /// A quality threshold between 0 and 1 used to determine whether the strategy should open channel with the peer.
144    ///
145    /// Default is 0.9
146    #[serde(default = "default_network_quality_open_threshold")]
147    #[default(default_network_quality_open_threshold())]
148    pub network_quality_open_threshold: f64,
149
150    /// A quality threshold between 0 and 1 used to determine whether the strategy should close channel with the peer.
151    /// If set to 0, no channels will be closed.
152    ///
153    /// Default is 0.2
154    #[serde(default = "default_network_quality_close_threshold")]
155    #[default(default_network_quality_close_threshold())]
156    pub network_quality_close_threshold: f64,
157
158    /// Number of heartbeats sent to the peer before it is considered for selection.
159    ///
160    /// Default is 50.
161    #[serde(default = "default_minimum_pings")]
162    #[default(default_minimum_pings())]
163    pub minimum_peer_pings: u32,
164
165    /// Initial delay from startup before the strategy starts taking decisions.
166    ///
167    /// Default is 5 minutes.
168    #[serde(default = "default_initial_delay")]
169    #[default(default_initial_delay())]
170    pub initial_delay: Duration,
171
172    /// A stake of tokens that should be allocated to a channel opened by the strategy.
173    ///
174    /// Default is 10 wxHOPR
175    #[serde_as(as = "DisplayFromStr")]
176    #[serde(default = "default_new_channel_stake")]
177    #[default(default_new_channel_stake())]
178    pub new_channel_stake: Balance,
179
180    /// Minimum token balance of the node's Safe.
181    /// When reached, the strategy will not open any new channels.
182    ///
183    /// Default is 1000 wxHOPR
184    #[serde_as(as = "DisplayFromStr")]
185    #[serde(default = "default_min_safe_balance")]
186    #[default(default_min_safe_balance())]
187    pub minimum_safe_balance: Balance,
188
189    /// The maximum number of opened channels the strategy should maintain.
190    ///
191    /// Defaults to square-root of the sampled network size, the minimum is 10.
192    pub max_channels: Option<usize>,
193
194    /// If set, the strategy will aggressively close channels
195    /// (even with peers above the `network_quality_close_threshold`)
196    /// if the number of opened outgoing channels (regardless if opened by the strategy or manually) exceeds the
197    /// `max_channels` limit.
198    ///
199    /// Default is true.
200    #[serde(default = "just_true")]
201    #[default(true)]
202    pub enforce_max_channels: bool,
203
204    /// Specifies a minimum version (in semver syntax) of the peer the strategy should open a channel to.
205    ///
206    /// Default is ">=2.2.1"
207    #[serde_as(as = "DisplayFromStr")]
208    #[default(">=2.2.1".parse().expect("should be valid default version"))]
209    pub minimum_peer_version: semver::VersionReq,
210}
211
212impl validator::Validate for PromiscuousStrategyConfig {
213    fn validate(&self) -> std::result::Result<(), validator::ValidationErrors> {
214        let mut errors = validator::ValidationErrors::new();
215
216        if !(0.0..=1.0).contains(&self.network_quality_open_threshold) {
217            errors.add(
218                "network_quality_open_threshold",
219                validator::ValidationError::new("must be in [0..1]"),
220            );
221        }
222
223        if !(0.0..=1.0).contains(&self.network_quality_close_threshold) {
224            errors.add(
225                "network_quality_close_threshold",
226                validator::ValidationError::new("must be in [0..1]"),
227            );
228        }
229
230        if self.network_quality_open_threshold <= self.network_quality_close_threshold {
231            errors.add(
232                "network_quality_open_threshold,network_quality_close_threshold",
233                validator::ValidationError::new(
234                    "network_quality_open_threshold must be greater than network_quality_close_threshold",
235                ),
236            );
237        }
238
239        if self.minimum_peer_pings == 0 {
240            errors.add(
241                "minimum_peer_pings",
242                validator::ValidationError::new("must be greater than 0"),
243            );
244        }
245
246        if self.new_channel_stake.is_zero() {
247            errors.add(
248                "new_channel_stake",
249                validator::ValidationError::new("must be greater than 0"),
250            );
251        }
252
253        if self.max_channels.is_some_and(|m| m == 0) {
254            errors.add(
255                "max_channels",
256                validator::ValidationError::new("must be greater than 0"),
257            );
258        }
259
260        if semver::VersionReq::parse(self.minimum_peer_version.to_string().as_str()).is_err() {
261            errors.add(
262                "minimum_peer_version",
263                validator::ValidationError::new("must be a valid semver expression"),
264            );
265        }
266
267        if errors.is_empty() {
268            Ok(())
269        } else {
270            Err(errors)
271        }
272    }
273}
274
275/// This strategy opens outgoing channels to peers, which have quality above a given threshold.
276/// At the same time, it closes outgoing channels opened to peers whose quality dropped below this threshold.
277pub struct PromiscuousStrategy<Db, A>
278where
279    Db: HoprDbAllOperations + Clone,
280    A: ChannelActions,
281{
282    db: Db,
283    hopr_chain_actions: A,
284    cfg: PromiscuousStrategyConfig,
285    started_at: std::time::Instant,
286}
287
288#[derive(Debug, Default)]
289struct NetworkStats {
290    pub peers_with_quality: HashMap<Address, (f64, u64)>,
291    pub num_online_peers: usize,
292}
293
294impl<Db, A> PromiscuousStrategy<Db, A>
295where
296    Db: HoprDbAllOperations + Clone,
297    A: ChannelActions,
298{
299    pub fn new(cfg: PromiscuousStrategyConfig, db: Db, hopr_chain_actions: A) -> Self {
300        #[cfg(all(feature = "prometheus", not(test)))]
301        {
302            lazy_static::initialize(&METRIC_MAX_AUTO_CHANNELS);
303            lazy_static::initialize(&METRIC_COUNT_CLOSURES);
304            lazy_static::initialize(&METRIC_COUNT_OPENS);
305        }
306
307        Self {
308            db,
309            hopr_chain_actions,
310            cfg,
311            started_at: std::time::Instant::now(),
312        }
313    }
314
315    async fn get_network_stats(&self) -> Result<NetworkStats> {
316        let mut num_online_peers = 0;
317        Ok(NetworkStats {
318            peers_with_quality: self
319                .db
320                .get_network_peers(PeerSelector::default(), false)
321                .await?
322                .inspect(|status| {
323                    if status.quality > 0.0 {
324                        num_online_peers += 1;
325                    } else {
326                        trace!(peer = %status.id.1, "peer is not online");
327                    }
328                })
329                .filter_map(|status| async move {
330                    // Check if peer reports any version
331                    if let Some(version) = status.peer_version.clone().and_then(|v| {
332                        semver::Version::from_str(&v)
333                            .ok() // Workaround for https://github.com/dtolnay/semver/issues/315
334                            .map(|v| Version::new(v.major, v.major, v.patch))
335                    }) {
336                        // Check if the reported version matches the version semver expression
337                        if self.cfg.minimum_peer_version.matches(&version) {
338                            // Resolve peer's chain key and average quality
339                            if let Ok(addr) = self
340                                .db
341                                .resolve_chain_key(&status.id.0)
342                                .await
343                                .and_then(|addr| addr.ok_or(DbSqlError::MissingAccount.into()))
344                            {
345                                Some((addr, (status.get_average_quality(), status.heartbeats_sent)))
346                            } else {
347                                error!(address = %status.id.1, "could not find on-chain address");
348                                None
349                            }
350                        } else {
351                            debug!(peer = %status.id.1, ?version, "version of peer does not match the expectation");
352                            None
353                        }
354                    } else {
355                        error!(peer = %status.id.1, "cannot get version");
356                        None
357                    }
358                })
359                .collect()
360                .await,
361            num_online_peers,
362        })
363    }
364
365    async fn collect_tick_decision(&self) -> Result<ChannelDecision> {
366        let mut tick_decision = ChannelDecision::default();
367        let mut new_channel_candidates: Vec<(Address, f64)> = Vec::new();
368
369        // Get all opened outgoing channels from this node
370        let our_outgoing_open_channels = self
371            .db
372            .get_outgoing_channels(None)
373            .await
374            .map_err(hopr_db_sql::api::errors::DbError::from)?
375            .into_iter()
376            .filter(|channel| channel.status == ChannelStatus::Open)
377            .collect::<Vec<_>>();
378        debug!(
379            count = our_outgoing_open_channels.len(),
380            "tracking open outgoing channels"
381        );
382
383        let network_stats = self.get_network_stats().await?;
384        debug!(?network_stats, "retrieved network stats");
385
386        // Close all channels to nodes that are not in the network peers
387        // The initial_delay should take care of prior heartbeats to take place.
388        our_outgoing_open_channels
389            .iter()
390            .filter(|channel| !network_stats.peers_with_quality.contains_key(&channel.destination))
391            .for_each(|channel| {
392                debug!(destination = %channel.destination, "destination of opened channel is not between the network peers");
393                tick_decision.add_to_close(*channel);
394            });
395
396        // Go through all the peer ids and their qualities
397        // to find out which channels should be closed and
398        // which peer ids should become candidates for a new channel
399        for (address, (quality, num_pings)) in network_stats.peers_with_quality.iter() {
400            // Get the channel we have opened with it
401            let channel_with_peer = our_outgoing_open_channels.iter().find(|c| c.destination.eq(address));
402
403            if let Some(channel) = channel_with_peer {
404                if *quality < self.cfg.network_quality_close_threshold
405                    && *num_pings >= self.cfg.minimum_peer_pings as u64
406                {
407                    // Need to close the channel because quality has dropped
408                    debug!(destination = %channel.destination, quality = %quality, threshold = self.cfg.network_quality_close_threshold,
409                        "strategy proposes to close existing channel"
410                    );
411                    tick_decision.add_to_close(*channel);
412                }
413            } else if *quality >= self.cfg.network_quality_open_threshold
414                && *num_pings >= self.cfg.minimum_peer_pings as u64
415            {
416                // Try to open a channel with this peer, because it is high-quality,
417                // and we don't yet have a channel with it
418                debug!(destination = %address, quality = %quality, threshold = self.cfg.network_quality_open_threshold,
419                    "strategy proposes to open a new channel");
420                new_channel_candidates.push((*address, *quality));
421            }
422        }
423        debug!(
424            proposed_closures = tick_decision.get_to_close().len(),
425            proposed_openings = new_channel_candidates.len(),
426            "channel decision proposal summary"
427        );
428
429        // We compute the upper bound for channels as a square-root of the perceived network size
430        let max_auto_channels = self.cfg.max_channels.unwrap_or(
431            MIN_AUTO_DETECTED_MAX_AUTO_CHANNELS.max((network_stats.num_online_peers as f64).sqrt().ceil() as usize),
432        );
433        debug!(
434            max_auto_channels,
435            "current upper bound for maximum number of auto-channels"
436        );
437
438        #[cfg(all(feature = "prometheus", not(test)))]
439        METRIC_MAX_AUTO_CHANNELS.set(max_auto_channels as f64);
440
441        // Count all the effectively opened channels (i.e., after the decisions have been made)
442        let occupied = our_outgoing_open_channels
443            .len()
444            .saturating_sub(tick_decision.get_to_close().len());
445
446        // If there are still more channels opened than we allow, close some
447        // lowest-quality ones that passed the threshold
448        if occupied > max_auto_channels && self.cfg.enforce_max_channels {
449            warn!(
450                count = occupied,
451                max_auto_channels, "the strategy allows only less occupied channels"
452            );
453
454            // Get all open channels that are not planned to be closed
455            let mut sorted_channels = our_outgoing_open_channels
456                .iter()
457                .filter(|c| !tick_decision.will_channel_be_closed(&c.destination))
458                .collect::<Vec<_>>();
459
460            // Sort by quality, lowest-quality first
461            sorted_channels.sort_unstable_by(|p1, p2| {
462                let q1 = match network_stats.peers_with_quality.get(&p1.destination) {
463                    Some((q, _)) => *q,
464                    None => {
465                        error!(channel = ?p1, "could not determine peer quality");
466                        0_f64
467                    }
468                };
469                let q2 = match network_stats.peers_with_quality.get(&p2.destination) {
470                    Some((q, _)) => *q,
471                    None => {
472                        error!(peer = %p2, "could not determine peer quality");
473                        0_f64
474                    }
475                };
476                q1.partial_cmp(&q2).expect("invalid comparison")
477            });
478
479            // Close the lowest-quality channels (those we did not mark for closing yet) to enforce the limit
480            sorted_channels
481                .into_iter()
482                .take(occupied - max_auto_channels)
483                .for_each(|channel| {
484                    debug!(destination = %channel.destination, "enforcing channel closure");
485                    tick_decision.add_to_close(*channel);
486                });
487        } else if max_auto_channels > occupied {
488            // Sort the new channel candidates by the best quality first, then truncate to the number of available slots
489            // This way, we'll prefer candidates with higher quality, when we don't have enough node balance.
490            // Shuffle first, so the equal candidates are randomized and then use unstable sorting for that purpose.
491            new_channel_candidates.shuffle(&mut hopr_crypto_random::rng());
492            new_channel_candidates
493                .sort_unstable_by(|(_, q1), (_, q2)| q1.partial_cmp(q2).expect("should be comparable").reverse());
494            new_channel_candidates.truncate(max_auto_channels - occupied);
495            debug!(count = new_channel_candidates.len(), "got new channel candidates");
496
497            let current_safe_balance = self
498                .db
499                .get_safe_hopr_balance(None)
500                .await
501                .map_err(hopr_db_sql::api::errors::DbError::from)?;
502
503            // Check if we do not surpass the minimum node's balance while opening new channels
504            let max_to_open = ((current_safe_balance - self.cfg.minimum_safe_balance).amount()
505                / self.cfg.new_channel_stake.amount())
506            .as_usize();
507            debug!(%current_safe_balance, max_to_open, num_candidates = new_channel_candidates.len(), "maximum number of channel openings with current balance");
508            new_channel_candidates
509                .into_iter()
510                .take(max_to_open)
511                .for_each(|(address, _)| tick_decision.add_to_open(address, self.cfg.new_channel_stake));
512        } else {
513            // max_channels == occupied
514            info!(
515                count = occupied,
516                "not going to allocate new channels, maximum number of effective channels is reached"
517            )
518        }
519
520        Ok(tick_decision)
521    }
522}
523
524impl<Db, A> Debug for PromiscuousStrategy<Db, A>
525where
526    Db: HoprDbAllOperations + Clone,
527    A: ChannelActions,
528{
529    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
530        write!(f, "{:?}", Strategy::Promiscuous(self.cfg.clone()))
531    }
532}
533
534impl<Db, A> Display for PromiscuousStrategy<Db, A>
535where
536    Db: HoprDbAllOperations + Clone,
537    A: ChannelActions,
538{
539    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
540        write!(f, "{}", Strategy::Promiscuous(self.cfg.clone()))
541    }
542}
543
544#[async_trait]
545impl<Db, A> SingularStrategy for PromiscuousStrategy<Db, A>
546where
547    Db: HoprDbAllOperations + Clone + Send + Sync,
548    A: ChannelActions + Send + Sync,
549{
550    async fn on_tick(&self) -> Result<()> {
551        let safe_balance = self
552            .db
553            .get_safe_hopr_balance(None)
554            .await
555            .map_err(hopr_db_sql::api::errors::DbError::from)?;
556        if safe_balance <= self.cfg.minimum_safe_balance {
557            error!(
558                "strategy cannot work with safe token balance already being less or equal than minimum node balance"
559            );
560            return Err(CriteriaNotSatisfied);
561        }
562
563        if self.started_at.elapsed() < self.cfg.initial_delay {
564            debug!("strategy is not yet ready to execute, waiting for initial delay");
565            return Err(CriteriaNotSatisfied);
566        }
567
568        let tick_decision = self.collect_tick_decision().await?;
569        debug!(%tick_decision, "collected channel decision");
570
571        for channel_to_close in tick_decision.get_to_close() {
572            match self
573                .hopr_chain_actions
574                .close_channel(channel_to_close.destination, ChannelDirection::Outgoing, false)
575                .await
576            {
577                Ok(_) => {
578                    // Intentionally do not await result of the channel transaction
579                    debug!(destination = %channel_to_close.destination, "issued channel closing");
580
581                    #[cfg(all(feature = "prometheus", not(test)))]
582                    METRIC_COUNT_CLOSURES.increment();
583                }
584                Err(e) => {
585                    error!(error = %e, "error while closing channel");
586                }
587            }
588        }
589
590        for channel_to_open in tick_decision.get_to_open() {
591            match self
592                .hopr_chain_actions
593                .open_channel(channel_to_open.0, channel_to_open.1)
594                .await
595            {
596                Ok(_) => {
597                    // Intentionally do not await result of the channel transaction
598                    debug!(destination = %channel_to_open.0, "issued channel opening");
599
600                    #[cfg(all(feature = "prometheus", not(test)))]
601                    METRIC_COUNT_OPENS.increment();
602                }
603                Err(e) => {
604                    error!(error = %e, channel = %channel_to_open.0, "error while issuing channel opening");
605                }
606            }
607        }
608
609        info!(%tick_decision, "on tick executed");
610        Ok(())
611    }
612}
613
614/// Unit tests of pure Rust code
615#[cfg(test)]
616mod tests {
617    use super::*;
618    use anyhow::Context;
619    use futures::{future::ok, FutureExt};
620    use hex_literal::hex;
621    use hopr_chain_actions::action_queue::{ActionConfirmation, PendingAction};
622    use hopr_chain_types::actions::Action;
623    use hopr_chain_types::chain_events::ChainEventType;
624    use hopr_crypto_random::random_bytes;
625    use hopr_crypto_types::prelude::*;
626    use hopr_db_sql::accounts::HoprDbAccountOperations;
627    use hopr_db_sql::api::peers::HoprDbPeersOperations;
628    use hopr_db_sql::channels::HoprDbChannelOperations;
629    use hopr_db_sql::db::HoprDb;
630    use hopr_db_sql::info::HoprDbInfoOperations;
631    use hopr_db_sql::HoprDbGeneralModelOperations;
632    use hopr_transport_network::{network::PeerOrigin, PeerId};
633    use lazy_static::lazy_static;
634    use mockall::mock;
635
636    lazy_static! {
637        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!(
638            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
639        ))
640        .expect("lazy static keypair should be valid");
641        static ref PEERS: [(Address, PeerId); 10] = [
642            (
643                ALICE.public().to_address().into(),
644                hex!("e03640d3184c8aa6f9d4ccd533281c51974a170c0c4d0fe1da9296a081ab1fd9")
645            ),
646            (
647                hex!("5f98dc63889681eb4306f0e3b5ee2e04b13af7c8"),
648                hex!("82a3cec1660697d8f3eb798f82ae281fc885c3e5370ef700c95c17397846c1e7")
649            ),
650            (
651                hex!("6e0bed94a8d2da952ad4468ff81157b6137a5566"),
652                hex!("2b93fcca9db2c5c12d1add5c07dd81d20c68eb713e99aa5c488210179c7505e3")
653            ),
654            (
655                hex!("8275b9ce8a3d2fe14029111f85b72ab05aa0f5d3"),
656                hex!("5cfd16dc160fd43396bfaff06e7c2e62cd087317671c159ce7cbc31c34fc32b6")
657            ),
658            (
659                hex!("3231673fd10c9ebeb9330745f1709c91db9cf40f"),
660                hex!("7f5b421cc58cf8449f5565756697261723fb96bba5f0aa2ba83c4973e0e994bf")
661            ),
662            (
663                hex!("585f4ca77b07ac7a3bf37de3069b641ba97bf76f"),
664                hex!("848af931ce57f54fbf96d7250eda8b0f36e3d1988ec8048c892e8d8ff0798f2f")
665            ),
666            (
667                hex!("ba413645edb6ddbd46d5911466264b119087dfea"),
668                hex!("d79258fc521dba8ded208066fe98fd8a857cf2e8f42f1b71c8f6e29b8f47e406")
669            ),
670            (
671                hex!("9ea8c0f3766022f84c41abd524c942971bd22d23"),
672                hex!("cd7a06caebcb90f95690c72472127cae8732b415440a1783c6ff9f9cb0bacf1e")
673            ),
674            (
675                hex!("9790b6cf8afe6a7d80102570fac18a322e26ef83"),
676                hex!("2dc3ff226be59333127ebfd3c79517eac8f81e0333abaa45189aae309880e55a")
677            ),
678            (
679                hex!("f6ab491cd4e2eccbe60a7f87aeaacfc408dabde8"),
680                hex!("5826ed44f52b3a26c472621812165bb2d3e60a9929e06db8b8df4e4d23068eba")
681            ),
682        ]
683        .map(|(addr, privkey)| (
684            addr.into(),
685            OffchainKeypair::from_secret(&privkey)
686                .expect("lazy static keypair should be valid")
687                .public()
688                .into()
689        ));
690    }
691
692    mock! {
693        ChannelAct { }
694        #[async_trait]
695        impl ChannelActions for ChannelAct {
696            async fn open_channel(&self, destination: Address, amount: Balance) -> hopr_chain_actions::errors::Result<PendingAction>;
697            async fn fund_channel(&self, channel_id: Hash, amount: Balance) -> hopr_chain_actions::errors::Result<PendingAction>;
698            async fn close_channel(
699                &self,
700                counterparty: Address,
701                direction: ChannelDirection,
702                redeem_before_close: bool,
703            ) -> hopr_chain_actions::errors::Result<PendingAction>;
704        }
705    }
706
707    async fn mock_channel(db: HoprDb, dst: Address, balance: Balance) -> anyhow::Result<ChannelEntry> {
708        let channel = ChannelEntry::new(
709            PEERS[0].0,
710            dst,
711            balance,
712            U256::zero(),
713            ChannelStatus::Open,
714            U256::zero(),
715        );
716        db.upsert_channel(None, channel).await?;
717
718        Ok(channel)
719    }
720
721    async fn prepare_network(db: HoprDb, qualities: Vec<f64>) -> anyhow::Result<()> {
722        assert_eq!(qualities.len(), PEERS.len() - 1, "invalid network setup");
723
724        for (i, quality) in qualities.into_iter().enumerate() {
725            let peer = &PEERS[i + 1].1;
726
727            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 10)
728                .await?;
729
730            let mut status = db.get_network_peer(peer).await?.expect("should be present");
731            status.peer_version = Some("2.2.0".into());
732            status.heartbeats_sent = 200;
733            while status.get_average_quality() < quality {
734                status.update_quality(quality);
735            }
736            db.update_network_peer(status).await?;
737        }
738
739        Ok(())
740    }
741
742    async fn init_db(db: HoprDb, node_balance: Balance) -> anyhow::Result<()> {
743        db.begin_transaction()
744            .await?
745            .perform(|tx| {
746                Box::pin(async move {
747                    db.set_safe_hopr_balance(Some(tx), node_balance).await?;
748                    db.set_safe_hopr_allowance(Some(tx), node_balance).await?;
749                    for (chain_key, peer_id) in PEERS.iter() {
750                        db.insert_account(
751                            Some(tx),
752                            AccountEntry::new(
753                                OffchainPublicKey::try_from(*peer_id).expect("should be valid PeerId"),
754                                *chain_key,
755                                AccountType::NotAnnounced,
756                            ),
757                        )
758                        .await?;
759                    }
760                    Ok::<_, DbSqlError>(())
761                })
762            })
763            .await?;
764
765        Ok(())
766    }
767
768    fn mock_action_confirmation_closure(channel: ChannelEntry) -> ActionConfirmation {
769        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
770        ActionConfirmation {
771            tx_hash: random_hash,
772            event: Some(ChainEventType::ChannelClosureInitiated(channel)),
773            action: Action::CloseChannel(channel, ChannelDirection::Outgoing),
774        }
775    }
776
777    fn mock_action_confirmation_opening(address: Address, balance: Balance) -> ActionConfirmation {
778        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
779        ActionConfirmation {
780            tx_hash: random_hash,
781            event: Some(ChainEventType::ChannelOpened(ChannelEntry::new(
782                PEERS[0].0,
783                address,
784                balance,
785                U256::zero(),
786                ChannelStatus::Open,
787                U256::zero(),
788            ))),
789            action: Action::OpenChannel(address, balance),
790        }
791    }
792
793    #[test]
794    fn test_semver() -> anyhow::Result<()> {
795        // See https://github.com/dtolnay/semver/issues/315
796        let ver: semver::Version = "2.1.0-rc.3+commit.f75bc6c8".parse()?;
797        let stripped = semver::Version::new(ver.major, ver.minor, ver.patch);
798        let req = semver::VersionReq::from_str(">=2.0.0")?;
799
800        assert!(req.matches(&stripped), "constraint must match");
801
802        Ok(())
803    }
804
805    #[test_log::test(async_std::test)]
806    async fn test_promiscuous_strategy_tick_decisions() -> anyhow::Result<()> {
807        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
808
809        let qualities_that_alice_sees = vec![0.7, 0.9, 0.8, 0.98, 0.1, 0.3, 0.1, 0.2, 1.0];
810
811        init_db(db.clone(), BalanceType::HOPR.balance(1000)).await?;
812        prepare_network(db.clone(), qualities_that_alice_sees).await?;
813
814        mock_channel(db.clone(), PEERS[1].0, BalanceType::HOPR.balance(10)).await?;
815        mock_channel(db.clone(), PEERS[2].0, BalanceType::HOPR.balance(10)).await?;
816        let for_closing = mock_channel(db.clone(), PEERS[5].0, BalanceType::HOPR.balance(10)).await?;
817
818        // Peer 3 has an accepted pre-release version
819        let mut status_3 = db
820            .get_network_peer(&PEERS[3].1)
821            .await?
822            .context("peer should be present")?;
823        status_3.peer_version = Some("2.1.0-rc.3+commit.f75bc6c8".into());
824        db.update_network_peer(status_3).await?;
825
826        // Peer 10 has an old node version
827        let mut status_10 = db
828            .get_network_peer(&PEERS[9].1)
829            .await?
830            .context("peer should be present")?;
831        status_10.peer_version = Some("1.92.0".into());
832        db.update_network_peer(status_10).await?;
833
834        let strat_cfg = PromiscuousStrategyConfig {
835            max_channels: Some(3),
836            network_quality_open_threshold: 0.5,
837            network_quality_close_threshold: 0.3,
838            new_channel_stake: BalanceType::HOPR.balance(10),
839            minimum_safe_balance: BalanceType::HOPR.balance(50),
840            minimum_peer_version: ">=2.2.0".parse()?,
841            initial_delay: Duration::ZERO,
842            ..Default::default()
843        };
844
845        /*
846            Situation:
847            - There are max 3 channels and also 3 are currently opened.
848            - Strategy will close channel to peer 5, because it has quality 0.1
849            - Because of the closure, this means there can be 1 additional channel opened:
850                - Strategy can open channel either to peer 3, 4 or 10 (with qualities 0.8, 0.98 and 1.0 respectively)
851                - It will ignore peer 10 even though it has the highest quality, but does not meet minimum node version
852                - It will prefer peer 4 because it has higher quality than node 3
853        */
854
855        let mut actions = MockChannelAct::new();
856        actions
857            .expect_close_channel()
858            .times(1)
859            .withf(|dst, dir, _| PEERS[5].0.eq(dst) && ChannelDirection::Outgoing.eq(dir))
860            .return_once(move |_, _, _| Ok(ok(mock_action_confirmation_closure(for_closing)).boxed()));
861
862        let new_stake = strat_cfg.new_channel_stake;
863        actions
864            .expect_open_channel()
865            .times(1)
866            .withf(move |dst, b| PEERS[4].0.eq(dst) && new_stake.eq(b))
867            .return_once(move |_, _| Ok(ok(mock_action_confirmation_opening(PEERS[4].0, new_stake)).boxed()));
868
869        let strat = PromiscuousStrategy::new(strat_cfg.clone(), db, actions);
870
871        async_std::task::sleep(Duration::from_millis(100)).await;
872
873        strat.on_tick().await?;
874
875        Ok(())
876    }
877}