1use std::time::Duration;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::{TryStreamExt, stream::BoxStream};
6use hopr_crypto_types::prelude::OffchainPublicKey;
7use hopr_db_api::{
8 errors::Result,
9 peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats},
10};
11use hopr_db_entity::network_peer;
12use hopr_primitive_types::prelude::*;
13use libp2p_identity::PeerId;
14use multiaddr::Multiaddr;
15use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder};
16use sea_query::{Condition, Expr, IntoCondition, Order};
17use sqlx::types::chrono::{self, DateTime, Utc};
18use tracing::{error, trace};
19
20use crate::{db::HoprDb, prelude::DbSqlError};
21
22const DB_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
23 .with_little_endian()
24 .with_variable_int_encoding();
25
26struct WrappedPeerSelector(PeerSelector);
27
28impl From<PeerSelector> for WrappedPeerSelector {
29 fn from(selector: PeerSelector) -> Self {
30 WrappedPeerSelector(selector)
31 }
32}
33
34impl IntoCondition for WrappedPeerSelector {
35 fn into_condition(self) -> Condition {
36 let mut ret = Expr::value(1);
37
38 if let Some(last_seen_l) = self.0.last_seen.0 {
39 ret = ret.and(network_peer::Column::LastSeen.gte(chrono::DateTime::<chrono::Utc>::from(last_seen_l)));
40 }
41
42 if let Some(last_seen_u) = self.0.last_seen.1 {
43 ret = ret.and(network_peer::Column::LastSeen.lte(chrono::DateTime::<chrono::Utc>::from(last_seen_u)));
44 }
45
46 if let Some(quality_l) = self.0.quality.0 {
47 ret = ret.and(network_peer::Column::Quality.gte(quality_l));
48 }
49
50 if let Some(quality_u) = self.0.quality.1 {
51 ret = ret.and(network_peer::Column::Quality.lte(quality_u));
52 }
53
54 ret.into_condition()
55 }
56}
57
58#[async_trait]
59impl HoprDbPeersOperations for HoprDb {
60 #[tracing::instrument(
61 skip(self),
62 name = "HoprDbPeersOperations::add_network_peer",
63 level = "trace",
64 err,
65 ret
66 )]
67 async fn add_network_peer(
68 &self,
69 peer: &PeerId,
70 origin: PeerOrigin,
71 mas: Vec<Multiaddr>,
72 backoff: f64,
73 quality_window: u32,
74 ) -> Result<()> {
75 let peer = *peer;
76 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
78 .await
79 .map_err(|_| crate::errors::DbSqlError::DecodingError)?;
80
81 let new_peer = hopr_db_entity::network_peer::ActiveModel {
82 packet_key: sea_orm::ActiveValue::Set(Vec::from(pubkey.as_ref())),
83 multi_addresses: sea_orm::ActiveValue::Set(
84 mas.into_iter().map(|m| m.to_string()).collect::<Vec<String>>().into(),
85 ),
86 origin: sea_orm::ActiveValue::Set(origin as i8),
87 backoff: sea_orm::ActiveValue::Set(Some(backoff)),
88 quality_sma: sea_orm::ActiveValue::Set(Some(
89 bincode::serde::encode_to_vec(
90 SingleSumSMA::<f64>::new(quality_window as usize),
91 DB_BINCODE_CONFIGURATION,
92 )
93 .map_err(|_| crate::errors::DbSqlError::DecodingError)?,
94 )),
95 ..Default::default()
96 };
97
98 let _ = new_peer.insert(&self.peers_db).await.map_err(DbSqlError::from)?;
99
100 Ok(())
101 }
102
103 #[tracing::instrument(
104 skip(self),
105 name = "HoprDbPeersOperations::remove_network_peer",
106 level = "trace",
107 err,
108 ret
109 )]
110 async fn remove_network_peer(&self, peer: &PeerId) -> Result<()> {
111 let peer = *peer;
112 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
114 .await
115 .map_err(|_| crate::errors::DbSqlError::DecodingError)?;
116
117 let res = hopr_db_entity::network_peer::Entity::delete_many()
118 .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(pubkey.as_ref())))
119 .exec(&self.peers_db)
120 .await
121 .map_err(DbSqlError::from)?;
122
123 if res.rows_affected > 0 {
124 Ok(())
125 } else {
126 Err(
127 crate::errors::DbSqlError::LogicalError("peer cannot be removed because it does not exist".into())
128 .into(),
129 )
130 }
131 }
132
133 #[tracing::instrument(
134 skip(self),
135 name = "HoprDbPeersOperations::update_network_peer",
136 level = "trace",
137 err,
138 ret
139 )]
140 async fn update_network_peer(&self, new_status: PeerStatus) -> Result<()> {
141 let row = hopr_db_entity::network_peer::Entity::find()
142 .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(new_status.id.0.as_ref())))
143 .one(&self.peers_db)
144 .await
145 .map_err(DbSqlError::from)?;
146
147 if let Some(model) = row {
148 let mut peer_data: hopr_db_entity::network_peer::ActiveModel = model.into();
149 peer_data.packet_key = sea_orm::ActiveValue::Set(Vec::from(new_status.id.0.as_ref()));
150 peer_data.multi_addresses = sea_orm::ActiveValue::Set(
151 new_status
152 .multiaddresses
153 .into_iter()
154 .map(|m| m.to_string())
155 .collect::<Vec<String>>()
156 .into(),
157 );
158 peer_data.origin = sea_orm::ActiveValue::Set(new_status.origin as i8);
159 peer_data.last_seen = sea_orm::ActiveValue::Set(DateTime::<Utc>::from(new_status.last_seen));
160 peer_data.last_seen_latency = sea_orm::ActiveValue::Set(new_status.last_seen_latency.as_millis() as i32);
161 peer_data.ignored_until = sea_orm::ActiveValue::Set(new_status.ignored_until.map(DateTime::<Utc>::from));
162 peer_data.quality = sea_orm::ActiveValue::Set(new_status.quality);
163 peer_data.quality_sma = sea_orm::ActiveValue::Set(Some(
164 bincode::serde::encode_to_vec(&new_status.quality_avg, DB_BINCODE_CONFIGURATION)
165 .map_err(|e| crate::errors::DbSqlError::LogicalError(format!("cannot serialize sma: {e}")))?,
166 ));
167 peer_data.backoff = sea_orm::ActiveValue::Set(Some(new_status.backoff));
168 peer_data.heartbeats_sent = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_sent as i32));
169 peer_data.heartbeats_successful = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_succeeded as i32));
170
171 peer_data.update(&self.peers_db).await.map_err(DbSqlError::from)?;
172
173 Ok(())
174 } else {
175 Err(crate::errors::DbSqlError::LogicalError(format!(
176 "cannot update a non-existing peer '{}'",
177 new_status.id.1
178 ))
179 .into())
180 }
181 }
182
183 #[tracing::instrument(
184 skip(self),
185 name = "HoprDbPeersOperations::get_network_peer",
186 level = "trace",
187 err,
188 ret
189 )]
190 async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
191 let peer = *peer;
192 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
194 .await
195 .map_err(|_| crate::errors::DbSqlError::DecodingError)?;
196 let row = hopr_db_entity::network_peer::Entity::find()
197 .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(pubkey.as_ref())))
198 .one(&self.peers_db)
199 .await
200 .map_err(DbSqlError::from)?;
201
202 if let Some(model) = row {
203 let status: WrappedPeerStatus = model.try_into()?;
204 Ok(Some(status.0))
205 } else {
206 Ok(None)
207 }
208 }
209
210 #[tracing::instrument(skip(self), name = "HoprDbPeersOperations::get_network_peers", level = "trace", err)]
211 async fn get_network_peers<'a>(
212 &'a self,
213 selector: PeerSelector,
214 sort_last_seen_asc: bool,
215 ) -> Result<BoxStream<'a, PeerStatus>> {
216 let selector: WrappedPeerSelector = selector.into();
217 let mut sub_stream = hopr_db_entity::network_peer::Entity::find()
218 .filter(selector)
219 .order_by(
220 network_peer::Column::LastSeen,
221 if sort_last_seen_asc { Order::Asc } else { Order::Desc },
222 )
223 .stream(&self.peers_db)
224 .await
225 .map_err(DbSqlError::from)?;
226
227 Ok(Box::pin(stream! {
228 loop {
229 match sub_stream.try_next().await {
230 Ok(Some(peer_row)) => {
231 trace!(?peer_row, "got db network row");
232 match WrappedPeerStatus::try_from(peer_row) {
233 Ok(peer_status) => yield peer_status.0,
234 Err(e) => error!(error = %e, "cannot map peer from row"),
235 }
236 },
237 Ok(None) => {
238 trace!("fetched all network results");
239 break;
240 }
241 Err(e) => {
242 error!(error = %e, "failed to retrieve next network row");
243 break;
244 }
245 }
246 }
247 }))
248 }
249
250 #[tracing::instrument(
251 skip(self),
252 name = "HoprDbPeersOperations::network_peer_stats",
253 level = "trace",
254 err,
255 ret
256 )]
257 async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats> {
258 Ok(Stats {
259 good_quality_public: hopr_db_entity::network_peer::Entity::find()
260 .filter(
261 sea_orm::Condition::all()
262 .add(hopr_db_entity::network_peer::Column::IgnoredUntil.is_null())
263 .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
264 )
265 .count(&self.peers_db)
266 .await
267 .map_err(DbSqlError::from)? as u32,
268 good_quality_non_public: 0u32, bad_quality_public: hopr_db_entity::network_peer::Entity::find()
270 .filter(
271 sea_orm::Condition::all()
272 .add(hopr_db_entity::network_peer::Column::IgnoredUntil.is_null())
273 .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
274 )
275 .count(&self.peers_db)
276 .await
277 .map_err(DbSqlError::from)? as u32,
278 bad_quality_non_public: 0u32, })
280 }
281}
282
283struct WrappedPeerStatus(PeerStatus);
284
285impl From<PeerStatus> for WrappedPeerStatus {
286 fn from(status: PeerStatus) -> Self {
287 WrappedPeerStatus(status)
288 }
289}
290
291impl TryFrom<hopr_db_entity::network_peer::Model> for WrappedPeerStatus {
292 type Error = crate::errors::DbSqlError;
293
294 fn try_from(value: hopr_db_entity::network_peer::Model) -> std::result::Result<Self, Self::Error> {
295 let key = OffchainPublicKey::try_from(value.packet_key.as_slice()).map_err(|_| Self::Error::DecodingError)?;
296 Ok(PeerStatus {
297 id: (key, key.into()),
298 origin: PeerOrigin::try_from(value.origin as u8).map_err(|_| Self::Error::DecodingError)?,
299 last_seen: value.last_seen.into(),
300 last_seen_latency: Duration::from_millis(value.last_seen_latency as u64),
301 heartbeats_sent: value.heartbeats_sent.unwrap_or_default() as u64,
302 heartbeats_succeeded: value.heartbeats_successful.unwrap_or_default() as u64,
303 backoff: value.backoff.unwrap_or(1.0f64),
304 ignored_until: value.ignored_until.map(|v| v.into()),
305 multiaddresses: {
306 if let sea_orm::query::JsonValue::Array(mas) = value.multi_addresses {
307 mas.into_iter()
308 .filter_map(|s| {
309 if let sea_orm::query::JsonValue::String(s) = s {
310 Some(s)
311 } else {
312 None
313 }
314 })
315 .filter(|s| !s.trim().is_empty())
316 .map(Multiaddr::try_from)
317 .collect::<std::result::Result<Vec<_>, multiaddr::Error>>()
318 .map_err(|_| Self::Error::DecodingError)
319 } else {
320 Err(Self::Error::DecodingError)
321 }?
322 },
323 quality: value.quality,
324 quality_avg: bincode::serde::borrow_decode_from_slice(
325 value
326 .quality_sma
327 .ok_or_else(|| Self::Error::LogicalError("the SMA should always be present for every peer".into()))?
328 .as_slice(),
329 DB_BINCODE_CONFIGURATION,
330 )
331 .map(|(v, _bytes)| v)
332 .map_err(|_| Self::Error::DecodingError)?,
333 }
334 .into())
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use std::{
341 ops::Add,
342 time::{Duration, SystemTime},
343 };
344
345 use futures::StreamExt;
346 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
347 use libp2p_identity::PeerId;
348 use multiaddr::Multiaddr;
349
350 use super::*;
351
352 #[tokio::test]
353 async fn test_add_get() -> anyhow::Result<()> {
354 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
355
356 let peer_id: PeerId = OffchainKeypair::random().public().into();
357 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
358 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
359
360 db.add_network_peer(
361 &peer_id,
362 PeerOrigin::IncomingConnection,
363 vec![ma_1.clone(), ma_2.clone()],
364 0.0,
365 25,
366 )
367 .await?;
368
369 let peer_from_db = db.get_network_peer(&peer_id).await?.expect("peer must exist in the db");
370
371 let mut expected_peer = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.0, 25);
372 expected_peer.last_seen = SystemTime::UNIX_EPOCH;
373 expected_peer.last_seen_latency = Duration::from_secs(0);
374 expected_peer.multiaddresses = vec![ma_1, ma_2];
375
376 assert_eq!(expected_peer, peer_from_db, "peer states must match");
377 Ok(())
378 }
379
380 #[tokio::test]
381 async fn test_should_remove_peer() -> anyhow::Result<()> {
382 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
383
384 let peer_id: PeerId = OffchainKeypair::random().public().into();
385 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
386
387 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
388 .await?;
389 assert!(db.get_network_peer(&peer_id).await?.is_some(), "must have peer entry");
390
391 db.remove_network_peer(&peer_id).await?;
392 assert!(
393 db.get_network_peer(&peer_id).await?.is_none(),
394 "peer entry must be gone"
395 );
396
397 Ok(())
398 }
399
400 #[tokio::test]
401 async fn test_should_not_remove_non_existing_peer() -> anyhow::Result<()> {
402 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
403
404 let peer_id: PeerId = OffchainKeypair::random().public().into();
405
406 db.remove_network_peer(&peer_id)
407 .await
408 .expect_err("must not delete non-existent peer");
409
410 Ok(())
411 }
412
413 #[tokio::test]
414 async fn test_should_not_add_duplicate_peers() -> anyhow::Result<()> {
415 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
416
417 let peer_id: PeerId = OffchainKeypair::random().public().into();
418 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
419
420 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
421 .await?;
422 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![], 0.0, 25)
423 .await
424 .expect_err("should fail adding second time");
425
426 Ok(())
427 }
428
429 #[tokio::test]
430 async fn test_should_return_none_on_non_existing_peer() -> anyhow::Result<()> {
431 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
432
433 let peer_id: PeerId = OffchainKeypair::random().public().into();
434
435 assert!(db.get_network_peer(&peer_id).await?.is_none(), "should return none");
436 Ok(())
437 }
438
439 #[tokio::test]
440 async fn test_update() -> anyhow::Result<()> {
441 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
442
443 let peer_id: PeerId = OffchainKeypair::random().public().into();
444
445 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
446 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
447
448 db.add_network_peer(
449 &peer_id,
450 PeerOrigin::IncomingConnection,
451 vec![ma_1.clone(), ma_2.clone()],
452 0.0,
453 25,
454 )
455 .await?;
456
457 let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
458 peer_status.last_seen = SystemTime::UNIX_EPOCH;
459 peer_status.last_seen_latency = Duration::from_secs(2);
460 peer_status.multiaddresses = vec![ma_1, ma_2];
461 peer_status.backoff = 2.0;
462 peer_status.ignored_until = None;
463 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
464 peer_status.update_quality(i);
465 }
466 peer_status.quality = peer_status.quality as f32 as f64;
467
468 let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
469
470 assert_ne!(peer_status, peer_status_from_db, "entries must not be equal");
471
472 db.update_network_peer(peer_status.clone()).await?;
473
474 let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
475
476 assert_eq!(peer_status, peer_status_from_db, "entries must be equal");
477
478 Ok(())
479 }
480
481 #[tokio::test]
482 async fn test_should_fail_to_update_non_existing_peer() -> anyhow::Result<()> {
483 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
484
485 let peer_id: PeerId = OffchainKeypair::random().public().into();
486
487 let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
488 peer_status.last_seen = SystemTime::UNIX_EPOCH;
489 peer_status.last_seen_latency = Duration::from_secs(2);
490 peer_status.backoff = 2.0;
491 peer_status.ignored_until = None;
492 peer_status.multiaddresses = vec![];
493 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
494 peer_status.update_quality(i);
495 }
496
497 db.update_network_peer(peer_status)
498 .await
499 .expect_err("should fail updating non-existing peer");
500 Ok(())
501 }
502
503 #[tokio::test]
504 async fn test_get_multiple_should_return_all_peers() -> anyhow::Result<()> {
505 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
506
507 let peers = (0..10)
508 .map(|_| {
509 let peer_id: PeerId = OffchainKeypair::random().public().into();
510 peer_id
511 })
512 .collect::<Vec<_>>();
513
514 for peer in &peers {
515 db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 25)
516 .await?;
517 }
518
519 let peers_from_db: Vec<PeerId> = db
524 .get_network_peers(Default::default(), true)
525 .await?
526 .map(|s| s.id.1)
527 .collect()
528 .await;
529
530 assert_eq!(peers.len(), peers_from_db.len(), "lengths must match");
531 assert_eq!(peers, peers_from_db, "peer ids must match");
532
533 Ok(())
534 }
535
536 #[tokio::test]
537 async fn test_get_multiple_should_return_filtered_peers() -> anyhow::Result<()> {
538 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
539
540 let peer_count = 10;
541 let peers = (0..peer_count)
542 .map(|_| {
543 let peer_id: PeerId = OffchainKeypair::random().public().into();
544 peer_id
545 })
546 .collect::<Vec<_>>();
547
548 for (i, peer) in peers.iter().enumerate() {
549 db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.2, 25)
550 .await?;
551 if i >= peer_count / 2 {
552 let mut peer_status = PeerStatus::new(*peer, PeerOrigin::IncomingConnection, 0.2, 25);
553 peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(i as u64));
554 peer_status.last_seen_latency = Duration::from_secs(2);
555 peer_status.multiaddresses = vec![];
556 peer_status.heartbeats_sent = 3;
557 peer_status.heartbeats_succeeded = 4;
558 peer_status.backoff = 1.0;
559 peer_status.ignored_until = None;
560 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
561 peer_status.update_quality(i);
562 }
563
564 db.update_network_peer(peer_status).await?;
565 }
566 }
567
568 let peers_from_db: Vec<PeerId> = db
569 .get_network_peers(PeerSelector::default().with_quality_gte(0.501_f64), false)
570 .await?
571 .map(|s| s.id.1)
572 .collect()
573 .await;
574
575 assert_eq!(peer_count / 2, peers_from_db.len(), "lengths must match");
576 assert_eq!(
577 peers.into_iter().skip(5).rev().collect::<Vec<_>>(),
578 peers_from_db,
579 "peer ids must match"
580 );
581
582 Ok(())
583 }
584
585 #[tokio::test]
586 async fn test_should_update_stats_when_updating_peers() -> anyhow::Result<()> {
587 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
588
589 let peer_id_1: PeerId = OffchainKeypair::random().public().into();
590 let peer_id_2: PeerId = OffchainKeypair::random().public().into();
591
592 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id_1}").parse()?;
593 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id_2}").parse()?;
594
595 db.add_network_peer(&peer_id_1, PeerOrigin::IncomingConnection, vec![ma_1], 0.0, 25)
596 .await?;
597
598 let stats = db.network_peer_stats(0.2).await?;
599 assert_eq!(
600 Stats {
601 good_quality_public: 0,
602 bad_quality_public: 1,
603 good_quality_non_public: 0,
604 bad_quality_non_public: 0,
605 },
606 stats,
607 "stats must be equal"
608 );
609
610 db.add_network_peer(&peer_id_2, PeerOrigin::IncomingConnection, vec![ma_2], 0.0, 25)
611 .await?;
612
613 let stats = db.network_peer_stats(0.2).await?;
614 assert_eq!(
615 Stats {
616 good_quality_public: 0,
617 bad_quality_public: 2,
618 good_quality_non_public: 0,
619 bad_quality_non_public: 0,
620 },
621 stats,
622 "stats must be equal"
623 );
624
625 let mut peer_status = PeerStatus::new(peer_id_1, PeerOrigin::IncomingConnection, 0.2, 25);
626 peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
627 peer_status.last_seen_latency = Duration::from_secs(2);
628 peer_status.multiaddresses = vec![];
629 peer_status.heartbeats_sent = 3;
630 peer_status.heartbeats_succeeded = 4;
631 peer_status.backoff = 1.0;
632 peer_status.ignored_until = None;
633 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
634 peer_status.update_quality(i);
635 }
636
637 db.update_network_peer(peer_status).await?;
638
639 let stats = db.network_peer_stats(0.2).await?;
640 assert_eq!(
641 Stats {
642 good_quality_public: 1,
643 bad_quality_public: 1,
644 good_quality_non_public: 0,
645 bad_quality_non_public: 0,
646 },
647 stats,
648 "stats must be equal"
649 );
650
651 db.remove_network_peer(&peer_id_1).await?;
652
653 let stats = db.network_peer_stats(0.2).await?;
654 assert_eq!(
655 Stats {
656 good_quality_public: 0,
657 bad_quality_public: 1,
658 good_quality_non_public: 0,
659 bad_quality_non_public: 0,
660 },
661 stats,
662 "stats must be equal"
663 );
664
665 Ok(())
666 }
667}