1use std::time::Duration;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::{TryStreamExt, stream::BoxStream};
6use hopr_api::{db::*, *};
7use hopr_crypto_types::prelude::OffchainPublicKey;
8use hopr_db_entity::network_peer;
9use hopr_primitive_types::prelude::*;
10use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder};
11use sea_query::{Condition, Expr, IntoCondition, Order};
12use sqlx::types::chrono::{self, DateTime, Utc};
13use tracing::{error, trace};
14
15use crate::{db::HoprNodeDb, errors::NodeDbError};
16
17const DB_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
18 .with_little_endian()
19 .with_variable_int_encoding();
20
21struct WrappedPeerSelector(PeerSelector);
22
23impl From<PeerSelector> for WrappedPeerSelector {
24 fn from(selector: PeerSelector) -> Self {
25 WrappedPeerSelector(selector)
26 }
27}
28
29impl IntoCondition for WrappedPeerSelector {
30 fn into_condition(self) -> Condition {
31 let mut ret = Expr::value(1);
32
33 if let Some(last_seen_l) = self.0.last_seen.0 {
34 ret = ret.and(network_peer::Column::LastSeen.gte(chrono::DateTime::<chrono::Utc>::from(last_seen_l)));
35 }
36
37 if let Some(last_seen_u) = self.0.last_seen.1 {
38 ret = ret.and(network_peer::Column::LastSeen.lte(chrono::DateTime::<chrono::Utc>::from(last_seen_u)));
39 }
40
41 if let Some(quality_l) = self.0.quality.0 {
42 ret = ret.and(network_peer::Column::Quality.gte(quality_l));
43 }
44
45 if let Some(quality_u) = self.0.quality.1 {
46 ret = ret.and(network_peer::Column::Quality.lte(quality_u));
47 }
48
49 ret.into_condition()
50 }
51}
52
53#[async_trait]
54impl HoprDbPeersOperations for HoprNodeDb {
55 type Error = NodeDbError;
56
57 #[tracing::instrument(
58 skip(self),
59 name = "HoprDbPeersOperations::add_network_peer",
60 level = "trace",
61 err,
62 ret
63 )]
64 async fn add_network_peer(
65 &self,
66 peer: &PeerId,
67 origin: PeerOrigin,
68 mas: Vec<Multiaddr>,
69 backoff: f64,
70 quality_window: u32,
71 ) -> Result<(), NodeDbError> {
72 let peer = *peer;
73 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
75
76 let new_peer = hopr_db_entity::network_peer::ActiveModel {
77 packet_key: sea_orm::ActiveValue::Set(Vec::from(pubkey.as_ref())),
78 multi_addresses: sea_orm::ActiveValue::Set(
79 mas.into_iter().map(|m| m.to_string()).collect::<Vec<String>>().into(),
80 ),
81 origin: sea_orm::ActiveValue::Set(origin as i8),
82 backoff: sea_orm::ActiveValue::Set(Some(backoff)),
83 quality_sma: sea_orm::ActiveValue::Set(Some(
84 bincode::serde::encode_to_vec(
85 SingleSumSMA::<f64>::new(quality_window as usize),
86 DB_BINCODE_CONFIGURATION,
87 )
88 .map_err(|e| NodeDbError::LogicalError(format!("cannot serialize sma: {e}")))?,
89 )),
90 ..Default::default()
91 };
92
93 let _ = new_peer.insert(&self.peers_db).await?;
94
95 Ok(())
96 }
97
98 #[tracing::instrument(
99 skip(self),
100 name = "HoprDbPeersOperations::remove_network_peer",
101 level = "trace",
102 err,
103 ret
104 )]
105 async fn remove_network_peer(&self, peer: &PeerId) -> Result<(), NodeDbError> {
106 let peer = *peer;
107 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
109
110 let res = hopr_db_entity::network_peer::Entity::delete_many()
111 .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(pubkey.as_ref())))
112 .exec(&self.peers_db)
113 .await?;
114
115 if res.rows_affected > 0 {
116 Ok(())
117 } else {
118 Err(NodeDbError::LogicalError(
119 "peer cannot be removed because it does not exist".into(),
120 ))
121 }
122 }
123
124 #[tracing::instrument(
125 skip(self),
126 name = "HoprDbPeersOperations::update_network_peer",
127 level = "trace",
128 err,
129 ret
130 )]
131 async fn update_network_peer(&self, new_status: PeerStatus) -> Result<(), NodeDbError> {
132 let row = hopr_db_entity::network_peer::Entity::find()
133 .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(new_status.id.0.as_ref())))
134 .one(&self.peers_db)
135 .await?;
136
137 if let Some(model) = row {
138 let mut peer_data: hopr_db_entity::network_peer::ActiveModel = model.into();
139 peer_data.packet_key = sea_orm::ActiveValue::Set(Vec::from(new_status.id.0.as_ref()));
140 peer_data.multi_addresses = sea_orm::ActiveValue::Set(
141 new_status
142 .multiaddresses
143 .into_iter()
144 .map(|m| m.to_string())
145 .collect::<Vec<String>>()
146 .into(),
147 );
148 peer_data.origin = sea_orm::ActiveValue::Set(new_status.origin as i8);
149 peer_data.last_seen = sea_orm::ActiveValue::Set(DateTime::<Utc>::from(new_status.last_seen));
150 peer_data.last_seen_latency = sea_orm::ActiveValue::Set(new_status.last_seen_latency.as_millis() as i32);
151 peer_data.ignored_until = sea_orm::ActiveValue::Set(new_status.ignored_until.map(DateTime::<Utc>::from));
152 peer_data.quality = sea_orm::ActiveValue::Set(new_status.quality);
153 peer_data.quality_sma = sea_orm::ActiveValue::Set(Some(
154 bincode::serde::encode_to_vec(&new_status.quality_avg, DB_BINCODE_CONFIGURATION)
155 .map_err(|e| NodeDbError::LogicalError(format!("cannot serialize sma: {e}")))?,
156 ));
157 peer_data.backoff = sea_orm::ActiveValue::Set(Some(new_status.backoff));
158 peer_data.heartbeats_sent = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_sent as i32));
159 peer_data.heartbeats_successful = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_succeeded as i32));
160
161 peer_data.update(&self.peers_db).await?;
162
163 Ok(())
164 } else {
165 Err(NodeDbError::LogicalError(format!(
166 "cannot update a non-existing peer '{}'",
167 new_status.id.1
168 )))
169 }
170 }
171
172 #[tracing::instrument(
173 skip(self),
174 name = "HoprDbPeersOperations::get_network_peer",
175 level = "trace",
176 err,
177 ret
178 )]
179 async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>, NodeDbError> {
180 let peer = *peer;
181 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
183 let row = hopr_db_entity::network_peer::Entity::find()
184 .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(pubkey.as_ref())))
185 .one(&self.peers_db)
186 .await?;
187
188 if let Some(model) = row {
189 let status: WrappedPeerStatus = model.try_into()?;
190 Ok(Some(status.0))
191 } else {
192 Ok(None)
193 }
194 }
195
196 #[tracing::instrument(skip(self), name = "HoprDbPeersOperations::get_network_peers", level = "trace", err)]
197 async fn get_network_peers<'a>(
198 &'a self,
199 selector: PeerSelector,
200 sort_last_seen_asc: bool,
201 ) -> Result<BoxStream<'a, PeerStatus>, NodeDbError> {
202 let selector: WrappedPeerSelector = selector.into();
203 let mut sub_stream = hopr_db_entity::network_peer::Entity::find()
204 .filter(selector)
205 .order_by(
206 network_peer::Column::LastSeen,
207 if sort_last_seen_asc { Order::Asc } else { Order::Desc },
208 )
209 .stream(&self.peers_db)
210 .await?;
211
212 Ok(Box::pin(stream! {
213 loop {
214 match sub_stream.try_next().await {
215 Ok(Some(peer_row)) => {
216 trace!(?peer_row, "got db network row");
217 match WrappedPeerStatus::try_from(peer_row) {
218 Ok(peer_status) => yield peer_status.0,
219 Err(e) => error!(error = %e, "cannot map peer from row"),
220 }
221 },
222 Ok(None) => {
223 trace!("fetched all network results");
224 break;
225 }
226 Err(e) => {
227 error!(error = %e, "failed to retrieve next network row");
228 break;
229 }
230 }
231 }
232 }))
233 }
234
235 #[tracing::instrument(
236 skip(self),
237 name = "HoprDbPeersOperations::network_peer_stats",
238 level = "trace",
239 err,
240 ret
241 )]
242 async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats, NodeDbError> {
243 Ok(Stats {
244 good_quality_public: hopr_db_entity::network_peer::Entity::find()
245 .filter(
246 sea_orm::Condition::all()
247 .add(hopr_db_entity::network_peer::Column::IgnoredUntil.is_null())
248 .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
249 )
250 .count(&self.peers_db)
251 .await? as u32,
252 good_quality_non_public: 0u32, bad_quality_public: hopr_db_entity::network_peer::Entity::find()
254 .filter(
255 sea_orm::Condition::all()
256 .add(hopr_db_entity::network_peer::Column::IgnoredUntil.is_null())
257 .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
258 )
259 .count(&self.peers_db)
260 .await? as u32,
261 bad_quality_non_public: 0u32, })
263 }
264}
265
266struct WrappedPeerStatus(PeerStatus);
267
268impl From<PeerStatus> for WrappedPeerStatus {
269 fn from(status: PeerStatus) -> Self {
270 WrappedPeerStatus(status)
271 }
272}
273
274impl TryFrom<hopr_db_entity::network_peer::Model> for WrappedPeerStatus {
275 type Error = NodeDbError;
276
277 fn try_from(value: hopr_db_entity::network_peer::Model) -> std::result::Result<Self, Self::Error> {
278 let key = OffchainPublicKey::try_from(value.packet_key.as_slice())?;
279 Ok(PeerStatus {
280 id: (key, key.into()),
281 origin: PeerOrigin::from_repr(value.origin as u8)
282 .ok_or_else(|| Self::Error::LogicalError("invalid origin".into()))?,
283 last_seen: value.last_seen.into(),
284 last_seen_latency: Duration::from_millis(value.last_seen_latency.max(0) as u64),
285 heartbeats_sent: value.heartbeats_sent.unwrap_or_default() as u64,
286 heartbeats_succeeded: value.heartbeats_successful.unwrap_or_default() as u64,
287 backoff: value.backoff.unwrap_or(1.0f64),
288 ignored_until: value.ignored_until.map(|v| v.into()),
289 multiaddresses: {
290 if let sea_orm::query::JsonValue::Array(mas) = value.multi_addresses {
291 mas.into_iter()
292 .filter_map(|s| {
293 if let sea_orm::query::JsonValue::String(s) = s {
294 Some(s)
295 } else {
296 None
297 }
298 })
299 .filter(|s| !s.trim().is_empty())
300 .map(|m| {
301 Multiaddr::try_from(m)
302 .map_err(|m| Self::Error::LogicalError(format!("invalid multi-address: {m}")))
303 })
304 .collect::<Result<Vec<_>, NodeDbError>>()
305 } else {
306 Err(Self::Error::LogicalError("invalid multi-addresses".into()))
307 }?
308 },
309 quality: value.quality,
310 quality_avg: bincode::serde::borrow_decode_from_slice(
311 value
312 .quality_sma
313 .ok_or_else(|| Self::Error::LogicalError("the SMA should always be present for every peer".into()))?
314 .as_slice(),
315 DB_BINCODE_CONFIGURATION,
316 )
317 .map(|(v, _bytes)| v)
318 .map_err(|_| Self::Error::LogicalError("cannot deserialize SMA".into()))?,
319 }
320 .into())
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use std::{
327 ops::Add,
328 time::{Duration, SystemTime},
329 };
330
331 use futures::StreamExt;
332 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
333
334 use super::*;
335
336 #[tokio::test]
337 async fn test_add_get() -> anyhow::Result<()> {
338 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
339
340 let peer_id: PeerId = OffchainKeypair::random().public().into();
341 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
342 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
343
344 db.add_network_peer(
345 &peer_id,
346 PeerOrigin::IncomingConnection,
347 vec![ma_1.clone(), ma_2.clone()],
348 0.0,
349 25,
350 )
351 .await?;
352
353 let peer_from_db = db.get_network_peer(&peer_id).await?.expect("peer must exist in the db");
354
355 let mut expected_peer = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.0, 25);
356 expected_peer.last_seen = SystemTime::UNIX_EPOCH;
357 expected_peer.last_seen_latency = Duration::from_secs(0);
358 expected_peer.multiaddresses = vec![ma_1, ma_2];
359
360 assert_eq!(expected_peer, peer_from_db, "peer states must match");
361 Ok(())
362 }
363
364 #[tokio::test]
365 async fn test_should_remove_peer() -> anyhow::Result<()> {
366 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
367
368 let peer_id: PeerId = OffchainKeypair::random().public().into();
369 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
370
371 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
372 .await?;
373 assert!(db.get_network_peer(&peer_id).await?.is_some(), "must have peer entry");
374
375 db.remove_network_peer(&peer_id).await?;
376 assert!(
377 db.get_network_peer(&peer_id).await?.is_none(),
378 "peer entry must be gone"
379 );
380
381 Ok(())
382 }
383
384 #[tokio::test]
385 async fn test_should_not_remove_non_existing_peer() -> anyhow::Result<()> {
386 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
387
388 let peer_id: PeerId = OffchainKeypair::random().public().into();
389
390 db.remove_network_peer(&peer_id)
391 .await
392 .expect_err("must not delete non-existent peer");
393
394 Ok(())
395 }
396
397 #[tokio::test]
398 async fn test_should_not_add_duplicate_peers() -> anyhow::Result<()> {
399 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
400
401 let peer_id: PeerId = OffchainKeypair::random().public().into();
402 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
403
404 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
405 .await?;
406 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![], 0.0, 25)
407 .await
408 .expect_err("should fail adding second time");
409
410 Ok(())
411 }
412
413 #[tokio::test]
414 async fn test_should_return_none_on_non_existing_peer() -> anyhow::Result<()> {
415 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
416
417 let peer_id: PeerId = OffchainKeypair::random().public().into();
418
419 assert!(db.get_network_peer(&peer_id).await?.is_none(), "should return none");
420 Ok(())
421 }
422
423 #[tokio::test]
424 async fn test_update() -> anyhow::Result<()> {
425 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
426
427 let peer_id: PeerId = OffchainKeypair::random().public().into();
428
429 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
430 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
431
432 db.add_network_peer(
433 &peer_id,
434 PeerOrigin::IncomingConnection,
435 vec![ma_1.clone(), ma_2.clone()],
436 0.0,
437 25,
438 )
439 .await?;
440
441 let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
442 peer_status.last_seen = SystemTime::UNIX_EPOCH;
443 peer_status.last_seen_latency = Duration::from_secs(2);
444 peer_status.multiaddresses = vec![ma_1, ma_2];
445 peer_status.backoff = 2.0;
446 peer_status.ignored_until = None;
447 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
448 peer_status.update_quality(i);
449 }
450 peer_status.quality = peer_status.quality as f32 as f64;
451
452 let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
453
454 assert_ne!(peer_status, peer_status_from_db, "entries must not be equal");
455
456 db.update_network_peer(peer_status.clone()).await?;
457
458 let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
459
460 assert_eq!(peer_status, peer_status_from_db, "entries must be equal");
461
462 Ok(())
463 }
464
465 #[tokio::test]
466 async fn test_should_fail_to_update_non_existing_peer() -> anyhow::Result<()> {
467 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
468
469 let peer_id: PeerId = OffchainKeypair::random().public().into();
470
471 let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
472 peer_status.last_seen = SystemTime::UNIX_EPOCH;
473 peer_status.last_seen_latency = Duration::from_secs(2);
474 peer_status.backoff = 2.0;
475 peer_status.ignored_until = None;
476 peer_status.multiaddresses = vec![];
477 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
478 peer_status.update_quality(i);
479 }
480
481 db.update_network_peer(peer_status)
482 .await
483 .expect_err("should fail updating non-existing peer");
484 Ok(())
485 }
486
487 #[tokio::test]
488 async fn test_get_multiple_should_return_all_peers() -> anyhow::Result<()> {
489 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
490
491 let peers = (0..10)
492 .map(|_| {
493 let peer_id: PeerId = OffchainKeypair::random().public().into();
494 peer_id
495 })
496 .collect::<Vec<_>>();
497
498 for peer in &peers {
499 db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 25)
500 .await?;
501 }
502
503 let peers_from_db: Vec<PeerId> = db
508 .get_network_peers(Default::default(), true)
509 .await?
510 .map(|s| s.id.1)
511 .collect()
512 .await;
513
514 assert_eq!(peers.len(), peers_from_db.len(), "lengths must match");
515 assert_eq!(peers, peers_from_db, "peer ids must match");
516
517 Ok(())
518 }
519
520 #[tokio::test]
521 async fn test_get_multiple_should_return_filtered_peers() -> anyhow::Result<()> {
522 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
523
524 let peer_count = 10;
525 let peers = (0..peer_count)
526 .map(|_| {
527 let peer_id: PeerId = OffchainKeypair::random().public().into();
528 peer_id
529 })
530 .collect::<Vec<_>>();
531
532 for (i, peer) in peers.iter().enumerate() {
533 db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.2, 25)
534 .await?;
535 if i >= peer_count / 2 {
536 let mut peer_status = PeerStatus::new(*peer, PeerOrigin::IncomingConnection, 0.2, 25);
537 peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(i as u64));
538 peer_status.last_seen_latency = Duration::from_secs(2);
539 peer_status.multiaddresses = vec![];
540 peer_status.heartbeats_sent = 3;
541 peer_status.heartbeats_succeeded = 4;
542 peer_status.backoff = 1.0;
543 peer_status.ignored_until = None;
544 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
545 peer_status.update_quality(i);
546 }
547
548 db.update_network_peer(peer_status).await?;
549 }
550 }
551
552 let peers_from_db: Vec<PeerId> = db
553 .get_network_peers(PeerSelector::default().with_quality_gte(0.501_f64), false)
554 .await?
555 .map(|s| s.id.1)
556 .collect()
557 .await;
558
559 assert_eq!(peer_count / 2, peers_from_db.len(), "lengths must match");
560 assert_eq!(
561 peers.into_iter().skip(5).rev().collect::<Vec<_>>(),
562 peers_from_db,
563 "peer ids must match"
564 );
565
566 Ok(())
567 }
568
569 #[tokio::test]
570 async fn test_should_update_stats_when_updating_peers() -> anyhow::Result<()> {
571 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
572
573 let peer_id_1: PeerId = OffchainKeypair::random().public().into();
574 let peer_id_2: PeerId = OffchainKeypair::random().public().into();
575
576 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id_1}").parse()?;
577 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id_2}").parse()?;
578
579 db.add_network_peer(&peer_id_1, PeerOrigin::IncomingConnection, vec![ma_1], 0.0, 25)
580 .await?;
581
582 let stats = db.network_peer_stats(0.2).await?;
583 assert_eq!(
584 Stats {
585 good_quality_public: 0,
586 bad_quality_public: 1,
587 good_quality_non_public: 0,
588 bad_quality_non_public: 0,
589 },
590 stats,
591 "stats must be equal"
592 );
593
594 db.add_network_peer(&peer_id_2, PeerOrigin::IncomingConnection, vec![ma_2], 0.0, 25)
595 .await?;
596
597 let stats = db.network_peer_stats(0.2).await?;
598 assert_eq!(
599 Stats {
600 good_quality_public: 0,
601 bad_quality_public: 2,
602 good_quality_non_public: 0,
603 bad_quality_non_public: 0,
604 },
605 stats,
606 "stats must be equal"
607 );
608
609 let mut peer_status = PeerStatus::new(peer_id_1, PeerOrigin::IncomingConnection, 0.2, 25);
610 peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
611 peer_status.last_seen_latency = Duration::from_secs(2);
612 peer_status.multiaddresses = vec![];
613 peer_status.heartbeats_sent = 3;
614 peer_status.heartbeats_succeeded = 4;
615 peer_status.backoff = 1.0;
616 peer_status.ignored_until = None;
617 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
618 peer_status.update_quality(i);
619 }
620
621 db.update_network_peer(peer_status).await?;
622
623 let stats = db.network_peer_stats(0.2).await?;
624 assert_eq!(
625 Stats {
626 good_quality_public: 1,
627 bad_quality_public: 1,
628 good_quality_non_public: 0,
629 bad_quality_non_public: 0,
630 },
631 stats,
632 "stats must be equal"
633 );
634
635 db.remove_network_peer(&peer_id_1).await?;
636
637 let stats = db.network_peer_stats(0.2).await?;
638 assert_eq!(
639 Stats {
640 good_quality_public: 0,
641 bad_quality_public: 1,
642 good_quality_non_public: 0,
643 bad_quality_non_public: 0,
644 },
645 stats,
646 "stats must be equal"
647 );
648
649 Ok(())
650 }
651}