hopr_strategy/
promiscuous.rs

1//! ## Promiscuous Strategy
2//! This strategy opens or closes automatically channels based the following rules:
3//! - if node quality is below or equal to a threshold `network_quality_threshold` and we have a channel opened to it,
4//!   the strategy will close it
5//!   - if node quality is above `network_quality_threshold` and no channel is opened yet, it will try to open channel
6//!     to it (with initial stake `new_channel_stake`). However, the channel is opened only if the following is both
7//!     true:
8//!   - the total node balance does not drop below `minimum_node_balance`
9//!   - the number of channels opened by this strategy does not exceed `max_channels`
10//!
11//! Also, the candidates for opening (quality > `network_quality_threshold`), are sorted by best quality first.
12//! So that means if some nodes cannot have a channel opened to them, because we hit `minimum_node_balance` or
13//! `max_channels`, the better quality ones were taking precedence.
14//!
15//! The sorting algorithm is intentionally unstable, so that the nodes which have the same quality get random order.
16//! The constant `k` can be also set to a value > 1, which will make the strategy to open more channels for smaller
17//! networks, but it would keep the same asymptotic properties.
18//! Per default `k` = 1.
19//!
20//! The strategy starts acting only after at least `min_network_size_samples` network size samples were gathered, which
21//! means it does not start opening/closing channels earlier than `min_network_size_samples` number of minutes after the
22//! node has started.
23//!
24//! For details on default parameters see [PromiscuousStrategyConfig].
25use std::{
26    collections::HashMap,
27    fmt::{Debug, Display, Formatter},
28    time::Duration,
29};
30
31use async_trait::async_trait;
32use futures::StreamExt;
33use hopr_chain_actions::channels::ChannelActions;
34use hopr_db_sql::{HoprDbAllOperations, api::peers::PeerSelector, errors::DbSqlError};
35use hopr_internal_types::prelude::*;
36#[cfg(all(feature = "prometheus", not(test)))]
37use hopr_metrics::metrics::{SimpleCounter, SimpleGauge};
38use hopr_primitive_types::prelude::*;
39use rand::seq::SliceRandom;
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use tracing::{debug, error, info, trace, warn};
43
44use crate::{
45    Strategy,
46    errors::{Result, StrategyError::CriteriaNotSatisfied},
47    strategy::SingularStrategy,
48};
49
50#[cfg(all(feature = "prometheus", not(test)))]
51lazy_static::lazy_static! {
52    static ref METRIC_COUNT_OPENS: SimpleCounter =
53        SimpleCounter::new("hopr_strategy_promiscuous_opened_channels_count", "Count of open channel decisions").unwrap();
54    static ref METRIC_COUNT_CLOSURES: SimpleCounter =
55        SimpleCounter::new("hopr_strategy_promiscuous_closed_channels_count", "Count of close channel decisions").unwrap();
56    static ref METRIC_MAX_AUTO_CHANNELS: SimpleGauge =
57        SimpleGauge::new("hopr_strategy_promiscuous_max_auto_channels", "Count of maximum number of channels managed by the strategy").unwrap();
58}
59
60/// A decision made by the Promiscuous strategy on each tick,
61/// represents which channels should be closed and which should be opened.
62/// Also indicates a number of maximum channels this strategy can open given the current network size.
63/// Note that the number changes as the network size changes.
64#[derive(Clone, Debug, PartialEq, Default)]
65struct ChannelDecision {
66    to_close: Vec<ChannelEntry>,
67    to_open: Vec<(Address, HoprBalance)>,
68}
69
70impl ChannelDecision {
71    pub fn will_channel_be_closed(&self, counter_party: &Address) -> bool {
72        self.to_close.iter().any(|c| &c.destination == counter_party)
73    }
74
75    pub fn add_to_close(&mut self, entry: ChannelEntry) {
76        self.to_close.push(entry);
77    }
78
79    pub fn add_to_open(&mut self, address: Address, balance: HoprBalance) {
80        self.to_open.push((address, balance));
81    }
82
83    pub fn get_to_close(&self) -> &Vec<ChannelEntry> {
84        &self.to_close
85    }
86
87    pub fn get_to_open(&self) -> &Vec<(Address, HoprBalance)> {
88        &self.to_open
89    }
90}
91
92impl Display for ChannelDecision {
93    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94        write!(
95            f,
96            "channel decision: opening ({}), closing({})",
97            self.to_open.len(),
98            self.to_close.len()
99        )
100    }
101}
102
103#[inline]
104fn default_new_channel_stake() -> HoprBalance {
105    HoprBalance::new_base(10)
106}
107
108#[inline]
109fn default_min_safe_balance() -> HoprBalance {
110    HoprBalance::new_base(1000)
111}
112
113#[inline]
114fn default_network_quality_open_threshold() -> f64 {
115    0.9
116}
117
118#[inline]
119fn default_network_quality_close_threshold() -> f64 {
120    0.2
121}
122
123#[inline]
124fn default_minimum_pings() -> u32 {
125    50
126}
127
128#[inline]
129fn just_true() -> bool {
130    true
131}
132
133#[inline]
134fn default_initial_delay() -> Duration {
135    Duration::from_secs(5 * 60)
136}
137
138const MIN_AUTO_DETECTED_MAX_AUTO_CHANNELS: usize = 10;
139
140/// Configuration of [PromiscuousStrategy].
141#[serde_as]
142#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
143pub struct PromiscuousStrategyConfig {
144    /// A quality threshold between 0 and 1 used to determine whether the strategy should open channel with the peer.
145    ///
146    /// Default is 0.9
147    #[serde(default = "default_network_quality_open_threshold")]
148    #[default(default_network_quality_open_threshold())]
149    pub network_quality_open_threshold: f64,
150
151    /// A quality threshold between 0 and 1 used to determine whether the strategy should close channel with the peer.
152    /// If set to 0, no channels will be closed.
153    ///
154    /// Default is 0.2
155    #[serde(default = "default_network_quality_close_threshold")]
156    #[default(default_network_quality_close_threshold())]
157    pub network_quality_close_threshold: f64,
158
159    /// Number of heartbeats sent to the peer before it is considered for selection.
160    ///
161    /// Default is 50.
162    #[serde(default = "default_minimum_pings")]
163    #[default(default_minimum_pings())]
164    pub minimum_peer_pings: u32,
165
166    /// Initial delay from startup before the strategy starts taking decisions.
167    ///
168    /// Default is 5 minutes.
169    #[serde(default = "default_initial_delay")]
170    #[default(default_initial_delay())]
171    pub initial_delay: Duration,
172
173    /// A stake of tokens that should be allocated to a channel opened by the strategy.
174    ///
175    /// Default is 10 wxHOPR
176    #[serde_as(as = "DisplayFromStr")]
177    #[serde(default = "default_new_channel_stake")]
178    #[default(default_new_channel_stake())]
179    pub new_channel_stake: HoprBalance,
180
181    /// Minimum token balance of the node's Safe.
182    /// When reached, the strategy will not open any new channels.
183    ///
184    /// Default is 1000 wxHOPR
185    #[serde_as(as = "DisplayFromStr")]
186    #[serde(default = "default_min_safe_balance")]
187    #[default(default_min_safe_balance())]
188    pub minimum_safe_balance: HoprBalance,
189
190    /// The maximum number of opened channels the strategy should maintain.
191    ///
192    /// Defaults to square-root of the sampled network size, the minimum is 10.
193    pub max_channels: Option<usize>,
194
195    /// If set, the strategy will aggressively close channels
196    /// (even with peers above the `network_quality_close_threshold`)
197    /// if the number of opened outgoing channels (regardless if opened by the strategy or manually) exceeds the
198    /// `max_channels` limit.
199    ///
200    /// Default is true.
201    #[serde(default = "just_true")]
202    #[default(true)]
203    pub enforce_max_channels: bool,
204}
205
206impl validator::Validate for PromiscuousStrategyConfig {
207    fn validate(&self) -> std::result::Result<(), validator::ValidationErrors> {
208        let mut errors = validator::ValidationErrors::new();
209
210        if !(0.0..=1.0).contains(&self.network_quality_open_threshold) {
211            errors.add(
212                "network_quality_open_threshold",
213                validator::ValidationError::new("must be in [0..1]"),
214            );
215        }
216
217        if !(0.0..=1.0).contains(&self.network_quality_close_threshold) {
218            errors.add(
219                "network_quality_close_threshold",
220                validator::ValidationError::new("must be in [0..1]"),
221            );
222        }
223
224        if self.network_quality_open_threshold <= self.network_quality_close_threshold {
225            errors.add(
226                "network_quality_open_threshold,network_quality_close_threshold",
227                validator::ValidationError::new(
228                    "network_quality_open_threshold must be greater than network_quality_close_threshold",
229                ),
230            );
231        }
232
233        if self.minimum_peer_pings == 0 {
234            errors.add(
235                "minimum_peer_pings",
236                validator::ValidationError::new("must be greater than 0"),
237            );
238        }
239
240        if self.new_channel_stake.is_zero() {
241            errors.add(
242                "new_channel_stake",
243                validator::ValidationError::new("must be greater than 0"),
244            );
245        }
246
247        if self.max_channels.is_some_and(|m| m == 0) {
248            errors.add(
249                "max_channels",
250                validator::ValidationError::new("must be greater than 0"),
251            );
252        }
253
254        if errors.is_empty() { Ok(()) } else { Err(errors) }
255    }
256}
257
258/// This strategy opens outgoing channels to peers, which have quality above a given threshold.
259/// At the same time, it closes outgoing channels opened to peers whose quality dropped below this threshold.
260pub struct PromiscuousStrategy<Db, A>
261where
262    Db: HoprDbAllOperations + Clone,
263    A: ChannelActions,
264{
265    db: Db,
266    hopr_chain_actions: A,
267    cfg: PromiscuousStrategyConfig,
268    started_at: std::time::Instant,
269}
270
271#[derive(Debug, Default)]
272struct NetworkStats {
273    pub peers_with_quality: HashMap<Address, (f64, u64)>,
274    pub num_online_peers: usize,
275}
276
277impl<Db, A> PromiscuousStrategy<Db, A>
278where
279    Db: HoprDbAllOperations + Clone,
280    A: ChannelActions,
281{
282    pub fn new(cfg: PromiscuousStrategyConfig, db: Db, hopr_chain_actions: A) -> Self {
283        #[cfg(all(feature = "prometheus", not(test)))]
284        {
285            lazy_static::initialize(&METRIC_MAX_AUTO_CHANNELS);
286            lazy_static::initialize(&METRIC_COUNT_CLOSURES);
287            lazy_static::initialize(&METRIC_COUNT_OPENS);
288        }
289
290        Self {
291            db,
292            hopr_chain_actions,
293            cfg,
294            started_at: std::time::Instant::now(),
295        }
296    }
297
298    async fn get_network_stats(&self) -> Result<NetworkStats> {
299        let mut num_online_peers = 0;
300        Ok(NetworkStats {
301            peers_with_quality: self
302                .db
303                .get_network_peers(PeerSelector::default(), false)
304                .await?
305                .inspect(|status| {
306                    if status.quality > 0.0 {
307                        num_online_peers += 1;
308                    } else {
309                        trace!(peer = %status.id.1, "peer is not online");
310                    }
311                })
312                .filter_map(|status| async move {
313                    // Resolve peer's chain key and average quality
314                    if let Ok(addr) = self
315                        .db
316                        .resolve_chain_key(&status.id.0)
317                        .await
318                        .and_then(|addr| addr.ok_or(DbSqlError::MissingAccount.into()))
319                    {
320                        Some((addr, (status.get_average_quality(), status.heartbeats_sent)))
321                    } else {
322                        error!(address = %status.id.1, "could not find on-chain address");
323                        None
324                    }
325                })
326                .collect()
327                .await,
328            num_online_peers,
329        })
330    }
331
332    async fn collect_tick_decision(&self) -> Result<ChannelDecision> {
333        let mut tick_decision = ChannelDecision::default();
334        let mut new_channel_candidates: Vec<(Address, f64)> = Vec::new();
335
336        // Get all opened outgoing channels from this node
337        let our_outgoing_open_channels = self
338            .db
339            .get_outgoing_channels(None)
340            .await
341            .map_err(hopr_db_sql::api::errors::DbError::from)?
342            .into_iter()
343            .filter(|channel| channel.status == ChannelStatus::Open)
344            .collect::<Vec<_>>();
345        debug!(
346            count = our_outgoing_open_channels.len(),
347            "tracking open outgoing channels"
348        );
349
350        let network_stats = self.get_network_stats().await?;
351        debug!(?network_stats, "retrieved network stats");
352
353        // Close all channels to nodes that are not in the network peers
354        // The initial_delay should take care of prior heartbeats to take place.
355        our_outgoing_open_channels
356            .iter()
357            .filter(|channel| !network_stats.peers_with_quality.contains_key(&channel.destination))
358            .for_each(|channel| {
359                debug!(destination = %channel.destination, "destination of opened channel is not between the network peers");
360                tick_decision.add_to_close(*channel);
361            });
362
363        // Go through all the peer ids and their qualities
364        // to find out which channels should be closed and
365        // which peer ids should become candidates for a new channel
366        for (address, (quality, num_pings)) in network_stats.peers_with_quality.iter() {
367            // Get the channel we have opened with it
368            let channel_with_peer = our_outgoing_open_channels.iter().find(|c| c.destination.eq(address));
369
370            if let Some(channel) = channel_with_peer {
371                if *quality < self.cfg.network_quality_close_threshold
372                    && *num_pings >= self.cfg.minimum_peer_pings as u64
373                {
374                    // Need to close the channel because quality has dropped
375                    debug!(destination = %channel.destination, quality = %quality, threshold = self.cfg.network_quality_close_threshold,
376                        "strategy proposes to close existing channel"
377                    );
378                    tick_decision.add_to_close(*channel);
379                }
380            } else if *quality >= self.cfg.network_quality_open_threshold
381                && *num_pings >= self.cfg.minimum_peer_pings as u64
382            {
383                // Try to open a channel with this peer, because it is high-quality,
384                // and we don't yet have a channel with it
385                debug!(destination = %address, quality = %quality, threshold = self.cfg.network_quality_open_threshold,
386                    "strategy proposes to open a new channel");
387                new_channel_candidates.push((*address, *quality));
388            }
389        }
390        debug!(
391            proposed_closures = tick_decision.get_to_close().len(),
392            proposed_openings = new_channel_candidates.len(),
393            "channel decision proposal summary"
394        );
395
396        // We compute the upper bound for channels as a square-root of the perceived network size
397        let max_auto_channels = self.cfg.max_channels.unwrap_or(
398            MIN_AUTO_DETECTED_MAX_AUTO_CHANNELS.max((network_stats.num_online_peers as f64).sqrt().ceil() as usize),
399        );
400        debug!(
401            max_auto_channels,
402            "current upper bound for maximum number of auto-channels"
403        );
404
405        #[cfg(all(feature = "prometheus", not(test)))]
406        METRIC_MAX_AUTO_CHANNELS.set(max_auto_channels as f64);
407
408        // Count all the effectively opened channels (i.e., after the decisions have been made)
409        let occupied = our_outgoing_open_channels
410            .len()
411            .saturating_sub(tick_decision.get_to_close().len());
412
413        // If there are still more channels opened than we allow, close some
414        // lowest-quality ones that passed the threshold
415        if occupied > max_auto_channels && self.cfg.enforce_max_channels {
416            warn!(
417                count = occupied,
418                max_auto_channels, "the strategy allows only less occupied channels"
419            );
420
421            // Get all open channels that are not planned to be closed
422            let mut sorted_channels = our_outgoing_open_channels
423                .iter()
424                .filter(|c| !tick_decision.will_channel_be_closed(&c.destination))
425                .collect::<Vec<_>>();
426
427            // Sort by quality, lowest-quality first
428            sorted_channels.sort_unstable_by(|p1, p2| {
429                let q1 = match network_stats.peers_with_quality.get(&p1.destination) {
430                    Some((q, _)) => *q,
431                    None => {
432                        error!(channel = ?p1, "could not determine peer quality");
433                        0_f64
434                    }
435                };
436                let q2 = match network_stats.peers_with_quality.get(&p2.destination) {
437                    Some((q, _)) => *q,
438                    None => {
439                        error!(peer = %p2, "could not determine peer quality");
440                        0_f64
441                    }
442                };
443                q1.partial_cmp(&q2).expect("invalid comparison")
444            });
445
446            // Close the lowest-quality channels (those we did not mark for closing yet) to enforce the limit
447            sorted_channels
448                .into_iter()
449                .take(occupied - max_auto_channels)
450                .for_each(|channel| {
451                    debug!(destination = %channel.destination, "enforcing channel closure");
452                    tick_decision.add_to_close(*channel);
453                });
454        } else if max_auto_channels > occupied {
455            // Sort the new channel candidates by the best quality first, then truncate to the number of available slots
456            // This way, we'll prefer candidates with higher quality, when we don't have enough node balance.
457            // Shuffle first, so the equal candidates are randomized and then use unstable sorting for that purpose.
458            new_channel_candidates.shuffle(&mut hopr_crypto_random::rng());
459            new_channel_candidates
460                .sort_unstable_by(|(_, q1), (_, q2)| q1.partial_cmp(q2).expect("should be comparable").reverse());
461            new_channel_candidates.truncate(max_auto_channels - occupied);
462            debug!(count = new_channel_candidates.len(), "got new channel candidates");
463
464            let current_safe_balance = self
465                .db
466                .get_safe_hopr_balance(None)
467                .await
468                .map_err(hopr_db_sql::api::errors::DbError::from)?;
469
470            // Check if we do not surpass the minimum node's balance while opening new channels
471            let max_to_open = ((current_safe_balance - self.cfg.minimum_safe_balance).amount()
472                / self.cfg.new_channel_stake.amount())
473            .as_usize();
474            debug!(%current_safe_balance, max_to_open, num_candidates = new_channel_candidates.len(), "maximum number of channel openings with current balance");
475            new_channel_candidates
476                .into_iter()
477                .take(max_to_open)
478                .for_each(|(address, _)| tick_decision.add_to_open(address, self.cfg.new_channel_stake));
479        } else {
480            // max_channels == occupied
481            info!(
482                count = occupied,
483                "not going to allocate new channels, maximum number of effective channels is reached"
484            )
485        }
486
487        Ok(tick_decision)
488    }
489}
490
491impl<Db, A> Debug for PromiscuousStrategy<Db, A>
492where
493    Db: HoprDbAllOperations + Clone,
494    A: ChannelActions,
495{
496    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
497        write!(f, "{:?}", Strategy::Promiscuous(self.cfg.clone()))
498    }
499}
500
501impl<Db, A> Display for PromiscuousStrategy<Db, A>
502where
503    Db: HoprDbAllOperations + Clone,
504    A: ChannelActions,
505{
506    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
507        write!(f, "{}", Strategy::Promiscuous(self.cfg.clone()))
508    }
509}
510
511#[async_trait]
512impl<Db, A> SingularStrategy for PromiscuousStrategy<Db, A>
513where
514    Db: HoprDbAllOperations + Clone + Send + Sync,
515    A: ChannelActions + Send + Sync,
516{
517    async fn on_tick(&self) -> Result<()> {
518        let safe_balance = self
519            .db
520            .get_safe_hopr_balance(None)
521            .await
522            .map_err(hopr_db_sql::api::errors::DbError::from)?;
523        if safe_balance <= self.cfg.minimum_safe_balance {
524            error!(
525                "strategy cannot work with safe token balance already being less or equal than minimum node balance"
526            );
527            return Err(CriteriaNotSatisfied);
528        }
529
530        if self.started_at.elapsed() < self.cfg.initial_delay {
531            debug!("strategy is not yet ready to execute, waiting for initial delay");
532            return Err(CriteriaNotSatisfied);
533        }
534
535        let tick_decision = self.collect_tick_decision().await?;
536        debug!(%tick_decision, "collected channel decision");
537
538        for channel_to_close in tick_decision.get_to_close() {
539            match self
540                .hopr_chain_actions
541                .close_channel(channel_to_close.destination, ChannelDirection::Outgoing, false)
542                .await
543            {
544                Ok(_) => {
545                    // Intentionally do not await result of the channel transaction
546                    debug!(destination = %channel_to_close.destination, "issued channel closing");
547
548                    #[cfg(all(feature = "prometheus", not(test)))]
549                    METRIC_COUNT_CLOSURES.increment();
550                }
551                Err(e) => {
552                    error!(error = %e, "error while closing channel");
553                }
554            }
555        }
556
557        for channel_to_open in tick_decision.get_to_open() {
558            match self
559                .hopr_chain_actions
560                .open_channel(channel_to_open.0, channel_to_open.1)
561                .await
562            {
563                Ok(_) => {
564                    // Intentionally do not await result of the channel transaction
565                    debug!(destination = %channel_to_open.0, "issued channel opening");
566
567                    #[cfg(all(feature = "prometheus", not(test)))]
568                    METRIC_COUNT_OPENS.increment();
569                }
570                Err(e) => {
571                    error!(error = %e, channel = %channel_to_open.0, "error while issuing channel opening");
572                }
573            }
574        }
575
576        info!(%tick_decision, "on tick executed");
577        Ok(())
578    }
579}
580
581/// Unit tests of pure Rust code
582#[cfg(test)]
583mod tests {
584    use anyhow::Context;
585    use futures::{FutureExt, future::ok};
586    use hex_literal::hex;
587    use hopr_chain_actions::action_queue::{ActionConfirmation, PendingAction};
588    use hopr_chain_types::{actions::Action, chain_events::ChainEventType};
589    use hopr_crypto_random::random_bytes;
590    use hopr_crypto_types::prelude::*;
591    use hopr_db_sql::{
592        HoprDbGeneralModelOperations, accounts::HoprDbAccountOperations, api::peers::HoprDbPeersOperations,
593        channels::HoprDbChannelOperations, db::HoprDb, info::HoprDbInfoOperations,
594    };
595    use hopr_transport_network::{PeerId, network::PeerOrigin};
596    use lazy_static::lazy_static;
597    use mockall::mock;
598
599    use super::*;
600
601    lazy_static! {
602        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!(
603            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
604        ))
605        .expect("lazy static keypair should be valid");
606        static ref PEERS: [(Address, PeerId); 10] = [
607            (
608                ALICE.public().to_address().into(),
609                hex!("e03640d3184c8aa6f9d4ccd533281c51974a170c0c4d0fe1da9296a081ab1fd9")
610            ),
611            (
612                hex!("5f98dc63889681eb4306f0e3b5ee2e04b13af7c8"),
613                hex!("82a3cec1660697d8f3eb798f82ae281fc885c3e5370ef700c95c17397846c1e7")
614            ),
615            (
616                hex!("6e0bed94a8d2da952ad4468ff81157b6137a5566"),
617                hex!("2b93fcca9db2c5c12d1add5c07dd81d20c68eb713e99aa5c488210179c7505e3")
618            ),
619            (
620                hex!("8275b9ce8a3d2fe14029111f85b72ab05aa0f5d3"),
621                hex!("5cfd16dc160fd43396bfaff06e7c2e62cd087317671c159ce7cbc31c34fc32b6")
622            ),
623            (
624                hex!("3231673fd10c9ebeb9330745f1709c91db9cf40f"),
625                hex!("7f5b421cc58cf8449f5565756697261723fb96bba5f0aa2ba83c4973e0e994bf")
626            ),
627            (
628                hex!("585f4ca77b07ac7a3bf37de3069b641ba97bf76f"),
629                hex!("848af931ce57f54fbf96d7250eda8b0f36e3d1988ec8048c892e8d8ff0798f2f")
630            ),
631            (
632                hex!("ba413645edb6ddbd46d5911466264b119087dfea"),
633                hex!("d79258fc521dba8ded208066fe98fd8a857cf2e8f42f1b71c8f6e29b8f47e406")
634            ),
635            (
636                hex!("9ea8c0f3766022f84c41abd524c942971bd22d23"),
637                hex!("cd7a06caebcb90f95690c72472127cae8732b415440a1783c6ff9f9cb0bacf1e")
638            ),
639            (
640                hex!("9790b6cf8afe6a7d80102570fac18a322e26ef83"),
641                hex!("2dc3ff226be59333127ebfd3c79517eac8f81e0333abaa45189aae309880e55a")
642            ),
643            (
644                hex!("f6ab491cd4e2eccbe60a7f87aeaacfc408dabde8"),
645                hex!("5826ed44f52b3a26c472621812165bb2d3e60a9929e06db8b8df4e4d23068eba")
646            ),
647        ]
648        .map(|(addr, privkey)| (
649            addr.into(),
650            OffchainKeypair::from_secret(&privkey)
651                .expect("lazy static keypair should be valid")
652                .public()
653                .into()
654        ));
655    }
656
657    mock! {
658        ChannelAct { }
659        #[async_trait]
660        impl ChannelActions for ChannelAct {
661            async fn open_channel(&self, destination: Address, amount: HoprBalance) -> hopr_chain_actions::errors::Result<PendingAction>;
662            async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> hopr_chain_actions::errors::Result<PendingAction>;
663            async fn close_channel(
664                &self,
665                counterparty: Address,
666                direction: ChannelDirection,
667                redeem_before_close: bool,
668            ) -> hopr_chain_actions::errors::Result<PendingAction>;
669        }
670    }
671
672    async fn mock_channel(db: HoprDb, dst: Address, balance: HoprBalance) -> anyhow::Result<ChannelEntry> {
673        let channel = ChannelEntry::new(
674            PEERS[0].0,
675            dst,
676            balance,
677            U256::zero(),
678            ChannelStatus::Open,
679            U256::zero(),
680        );
681        db.upsert_channel(None, channel).await?;
682
683        Ok(channel)
684    }
685
686    async fn prepare_network(db: HoprDb, qualities: Vec<f64>) -> anyhow::Result<()> {
687        assert_eq!(qualities.len(), PEERS.len() - 1, "invalid network setup");
688
689        for (i, quality) in qualities.into_iter().enumerate() {
690            let peer = &PEERS[i + 1].1;
691
692            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 10)
693                .await?;
694
695            let mut status = db.get_network_peer(peer).await?.expect("should be present");
696            status.heartbeats_sent = 200;
697            while status.get_average_quality() < quality {
698                status.update_quality(quality);
699            }
700            db.update_network_peer(status).await?;
701        }
702
703        Ok(())
704    }
705
706    async fn init_db(db: HoprDb, node_balance: HoprBalance) -> anyhow::Result<()> {
707        db.begin_transaction()
708            .await?
709            .perform(|tx| {
710                Box::pin(async move {
711                    db.set_safe_hopr_balance(Some(tx), node_balance).await?;
712                    db.set_safe_hopr_allowance(Some(tx), node_balance).await?;
713                    for (chain_key, peer_id) in PEERS.iter() {
714                        db.insert_account(
715                            Some(tx),
716                            AccountEntry {
717                                public_key: OffchainPublicKey::from_peerid(peer_id).expect("should be valid PeerId"),
718                                chain_addr: *chain_key,
719                                entry_type: AccountType::NotAnnounced,
720                                published_at: 1,
721                            },
722                        )
723                        .await?;
724                    }
725                    Ok::<_, DbSqlError>(())
726                })
727            })
728            .await?;
729
730        Ok(())
731    }
732
733    fn mock_action_confirmation_closure(channel: ChannelEntry) -> ActionConfirmation {
734        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
735        ActionConfirmation {
736            tx_hash: random_hash,
737            event: Some(ChainEventType::ChannelClosureInitiated(channel)),
738            action: Action::CloseChannel(channel, ChannelDirection::Outgoing),
739        }
740    }
741
742    fn mock_action_confirmation_opening(address: Address, balance: HoprBalance) -> ActionConfirmation {
743        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
744        ActionConfirmation {
745            tx_hash: random_hash,
746            event: Some(ChainEventType::ChannelOpened(ChannelEntry::new(
747                PEERS[0].0,
748                address,
749                balance,
750                U256::zero(),
751                ChannelStatus::Open,
752                U256::zero(),
753            ))),
754            action: Action::OpenChannel(address, balance),
755        }
756    }
757
758    #[test_log::test(tokio::test)]
759    async fn test_promiscuous_strategy_tick_decisions() -> anyhow::Result<()> {
760        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
761
762        let qualities_that_alice_sees = vec![0.7, 0.9, 0.8, 0.98, 0.1, 0.3, 0.1, 0.2, 1.0];
763
764        init_db(db.clone(), 1000.into()).await?;
765        prepare_network(db.clone(), qualities_that_alice_sees).await?;
766
767        mock_channel(db.clone(), PEERS[1].0, 10.into()).await?;
768        mock_channel(db.clone(), PEERS[2].0, 10.into()).await?;
769        let for_closing = mock_channel(db.clone(), PEERS[5].0, 10.into()).await?;
770
771        // Peer 3 has an accepted pre-release version
772        let status_3 = db
773            .get_network_peer(&PEERS[3].1)
774            .await?
775            .context("peer should be present")?;
776        db.update_network_peer(status_3).await?;
777
778        // Peer 10 has an old node version
779        let status_10 = db
780            .get_network_peer(&PEERS[9].1)
781            .await?
782            .context("peer should be present")?;
783        db.update_network_peer(status_10).await?;
784
785        let strat_cfg = PromiscuousStrategyConfig {
786            max_channels: Some(3),
787            network_quality_open_threshold: 0.5,
788            network_quality_close_threshold: 0.3,
789            new_channel_stake: 10.into(),
790            minimum_safe_balance: 50.into(),
791            initial_delay: Duration::ZERO,
792            ..Default::default()
793        };
794
795        // Situation:
796        // - There are max 3 channels and also 3 are currently opened.
797        // - Strategy will close channel to peer 5, because it has quality 0.1
798        // - Because of the closure, this means there can be 1 additional channel opened:
799        // - Strategy can open channel either to peer 3, 4 or 9 (with qualities 0.8, 0.98 and 1.0 respectively)
800        // - It will ignore peer 9 even though it has the highest quality, but does not meet minimum node version
801        // - It will prefer peer 4 because it has higher quality than node 3
802
803        let mut actions = MockChannelAct::new();
804        actions
805            .expect_close_channel()
806            .times(1)
807            .withf(|dst, dir, _| PEERS[5].0.eq(dst) && ChannelDirection::Outgoing.eq(dir))
808            .return_once(move |_, _, _| Ok(ok(mock_action_confirmation_closure(for_closing)).boxed()));
809
810        let new_stake = strat_cfg.new_channel_stake;
811        actions
812            .expect_open_channel()
813            .times(1)
814            .withf(move |dst, b| PEERS[9].0.eq(dst) && new_stake.eq(b))
815            .return_once(move |_, _| Ok(ok(mock_action_confirmation_opening(PEERS[4].0, new_stake)).boxed()));
816
817        let strat = PromiscuousStrategy::new(strat_cfg.clone(), db, actions);
818
819        tokio::time::sleep(Duration::from_millis(100)).await;
820
821        strat.on_tick().await?;
822
823        Ok(())
824    }
825}