hopr_db_sql/
peers.rs

1use std::time::Duration;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::{TryStreamExt, stream::BoxStream};
6use hopr_crypto_types::prelude::OffchainPublicKey;
7use hopr_db_api::{
8    errors::Result,
9    peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats},
10};
11use hopr_db_entity::network_peer;
12use hopr_primitive_types::prelude::*;
13use libp2p_identity::PeerId;
14use multiaddr::Multiaddr;
15use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder};
16use sea_query::{Condition, Expr, IntoCondition, Order};
17use sqlx::types::chrono::{self, DateTime, Utc};
18use tracing::{error, trace};
19
20use crate::{db::HoprDb, prelude::DbSqlError};
21
22const DB_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
23    .with_little_endian()
24    .with_variable_int_encoding();
25
26struct WrappedPeerSelector(PeerSelector);
27
28impl From<PeerSelector> for WrappedPeerSelector {
29    fn from(selector: PeerSelector) -> Self {
30        WrappedPeerSelector(selector)
31    }
32}
33
34impl IntoCondition for WrappedPeerSelector {
35    fn into_condition(self) -> Condition {
36        let mut ret = Expr::value(1);
37
38        if let Some(last_seen_l) = self.0.last_seen.0 {
39            ret = ret.and(network_peer::Column::LastSeen.gte(chrono::DateTime::<chrono::Utc>::from(last_seen_l)));
40        }
41
42        if let Some(last_seen_u) = self.0.last_seen.1 {
43            ret = ret.and(network_peer::Column::LastSeen.lte(chrono::DateTime::<chrono::Utc>::from(last_seen_u)));
44        }
45
46        if let Some(quality_l) = self.0.quality.0 {
47            ret = ret.and(network_peer::Column::Quality.gte(quality_l));
48        }
49
50        if let Some(quality_u) = self.0.quality.1 {
51            ret = ret.and(network_peer::Column::Quality.lte(quality_u));
52        }
53
54        ret.into_condition()
55    }
56}
57
58#[async_trait]
59impl HoprDbPeersOperations for HoprDb {
60    #[tracing::instrument(
61        skip(self),
62        name = "HoprDbPeersOperations::add_network_peer",
63        level = "trace",
64        err,
65        ret
66    )]
67    async fn add_network_peer(
68        &self,
69        peer: &PeerId,
70        origin: PeerOrigin,
71        mas: Vec<Multiaddr>,
72        backoff: f64,
73        quality_window: u32,
74    ) -> Result<()> {
75        let peer = *peer;
76        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
77        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
78            .await
79            .map_err(|_| crate::errors::DbSqlError::DecodingError)?;
80
81        let new_peer = hopr_db_entity::network_peer::ActiveModel {
82            packet_key: sea_orm::ActiveValue::Set(Vec::from(pubkey.as_ref())),
83            multi_addresses: sea_orm::ActiveValue::Set(
84                mas.into_iter().map(|m| m.to_string()).collect::<Vec<String>>().into(),
85            ),
86            origin: sea_orm::ActiveValue::Set(origin as i8),
87            backoff: sea_orm::ActiveValue::Set(Some(backoff)),
88            quality_sma: sea_orm::ActiveValue::Set(Some(
89                bincode::serde::encode_to_vec(
90                    SingleSumSMA::<f64>::new(quality_window as usize),
91                    DB_BINCODE_CONFIGURATION,
92                )
93                .map_err(|_| crate::errors::DbSqlError::DecodingError)?,
94            )),
95            ..Default::default()
96        };
97
98        let _ = new_peer.insert(&self.peers_db).await.map_err(DbSqlError::from)?;
99
100        Ok(())
101    }
102
103    #[tracing::instrument(
104        skip(self),
105        name = "HoprDbPeersOperations::remove_network_peer",
106        level = "trace",
107        err,
108        ret
109    )]
110    async fn remove_network_peer(&self, peer: &PeerId) -> Result<()> {
111        let peer = *peer;
112        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
113        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
114            .await
115            .map_err(|_| crate::errors::DbSqlError::DecodingError)?;
116
117        let res = hopr_db_entity::network_peer::Entity::delete_many()
118            .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(pubkey.as_ref())))
119            .exec(&self.peers_db)
120            .await
121            .map_err(DbSqlError::from)?;
122
123        if res.rows_affected > 0 {
124            Ok(())
125        } else {
126            Err(
127                crate::errors::DbSqlError::LogicalError("peer cannot be removed because it does not exist".into())
128                    .into(),
129            )
130        }
131    }
132
133    #[tracing::instrument(
134        skip(self),
135        name = "HoprDbPeersOperations::update_network_peer",
136        level = "trace",
137        err,
138        ret
139    )]
140    async fn update_network_peer(&self, new_status: PeerStatus) -> Result<()> {
141        let row = hopr_db_entity::network_peer::Entity::find()
142            .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(new_status.id.0.as_ref())))
143            .one(&self.peers_db)
144            .await
145            .map_err(DbSqlError::from)?;
146
147        if let Some(model) = row {
148            let mut peer_data: hopr_db_entity::network_peer::ActiveModel = model.into();
149            peer_data.packet_key = sea_orm::ActiveValue::Set(Vec::from(new_status.id.0.as_ref()));
150            peer_data.multi_addresses = sea_orm::ActiveValue::Set(
151                new_status
152                    .multiaddresses
153                    .into_iter()
154                    .map(|m| m.to_string())
155                    .collect::<Vec<String>>()
156                    .into(),
157            );
158            peer_data.origin = sea_orm::ActiveValue::Set(new_status.origin as i8);
159            peer_data.last_seen = sea_orm::ActiveValue::Set(DateTime::<Utc>::from(new_status.last_seen));
160            peer_data.last_seen_latency = sea_orm::ActiveValue::Set(new_status.last_seen_latency.as_millis() as i32);
161            peer_data.ignored_until = sea_orm::ActiveValue::Set(new_status.ignored_until.map(DateTime::<Utc>::from));
162            peer_data.quality = sea_orm::ActiveValue::Set(new_status.quality);
163            peer_data.quality_sma = sea_orm::ActiveValue::Set(Some(
164                bincode::serde::encode_to_vec(&new_status.quality_avg, DB_BINCODE_CONFIGURATION)
165                    .map_err(|e| crate::errors::DbSqlError::LogicalError(format!("cannot serialize sma: {e}")))?,
166            ));
167            peer_data.backoff = sea_orm::ActiveValue::Set(Some(new_status.backoff));
168            peer_data.heartbeats_sent = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_sent as i32));
169            peer_data.heartbeats_successful = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_succeeded as i32));
170
171            peer_data.update(&self.peers_db).await.map_err(DbSqlError::from)?;
172
173            Ok(())
174        } else {
175            Err(crate::errors::DbSqlError::LogicalError(format!(
176                "cannot update a non-existing peer '{}'",
177                new_status.id.1
178            ))
179            .into())
180        }
181    }
182
183    #[tracing::instrument(
184        skip(self),
185        name = "HoprDbPeersOperations::get_network_peer",
186        level = "trace",
187        err,
188        ret
189    )]
190    async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
191        let peer = *peer;
192        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
193        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
194            .await
195            .map_err(|_| crate::errors::DbSqlError::DecodingError)?;
196        let row = hopr_db_entity::network_peer::Entity::find()
197            .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(pubkey.as_ref())))
198            .one(&self.peers_db)
199            .await
200            .map_err(DbSqlError::from)?;
201
202        if let Some(model) = row {
203            let status: WrappedPeerStatus = model.try_into()?;
204            Ok(Some(status.0))
205        } else {
206            Ok(None)
207        }
208    }
209
210    #[tracing::instrument(skip(self), name = "HoprDbPeersOperations::get_network_peers", level = "trace", err)]
211    async fn get_network_peers<'a>(
212        &'a self,
213        selector: PeerSelector,
214        sort_last_seen_asc: bool,
215    ) -> Result<BoxStream<'a, PeerStatus>> {
216        let selector: WrappedPeerSelector = selector.into();
217        let mut sub_stream = hopr_db_entity::network_peer::Entity::find()
218            .filter(selector)
219            .order_by(
220                network_peer::Column::LastSeen,
221                if sort_last_seen_asc { Order::Asc } else { Order::Desc },
222            )
223            .stream(&self.peers_db)
224            .await
225            .map_err(DbSqlError::from)?;
226
227        Ok(Box::pin(stream! {
228            loop {
229                match sub_stream.try_next().await {
230                    Ok(Some(peer_row)) => {
231                        trace!(?peer_row, "got db network row");
232                        match WrappedPeerStatus::try_from(peer_row) {
233                            Ok(peer_status) => yield peer_status.0,
234                            Err(e) => error!(error = %e, "cannot map peer from row"),
235                        }
236                    },
237                    Ok(None) => {
238                        trace!("fetched all network results");
239                        break;
240                    }
241                    Err(e) => {
242                        error!(error = %e, "failed to retrieve next network row");
243                        break;
244                    }
245                }
246            }
247        }))
248    }
249
250    #[tracing::instrument(
251        skip(self),
252        name = "HoprDbPeersOperations::network_peer_stats",
253        level = "trace",
254        err,
255        ret
256    )]
257    async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats> {
258        Ok(Stats {
259            good_quality_public: hopr_db_entity::network_peer::Entity::find()
260                .filter(
261                    sea_orm::Condition::all()
262                        .add(hopr_db_entity::network_peer::Column::IgnoredUntil.is_null())
263                        .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
264                )
265                .count(&self.peers_db)
266                .await
267                .map_err(DbSqlError::from)? as u32,
268            good_quality_non_public: 0u32, // TODO: Only public peers supported in v3
269            bad_quality_public: hopr_db_entity::network_peer::Entity::find()
270                .filter(
271                    sea_orm::Condition::all()
272                        .add(hopr_db_entity::network_peer::Column::IgnoredUntil.is_null())
273                        .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
274                )
275                .count(&self.peers_db)
276                .await
277                .map_err(DbSqlError::from)? as u32,
278            bad_quality_non_public: 0u32, // TODO: Only public peers supported in v3
279        })
280    }
281}
282
283struct WrappedPeerStatus(PeerStatus);
284
285impl From<PeerStatus> for WrappedPeerStatus {
286    fn from(status: PeerStatus) -> Self {
287        WrappedPeerStatus(status)
288    }
289}
290
291impl TryFrom<hopr_db_entity::network_peer::Model> for WrappedPeerStatus {
292    type Error = crate::errors::DbSqlError;
293
294    fn try_from(value: hopr_db_entity::network_peer::Model) -> std::result::Result<Self, Self::Error> {
295        let key = OffchainPublicKey::try_from(value.packet_key.as_slice()).map_err(|_| Self::Error::DecodingError)?;
296        Ok(PeerStatus {
297            id: (key, key.into()),
298            origin: PeerOrigin::try_from(value.origin as u8).map_err(|_| Self::Error::DecodingError)?,
299            last_seen: value.last_seen.into(),
300            last_seen_latency: Duration::from_millis(value.last_seen_latency as u64),
301            heartbeats_sent: value.heartbeats_sent.unwrap_or_default() as u64,
302            heartbeats_succeeded: value.heartbeats_successful.unwrap_or_default() as u64,
303            backoff: value.backoff.unwrap_or(1.0f64),
304            ignored_until: value.ignored_until.map(|v| v.into()),
305            multiaddresses: {
306                if let sea_orm::query::JsonValue::Array(mas) = value.multi_addresses {
307                    mas.into_iter()
308                        .filter_map(|s| {
309                            if let sea_orm::query::JsonValue::String(s) = s {
310                                Some(s)
311                            } else {
312                                None
313                            }
314                        })
315                        .filter(|s| !s.trim().is_empty())
316                        .map(Multiaddr::try_from)
317                        .collect::<std::result::Result<Vec<_>, multiaddr::Error>>()
318                        .map_err(|_| Self::Error::DecodingError)
319                } else {
320                    Err(Self::Error::DecodingError)
321                }?
322            },
323            quality: value.quality,
324            quality_avg: bincode::serde::borrow_decode_from_slice(
325                value
326                    .quality_sma
327                    .ok_or_else(|| Self::Error::LogicalError("the SMA should always be present for every peer".into()))?
328                    .as_slice(),
329                DB_BINCODE_CONFIGURATION,
330            )
331            .map(|(v, _bytes)| v)
332            .map_err(|_| Self::Error::DecodingError)?,
333        }
334        .into())
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use std::{
341        ops::Add,
342        time::{Duration, SystemTime},
343    };
344
345    use futures::StreamExt;
346    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
347    use libp2p_identity::PeerId;
348    use multiaddr::Multiaddr;
349
350    use super::*;
351
352    #[tokio::test]
353    async fn test_add_get() -> anyhow::Result<()> {
354        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
355
356        let peer_id: PeerId = OffchainKeypair::random().public().into();
357        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
358        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
359
360        db.add_network_peer(
361            &peer_id,
362            PeerOrigin::IncomingConnection,
363            vec![ma_1.clone(), ma_2.clone()],
364            0.0,
365            25,
366        )
367        .await?;
368
369        let peer_from_db = db.get_network_peer(&peer_id).await?.expect("peer must exist in the db");
370
371        let mut expected_peer = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.0, 25);
372        expected_peer.last_seen = SystemTime::UNIX_EPOCH;
373        expected_peer.last_seen_latency = Duration::from_secs(0);
374        expected_peer.multiaddresses = vec![ma_1, ma_2];
375
376        assert_eq!(expected_peer, peer_from_db, "peer states must match");
377        Ok(())
378    }
379
380    #[tokio::test]
381    async fn test_should_remove_peer() -> anyhow::Result<()> {
382        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
383
384        let peer_id: PeerId = OffchainKeypair::random().public().into();
385        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
386
387        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
388            .await?;
389        assert!(db.get_network_peer(&peer_id).await?.is_some(), "must have peer entry");
390
391        db.remove_network_peer(&peer_id).await?;
392        assert!(
393            db.get_network_peer(&peer_id).await?.is_none(),
394            "peer entry must be gone"
395        );
396
397        Ok(())
398    }
399
400    #[tokio::test]
401    async fn test_should_not_remove_non_existing_peer() -> anyhow::Result<()> {
402        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
403
404        let peer_id: PeerId = OffchainKeypair::random().public().into();
405
406        db.remove_network_peer(&peer_id)
407            .await
408            .expect_err("must not delete non-existent peer");
409
410        Ok(())
411    }
412
413    #[tokio::test]
414    async fn test_should_not_add_duplicate_peers() -> anyhow::Result<()> {
415        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
416
417        let peer_id: PeerId = OffchainKeypair::random().public().into();
418        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
419
420        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
421            .await?;
422        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![], 0.0, 25)
423            .await
424            .expect_err("should fail adding second time");
425
426        Ok(())
427    }
428
429    #[tokio::test]
430    async fn test_should_return_none_on_non_existing_peer() -> anyhow::Result<()> {
431        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
432
433        let peer_id: PeerId = OffchainKeypair::random().public().into();
434
435        assert!(db.get_network_peer(&peer_id).await?.is_none(), "should return none");
436        Ok(())
437    }
438
439    #[tokio::test]
440    async fn test_update() -> anyhow::Result<()> {
441        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
442
443        let peer_id: PeerId = OffchainKeypair::random().public().into();
444
445        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
446        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
447
448        db.add_network_peer(
449            &peer_id,
450            PeerOrigin::IncomingConnection,
451            vec![ma_1.clone(), ma_2.clone()],
452            0.0,
453            25,
454        )
455        .await?;
456
457        let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
458        peer_status.last_seen = SystemTime::UNIX_EPOCH;
459        peer_status.last_seen_latency = Duration::from_secs(2);
460        peer_status.multiaddresses = vec![ma_1, ma_2];
461        peer_status.backoff = 2.0;
462        peer_status.ignored_until = None;
463        for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
464            peer_status.update_quality(i);
465        }
466        peer_status.quality = peer_status.quality as f32 as f64;
467
468        let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
469
470        assert_ne!(peer_status, peer_status_from_db, "entries must not be equal");
471
472        db.update_network_peer(peer_status.clone()).await?;
473
474        let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
475
476        assert_eq!(peer_status, peer_status_from_db, "entries must be equal");
477
478        Ok(())
479    }
480
481    #[tokio::test]
482    async fn test_should_fail_to_update_non_existing_peer() -> anyhow::Result<()> {
483        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
484
485        let peer_id: PeerId = OffchainKeypair::random().public().into();
486
487        let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
488        peer_status.last_seen = SystemTime::UNIX_EPOCH;
489        peer_status.last_seen_latency = Duration::from_secs(2);
490        peer_status.backoff = 2.0;
491        peer_status.ignored_until = None;
492        peer_status.multiaddresses = vec![];
493        for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
494            peer_status.update_quality(i);
495        }
496
497        db.update_network_peer(peer_status)
498            .await
499            .expect_err("should fail updating non-existing peer");
500        Ok(())
501    }
502
503    #[tokio::test]
504    async fn test_get_multiple_should_return_all_peers() -> anyhow::Result<()> {
505        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
506
507        let peers = (0..10)
508            .map(|_| {
509                let peer_id: PeerId = OffchainKeypair::random().public().into();
510                peer_id
511            })
512            .collect::<Vec<_>>();
513
514        for peer in &peers {
515            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 25)
516                .await?;
517        }
518
519        // The peers have by default current timestamp as LastSeen column when
520        // the `add_network_peer` method is called.
521        // Therefore, the `get_network_peers` must retrieve them in ascending order
522        // so that it later matches the `peers` vector in the assertion.
523        let peers_from_db: Vec<PeerId> = db
524            .get_network_peers(Default::default(), true)
525            .await?
526            .map(|s| s.id.1)
527            .collect()
528            .await;
529
530        assert_eq!(peers.len(), peers_from_db.len(), "lengths must match");
531        assert_eq!(peers, peers_from_db, "peer ids must match");
532
533        Ok(())
534    }
535
536    #[tokio::test]
537    async fn test_get_multiple_should_return_filtered_peers() -> anyhow::Result<()> {
538        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
539
540        let peer_count = 10;
541        let peers = (0..peer_count)
542            .map(|_| {
543                let peer_id: PeerId = OffchainKeypair::random().public().into();
544                peer_id
545            })
546            .collect::<Vec<_>>();
547
548        for (i, peer) in peers.iter().enumerate() {
549            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.2, 25)
550                .await?;
551            if i >= peer_count / 2 {
552                let mut peer_status = PeerStatus::new(*peer, PeerOrigin::IncomingConnection, 0.2, 25);
553                peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(i as u64));
554                peer_status.last_seen_latency = Duration::from_secs(2);
555                peer_status.multiaddresses = vec![];
556                peer_status.heartbeats_sent = 3;
557                peer_status.heartbeats_succeeded = 4;
558                peer_status.backoff = 1.0;
559                peer_status.ignored_until = None;
560                for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
561                    peer_status.update_quality(i);
562                }
563
564                db.update_network_peer(peer_status).await?;
565            }
566        }
567
568        let peers_from_db: Vec<PeerId> = db
569            .get_network_peers(PeerSelector::default().with_quality_gte(0.501_f64), false)
570            .await?
571            .map(|s| s.id.1)
572            .collect()
573            .await;
574
575        assert_eq!(peer_count / 2, peers_from_db.len(), "lengths must match");
576        assert_eq!(
577            peers.into_iter().skip(5).rev().collect::<Vec<_>>(),
578            peers_from_db,
579            "peer ids must match"
580        );
581
582        Ok(())
583    }
584
585    #[tokio::test]
586    async fn test_should_update_stats_when_updating_peers() -> anyhow::Result<()> {
587        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
588
589        let peer_id_1: PeerId = OffchainKeypair::random().public().into();
590        let peer_id_2: PeerId = OffchainKeypair::random().public().into();
591
592        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id_1}").parse()?;
593        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id_2}").parse()?;
594
595        db.add_network_peer(&peer_id_1, PeerOrigin::IncomingConnection, vec![ma_1], 0.0, 25)
596            .await?;
597
598        let stats = db.network_peer_stats(0.2).await?;
599        assert_eq!(
600            Stats {
601                good_quality_public: 0,
602                bad_quality_public: 1,
603                good_quality_non_public: 0,
604                bad_quality_non_public: 0,
605            },
606            stats,
607            "stats must be equal"
608        );
609
610        db.add_network_peer(&peer_id_2, PeerOrigin::IncomingConnection, vec![ma_2], 0.0, 25)
611            .await?;
612
613        let stats = db.network_peer_stats(0.2).await?;
614        assert_eq!(
615            Stats {
616                good_quality_public: 0,
617                bad_quality_public: 2,
618                good_quality_non_public: 0,
619                bad_quality_non_public: 0,
620            },
621            stats,
622            "stats must be equal"
623        );
624
625        let mut peer_status = PeerStatus::new(peer_id_1, PeerOrigin::IncomingConnection, 0.2, 25);
626        peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
627        peer_status.last_seen_latency = Duration::from_secs(2);
628        peer_status.multiaddresses = vec![];
629        peer_status.heartbeats_sent = 3;
630        peer_status.heartbeats_succeeded = 4;
631        peer_status.backoff = 1.0;
632        peer_status.ignored_until = None;
633        for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
634            peer_status.update_quality(i);
635        }
636
637        db.update_network_peer(peer_status).await?;
638
639        let stats = db.network_peer_stats(0.2).await?;
640        assert_eq!(
641            Stats {
642                good_quality_public: 1,
643                bad_quality_public: 1,
644                good_quality_non_public: 0,
645                bad_quality_non_public: 0,
646            },
647            stats,
648            "stats must be equal"
649        );
650
651        db.remove_network_peer(&peer_id_1).await?;
652
653        let stats = db.network_peer_stats(0.2).await?;
654        assert_eq!(
655            Stats {
656                good_quality_public: 0,
657                bad_quality_public: 1,
658                good_quality_non_public: 0,
659                bad_quality_non_public: 0,
660            },
661            stats,
662            "stats must be equal"
663        );
664
665        Ok(())
666    }
667}