hopr_strategy/
promiscuous.rs

1//! ## Promiscuous Strategy
2//! This strategy opens or closes automatically channels based the following rules:
3//! - if node quality is below or equal to a threshold `network_quality_threshold` and we have a channel opened to it,
4//!   the strategy will close it
5//!   - if node quality is above `network_quality_threshold` and no channel is opened yet, it will try to open channel
6//!     to it (with initial stake `new_channel_stake`). However, the channel is opened only if the following is both
7//!     true:
8//!   - the total node balance does not drop below `minimum_node_balance`
9//!   - the number of channels opened by this strategy does not exceed `max_channels`
10//!
11//! Also, the candidates for opening (quality > `network_quality_threshold`), are sorted by best quality first.
12//! So that means if some nodes cannot have a channel opened to them, because we hit `minimum_node_balance` or
13//! `max_channels`, the better quality ones were taking precedence.
14//!
15//! The sorting algorithm is intentionally unstable, so that the nodes which have the same quality get random order.
16//! The constant `k` can be also set to a value > 1, which will make the strategy to open more channels for smaller
17//! networks, but it would keep the same asymptotic properties.
18//! Per default `k` = 1.
19//!
20//! The strategy starts acting only after at least `min_network_size_samples` network size samples were gathered, which
21//! means it does not start opening/closing channels earlier than `min_network_size_samples` number of minutes after the
22//! node has started.
23//!
24//! For details on default parameters see [PromiscuousStrategyConfig].
25use std::{
26    collections::HashMap,
27    fmt::{Debug, Display, Formatter},
28    str::FromStr,
29    time::Duration,
30};
31
32use async_trait::async_trait;
33use futures::StreamExt;
34use hopr_chain_actions::channels::ChannelActions;
35use hopr_db_sql::{HoprDbAllOperations, api::peers::PeerSelector, errors::DbSqlError};
36use hopr_internal_types::prelude::*;
37#[cfg(all(feature = "prometheus", not(test)))]
38use hopr_metrics::metrics::{SimpleCounter, SimpleGauge};
39use hopr_primitive_types::prelude::*;
40use rand::seq::SliceRandom;
41use semver::Version;
42use serde::{Deserialize, Serialize};
43use serde_with::{DisplayFromStr, serde_as};
44use tracing::{debug, error, info, trace, warn};
45
46use crate::{
47    Strategy,
48    errors::{Result, StrategyError::CriteriaNotSatisfied},
49    strategy::SingularStrategy,
50};
51
52#[cfg(all(feature = "prometheus", not(test)))]
53lazy_static::lazy_static! {
54    static ref METRIC_COUNT_OPENS: SimpleCounter =
55        SimpleCounter::new("hopr_strategy_promiscuous_opened_channels_count", "Count of open channel decisions").unwrap();
56    static ref METRIC_COUNT_CLOSURES: SimpleCounter =
57        SimpleCounter::new("hopr_strategy_promiscuous_closed_channels_count", "Count of close channel decisions").unwrap();
58    static ref METRIC_MAX_AUTO_CHANNELS: SimpleGauge =
59        SimpleGauge::new("hopr_strategy_promiscuous_max_auto_channels", "Count of maximum number of channels managed by the strategy").unwrap();
60}
61
62/// A decision made by the Promiscuous strategy on each tick,
63/// represents which channels should be closed and which should be opened.
64/// Also indicates a number of maximum channels this strategy can open given the current network size.
65/// Note that the number changes as the network size changes.
66#[derive(Clone, Debug, PartialEq, Default)]
67struct ChannelDecision {
68    to_close: Vec<ChannelEntry>,
69    to_open: Vec<(Address, HoprBalance)>,
70}
71
72impl ChannelDecision {
73    pub fn will_channel_be_closed(&self, counter_party: &Address) -> bool {
74        self.to_close.iter().any(|c| &c.destination == counter_party)
75    }
76
77    pub fn add_to_close(&mut self, entry: ChannelEntry) {
78        self.to_close.push(entry);
79    }
80
81    pub fn add_to_open(&mut self, address: Address, balance: HoprBalance) {
82        self.to_open.push((address, balance));
83    }
84
85    pub fn get_to_close(&self) -> &Vec<ChannelEntry> {
86        &self.to_close
87    }
88
89    pub fn get_to_open(&self) -> &Vec<(Address, HoprBalance)> {
90        &self.to_open
91    }
92}
93
94impl Display for ChannelDecision {
95    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96        write!(
97            f,
98            "channel decision: opening ({}), closing({})",
99            self.to_open.len(),
100            self.to_close.len()
101        )
102    }
103}
104
105#[inline]
106fn default_new_channel_stake() -> HoprBalance {
107    HoprBalance::new_base(10)
108}
109
110#[inline]
111fn default_min_safe_balance() -> HoprBalance {
112    HoprBalance::new_base(1000)
113}
114
115#[inline]
116fn default_network_quality_open_threshold() -> f64 {
117    0.9
118}
119
120#[inline]
121fn default_network_quality_close_threshold() -> f64 {
122    0.2
123}
124
125#[inline]
126fn default_minimum_pings() -> u32 {
127    50
128}
129
130#[inline]
131fn just_true() -> bool {
132    true
133}
134
135#[inline]
136fn default_initial_delay() -> Duration {
137    Duration::from_secs(5 * 60)
138}
139
140const MIN_AUTO_DETECTED_MAX_AUTO_CHANNELS: usize = 10;
141
142/// Configuration of [PromiscuousStrategy].
143#[serde_as]
144#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
145pub struct PromiscuousStrategyConfig {
146    /// A quality threshold between 0 and 1 used to determine whether the strategy should open channel with the peer.
147    ///
148    /// Default is 0.9
149    #[serde(default = "default_network_quality_open_threshold")]
150    #[default(default_network_quality_open_threshold())]
151    pub network_quality_open_threshold: f64,
152
153    /// A quality threshold between 0 and 1 used to determine whether the strategy should close channel with the peer.
154    /// If set to 0, no channels will be closed.
155    ///
156    /// Default is 0.2
157    #[serde(default = "default_network_quality_close_threshold")]
158    #[default(default_network_quality_close_threshold())]
159    pub network_quality_close_threshold: f64,
160
161    /// Number of heartbeats sent to the peer before it is considered for selection.
162    ///
163    /// Default is 50.
164    #[serde(default = "default_minimum_pings")]
165    #[default(default_minimum_pings())]
166    pub minimum_peer_pings: u32,
167
168    /// Initial delay from startup before the strategy starts taking decisions.
169    ///
170    /// Default is 5 minutes.
171    #[serde(default = "default_initial_delay")]
172    #[default(default_initial_delay())]
173    pub initial_delay: Duration,
174
175    /// A stake of tokens that should be allocated to a channel opened by the strategy.
176    ///
177    /// Default is 10 wxHOPR
178    #[serde_as(as = "DisplayFromStr")]
179    #[serde(default = "default_new_channel_stake")]
180    #[default(default_new_channel_stake())]
181    pub new_channel_stake: HoprBalance,
182
183    /// Minimum token balance of the node's Safe.
184    /// When reached, the strategy will not open any new channels.
185    ///
186    /// Default is 1000 wxHOPR
187    #[serde_as(as = "DisplayFromStr")]
188    #[serde(default = "default_min_safe_balance")]
189    #[default(default_min_safe_balance())]
190    pub minimum_safe_balance: HoprBalance,
191
192    /// The maximum number of opened channels the strategy should maintain.
193    ///
194    /// Defaults to square-root of the sampled network size, the minimum is 10.
195    pub max_channels: Option<usize>,
196
197    /// If set, the strategy will aggressively close channels
198    /// (even with peers above the `network_quality_close_threshold`)
199    /// if the number of opened outgoing channels (regardless if opened by the strategy or manually) exceeds the
200    /// `max_channels` limit.
201    ///
202    /// Default is true.
203    #[serde(default = "just_true")]
204    #[default(true)]
205    pub enforce_max_channels: bool,
206
207    /// Specifies a minimum version (in semver syntax) of the peer the strategy should open a channel to.
208    ///
209    /// Default is ">=2.2.1"
210    #[serde_as(as = "DisplayFromStr")]
211    #[default(">=2.2.1".parse().expect("should be valid default version"))]
212    pub minimum_peer_version: semver::VersionReq,
213}
214
215impl validator::Validate for PromiscuousStrategyConfig {
216    fn validate(&self) -> std::result::Result<(), validator::ValidationErrors> {
217        let mut errors = validator::ValidationErrors::new();
218
219        if !(0.0..=1.0).contains(&self.network_quality_open_threshold) {
220            errors.add(
221                "network_quality_open_threshold",
222                validator::ValidationError::new("must be in [0..1]"),
223            );
224        }
225
226        if !(0.0..=1.0).contains(&self.network_quality_close_threshold) {
227            errors.add(
228                "network_quality_close_threshold",
229                validator::ValidationError::new("must be in [0..1]"),
230            );
231        }
232
233        if self.network_quality_open_threshold <= self.network_quality_close_threshold {
234            errors.add(
235                "network_quality_open_threshold,network_quality_close_threshold",
236                validator::ValidationError::new(
237                    "network_quality_open_threshold must be greater than network_quality_close_threshold",
238                ),
239            );
240        }
241
242        if self.minimum_peer_pings == 0 {
243            errors.add(
244                "minimum_peer_pings",
245                validator::ValidationError::new("must be greater than 0"),
246            );
247        }
248
249        if self.new_channel_stake.is_zero() {
250            errors.add(
251                "new_channel_stake",
252                validator::ValidationError::new("must be greater than 0"),
253            );
254        }
255
256        if self.max_channels.is_some_and(|m| m == 0) {
257            errors.add(
258                "max_channels",
259                validator::ValidationError::new("must be greater than 0"),
260            );
261        }
262
263        if semver::VersionReq::parse(self.minimum_peer_version.to_string().as_str()).is_err() {
264            errors.add(
265                "minimum_peer_version",
266                validator::ValidationError::new("must be a valid semver expression"),
267            );
268        }
269
270        if errors.is_empty() { Ok(()) } else { Err(errors) }
271    }
272}
273
274/// This strategy opens outgoing channels to peers, which have quality above a given threshold.
275/// At the same time, it closes outgoing channels opened to peers whose quality dropped below this threshold.
276pub struct PromiscuousStrategy<Db, A>
277where
278    Db: HoprDbAllOperations + Clone,
279    A: ChannelActions,
280{
281    db: Db,
282    hopr_chain_actions: A,
283    cfg: PromiscuousStrategyConfig,
284    started_at: std::time::Instant,
285}
286
287#[derive(Debug, Default)]
288struct NetworkStats {
289    pub peers_with_quality: HashMap<Address, (f64, u64)>,
290    pub num_online_peers: usize,
291}
292
293impl<Db, A> PromiscuousStrategy<Db, A>
294where
295    Db: HoprDbAllOperations + Clone,
296    A: ChannelActions,
297{
298    pub fn new(cfg: PromiscuousStrategyConfig, db: Db, hopr_chain_actions: A) -> Self {
299        #[cfg(all(feature = "prometheus", not(test)))]
300        {
301            lazy_static::initialize(&METRIC_MAX_AUTO_CHANNELS);
302            lazy_static::initialize(&METRIC_COUNT_CLOSURES);
303            lazy_static::initialize(&METRIC_COUNT_OPENS);
304        }
305
306        Self {
307            db,
308            hopr_chain_actions,
309            cfg,
310            started_at: std::time::Instant::now(),
311        }
312    }
313
314    async fn get_network_stats(&self) -> Result<NetworkStats> {
315        let mut num_online_peers = 0;
316        Ok(NetworkStats {
317            peers_with_quality: self
318                .db
319                .get_network_peers(PeerSelector::default(), false)
320                .await?
321                .inspect(|status| {
322                    if status.quality > 0.0 {
323                        num_online_peers += 1;
324                    } else {
325                        trace!(peer = %status.id.1, "peer is not online");
326                    }
327                })
328                .filter_map(|status| async move {
329                    // Check if peer reports any version
330                    if let Some(version) = status.peer_version.clone().and_then(|v| {
331                        semver::Version::from_str(&v)
332                            .ok() // Workaround for https://github.com/dtolnay/semver/issues/315
333                            .map(|v| Version::new(v.major, v.major, v.patch))
334                    }) {
335                        // Check if the reported version matches the version semver expression
336                        if self.cfg.minimum_peer_version.matches(&version) {
337                            // Resolve peer's chain key and average quality
338                            if let Ok(addr) = self
339                                .db
340                                .resolve_chain_key(&status.id.0)
341                                .await
342                                .and_then(|addr| addr.ok_or(DbSqlError::MissingAccount.into()))
343                            {
344                                Some((addr, (status.get_average_quality(), status.heartbeats_sent)))
345                            } else {
346                                error!(address = %status.id.1, "could not find on-chain address");
347                                None
348                            }
349                        } else {
350                            debug!(peer = %status.id.1, ?version, "version of peer does not match the expectation");
351                            None
352                        }
353                    } else {
354                        error!(peer = %status.id.1, "cannot get version");
355                        None
356                    }
357                })
358                .collect()
359                .await,
360            num_online_peers,
361        })
362    }
363
364    async fn collect_tick_decision(&self) -> Result<ChannelDecision> {
365        let mut tick_decision = ChannelDecision::default();
366        let mut new_channel_candidates: Vec<(Address, f64)> = Vec::new();
367
368        // Get all opened outgoing channels from this node
369        let our_outgoing_open_channels = self
370            .db
371            .get_outgoing_channels(None)
372            .await
373            .map_err(hopr_db_sql::api::errors::DbError::from)?
374            .into_iter()
375            .filter(|channel| channel.status == ChannelStatus::Open)
376            .collect::<Vec<_>>();
377        debug!(
378            count = our_outgoing_open_channels.len(),
379            "tracking open outgoing channels"
380        );
381
382        let network_stats = self.get_network_stats().await?;
383        debug!(?network_stats, "retrieved network stats");
384
385        // Close all channels to nodes that are not in the network peers
386        // The initial_delay should take care of prior heartbeats to take place.
387        our_outgoing_open_channels
388            .iter()
389            .filter(|channel| !network_stats.peers_with_quality.contains_key(&channel.destination))
390            .for_each(|channel| {
391                debug!(destination = %channel.destination, "destination of opened channel is not between the network peers");
392                tick_decision.add_to_close(*channel);
393            });
394
395        // Go through all the peer ids and their qualities
396        // to find out which channels should be closed and
397        // which peer ids should become candidates for a new channel
398        for (address, (quality, num_pings)) in network_stats.peers_with_quality.iter() {
399            // Get the channel we have opened with it
400            let channel_with_peer = our_outgoing_open_channels.iter().find(|c| c.destination.eq(address));
401
402            if let Some(channel) = channel_with_peer {
403                if *quality < self.cfg.network_quality_close_threshold
404                    && *num_pings >= self.cfg.minimum_peer_pings as u64
405                {
406                    // Need to close the channel because quality has dropped
407                    debug!(destination = %channel.destination, quality = %quality, threshold = self.cfg.network_quality_close_threshold,
408                        "strategy proposes to close existing channel"
409                    );
410                    tick_decision.add_to_close(*channel);
411                }
412            } else if *quality >= self.cfg.network_quality_open_threshold
413                && *num_pings >= self.cfg.minimum_peer_pings as u64
414            {
415                // Try to open a channel with this peer, because it is high-quality,
416                // and we don't yet have a channel with it
417                debug!(destination = %address, quality = %quality, threshold = self.cfg.network_quality_open_threshold,
418                    "strategy proposes to open a new channel");
419                new_channel_candidates.push((*address, *quality));
420            }
421        }
422        debug!(
423            proposed_closures = tick_decision.get_to_close().len(),
424            proposed_openings = new_channel_candidates.len(),
425            "channel decision proposal summary"
426        );
427
428        // We compute the upper bound for channels as a square-root of the perceived network size
429        let max_auto_channels = self.cfg.max_channels.unwrap_or(
430            MIN_AUTO_DETECTED_MAX_AUTO_CHANNELS.max((network_stats.num_online_peers as f64).sqrt().ceil() as usize),
431        );
432        debug!(
433            max_auto_channels,
434            "current upper bound for maximum number of auto-channels"
435        );
436
437        #[cfg(all(feature = "prometheus", not(test)))]
438        METRIC_MAX_AUTO_CHANNELS.set(max_auto_channels as f64);
439
440        // Count all the effectively opened channels (i.e., after the decisions have been made)
441        let occupied = our_outgoing_open_channels
442            .len()
443            .saturating_sub(tick_decision.get_to_close().len());
444
445        // If there are still more channels opened than we allow, close some
446        // lowest-quality ones that passed the threshold
447        if occupied > max_auto_channels && self.cfg.enforce_max_channels {
448            warn!(
449                count = occupied,
450                max_auto_channels, "the strategy allows only less occupied channels"
451            );
452
453            // Get all open channels that are not planned to be closed
454            let mut sorted_channels = our_outgoing_open_channels
455                .iter()
456                .filter(|c| !tick_decision.will_channel_be_closed(&c.destination))
457                .collect::<Vec<_>>();
458
459            // Sort by quality, lowest-quality first
460            sorted_channels.sort_unstable_by(|p1, p2| {
461                let q1 = match network_stats.peers_with_quality.get(&p1.destination) {
462                    Some((q, _)) => *q,
463                    None => {
464                        error!(channel = ?p1, "could not determine peer quality");
465                        0_f64
466                    }
467                };
468                let q2 = match network_stats.peers_with_quality.get(&p2.destination) {
469                    Some((q, _)) => *q,
470                    None => {
471                        error!(peer = %p2, "could not determine peer quality");
472                        0_f64
473                    }
474                };
475                q1.partial_cmp(&q2).expect("invalid comparison")
476            });
477
478            // Close the lowest-quality channels (those we did not mark for closing yet) to enforce the limit
479            sorted_channels
480                .into_iter()
481                .take(occupied - max_auto_channels)
482                .for_each(|channel| {
483                    debug!(destination = %channel.destination, "enforcing channel closure");
484                    tick_decision.add_to_close(*channel);
485                });
486        } else if max_auto_channels > occupied {
487            // Sort the new channel candidates by the best quality first, then truncate to the number of available slots
488            // This way, we'll prefer candidates with higher quality, when we don't have enough node balance.
489            // Shuffle first, so the equal candidates are randomized and then use unstable sorting for that purpose.
490            new_channel_candidates.shuffle(&mut hopr_crypto_random::rng());
491            new_channel_candidates
492                .sort_unstable_by(|(_, q1), (_, q2)| q1.partial_cmp(q2).expect("should be comparable").reverse());
493            new_channel_candidates.truncate(max_auto_channels - occupied);
494            debug!(count = new_channel_candidates.len(), "got new channel candidates");
495
496            let current_safe_balance = self
497                .db
498                .get_safe_hopr_balance(None)
499                .await
500                .map_err(hopr_db_sql::api::errors::DbError::from)?;
501
502            // Check if we do not surpass the minimum node's balance while opening new channels
503            let max_to_open = ((current_safe_balance - self.cfg.minimum_safe_balance).amount()
504                / self.cfg.new_channel_stake.amount())
505            .as_usize();
506            debug!(%current_safe_balance, max_to_open, num_candidates = new_channel_candidates.len(), "maximum number of channel openings with current balance");
507            new_channel_candidates
508                .into_iter()
509                .take(max_to_open)
510                .for_each(|(address, _)| tick_decision.add_to_open(address, self.cfg.new_channel_stake));
511        } else {
512            // max_channels == occupied
513            info!(
514                count = occupied,
515                "not going to allocate new channels, maximum number of effective channels is reached"
516            )
517        }
518
519        Ok(tick_decision)
520    }
521}
522
523impl<Db, A> Debug for PromiscuousStrategy<Db, A>
524where
525    Db: HoprDbAllOperations + Clone,
526    A: ChannelActions,
527{
528    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
529        write!(f, "{:?}", Strategy::Promiscuous(self.cfg.clone()))
530    }
531}
532
533impl<Db, A> Display for PromiscuousStrategy<Db, A>
534where
535    Db: HoprDbAllOperations + Clone,
536    A: ChannelActions,
537{
538    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
539        write!(f, "{}", Strategy::Promiscuous(self.cfg.clone()))
540    }
541}
542
543#[async_trait]
544impl<Db, A> SingularStrategy for PromiscuousStrategy<Db, A>
545where
546    Db: HoprDbAllOperations + Clone + Send + Sync,
547    A: ChannelActions + Send + Sync,
548{
549    async fn on_tick(&self) -> Result<()> {
550        let safe_balance = self
551            .db
552            .get_safe_hopr_balance(None)
553            .await
554            .map_err(hopr_db_sql::api::errors::DbError::from)?;
555        if safe_balance <= self.cfg.minimum_safe_balance {
556            error!(
557                "strategy cannot work with safe token balance already being less or equal than minimum node balance"
558            );
559            return Err(CriteriaNotSatisfied);
560        }
561
562        if self.started_at.elapsed() < self.cfg.initial_delay {
563            debug!("strategy is not yet ready to execute, waiting for initial delay");
564            return Err(CriteriaNotSatisfied);
565        }
566
567        let tick_decision = self.collect_tick_decision().await?;
568        debug!(%tick_decision, "collected channel decision");
569
570        for channel_to_close in tick_decision.get_to_close() {
571            match self
572                .hopr_chain_actions
573                .close_channel(channel_to_close.destination, ChannelDirection::Outgoing, false)
574                .await
575            {
576                Ok(_) => {
577                    // Intentionally do not await result of the channel transaction
578                    debug!(destination = %channel_to_close.destination, "issued channel closing");
579
580                    #[cfg(all(feature = "prometheus", not(test)))]
581                    METRIC_COUNT_CLOSURES.increment();
582                }
583                Err(e) => {
584                    error!(error = %e, "error while closing channel");
585                }
586            }
587        }
588
589        for channel_to_open in tick_decision.get_to_open() {
590            match self
591                .hopr_chain_actions
592                .open_channel(channel_to_open.0, channel_to_open.1)
593                .await
594            {
595                Ok(_) => {
596                    // Intentionally do not await result of the channel transaction
597                    debug!(destination = %channel_to_open.0, "issued channel opening");
598
599                    #[cfg(all(feature = "prometheus", not(test)))]
600                    METRIC_COUNT_OPENS.increment();
601                }
602                Err(e) => {
603                    error!(error = %e, channel = %channel_to_open.0, "error while issuing channel opening");
604                }
605            }
606        }
607
608        info!(%tick_decision, "on tick executed");
609        Ok(())
610    }
611}
612
613/// Unit tests of pure Rust code
614#[cfg(test)]
615mod tests {
616    use anyhow::Context;
617    use futures::{FutureExt, future::ok};
618    use hex_literal::hex;
619    use hopr_chain_actions::action_queue::{ActionConfirmation, PendingAction};
620    use hopr_chain_types::{actions::Action, chain_events::ChainEventType};
621    use hopr_crypto_random::random_bytes;
622    use hopr_crypto_types::prelude::*;
623    use hopr_db_sql::{
624        HoprDbGeneralModelOperations, accounts::HoprDbAccountOperations, api::peers::HoprDbPeersOperations,
625        channels::HoprDbChannelOperations, db::HoprDb, info::HoprDbInfoOperations,
626    };
627    use hopr_transport_network::{PeerId, network::PeerOrigin};
628    use lazy_static::lazy_static;
629    use mockall::mock;
630
631    use super::*;
632
633    lazy_static! {
634        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!(
635            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
636        ))
637        .expect("lazy static keypair should be valid");
638        static ref PEERS: [(Address, PeerId); 10] = [
639            (
640                ALICE.public().to_address().into(),
641                hex!("e03640d3184c8aa6f9d4ccd533281c51974a170c0c4d0fe1da9296a081ab1fd9")
642            ),
643            (
644                hex!("5f98dc63889681eb4306f0e3b5ee2e04b13af7c8"),
645                hex!("82a3cec1660697d8f3eb798f82ae281fc885c3e5370ef700c95c17397846c1e7")
646            ),
647            (
648                hex!("6e0bed94a8d2da952ad4468ff81157b6137a5566"),
649                hex!("2b93fcca9db2c5c12d1add5c07dd81d20c68eb713e99aa5c488210179c7505e3")
650            ),
651            (
652                hex!("8275b9ce8a3d2fe14029111f85b72ab05aa0f5d3"),
653                hex!("5cfd16dc160fd43396bfaff06e7c2e62cd087317671c159ce7cbc31c34fc32b6")
654            ),
655            (
656                hex!("3231673fd10c9ebeb9330745f1709c91db9cf40f"),
657                hex!("7f5b421cc58cf8449f5565756697261723fb96bba5f0aa2ba83c4973e0e994bf")
658            ),
659            (
660                hex!("585f4ca77b07ac7a3bf37de3069b641ba97bf76f"),
661                hex!("848af931ce57f54fbf96d7250eda8b0f36e3d1988ec8048c892e8d8ff0798f2f")
662            ),
663            (
664                hex!("ba413645edb6ddbd46d5911466264b119087dfea"),
665                hex!("d79258fc521dba8ded208066fe98fd8a857cf2e8f42f1b71c8f6e29b8f47e406")
666            ),
667            (
668                hex!("9ea8c0f3766022f84c41abd524c942971bd22d23"),
669                hex!("cd7a06caebcb90f95690c72472127cae8732b415440a1783c6ff9f9cb0bacf1e")
670            ),
671            (
672                hex!("9790b6cf8afe6a7d80102570fac18a322e26ef83"),
673                hex!("2dc3ff226be59333127ebfd3c79517eac8f81e0333abaa45189aae309880e55a")
674            ),
675            (
676                hex!("f6ab491cd4e2eccbe60a7f87aeaacfc408dabde8"),
677                hex!("5826ed44f52b3a26c472621812165bb2d3e60a9929e06db8b8df4e4d23068eba")
678            ),
679        ]
680        .map(|(addr, privkey)| (
681            addr.into(),
682            OffchainKeypair::from_secret(&privkey)
683                .expect("lazy static keypair should be valid")
684                .public()
685                .into()
686        ));
687    }
688
689    mock! {
690        ChannelAct { }
691        #[async_trait]
692        impl ChannelActions for ChannelAct {
693            async fn open_channel(&self, destination: Address, amount: HoprBalance) -> hopr_chain_actions::errors::Result<PendingAction>;
694            async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> hopr_chain_actions::errors::Result<PendingAction>;
695            async fn close_channel(
696                &self,
697                counterparty: Address,
698                direction: ChannelDirection,
699                redeem_before_close: bool,
700            ) -> hopr_chain_actions::errors::Result<PendingAction>;
701        }
702    }
703
704    async fn mock_channel(db: HoprDb, dst: Address, balance: HoprBalance) -> anyhow::Result<ChannelEntry> {
705        let channel = ChannelEntry::new(
706            PEERS[0].0,
707            dst,
708            balance,
709            U256::zero(),
710            ChannelStatus::Open,
711            U256::zero(),
712        );
713        db.upsert_channel(None, channel).await?;
714
715        Ok(channel)
716    }
717
718    async fn prepare_network(db: HoprDb, qualities: Vec<f64>) -> anyhow::Result<()> {
719        assert_eq!(qualities.len(), PEERS.len() - 1, "invalid network setup");
720
721        for (i, quality) in qualities.into_iter().enumerate() {
722            let peer = &PEERS[i + 1].1;
723
724            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 10)
725                .await?;
726
727            let mut status = db.get_network_peer(peer).await?.expect("should be present");
728            status.peer_version = Some("2.2.0".into());
729            status.heartbeats_sent = 200;
730            while status.get_average_quality() < quality {
731                status.update_quality(quality);
732            }
733            db.update_network_peer(status).await?;
734        }
735
736        Ok(())
737    }
738
739    async fn init_db(db: HoprDb, node_balance: HoprBalance) -> anyhow::Result<()> {
740        db.begin_transaction()
741            .await?
742            .perform(|tx| {
743                Box::pin(async move {
744                    db.set_safe_hopr_balance(Some(tx), node_balance).await?;
745                    db.set_safe_hopr_allowance(Some(tx), node_balance).await?;
746                    for (chain_key, peer_id) in PEERS.iter() {
747                        db.insert_account(
748                            Some(tx),
749                            AccountEntry {
750                                public_key: OffchainPublicKey::try_from(*peer_id).expect("should be valid PeerId"),
751                                chain_addr: *chain_key,
752                                entry_type: AccountType::NotAnnounced,
753                                published_at: 1,
754                            },
755                        )
756                        .await?;
757                    }
758                    Ok::<_, DbSqlError>(())
759                })
760            })
761            .await?;
762
763        Ok(())
764    }
765
766    fn mock_action_confirmation_closure(channel: ChannelEntry) -> ActionConfirmation {
767        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
768        ActionConfirmation {
769            tx_hash: random_hash,
770            event: Some(ChainEventType::ChannelClosureInitiated(channel)),
771            action: Action::CloseChannel(channel, ChannelDirection::Outgoing),
772        }
773    }
774
775    fn mock_action_confirmation_opening(address: Address, balance: HoprBalance) -> ActionConfirmation {
776        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
777        ActionConfirmation {
778            tx_hash: random_hash,
779            event: Some(ChainEventType::ChannelOpened(ChannelEntry::new(
780                PEERS[0].0,
781                address,
782                balance,
783                U256::zero(),
784                ChannelStatus::Open,
785                U256::zero(),
786            ))),
787            action: Action::OpenChannel(address, balance),
788        }
789    }
790
791    #[test]
792    fn test_semver() -> anyhow::Result<()> {
793        // See https://github.com/dtolnay/semver/issues/315
794        let ver: semver::Version = "2.1.0-rc.3+commit.f75bc6c8".parse()?;
795        let stripped = semver::Version::new(ver.major, ver.minor, ver.patch);
796        let req = semver::VersionReq::from_str(">=2.0.0")?;
797
798        assert!(req.matches(&stripped), "constraint must match");
799
800        Ok(())
801    }
802
803    #[test_log::test(tokio::test)]
804    async fn test_promiscuous_strategy_tick_decisions() -> anyhow::Result<()> {
805        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
806
807        let qualities_that_alice_sees = vec![0.7, 0.9, 0.8, 0.98, 0.1, 0.3, 0.1, 0.2, 1.0];
808
809        init_db(db.clone(), 1000.into()).await?;
810        prepare_network(db.clone(), qualities_that_alice_sees).await?;
811
812        mock_channel(db.clone(), PEERS[1].0, 10.into()).await?;
813        mock_channel(db.clone(), PEERS[2].0, 10.into()).await?;
814        let for_closing = mock_channel(db.clone(), PEERS[5].0, 10.into()).await?;
815
816        // Peer 3 has an accepted pre-release version
817        let mut status_3 = db
818            .get_network_peer(&PEERS[3].1)
819            .await?
820            .context("peer should be present")?;
821        status_3.peer_version = Some("2.1.0-rc.3+commit.f75bc6c8".into());
822        db.update_network_peer(status_3).await?;
823
824        // Peer 10 has an old node version
825        let mut status_10 = db
826            .get_network_peer(&PEERS[9].1)
827            .await?
828            .context("peer should be present")?;
829        status_10.peer_version = Some("1.92.0".into());
830        db.update_network_peer(status_10).await?;
831
832        let strat_cfg = PromiscuousStrategyConfig {
833            max_channels: Some(3),
834            network_quality_open_threshold: 0.5,
835            network_quality_close_threshold: 0.3,
836            new_channel_stake: 10.into(),
837            minimum_safe_balance: 50.into(),
838            minimum_peer_version: ">=2.2.0".parse()?,
839            initial_delay: Duration::ZERO,
840            ..Default::default()
841        };
842
843        // Situation:
844        // - There are max 3 channels and also 3 are currently opened.
845        // - Strategy will close channel to peer 5, because it has quality 0.1
846        // - Because of the closure, this means there can be 1 additional channel opened:
847        // - Strategy can open channel either to peer 3, 4 or 10 (with qualities 0.8, 0.98 and 1.0 respectively)
848        // - It will ignore peer 10 even though it has the highest quality, but does not meet minimum node version
849        // - It will prefer peer 4 because it has higher quality than node 3
850
851        let mut actions = MockChannelAct::new();
852        actions
853            .expect_close_channel()
854            .times(1)
855            .withf(|dst, dir, _| PEERS[5].0.eq(dst) && ChannelDirection::Outgoing.eq(dir))
856            .return_once(move |_, _, _| Ok(ok(mock_action_confirmation_closure(for_closing)).boxed()));
857
858        let new_stake = strat_cfg.new_channel_stake;
859        actions
860            .expect_open_channel()
861            .times(1)
862            .withf(move |dst, b| PEERS[4].0.eq(dst) && new_stake.eq(b))
863            .return_once(move |_, _| Ok(ok(mock_action_confirmation_opening(PEERS[4].0, new_stake)).boxed()));
864
865        let strat = PromiscuousStrategy::new(strat_cfg.clone(), db, actions);
866
867        tokio::time::sleep(Duration::from_millis(100)).await;
868
869        strat.on_tick().await?;
870
871        Ok(())
872    }
873}