hopr_db_sql/
peers.rs

1use std::time::Duration;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::{TryStreamExt, stream::BoxStream};
6use hopr_crypto_types::prelude::OffchainPublicKey;
7use hopr_db_api::{
8    errors::Result,
9    peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats},
10};
11use hopr_db_entity::network_peer;
12use hopr_primitive_types::prelude::*;
13use libp2p_identity::PeerId;
14use multiaddr::Multiaddr;
15use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder};
16use sea_query::{Condition, Expr, IntoCondition, Order};
17use sqlx::types::chrono::{self, DateTime, Utc};
18use tracing::{error, trace};
19
20use crate::{db::HoprDb, prelude::DbSqlError};
21
22const DB_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
23    .with_little_endian()
24    .with_variable_int_encoding();
25
26struct WrappedPeerSelector(PeerSelector);
27
28impl From<PeerSelector> for WrappedPeerSelector {
29    fn from(selector: PeerSelector) -> Self {
30        WrappedPeerSelector(selector)
31    }
32}
33
34impl IntoCondition for WrappedPeerSelector {
35    fn into_condition(self) -> Condition {
36        let mut ret = Expr::value(1);
37
38        if let Some(last_seen_l) = self.0.last_seen.0 {
39            ret = ret.and(network_peer::Column::LastSeen.gte(chrono::DateTime::<chrono::Utc>::from(last_seen_l)));
40        }
41
42        if let Some(last_seen_u) = self.0.last_seen.1 {
43            ret = ret.and(network_peer::Column::LastSeen.lte(chrono::DateTime::<chrono::Utc>::from(last_seen_u)));
44        }
45
46        if let Some(quality_l) = self.0.quality.0 {
47            ret = ret.and(network_peer::Column::Quality.gte(quality_l));
48        }
49
50        if let Some(quality_u) = self.0.quality.1 {
51            ret = ret.and(network_peer::Column::Quality.lte(quality_u));
52        }
53
54        ret.into_condition()
55    }
56}
57
58#[async_trait]
59impl HoprDbPeersOperations for HoprDb {
60    async fn add_network_peer(
61        &self,
62        peer: &PeerId,
63        origin: PeerOrigin,
64        mas: Vec<Multiaddr>,
65        backoff: f64,
66        quality_window: u32,
67    ) -> Result<()> {
68        let new_peer = hopr_db_entity::network_peer::ActiveModel {
69            packet_key: sea_orm::ActiveValue::Set(Vec::from(
70                OffchainPublicKey::try_from(peer)
71                    .map_err(|_| crate::errors::DbSqlError::DecodingError)?
72                    .as_ref(),
73            )),
74            multi_addresses: sea_orm::ActiveValue::Set(
75                mas.into_iter().map(|m| m.to_string()).collect::<Vec<String>>().into(),
76            ),
77            origin: sea_orm::ActiveValue::Set(origin as i8),
78            backoff: sea_orm::ActiveValue::Set(Some(backoff)),
79            quality_sma: sea_orm::ActiveValue::Set(Some(
80                bincode::serde::encode_to_vec(
81                    SingleSumSMA::<f64>::new(quality_window as usize),
82                    DB_BINCODE_CONFIGURATION,
83                )
84                .map_err(|_| crate::errors::DbSqlError::DecodingError)?,
85            )),
86            ..Default::default()
87        };
88
89        let _ = new_peer.insert(&self.peers_db).await.map_err(DbSqlError::from)?;
90
91        Ok(())
92    }
93
94    async fn remove_network_peer(&self, peer: &PeerId) -> Result<()> {
95        let res = hopr_db_entity::network_peer::Entity::delete_many()
96            .filter(
97                hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(
98                    OffchainPublicKey::try_from(peer)
99                        .map_err(|_| crate::errors::DbSqlError::DecodingError)?
100                        .as_ref(),
101                )),
102            )
103            .exec(&self.peers_db)
104            .await
105            .map_err(DbSqlError::from)?;
106
107        if res.rows_affected > 0 {
108            Ok(())
109        } else {
110            Err(
111                crate::errors::DbSqlError::LogicalError("peer cannot be removed because it does not exist".into())
112                    .into(),
113            )
114        }
115    }
116
117    async fn update_network_peer(&self, new_status: PeerStatus) -> Result<()> {
118        let row = hopr_db_entity::network_peer::Entity::find()
119            .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(new_status.id.0.as_ref())))
120            .one(&self.peers_db)
121            .await
122            .map_err(DbSqlError::from)?;
123
124        if let Some(model) = row {
125            let mut peer_data: hopr_db_entity::network_peer::ActiveModel = model.into();
126            peer_data.packet_key = sea_orm::ActiveValue::Set(Vec::from(new_status.id.0.as_ref()));
127            peer_data.multi_addresses = sea_orm::ActiveValue::Set(
128                new_status
129                    .multiaddresses
130                    .into_iter()
131                    .map(|m| m.to_string())
132                    .collect::<Vec<String>>()
133                    .into(),
134            );
135            peer_data.origin = sea_orm::ActiveValue::Set(new_status.origin as i8);
136            peer_data.version = sea_orm::ActiveValue::Set(new_status.peer_version);
137            peer_data.last_seen = sea_orm::ActiveValue::Set(DateTime::<Utc>::from(new_status.last_seen));
138            peer_data.last_seen_latency = sea_orm::ActiveValue::Set(new_status.last_seen_latency.as_millis() as i32);
139            peer_data.ignored = sea_orm::ActiveValue::Set(new_status.ignored.map(DateTime::<Utc>::from));
140            peer_data.public = sea_orm::ActiveValue::Set(new_status.is_public);
141            peer_data.quality = sea_orm::ActiveValue::Set(new_status.quality);
142            peer_data.quality_sma = sea_orm::ActiveValue::Set(Some(
143                bincode::serde::encode_to_vec(&new_status.quality_avg, DB_BINCODE_CONFIGURATION)
144                    .map_err(|e| crate::errors::DbSqlError::LogicalError(format!("cannot serialize sma: {e}")))?,
145            ));
146            peer_data.backoff = sea_orm::ActiveValue::Set(Some(new_status.backoff));
147            peer_data.heartbeats_sent = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_sent as i32));
148            peer_data.heartbeats_successful = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_succeeded as i32));
149
150            peer_data.update(&self.peers_db).await.map_err(DbSqlError::from)?;
151
152            Ok(())
153        } else {
154            Err(crate::errors::DbSqlError::LogicalError(format!(
155                "cannot update a non-existing peer '{}'",
156                new_status.id.1
157            ))
158            .into())
159        }
160    }
161
162    async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
163        let row = hopr_db_entity::network_peer::Entity::find()
164            .filter(
165                hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(
166                    OffchainPublicKey::try_from(peer)
167                        .map_err(|_| crate::errors::DbSqlError::DecodingError)?
168                        .as_ref(),
169                )),
170            )
171            .one(&self.peers_db)
172            .await
173            .map_err(DbSqlError::from)?;
174
175        if let Some(model) = row {
176            let status: WrappedPeerStatus = model.try_into()?;
177            Ok(Some(status.0))
178        } else {
179            Ok(None)
180        }
181    }
182
183    async fn get_network_peers<'a>(
184        &'a self,
185        selector: PeerSelector,
186        sort_last_seen_asc: bool,
187    ) -> Result<BoxStream<'a, PeerStatus>> {
188        let selector: WrappedPeerSelector = selector.into();
189        let mut sub_stream = hopr_db_entity::network_peer::Entity::find()
190            // .filter(hopr_db_entity::network_peer::Column::Ignored.is_not_null())
191            .filter(selector)
192            .order_by(
193                network_peer::Column::LastSeen,
194                if sort_last_seen_asc { Order::Asc } else { Order::Desc },
195            )
196            .stream(&self.peers_db)
197            .await
198            .map_err(DbSqlError::from)?;
199
200        Ok(Box::pin(stream! {
201            loop {
202                match sub_stream.try_next().await {
203                    Ok(Some(peer_row)) => {
204                        trace!(?peer_row, "got db network row");
205                        match WrappedPeerStatus::try_from(peer_row) {
206                            Ok(peer_status) => yield peer_status.0,
207                            Err(e) => error!(error = %e, "cannot map peer from row"),
208                        }
209                    },
210                    Ok(None) => {
211                        trace!("fetched all network results");
212                        break;
213                    }
214                    Err(e) => {
215                        error!(error = %e, "failed to retrieve next network row");
216                        break;
217                    }
218                }
219            }
220        }))
221    }
222
223    async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats> {
224        Ok(Stats {
225            good_quality_public: hopr_db_entity::network_peer::Entity::find()
226                .filter(
227                    sea_orm::Condition::all()
228                        .add(hopr_db_entity::network_peer::Column::Public.eq(true))
229                        .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
230                        .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
231                )
232                .count(&self.peers_db)
233                .await
234                .map_err(DbSqlError::from)? as u32,
235            good_quality_non_public: hopr_db_entity::network_peer::Entity::find()
236                .filter(
237                    sea_orm::Condition::all()
238                        .add(hopr_db_entity::network_peer::Column::Public.eq(false))
239                        .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
240                        .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
241                )
242                .count(&self.peers_db)
243                .await
244                .map_err(DbSqlError::from)? as u32,
245            bad_quality_public: hopr_db_entity::network_peer::Entity::find()
246                .filter(
247                    sea_orm::Condition::all()
248                        .add(hopr_db_entity::network_peer::Column::Public.eq(true))
249                        .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
250                        .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
251                )
252                .count(&self.peers_db)
253                .await
254                .map_err(DbSqlError::from)? as u32,
255            bad_quality_non_public: hopr_db_entity::network_peer::Entity::find()
256                .filter(
257                    sea_orm::Condition::all()
258                        .add(hopr_db_entity::network_peer::Column::Public.eq(false))
259                        .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
260                        .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
261                )
262                .count(&self.peers_db)
263                .await
264                .map_err(DbSqlError::from)? as u32,
265        })
266    }
267}
268
269struct WrappedPeerStatus(PeerStatus);
270
271impl From<PeerStatus> for WrappedPeerStatus {
272    fn from(status: PeerStatus) -> Self {
273        WrappedPeerStatus(status)
274    }
275}
276
277impl TryFrom<hopr_db_entity::network_peer::Model> for WrappedPeerStatus {
278    type Error = crate::errors::DbSqlError;
279
280    fn try_from(value: hopr_db_entity::network_peer::Model) -> std::result::Result<Self, Self::Error> {
281        let key = OffchainPublicKey::try_from(value.packet_key.as_slice()).map_err(|_| Self::Error::DecodingError)?;
282        Ok(PeerStatus {
283            id: (key, key.into()),
284            origin: PeerOrigin::try_from(value.origin as u8).map_err(|_| Self::Error::DecodingError)?,
285            is_public: value.public,
286            last_seen: value.last_seen.into(),
287            last_seen_latency: Duration::from_millis(value.last_seen_latency as u64),
288            heartbeats_sent: value.heartbeats_sent.unwrap_or_default() as u64,
289            heartbeats_succeeded: value.heartbeats_successful.unwrap_or_default() as u64,
290            backoff: value.backoff.unwrap_or(1.0f64),
291            ignored: value.ignored.map(|v| v.into()),
292            peer_version: value.version,
293            multiaddresses: {
294                if let sea_orm::query::JsonValue::Array(mas) = value.multi_addresses {
295                    mas.into_iter()
296                        .filter_map(|s| {
297                            if let sea_orm::query::JsonValue::String(s) = s {
298                                Some(s)
299                            } else {
300                                None
301                            }
302                        })
303                        .filter(|s| !s.trim().is_empty())
304                        .map(Multiaddr::try_from)
305                        .collect::<std::result::Result<Vec<_>, multiaddr::Error>>()
306                        .map_err(|_| Self::Error::DecodingError)
307                } else {
308                    Err(Self::Error::DecodingError)
309                }?
310            },
311            quality: value.quality,
312            quality_avg: bincode::serde::borrow_decode_from_slice(
313                value
314                    .quality_sma
315                    .ok_or_else(|| Self::Error::LogicalError("the SMA should always be present for every peer".into()))?
316                    .as_slice(),
317                DB_BINCODE_CONFIGURATION,
318            )
319            .map(|(v, _bytes)| v)
320            .map_err(|_| Self::Error::DecodingError)?,
321        }
322        .into())
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use std::{
329        ops::Add,
330        time::{Duration, SystemTime},
331    };
332
333    use futures::StreamExt;
334    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
335    use libp2p_identity::PeerId;
336    use multiaddr::Multiaddr;
337
338    use super::*;
339
340    #[tokio::test]
341    async fn test_add_get() -> anyhow::Result<()> {
342        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
343
344        let peer_id: PeerId = OffchainKeypair::random().public().into();
345        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
346        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
347
348        db.add_network_peer(
349            &peer_id,
350            PeerOrigin::IncomingConnection,
351            vec![ma_1.clone(), ma_2.clone()],
352            0.0,
353            25,
354        )
355        .await?;
356
357        let peer_from_db = db.get_network_peer(&peer_id).await?.expect("peer must exist in the db");
358
359        let mut expected_peer = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.0, 25);
360        expected_peer.last_seen = SystemTime::UNIX_EPOCH;
361        expected_peer.last_seen_latency = Duration::from_secs(0);
362        expected_peer.multiaddresses = vec![ma_1, ma_2];
363
364        assert_eq!(expected_peer, peer_from_db, "peer states must match");
365        Ok(())
366    }
367
368    #[tokio::test]
369    async fn test_should_remove_peer() -> anyhow::Result<()> {
370        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
371
372        let peer_id: PeerId = OffchainKeypair::random().public().into();
373        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
374
375        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
376            .await?;
377        assert!(db.get_network_peer(&peer_id).await?.is_some(), "must have peer entry");
378
379        db.remove_network_peer(&peer_id).await?;
380        assert!(
381            db.get_network_peer(&peer_id).await?.is_none(),
382            "peer entry must be gone"
383        );
384
385        Ok(())
386    }
387
388    #[tokio::test]
389    async fn test_should_not_remove_non_existing_peer() -> anyhow::Result<()> {
390        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
391
392        let peer_id: PeerId = OffchainKeypair::random().public().into();
393
394        db.remove_network_peer(&peer_id)
395            .await
396            .expect_err("must not delete non-existent peer");
397
398        Ok(())
399    }
400
401    #[tokio::test]
402    async fn test_should_not_add_duplicate_peers() -> anyhow::Result<()> {
403        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
404
405        let peer_id: PeerId = OffchainKeypair::random().public().into();
406        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
407
408        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
409            .await?;
410        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![], 0.0, 25)
411            .await
412            .expect_err("should fail adding second time");
413
414        Ok(())
415    }
416
417    #[tokio::test]
418    async fn test_should_return_none_on_non_existing_peer() -> anyhow::Result<()> {
419        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
420
421        let peer_id: PeerId = OffchainKeypair::random().public().into();
422
423        assert!(db.get_network_peer(&peer_id).await?.is_none(), "should return none");
424        Ok(())
425    }
426
427    #[tokio::test]
428    async fn test_update() -> anyhow::Result<()> {
429        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
430
431        let peer_id: PeerId = OffchainKeypair::random().public().into();
432
433        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
434        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
435
436        db.add_network_peer(
437            &peer_id,
438            PeerOrigin::IncomingConnection,
439            vec![ma_1.clone(), ma_2.clone()],
440            0.0,
441            25,
442        )
443        .await?;
444
445        let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
446        peer_status.last_seen = SystemTime::UNIX_EPOCH;
447        peer_status.last_seen_latency = Duration::from_secs(2);
448        peer_status.multiaddresses = vec![ma_1, ma_2];
449        peer_status.backoff = 2.0;
450        peer_status.ignored = None;
451        peer_status.peer_version = Some("1.2.3".into());
452        for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
453            peer_status.update_quality(i);
454        }
455        peer_status.quality = peer_status.quality as f32 as f64;
456
457        let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
458
459        assert_ne!(peer_status, peer_status_from_db, "entries must not be equal");
460
461        db.update_network_peer(peer_status.clone()).await?;
462
463        let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
464
465        assert_eq!(peer_status, peer_status_from_db, "entries must be equal");
466
467        Ok(())
468    }
469
470    #[tokio::test]
471    async fn test_should_fail_to_update_non_existing_peer() -> anyhow::Result<()> {
472        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
473
474        let peer_id: PeerId = OffchainKeypair::random().public().into();
475
476        let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
477        peer_status.last_seen = SystemTime::UNIX_EPOCH;
478        peer_status.last_seen_latency = Duration::from_secs(2);
479        peer_status.backoff = 2.0;
480        peer_status.ignored = None;
481        peer_status.peer_version = Some("1.2.3".into());
482        peer_status.multiaddresses = vec![];
483        for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
484            peer_status.update_quality(i);
485        }
486
487        db.update_network_peer(peer_status)
488            .await
489            .expect_err("should fail updating non-existing peer");
490        Ok(())
491    }
492
493    #[tokio::test]
494    async fn test_get_multiple_should_return_all_peers() -> anyhow::Result<()> {
495        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
496
497        let peers = (0..10)
498            .map(|_| {
499                let peer_id: PeerId = OffchainKeypair::random().public().into();
500                peer_id
501            })
502            .collect::<Vec<_>>();
503
504        for peer in &peers {
505            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 25)
506                .await?;
507        }
508
509        let peers_from_db: Vec<PeerId> = db
510            .get_network_peers(Default::default(), false)
511            .await?
512            .map(|s| s.id.1)
513            .collect()
514            .await;
515
516        assert_eq!(peers.len(), peers_from_db.len(), "lengths must match");
517        assert_eq!(peers, peers_from_db, "peer ids must match");
518
519        Ok(())
520    }
521
522    #[tokio::test]
523    async fn test_get_multiple_should_return_filtered_peers() -> anyhow::Result<()> {
524        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
525
526        let peer_count = 10;
527        let peers = (0..peer_count)
528            .map(|_| {
529                let peer_id: PeerId = OffchainKeypair::random().public().into();
530                peer_id
531            })
532            .collect::<Vec<_>>();
533
534        for (i, peer) in peers.iter().enumerate() {
535            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.2, 25)
536                .await?;
537            if i >= peer_count / 2 {
538                let mut peer_status = PeerStatus::new(*peer, PeerOrigin::IncomingConnection, 0.2, 25);
539                peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(i as u64));
540                peer_status.last_seen_latency = Duration::from_secs(2);
541                peer_status.multiaddresses = vec![];
542                peer_status.heartbeats_sent = 3;
543                peer_status.heartbeats_succeeded = 4;
544                peer_status.backoff = 1.0;
545                peer_status.ignored = None;
546                peer_status.peer_version = Some("1.2.3".into());
547                for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
548                    peer_status.update_quality(i);
549                }
550
551                db.update_network_peer(peer_status).await?;
552            }
553        }
554
555        let peers_from_db: Vec<PeerId> = db
556            .get_network_peers(PeerSelector::default().with_quality_gte(0.501_f64), false)
557            .await?
558            .map(|s| s.id.1)
559            .collect()
560            .await;
561
562        assert_eq!(peer_count / 2, peers_from_db.len(), "lengths must match");
563        assert_eq!(
564            peers.into_iter().skip(5).rev().collect::<Vec<_>>(),
565            peers_from_db,
566            "peer ids must match"
567        );
568
569        Ok(())
570    }
571
572    #[tokio::test]
573    async fn test_should_update_stats_when_updating_peers() -> anyhow::Result<()> {
574        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
575
576        let peer_id_1: PeerId = OffchainKeypair::random().public().into();
577        let peer_id_2: PeerId = OffchainKeypair::random().public().into();
578
579        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id_1}").parse()?;
580        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id_2}").parse()?;
581
582        db.add_network_peer(&peer_id_1, PeerOrigin::IncomingConnection, vec![ma_1], 0.0, 25)
583            .await?;
584
585        let stats = db.network_peer_stats(0.2).await?;
586        assert_eq!(
587            Stats {
588                good_quality_public: 0,
589                bad_quality_public: 1,
590                good_quality_non_public: 0,
591                bad_quality_non_public: 0,
592            },
593            stats,
594            "stats must be equal"
595        );
596
597        db.add_network_peer(&peer_id_2, PeerOrigin::IncomingConnection, vec![ma_2], 0.0, 25)
598            .await?;
599
600        let stats = db.network_peer_stats(0.2).await?;
601        assert_eq!(
602            Stats {
603                good_quality_public: 0,
604                bad_quality_public: 2,
605                good_quality_non_public: 0,
606                bad_quality_non_public: 0,
607            },
608            stats,
609            "stats must be equal"
610        );
611
612        let mut peer_status = PeerStatus::new(peer_id_1, PeerOrigin::IncomingConnection, 0.2, 25);
613        peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
614        peer_status.last_seen_latency = Duration::from_secs(2);
615        peer_status.multiaddresses = vec![];
616        peer_status.heartbeats_sent = 3;
617        peer_status.heartbeats_succeeded = 4;
618        peer_status.backoff = 1.0;
619        peer_status.ignored = None;
620        peer_status.peer_version = Some("1.2.3".into());
621        for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
622            peer_status.update_quality(i);
623        }
624
625        db.update_network_peer(peer_status).await?;
626
627        let stats = db.network_peer_stats(0.2).await?;
628        assert_eq!(
629            Stats {
630                good_quality_public: 1,
631                bad_quality_public: 1,
632                good_quality_non_public: 0,
633                bad_quality_non_public: 0,
634            },
635            stats,
636            "stats must be equal"
637        );
638
639        let mut peer_status = PeerStatus::new(peer_id_2, PeerOrigin::IncomingConnection, 0.2, 25);
640        peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
641        peer_status.last_seen_latency = Duration::from_secs(2);
642        peer_status.multiaddresses = vec![];
643        peer_status.is_public = false;
644        peer_status.heartbeats_sent = 3;
645        peer_status.heartbeats_succeeded = 4;
646        peer_status.backoff = 2.0;
647        peer_status.ignored = None;
648        peer_status.peer_version = Some("1.2.3".into());
649
650        db.update_network_peer(peer_status).await?;
651
652        let stats = db.network_peer_stats(0.2).await?;
653        assert_eq!(
654            Stats {
655                good_quality_public: 1,
656                bad_quality_public: 0,
657                good_quality_non_public: 0,
658                bad_quality_non_public: 1,
659            },
660            stats,
661            "stats must be equal"
662        );
663
664        db.remove_network_peer(&peer_id_1).await?;
665
666        let stats = db.network_peer_stats(0.2).await?;
667        assert_eq!(
668            Stats {
669                good_quality_public: 0,
670                bad_quality_public: 0,
671                good_quality_non_public: 0,
672                bad_quality_non_public: 1,
673            },
674            stats,
675            "stats must be equal"
676        );
677
678        Ok(())
679    }
680}