hopr_db_sql/
peers.rs

1use std::time::Duration;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::{stream::BoxStream, TryStreamExt};
6use libp2p_identity::PeerId;
7use multiaddr::Multiaddr;
8use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder};
9use sea_query::{Condition, Expr, IntoCondition, Order};
10use sqlx::types::chrono::{self, DateTime, Utc};
11use tracing::{error, trace};
12
13use hopr_crypto_types::prelude::OffchainPublicKey;
14use hopr_db_api::{
15    errors::Result,
16    peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats},
17};
18use hopr_db_entity::network_peer;
19use hopr_primitive_types::prelude::*;
20
21use crate::{db::HoprDb, prelude::DbSqlError};
22
23const DB_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
24    .with_little_endian()
25    .with_variable_int_encoding();
26
27struct WrappedPeerSelector(PeerSelector);
28
29impl From<PeerSelector> for WrappedPeerSelector {
30    fn from(selector: PeerSelector) -> Self {
31        WrappedPeerSelector(selector)
32    }
33}
34
35impl IntoCondition for WrappedPeerSelector {
36    fn into_condition(self) -> Condition {
37        let mut ret = Expr::value(1);
38
39        if let Some(last_seen_l) = self.0.last_seen.0 {
40            ret = ret.and(network_peer::Column::LastSeen.gte(chrono::DateTime::<chrono::Utc>::from(last_seen_l)));
41        }
42
43        if let Some(last_seen_u) = self.0.last_seen.1 {
44            ret = ret.and(network_peer::Column::LastSeen.lte(chrono::DateTime::<chrono::Utc>::from(last_seen_u)));
45        }
46
47        if let Some(quality_l) = self.0.quality.0 {
48            ret = ret.and(network_peer::Column::Quality.gte(quality_l));
49        }
50
51        if let Some(quality_u) = self.0.quality.1 {
52            ret = ret.and(network_peer::Column::Quality.lte(quality_u));
53        }
54
55        ret.into_condition()
56    }
57}
58
59#[async_trait]
60impl HoprDbPeersOperations for HoprDb {
61    async fn add_network_peer(
62        &self,
63        peer: &PeerId,
64        origin: PeerOrigin,
65        mas: Vec<Multiaddr>,
66        backoff: f64,
67        quality_window: u32,
68    ) -> Result<()> {
69        let new_peer = hopr_db_entity::network_peer::ActiveModel {
70            packet_key: sea_orm::ActiveValue::Set(Vec::from(
71                OffchainPublicKey::try_from(peer)
72                    .map_err(|_| crate::errors::DbSqlError::DecodingError)?
73                    .as_ref(),
74            )),
75            multi_addresses: sea_orm::ActiveValue::Set(
76                mas.into_iter().map(|m| m.to_string()).collect::<Vec<String>>().into(),
77            ),
78            origin: sea_orm::ActiveValue::Set(origin as i8),
79            backoff: sea_orm::ActiveValue::Set(Some(backoff)),
80            quality_sma: sea_orm::ActiveValue::Set(Some(
81                bincode::serde::encode_to_vec(
82                    SingleSumSMA::<f64>::new(quality_window as usize),
83                    DB_BINCODE_CONFIGURATION,
84                )
85                .map_err(|_| crate::errors::DbSqlError::DecodingError)?,
86            )),
87            ..Default::default()
88        };
89
90        let _ = new_peer.insert(&self.peers_db).await.map_err(DbSqlError::from)?;
91
92        Ok(())
93    }
94
95    async fn remove_network_peer(&self, peer: &PeerId) -> Result<()> {
96        let res = hopr_db_entity::network_peer::Entity::delete_many()
97            .filter(
98                hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(
99                    OffchainPublicKey::try_from(peer)
100                        .map_err(|_| crate::errors::DbSqlError::DecodingError)?
101                        .as_ref(),
102                )),
103            )
104            .exec(&self.peers_db)
105            .await
106            .map_err(DbSqlError::from)?;
107
108        if res.rows_affected > 0 {
109            Ok(())
110        } else {
111            Err(
112                crate::errors::DbSqlError::LogicalError("peer cannot be removed because it does not exist".into())
113                    .into(),
114            )
115        }
116    }
117
118    async fn update_network_peer(&self, new_status: PeerStatus) -> Result<()> {
119        let row = hopr_db_entity::network_peer::Entity::find()
120            .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(new_status.id.0.as_ref())))
121            .one(&self.peers_db)
122            .await
123            .map_err(DbSqlError::from)?;
124
125        if let Some(model) = row {
126            let mut peer_data: hopr_db_entity::network_peer::ActiveModel = model.into();
127            peer_data.packet_key = sea_orm::ActiveValue::Set(Vec::from(new_status.id.0.as_ref()));
128            peer_data.multi_addresses = sea_orm::ActiveValue::Set(
129                new_status
130                    .multiaddresses
131                    .into_iter()
132                    .map(|m| m.to_string())
133                    .collect::<Vec<String>>()
134                    .into(),
135            );
136            peer_data.origin = sea_orm::ActiveValue::Set(new_status.origin as i8);
137            peer_data.version = sea_orm::ActiveValue::Set(new_status.peer_version);
138            peer_data.last_seen = sea_orm::ActiveValue::Set(DateTime::<Utc>::from(new_status.last_seen));
139            peer_data.last_seen_latency = sea_orm::ActiveValue::Set(new_status.last_seen_latency.as_millis() as i32);
140            peer_data.ignored = sea_orm::ActiveValue::Set(new_status.ignored.map(DateTime::<Utc>::from));
141            peer_data.public = sea_orm::ActiveValue::Set(new_status.is_public);
142            peer_data.quality = sea_orm::ActiveValue::Set(new_status.quality);
143            peer_data.quality_sma = sea_orm::ActiveValue::Set(Some(
144                bincode::serde::encode_to_vec(&new_status.quality_avg, DB_BINCODE_CONFIGURATION)
145                    .map_err(|e| crate::errors::DbSqlError::LogicalError(format!("cannot serialize sma: {e}")))?,
146            ));
147            peer_data.backoff = sea_orm::ActiveValue::Set(Some(new_status.backoff));
148            peer_data.heartbeats_sent = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_sent as i32));
149            peer_data.heartbeats_successful = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_succeeded as i32));
150
151            peer_data.update(&self.peers_db).await.map_err(DbSqlError::from)?;
152
153            Ok(())
154        } else {
155            Err(crate::errors::DbSqlError::LogicalError(format!(
156                "cannot update a non-existing peer '{}'",
157                new_status.id.1
158            ))
159            .into())
160        }
161    }
162
163    async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
164        let row = hopr_db_entity::network_peer::Entity::find()
165            .filter(
166                hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(
167                    OffchainPublicKey::try_from(peer)
168                        .map_err(|_| crate::errors::DbSqlError::DecodingError)?
169                        .as_ref(),
170                )),
171            )
172            .one(&self.peers_db)
173            .await
174            .map_err(DbSqlError::from)?;
175
176        if let Some(model) = row {
177            let status: WrappedPeerStatus = model.try_into()?;
178            Ok(Some(status.0))
179        } else {
180            Ok(None)
181        }
182    }
183
184    async fn get_network_peers<'a>(
185        &'a self,
186        selector: PeerSelector,
187        sort_last_seen_asc: bool,
188    ) -> Result<BoxStream<'a, PeerStatus>> {
189        let selector: WrappedPeerSelector = selector.into();
190        let mut sub_stream = hopr_db_entity::network_peer::Entity::find()
191            // .filter(hopr_db_entity::network_peer::Column::Ignored.is_not_null())
192            .filter(selector)
193            .order_by(
194                network_peer::Column::LastSeen,
195                if sort_last_seen_asc { Order::Asc } else { Order::Desc },
196            )
197            .stream(&self.peers_db)
198            .await
199            .map_err(DbSqlError::from)?;
200
201        Ok(Box::pin(stream! {
202            loop {
203                match sub_stream.try_next().await {
204                    Ok(Some(peer_row)) => {
205                        trace!(?peer_row, "got db network row");
206                        match WrappedPeerStatus::try_from(peer_row) {
207                            Ok(peer_status) => yield peer_status.0,
208                            Err(e) => error!(error = %e, "cannot map peer from row"),
209                        }
210                    },
211                    Ok(None) => {
212                        trace!("fetched all network results");
213                        break;
214                    }
215                    Err(e) => {
216                        error!(error = %e, "failed to retrieve next network row");
217                        break;
218                    }
219                }
220            }
221        }))
222    }
223
224    async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats> {
225        Ok(Stats {
226            good_quality_public: hopr_db_entity::network_peer::Entity::find()
227                .filter(
228                    sea_orm::Condition::all()
229                        .add(hopr_db_entity::network_peer::Column::Public.eq(true))
230                        .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
231                        .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
232                )
233                .count(&self.peers_db)
234                .await
235                .map_err(DbSqlError::from)? as u32,
236            good_quality_non_public: hopr_db_entity::network_peer::Entity::find()
237                .filter(
238                    sea_orm::Condition::all()
239                        .add(hopr_db_entity::network_peer::Column::Public.eq(false))
240                        .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
241                        .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
242                )
243                .count(&self.peers_db)
244                .await
245                .map_err(DbSqlError::from)? as u32,
246            bad_quality_public: hopr_db_entity::network_peer::Entity::find()
247                .filter(
248                    sea_orm::Condition::all()
249                        .add(hopr_db_entity::network_peer::Column::Public.eq(true))
250                        .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
251                        .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
252                )
253                .count(&self.peers_db)
254                .await
255                .map_err(DbSqlError::from)? as u32,
256            bad_quality_non_public: hopr_db_entity::network_peer::Entity::find()
257                .filter(
258                    sea_orm::Condition::all()
259                        .add(hopr_db_entity::network_peer::Column::Public.eq(false))
260                        .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
261                        .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
262                )
263                .count(&self.peers_db)
264                .await
265                .map_err(DbSqlError::from)? as u32,
266        })
267    }
268}
269
270struct WrappedPeerStatus(PeerStatus);
271
272impl From<PeerStatus> for WrappedPeerStatus {
273    fn from(status: PeerStatus) -> Self {
274        WrappedPeerStatus(status)
275    }
276}
277
278impl TryFrom<hopr_db_entity::network_peer::Model> for WrappedPeerStatus {
279    type Error = crate::errors::DbSqlError;
280
281    fn try_from(value: hopr_db_entity::network_peer::Model) -> std::result::Result<Self, Self::Error> {
282        let key = OffchainPublicKey::try_from(value.packet_key.as_slice()).map_err(|_| Self::Error::DecodingError)?;
283        Ok(PeerStatus {
284            id: (key, key.into()),
285            origin: PeerOrigin::try_from(value.origin as u8).map_err(|_| Self::Error::DecodingError)?,
286            is_public: value.public,
287            last_seen: value.last_seen.into(),
288            last_seen_latency: Duration::from_millis(value.last_seen_latency as u64),
289            heartbeats_sent: value.heartbeats_sent.unwrap_or_default() as u64,
290            heartbeats_succeeded: value.heartbeats_successful.unwrap_or_default() as u64,
291            backoff: value.backoff.unwrap_or(1.0f64),
292            ignored: value.ignored.map(|v| v.into()),
293            peer_version: value.version,
294            multiaddresses: {
295                if let sea_orm::query::JsonValue::Array(mas) = value.multi_addresses {
296                    mas.into_iter()
297                        .filter_map(|s| {
298                            if let sea_orm::query::JsonValue::String(s) = s {
299                                Some(s)
300                            } else {
301                                None
302                            }
303                        })
304                        .filter(|s| !s.trim().is_empty())
305                        .map(Multiaddr::try_from)
306                        .collect::<std::result::Result<Vec<_>, multiaddr::Error>>()
307                        .map_err(|_| Self::Error::DecodingError)
308                } else {
309                    Err(Self::Error::DecodingError)
310                }?
311            },
312            quality: value.quality,
313            quality_avg: bincode::serde::borrow_decode_from_slice(
314                value
315                    .quality_sma
316                    .ok_or_else(|| Self::Error::LogicalError("the SMA should always be present for every peer".into()))?
317                    .as_slice(),
318                DB_BINCODE_CONFIGURATION,
319            )
320            .map(|(v, _bytes)| v)
321            .map_err(|_| Self::Error::DecodingError)?,
322        }
323        .into())
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use futures::StreamExt;
331    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
332    use libp2p_identity::PeerId;
333    use multiaddr::Multiaddr;
334    use std::ops::Add;
335    use std::time::{Duration, SystemTime};
336
337    #[async_std::test]
338    async fn test_add_get() -> anyhow::Result<()> {
339        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
340
341        let peer_id: PeerId = OffchainKeypair::random().public().into();
342        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
343        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
344
345        db.add_network_peer(
346            &peer_id,
347            PeerOrigin::IncomingConnection,
348            vec![ma_1.clone(), ma_2.clone()],
349            0.0,
350            25,
351        )
352        .await?;
353
354        let peer_from_db = db.get_network_peer(&peer_id).await?.expect("peer must exist in the db");
355
356        let mut expected_peer = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.0, 25);
357        expected_peer.last_seen = SystemTime::UNIX_EPOCH;
358        expected_peer.last_seen_latency = Duration::from_secs(0);
359        expected_peer.multiaddresses = vec![ma_1, ma_2];
360
361        assert_eq!(expected_peer, peer_from_db, "peer states must match");
362        Ok(())
363    }
364
365    #[async_std::test]
366    async fn test_should_remove_peer() -> anyhow::Result<()> {
367        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
368
369        let peer_id: PeerId = OffchainKeypair::random().public().into();
370        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
371
372        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
373            .await?;
374        assert!(db.get_network_peer(&peer_id).await?.is_some(), "must have peer entry");
375
376        db.remove_network_peer(&peer_id).await?;
377        assert!(
378            db.get_network_peer(&peer_id).await?.is_none(),
379            "peer entry must be gone"
380        );
381
382        Ok(())
383    }
384
385    #[async_std::test]
386    async fn test_should_not_remove_non_existing_peer() -> anyhow::Result<()> {
387        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
388
389        let peer_id: PeerId = OffchainKeypair::random().public().into();
390
391        db.remove_network_peer(&peer_id)
392            .await
393            .expect_err("must not delete non-existent peer");
394
395        Ok(())
396    }
397
398    #[async_std::test]
399    async fn test_should_not_add_duplicate_peers() -> anyhow::Result<()> {
400        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
401
402        let peer_id: PeerId = OffchainKeypair::random().public().into();
403        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
404
405        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
406            .await?;
407        db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![], 0.0, 25)
408            .await
409            .expect_err("should fail adding second time");
410
411        Ok(())
412    }
413
414    #[async_std::test]
415    async fn test_should_return_none_on_non_existing_peer() -> anyhow::Result<()> {
416        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
417
418        let peer_id: PeerId = OffchainKeypair::random().public().into();
419
420        assert!(db.get_network_peer(&peer_id).await?.is_none(), "should return none");
421        Ok(())
422    }
423
424    #[async_std::test]
425    async fn test_update() -> anyhow::Result<()> {
426        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
427
428        let peer_id: PeerId = OffchainKeypair::random().public().into();
429
430        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
431        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
432
433        db.add_network_peer(
434            &peer_id,
435            PeerOrigin::IncomingConnection,
436            vec![ma_1.clone(), ma_2.clone()],
437            0.0,
438            25,
439        )
440        .await?;
441
442        let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
443        peer_status.last_seen = SystemTime::UNIX_EPOCH;
444        peer_status.last_seen_latency = Duration::from_secs(2);
445        peer_status.multiaddresses = vec![ma_1, ma_2];
446        peer_status.backoff = 2.0;
447        peer_status.ignored = None;
448        peer_status.peer_version = Some("1.2.3".into());
449        for i in [0.1_f64, 0.4_64, 0.6_f64].into_iter() {
450            peer_status.update_quality(i);
451        }
452        peer_status.quality = peer_status.quality as f32 as f64;
453
454        let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
455
456        assert_ne!(peer_status, peer_status_from_db, "entries must not be equal");
457
458        db.update_network_peer(peer_status.clone()).await?;
459
460        let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
461
462        assert_eq!(peer_status, peer_status_from_db, "entries must be equal");
463
464        Ok(())
465    }
466
467    #[async_std::test]
468    async fn test_should_fail_to_update_non_existing_peer() -> anyhow::Result<()> {
469        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
470
471        let peer_id: PeerId = OffchainKeypair::random().public().into();
472
473        let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
474        peer_status.last_seen = SystemTime::UNIX_EPOCH;
475        peer_status.last_seen_latency = Duration::from_secs(2);
476        peer_status.backoff = 2.0;
477        peer_status.ignored = None;
478        peer_status.peer_version = Some("1.2.3".into());
479        peer_status.multiaddresses = vec![];
480        for i in [0.1_f64, 0.4_64, 0.6_f64].into_iter() {
481            peer_status.update_quality(i);
482        }
483
484        db.update_network_peer(peer_status)
485            .await
486            .expect_err("should fail updating non-existing peer");
487        Ok(())
488    }
489
490    #[async_std::test]
491    async fn test_get_multiple_should_return_all_peers() -> anyhow::Result<()> {
492        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
493
494        let peers = (0..10)
495            .map(|_| {
496                let peer_id: PeerId = OffchainKeypair::random().public().into();
497                peer_id
498            })
499            .collect::<Vec<_>>();
500
501        for peer in &peers {
502            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 25)
503                .await?;
504        }
505
506        let peers_from_db: Vec<PeerId> = db
507            .get_network_peers(Default::default(), false)
508            .await?
509            .map(|s| s.id.1)
510            .collect()
511            .await;
512
513        assert_eq!(peers.len(), peers_from_db.len(), "lengths must match");
514        assert_eq!(peers, peers_from_db, "peer ids must match");
515
516        Ok(())
517    }
518
519    #[async_std::test]
520    async fn test_get_multiple_should_return_filtered_peers() -> anyhow::Result<()> {
521        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
522
523        let peer_count = 10;
524        let peers = (0..peer_count)
525            .map(|_| {
526                let peer_id: PeerId = OffchainKeypair::random().public().into();
527                peer_id
528            })
529            .collect::<Vec<_>>();
530
531        for (i, peer) in peers.iter().enumerate() {
532            db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.2, 25)
533                .await?;
534            if i >= peer_count / 2 {
535                let mut peer_status = PeerStatus::new(*peer, PeerOrigin::IncomingConnection, 0.2, 25);
536                peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(i as u64));
537                peer_status.last_seen_latency = Duration::from_secs(2);
538                peer_status.multiaddresses = vec![];
539                peer_status.heartbeats_sent = 3;
540                peer_status.heartbeats_succeeded = 4;
541                peer_status.backoff = 1.0;
542                peer_status.ignored = None;
543                peer_status.peer_version = Some("1.2.3".into());
544                for i in [0.1_f64, 0.4_64, 0.6_f64].into_iter() {
545                    peer_status.update_quality(i);
546                }
547
548                db.update_network_peer(peer_status).await?;
549            }
550        }
551
552        let peers_from_db: Vec<PeerId> = db
553            .get_network_peers(PeerSelector::default().with_quality_gte(0.501_f64), false)
554            .await?
555            .map(|s| s.id.1)
556            .collect()
557            .await;
558
559        assert_eq!(peer_count / 2, peers_from_db.len(), "lengths must match");
560        assert_eq!(
561            peers.into_iter().skip(5).rev().collect::<Vec<_>>(),
562            peers_from_db,
563            "peer ids must match"
564        );
565
566        Ok(())
567    }
568
569    #[async_std::test]
570    async fn test_should_update_stats_when_updating_peers() -> anyhow::Result<()> {
571        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
572
573        let peer_id_1: PeerId = OffchainKeypair::random().public().into();
574        let peer_id_2: PeerId = OffchainKeypair::random().public().into();
575
576        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id_1}").parse()?;
577        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id_2}").parse()?;
578
579        db.add_network_peer(&peer_id_1, PeerOrigin::IncomingConnection, vec![ma_1], 0.0, 25)
580            .await?;
581
582        let stats = db.network_peer_stats(0.2).await?;
583        assert_eq!(
584            Stats {
585                good_quality_public: 0,
586                bad_quality_public: 1,
587                good_quality_non_public: 0,
588                bad_quality_non_public: 0,
589            },
590            stats,
591            "stats must be equal"
592        );
593
594        db.add_network_peer(&peer_id_2, PeerOrigin::IncomingConnection, vec![ma_2], 0.0, 25)
595            .await?;
596
597        let stats = db.network_peer_stats(0.2).await?;
598        assert_eq!(
599            Stats {
600                good_quality_public: 0,
601                bad_quality_public: 2,
602                good_quality_non_public: 0,
603                bad_quality_non_public: 0,
604            },
605            stats,
606            "stats must be equal"
607        );
608
609        let mut peer_status = PeerStatus::new(peer_id_1, PeerOrigin::IncomingConnection, 0.2, 25);
610        peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
611        peer_status.last_seen_latency = Duration::from_secs(2);
612        peer_status.multiaddresses = vec![];
613        peer_status.heartbeats_sent = 3;
614        peer_status.heartbeats_succeeded = 4;
615        peer_status.backoff = 1.0;
616        peer_status.ignored = None;
617        peer_status.peer_version = Some("1.2.3".into());
618        for i in [0.1_f64, 0.4_64, 0.6_f64].into_iter() {
619            peer_status.update_quality(i);
620        }
621
622        db.update_network_peer(peer_status).await?;
623
624        let stats = db.network_peer_stats(0.2).await?;
625        assert_eq!(
626            Stats {
627                good_quality_public: 1,
628                bad_quality_public: 1,
629                good_quality_non_public: 0,
630                bad_quality_non_public: 0,
631            },
632            stats,
633            "stats must be equal"
634        );
635
636        let mut peer_status = PeerStatus::new(peer_id_2, PeerOrigin::IncomingConnection, 0.2, 25);
637        peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
638        peer_status.last_seen_latency = Duration::from_secs(2);
639        peer_status.multiaddresses = vec![];
640        peer_status.is_public = false;
641        peer_status.heartbeats_sent = 3;
642        peer_status.heartbeats_succeeded = 4;
643        peer_status.backoff = 2.0;
644        peer_status.ignored = None;
645        peer_status.peer_version = Some("1.2.3".into());
646
647        db.update_network_peer(peer_status).await?;
648
649        let stats = db.network_peer_stats(0.2).await?;
650        assert_eq!(
651            Stats {
652                good_quality_public: 1,
653                bad_quality_public: 0,
654                good_quality_non_public: 0,
655                bad_quality_non_public: 1,
656            },
657            stats,
658            "stats must be equal"
659        );
660
661        db.remove_network_peer(&peer_id_1).await?;
662
663        let stats = db.network_peer_stats(0.2).await?;
664        assert_eq!(
665            Stats {
666                good_quality_public: 0,
667                bad_quality_public: 0,
668                good_quality_non_public: 0,
669                bad_quality_non_public: 1,
670            },
671            stats,
672            "stats must be equal"
673        );
674
675        Ok(())
676    }
677}