hopr_db_node/
peers.rs

1use std::time::Duration;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::{TryStreamExt, stream::BoxStream};
6use hopr_api::{db::*, *};
7use hopr_crypto_types::prelude::OffchainPublicKey;
8use hopr_db_entity::network_peer;
9use hopr_primitive_types::prelude::*;
10use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder};
11use sea_query::{Condition, Expr, IntoCondition, Order};
12use sqlx::types::chrono::{self, DateTime, Utc};
13use tracing::{error, trace};
14
15use crate::{db::HoprNodeDb, errors::NodeDbError};
16
17const DB_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
18    .with_little_endian()
19    .with_variable_int_encoding();
20
21struct WrappedPeerSelector(PeerSelector);
22
23impl From<PeerSelector> for WrappedPeerSelector {
24    fn from(selector: PeerSelector) -> Self {
25        WrappedPeerSelector(selector)
26    }
27}
28
29impl IntoCondition for WrappedPeerSelector {
30    fn into_condition(self) -> Condition {
31        let mut ret = Expr::value(1);
32
33        if let Some(last_seen_l) = self.0.last_seen.0 {
34            ret = ret.and(network_peer::Column::LastSeen.gte(chrono::DateTime::<chrono::Utc>::from(last_seen_l)));
35        }
36
37        if let Some(last_seen_u) = self.0.last_seen.1 {
38            ret = ret.and(network_peer::Column::LastSeen.lte(chrono::DateTime::<chrono::Utc>::from(last_seen_u)));
39        }
40
41        if let Some(quality_l) = self.0.quality.0 {
42            ret = ret.and(network_peer::Column::Quality.gte(quality_l));
43        }
44
45        if let Some(quality_u) = self.0.quality.1 {
46            ret = ret.and(network_peer::Column::Quality.lte(quality_u));
47        }
48
49        ret.into_condition()
50    }
51}
52
53#[async_trait]
54impl HoprDbPeersOperations for HoprNodeDb {
55    type Error = NodeDbError;
56
57    #[tracing::instrument(
58        skip(self),
59        name = "HoprDbPeersOperations::add_network_peer",
60        level = "trace",
61        err,
62        ret
63    )]
64    async fn add_network_peer(
65        &self,
66        peer: &PeerId,
67        origin: PeerOrigin,
68        mas: Vec<Multiaddr>,
69        backoff: f64,
70        quality_window: u32,
71    ) -> Result<(), NodeDbError> {
72        let peer = *peer;
73        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
74        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
75
76        let new_peer = hopr_db_entity::network_peer::ActiveModel {
77            packet_key: sea_orm::ActiveValue::Set(Vec::from(pubkey.as_ref())),
78            multi_addresses: sea_orm::ActiveValue::Set(
79                mas.into_iter().map(|m| m.to_string()).collect::<Vec<String>>().into(),
80            ),
81            origin: sea_orm::ActiveValue::Set(origin as i8),
82            backoff: sea_orm::ActiveValue::Set(Some(backoff)),
83            quality_sma: sea_orm::ActiveValue::Set(Some(
84                bincode::serde::encode_to_vec(
85                    SingleSumSMA::<f64>::new(quality_window as usize),
86                    DB_BINCODE_CONFIGURATION,
87                )
88                .map_err(|e| NodeDbError::LogicalError(format!("cannot serialize sma: {e}")))?,
89            )),
90            ..Default::default()
91        };
92
93        let _ = new_peer.insert(&self.peers_db).await?;
94
95        Ok(())
96    }
97
98    #[tracing::instrument(
99        skip(self),
100        name = "HoprDbPeersOperations::remove_network_peer",
101        level = "trace",
102        err,
103        ret
104    )]
105    async fn remove_network_peer(&self, peer: &PeerId) -> Result<(), NodeDbError> {
106        let peer = *peer;
107        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
108        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
109
110        let res = hopr_db_entity::network_peer::Entity::delete_many()
111            .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(pubkey.as_ref())))
112            .exec(&self.peers_db)
113            .await?;
114
115        if res.rows_affected > 0 {
116            Ok(())
117        } else {
118            Err(NodeDbError::LogicalError(
119                "peer cannot be removed because it does not exist".into(),
120            ))
121        }
122    }
123
124    #[tracing::instrument(
125        skip(self),
126        name = "HoprDbPeersOperations::update_network_peer",
127        level = "trace",
128        err,
129        ret
130    )]
131    async fn update_network_peer(&self, new_status: PeerStatus) -> Result<(), NodeDbError> {
132        let row = hopr_db_entity::network_peer::Entity::find()
133            .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(new_status.id.0.as_ref())))
134            .one(&self.peers_db)
135            .await?;
136
137        if let Some(model) = row {
138            let mut peer_data: hopr_db_entity::network_peer::ActiveModel = model.into();
139            peer_data.packet_key = sea_orm::ActiveValue::Set(Vec::from(new_status.id.0.as_ref()));
140            peer_data.multi_addresses = sea_orm::ActiveValue::Set(
141                new_status
142                    .multiaddresses
143                    .into_iter()
144                    .map(|m| m.to_string())
145                    .collect::<Vec<String>>()
146                    .into(),
147            );
148            peer_data.origin = sea_orm::ActiveValue::Set(new_status.origin as i8);
149            peer_data.last_seen = sea_orm::ActiveValue::Set(DateTime::<Utc>::from(new_status.last_seen));
150            peer_data.last_seen_latency = sea_orm::ActiveValue::Set(new_status.last_seen_latency.as_millis() as i32);
151            peer_data.ignored_until = sea_orm::ActiveValue::Set(new_status.ignored_until.map(DateTime::<Utc>::from));
152            peer_data.quality = sea_orm::ActiveValue::Set(new_status.quality);
153            peer_data.quality_sma = sea_orm::ActiveValue::Set(Some(
154                bincode::serde::encode_to_vec(&new_status.quality_avg, DB_BINCODE_CONFIGURATION)
155                    .map_err(|e| NodeDbError::LogicalError(format!("cannot serialize sma: {e}")))?,
156            ));
157            peer_data.backoff = sea_orm::ActiveValue::Set(Some(new_status.backoff));
158            peer_data.heartbeats_sent = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_sent as i32));
159            peer_data.heartbeats_successful = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_succeeded as i32));
160
161            peer_data.update(&self.peers_db).await?;
162
163            Ok(())
164        } else {
165            Err(NodeDbError::LogicalError(format!(
166                "cannot update a non-existing peer '{}'",
167                new_status.id.1
168            )))
169        }
170    }
171
172    #[tracing::instrument(
173        skip(self),
174        name = "HoprDbPeersOperations::get_network_peer",
175        level = "trace",
176        err,
177        ret
178    )]
179    async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>, NodeDbError> {
180        let peer = *peer;
181        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
182        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
183        let row = hopr_db_entity::network_peer::Entity::find()
184            .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(pubkey.as_ref())))
185            .one(&self.peers_db)
186            .await?;
187
188        if let Some(model) = row {
189            let status: WrappedPeerStatus = model.try_into()?;
190            Ok(Some(status.0))
191        } else {
192            Ok(None)
193        }
194    }
195
196    #[tracing::instrument(skip(self), name = "HoprDbPeersOperations::get_network_peers", level = "trace", err)]
197    async fn get_network_peers<'a>(
198        &'a self,
199        selector: PeerSelector,
200        sort_last_seen_asc: bool,
201    ) -> Result<BoxStream<'a, PeerStatus>, NodeDbError> {
202        let selector: WrappedPeerSelector = selector.into();
203        let mut sub_stream = hopr_db_entity::network_peer::Entity::find()
204            .filter(selector)
205            .order_by(
206                network_peer::Column::LastSeen,
207                if sort_last_seen_asc { Order::Asc } else { Order::Desc },
208            )
209            .stream(&self.peers_db)
210            .await?;
211
212        Ok(Box::pin(stream! {
213            loop {
214                match sub_stream.try_next().await {
215                    Ok(Some(peer_row)) => {
216                        trace!(?peer_row, "got db network row");
217                        match WrappedPeerStatus::try_from(peer_row) {
218                            Ok(peer_status) => yield peer_status.0,
219                            Err(e) => error!(error = %e, "cannot map peer from row"),
220                        }
221                    },
222                    Ok(None) => {
223                        trace!("fetched all network results");
224                        break;
225                    }
226                    Err(e) => {
227                        error!(error = %e, "failed to retrieve next network row");
228                        break;
229                    }
230                }
231            }
232        }))
233    }
234
235    #[tracing::instrument(
236        skip(self),
237        name = "HoprDbPeersOperations::network_peer_stats",
238        level = "trace",
239        err,
240        ret
241    )]
242    async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats, NodeDbError> {
243        Ok(Stats {
244            good_quality_public: hopr_db_entity::network_peer::Entity::find()
245                .filter(
246                    sea_orm::Condition::all()
247                        .add(hopr_db_entity::network_peer::Column::IgnoredUntil.is_null())
248                        .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
249                )
250                .count(&self.peers_db)
251                .await? as u32,
252            good_quality_non_public: 0u32, // TODO: Only public peers supported in v3
253            bad_quality_public: hopr_db_entity::network_peer::Entity::find()
254                .filter(
255                    sea_orm::Condition::all()
256                        .add(hopr_db_entity::network_peer::Column::IgnoredUntil.is_null())
257                        .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
258                )
259                .count(&self.peers_db)
260                .await? as u32,
261            bad_quality_non_public: 0u32, // TODO: Only public peers supported in v3
262        })
263    }
264}
265
266struct WrappedPeerStatus(PeerStatus);
267
268impl From<PeerStatus> for WrappedPeerStatus {
269    fn from(status: PeerStatus) -> Self {
270        WrappedPeerStatus(status)
271    }
272}
273
274impl TryFrom<hopr_db_entity::network_peer::Model> for WrappedPeerStatus {
275    type Error = NodeDbError;
276
277    fn try_from(value: hopr_db_entity::network_peer::Model) -> std::result::Result<Self, Self::Error> {
278        let key = OffchainPublicKey::try_from(value.packet_key.as_slice())?;
279        Ok(PeerStatus {
280            id: (key, key.into()),
281            origin: PeerOrigin::from_repr(value.origin as u8)
282                .ok_or_else(|| Self::Error::LogicalError("invalid origin".into()))?,
283            last_seen: value.last_seen.into(),
284            last_seen_latency: Duration::from_millis(value.last_seen_latency.max(0) as u64),
285            heartbeats_sent: value.heartbeats_sent.unwrap_or_default() as u64,
286            heartbeats_succeeded: value.heartbeats_successful.unwrap_or_default() as u64,
287            backoff: value.backoff.unwrap_or(1.0f64),
288            ignored_until: value.ignored_until.map(|v| v.into()),
289            multiaddresses: {
290                if let sea_orm::query::JsonValue::Array(mas) = value.multi_addresses {
291                    mas.into_iter()
292                        .filter_map(|s| {
293                            if let sea_orm::query::JsonValue::String(s) = s {
294                                Some(s)
295                            } else {
296                                None
297                            }
298                        })
299                        .filter(|s| !s.trim().is_empty())
300                        .map(|m| {
301                            Multiaddr::try_from(m)
302                                .map_err(|m| Self::Error::LogicalError(format!("invalid multi-address: {m}")))
303                        })
304                        .collect::<Result<Vec<_>, NodeDbError>>()
305                } else {
306                    Err(Self::Error::LogicalError("invalid multi-addresses".into()))
307                }?
308            },
309            quality: value.quality,
310            quality_avg: bincode::serde::borrow_decode_from_slice(
311                value
312                    .quality_sma
313                    .ok_or_else(|| Self::Error::LogicalError("the SMA should always be present for every peer".into()))?
314                    .as_slice(),
315                DB_BINCODE_CONFIGURATION,
316            )
317            .map(|(v, _bytes)| v)
318            .map_err(|_| Self::Error::LogicalError("cannot deserialize SMA".into()))?,
319        }
320        .into())
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use std::{
327        ops::Add,
328        time::{Duration, SystemTime},
329    };
330
331    use futures::StreamExt;
332    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
333
334    use super::*;
335
336    #[tokio::test]
337    async fn test_add_get() -> anyhow::Result<()> {
338        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
339
340        let peer_id: PeerId = OffchainKeypair::random().public().into();
341        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
342        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
343
344        db.add_network_peer(
345            &peer_id,
346            PeerOrigin::IncomingConnection,
347            vec![ma_1.clone(), ma_2.clone()],
348            0.0,
349            25,
350        )
351        .await?;
352
353        let peer_from_db = db.get_network_peer(&peer_id).await?.expect("peer must exist in the db");
354
355        let mut expected_peer = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.0, 25);
356        expected_peer.last_seen = SystemTime::UNIX_EPOCH;
357        expected_peer.last_seen_latency = Duration::from_secs(0);
358        expected_peer.multiaddresses = vec![ma_1, ma_2];
359
360        assert_eq!(expected_peer, peer_from_db, "peer states must match");
361        Ok(())
362    }
363
364    #[tokio::test]
365    async fn test_should_remove_peer() -> anyhow::Result<()> {
366        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
367
368        let peer_id: PeerId = OffchainKeypair::random().public().into();
369        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
370
371        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
372            .await?;
373        assert!(db.get_network_peer(&peer_id).await?.is_some(), "must have peer entry");
374
375        db.remove_network_peer(&peer_id).await?;
376        assert!(
377            db.get_network_peer(&peer_id).await?.is_none(),
378            "peer entry must be gone"
379        );
380
381        Ok(())
382    }
383
384    #[tokio::test]
385    async fn test_should_not_remove_non_existing_peer() -> anyhow::Result<()> {
386        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
387
388        let peer_id: PeerId = OffchainKeypair::random().public().into();
389
390        db.remove_network_peer(&peer_id)
391            .await
392            .expect_err("must not delete non-existent peer");
393
394        Ok(())
395    }
396
397    #[tokio::test]
398    async fn test_should_not_add_duplicate_peers() -> anyhow::Result<()> {
399        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
400
401        let peer_id: PeerId = OffchainKeypair::random().public().into();
402        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
403
404        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
405            .await?;
406        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![], 0.0, 25)
407            .await
408            .expect_err("should fail adding second time");
409
410        Ok(())
411    }
412
413    #[tokio::test]
414    async fn test_should_return_none_on_non_existing_peer() -> anyhow::Result<()> {
415        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
416
417        let peer_id: PeerId = OffchainKeypair::random().public().into();
418
419        assert!(db.get_network_peer(&peer_id).await?.is_none(), "should return none");
420        Ok(())
421    }
422
423    #[tokio::test]
424    async fn test_update() -> anyhow::Result<()> {
425        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
426
427        let peer_id: PeerId = OffchainKeypair::random().public().into();
428
429        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
430        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
431
432        db.add_network_peer(
433            &peer_id,
434            PeerOrigin::IncomingConnection,
435            vec![ma_1.clone(), ma_2.clone()],
436            0.0,
437            25,
438        )
439        .await?;
440
441        let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
442        peer_status.last_seen = SystemTime::UNIX_EPOCH;
443        peer_status.last_seen_latency = Duration::from_secs(2);
444        peer_status.multiaddresses = vec![ma_1, ma_2];
445        peer_status.backoff = 2.0;
446        peer_status.ignored_until = None;
447        for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
448            peer_status.update_quality(i);
449        }
450        peer_status.quality = peer_status.quality as f32 as f64;
451
452        let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
453
454        assert_ne!(peer_status, peer_status_from_db, "entries must not be equal");
455
456        db.update_network_peer(peer_status.clone()).await?;
457
458        let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
459
460        assert_eq!(peer_status, peer_status_from_db, "entries must be equal");
461
462        Ok(())
463    }
464
465    #[tokio::test]
466    async fn test_should_fail_to_update_non_existing_peer() -> anyhow::Result<()> {
467        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
468
469        let peer_id: PeerId = OffchainKeypair::random().public().into();
470
471        let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
472        peer_status.last_seen = SystemTime::UNIX_EPOCH;
473        peer_status.last_seen_latency = Duration::from_secs(2);
474        peer_status.backoff = 2.0;
475        peer_status.ignored_until = None;
476        peer_status.multiaddresses = vec![];
477        for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
478            peer_status.update_quality(i);
479        }
480
481        db.update_network_peer(peer_status)
482            .await
483            .expect_err("should fail updating non-existing peer");
484        Ok(())
485    }
486
487    #[tokio::test]
488    async fn test_get_multiple_should_return_all_peers() -> anyhow::Result<()> {
489        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
490
491        let peers = (0..10)
492            .map(|_| {
493                let peer_id: PeerId = OffchainKeypair::random().public().into();
494                peer_id
495            })
496            .collect::<Vec<_>>();
497
498        for peer in &peers {
499            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 25)
500                .await?;
501        }
502
503        // The peers have by default current timestamp as LastSeen column when
504        // the `add_network_peer` method is called.
505        // Therefore, the `get_network_peers` must retrieve them in ascending order
506        // so that it later matches the `peers` vector in the assertion.
507        let peers_from_db: Vec<PeerId> = db
508            .get_network_peers(Default::default(), true)
509            .await?
510            .map(|s| s.id.1)
511            .collect()
512            .await;
513
514        assert_eq!(peers.len(), peers_from_db.len(), "lengths must match");
515        assert_eq!(peers, peers_from_db, "peer ids must match");
516
517        Ok(())
518    }
519
520    #[tokio::test]
521    async fn test_get_multiple_should_return_filtered_peers() -> anyhow::Result<()> {
522        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
523
524        let peer_count = 10;
525        let peers = (0..peer_count)
526            .map(|_| {
527                let peer_id: PeerId = OffchainKeypair::random().public().into();
528                peer_id
529            })
530            .collect::<Vec<_>>();
531
532        for (i, peer) in peers.iter().enumerate() {
533            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.2, 25)
534                .await?;
535            if i >= peer_count / 2 {
536                let mut peer_status = PeerStatus::new(*peer, PeerOrigin::IncomingConnection, 0.2, 25);
537                peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(i as u64));
538                peer_status.last_seen_latency = Duration::from_secs(2);
539                peer_status.multiaddresses = vec![];
540                peer_status.heartbeats_sent = 3;
541                peer_status.heartbeats_succeeded = 4;
542                peer_status.backoff = 1.0;
543                peer_status.ignored_until = None;
544                for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
545                    peer_status.update_quality(i);
546                }
547
548                db.update_network_peer(peer_status).await?;
549            }
550        }
551
552        let peers_from_db: Vec<PeerId> = db
553            .get_network_peers(PeerSelector::default().with_quality_gte(0.501_f64), false)
554            .await?
555            .map(|s| s.id.1)
556            .collect()
557            .await;
558
559        assert_eq!(peer_count / 2, peers_from_db.len(), "lengths must match");
560        assert_eq!(
561            peers.into_iter().skip(5).rev().collect::<Vec<_>>(),
562            peers_from_db,
563            "peer ids must match"
564        );
565
566        Ok(())
567    }
568
569    #[tokio::test]
570    async fn test_should_update_stats_when_updating_peers() -> anyhow::Result<()> {
571        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
572
573        let peer_id_1: PeerId = OffchainKeypair::random().public().into();
574        let peer_id_2: PeerId = OffchainKeypair::random().public().into();
575
576        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id_1}").parse()?;
577        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id_2}").parse()?;
578
579        db.add_network_peer(&peer_id_1, PeerOrigin::IncomingConnection, vec![ma_1], 0.0, 25)
580            .await?;
581
582        let stats = db.network_peer_stats(0.2).await?;
583        assert_eq!(
584            Stats {
585                good_quality_public: 0,
586                bad_quality_public: 1,
587                good_quality_non_public: 0,
588                bad_quality_non_public: 0,
589            },
590            stats,
591            "stats must be equal"
592        );
593
594        db.add_network_peer(&peer_id_2, PeerOrigin::IncomingConnection, vec![ma_2], 0.0, 25)
595            .await?;
596
597        let stats = db.network_peer_stats(0.2).await?;
598        assert_eq!(
599            Stats {
600                good_quality_public: 0,
601                bad_quality_public: 2,
602                good_quality_non_public: 0,
603                bad_quality_non_public: 0,
604            },
605            stats,
606            "stats must be equal"
607        );
608
609        let mut peer_status = PeerStatus::new(peer_id_1, PeerOrigin::IncomingConnection, 0.2, 25);
610        peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
611        peer_status.last_seen_latency = Duration::from_secs(2);
612        peer_status.multiaddresses = vec![];
613        peer_status.heartbeats_sent = 3;
614        peer_status.heartbeats_succeeded = 4;
615        peer_status.backoff = 1.0;
616        peer_status.ignored_until = None;
617        for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
618            peer_status.update_quality(i);
619        }
620
621        db.update_network_peer(peer_status).await?;
622
623        let stats = db.network_peer_stats(0.2).await?;
624        assert_eq!(
625            Stats {
626                good_quality_public: 1,
627                bad_quality_public: 1,
628                good_quality_non_public: 0,
629                bad_quality_non_public: 0,
630            },
631            stats,
632            "stats must be equal"
633        );
634
635        db.remove_network_peer(&peer_id_1).await?;
636
637        let stats = db.network_peer_stats(0.2).await?;
638        assert_eq!(
639            Stats {
640                good_quality_public: 0,
641                bad_quality_public: 1,
642                good_quality_non_public: 0,
643                bad_quality_non_public: 0,
644            },
645            stats,
646            "stats must be equal"
647        );
648
649        Ok(())
650    }
651}