1use std::time::Duration;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::{TryStreamExt, stream::BoxStream};
6use hopr_crypto_types::prelude::OffchainPublicKey;
7use hopr_db_api::{
8 errors::Result,
9 peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats},
10};
11use hopr_db_entity::network_peer;
12use hopr_primitive_types::prelude::*;
13use libp2p_identity::PeerId;
14use multiaddr::Multiaddr;
15use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder};
16use sea_query::{Condition, Expr, IntoCondition, Order};
17use sqlx::types::chrono::{self, DateTime, Utc};
18use tracing::{error, trace};
19
20use crate::{db::HoprDb, prelude::DbSqlError};
21
22const DB_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
23 .with_little_endian()
24 .with_variable_int_encoding();
25
26struct WrappedPeerSelector(PeerSelector);
27
28impl From<PeerSelector> for WrappedPeerSelector {
29 fn from(selector: PeerSelector) -> Self {
30 WrappedPeerSelector(selector)
31 }
32}
33
34impl IntoCondition for WrappedPeerSelector {
35 fn into_condition(self) -> Condition {
36 let mut ret = Expr::value(1);
37
38 if let Some(last_seen_l) = self.0.last_seen.0 {
39 ret = ret.and(network_peer::Column::LastSeen.gte(chrono::DateTime::<chrono::Utc>::from(last_seen_l)));
40 }
41
42 if let Some(last_seen_u) = self.0.last_seen.1 {
43 ret = ret.and(network_peer::Column::LastSeen.lte(chrono::DateTime::<chrono::Utc>::from(last_seen_u)));
44 }
45
46 if let Some(quality_l) = self.0.quality.0 {
47 ret = ret.and(network_peer::Column::Quality.gte(quality_l));
48 }
49
50 if let Some(quality_u) = self.0.quality.1 {
51 ret = ret.and(network_peer::Column::Quality.lte(quality_u));
52 }
53
54 ret.into_condition()
55 }
56}
57
58#[async_trait]
59impl HoprDbPeersOperations for HoprDb {
60 async fn add_network_peer(
61 &self,
62 peer: &PeerId,
63 origin: PeerOrigin,
64 mas: Vec<Multiaddr>,
65 backoff: f64,
66 quality_window: u32,
67 ) -> Result<()> {
68 let new_peer = hopr_db_entity::network_peer::ActiveModel {
69 packet_key: sea_orm::ActiveValue::Set(Vec::from(
70 OffchainPublicKey::try_from(peer)
71 .map_err(|_| crate::errors::DbSqlError::DecodingError)?
72 .as_ref(),
73 )),
74 multi_addresses: sea_orm::ActiveValue::Set(
75 mas.into_iter().map(|m| m.to_string()).collect::<Vec<String>>().into(),
76 ),
77 origin: sea_orm::ActiveValue::Set(origin as i8),
78 backoff: sea_orm::ActiveValue::Set(Some(backoff)),
79 quality_sma: sea_orm::ActiveValue::Set(Some(
80 bincode::serde::encode_to_vec(
81 SingleSumSMA::<f64>::new(quality_window as usize),
82 DB_BINCODE_CONFIGURATION,
83 )
84 .map_err(|_| crate::errors::DbSqlError::DecodingError)?,
85 )),
86 ..Default::default()
87 };
88
89 let _ = new_peer.insert(&self.peers_db).await.map_err(DbSqlError::from)?;
90
91 Ok(())
92 }
93
94 async fn remove_network_peer(&self, peer: &PeerId) -> Result<()> {
95 let res = hopr_db_entity::network_peer::Entity::delete_many()
96 .filter(
97 hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(
98 OffchainPublicKey::try_from(peer)
99 .map_err(|_| crate::errors::DbSqlError::DecodingError)?
100 .as_ref(),
101 )),
102 )
103 .exec(&self.peers_db)
104 .await
105 .map_err(DbSqlError::from)?;
106
107 if res.rows_affected > 0 {
108 Ok(())
109 } else {
110 Err(
111 crate::errors::DbSqlError::LogicalError("peer cannot be removed because it does not exist".into())
112 .into(),
113 )
114 }
115 }
116
117 async fn update_network_peer(&self, new_status: PeerStatus) -> Result<()> {
118 let row = hopr_db_entity::network_peer::Entity::find()
119 .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(new_status.id.0.as_ref())))
120 .one(&self.peers_db)
121 .await
122 .map_err(DbSqlError::from)?;
123
124 if let Some(model) = row {
125 let mut peer_data: hopr_db_entity::network_peer::ActiveModel = model.into();
126 peer_data.packet_key = sea_orm::ActiveValue::Set(Vec::from(new_status.id.0.as_ref()));
127 peer_data.multi_addresses = sea_orm::ActiveValue::Set(
128 new_status
129 .multiaddresses
130 .into_iter()
131 .map(|m| m.to_string())
132 .collect::<Vec<String>>()
133 .into(),
134 );
135 peer_data.origin = sea_orm::ActiveValue::Set(new_status.origin as i8);
136 peer_data.version = sea_orm::ActiveValue::Set(new_status.peer_version);
137 peer_data.last_seen = sea_orm::ActiveValue::Set(DateTime::<Utc>::from(new_status.last_seen));
138 peer_data.last_seen_latency = sea_orm::ActiveValue::Set(new_status.last_seen_latency.as_millis() as i32);
139 peer_data.ignored = sea_orm::ActiveValue::Set(new_status.ignored.map(DateTime::<Utc>::from));
140 peer_data.public = sea_orm::ActiveValue::Set(new_status.is_public);
141 peer_data.quality = sea_orm::ActiveValue::Set(new_status.quality);
142 peer_data.quality_sma = sea_orm::ActiveValue::Set(Some(
143 bincode::serde::encode_to_vec(&new_status.quality_avg, DB_BINCODE_CONFIGURATION)
144 .map_err(|e| crate::errors::DbSqlError::LogicalError(format!("cannot serialize sma: {e}")))?,
145 ));
146 peer_data.backoff = sea_orm::ActiveValue::Set(Some(new_status.backoff));
147 peer_data.heartbeats_sent = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_sent as i32));
148 peer_data.heartbeats_successful = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_succeeded as i32));
149
150 peer_data.update(&self.peers_db).await.map_err(DbSqlError::from)?;
151
152 Ok(())
153 } else {
154 Err(crate::errors::DbSqlError::LogicalError(format!(
155 "cannot update a non-existing peer '{}'",
156 new_status.id.1
157 ))
158 .into())
159 }
160 }
161
162 async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
163 let row = hopr_db_entity::network_peer::Entity::find()
164 .filter(
165 hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(
166 OffchainPublicKey::try_from(peer)
167 .map_err(|_| crate::errors::DbSqlError::DecodingError)?
168 .as_ref(),
169 )),
170 )
171 .one(&self.peers_db)
172 .await
173 .map_err(DbSqlError::from)?;
174
175 if let Some(model) = row {
176 let status: WrappedPeerStatus = model.try_into()?;
177 Ok(Some(status.0))
178 } else {
179 Ok(None)
180 }
181 }
182
183 async fn get_network_peers<'a>(
184 &'a self,
185 selector: PeerSelector,
186 sort_last_seen_asc: bool,
187 ) -> Result<BoxStream<'a, PeerStatus>> {
188 let selector: WrappedPeerSelector = selector.into();
189 let mut sub_stream = hopr_db_entity::network_peer::Entity::find()
190 .filter(selector)
192 .order_by(
193 network_peer::Column::LastSeen,
194 if sort_last_seen_asc { Order::Asc } else { Order::Desc },
195 )
196 .stream(&self.peers_db)
197 .await
198 .map_err(DbSqlError::from)?;
199
200 Ok(Box::pin(stream! {
201 loop {
202 match sub_stream.try_next().await {
203 Ok(Some(peer_row)) => {
204 trace!(?peer_row, "got db network row");
205 match WrappedPeerStatus::try_from(peer_row) {
206 Ok(peer_status) => yield peer_status.0,
207 Err(e) => error!(error = %e, "cannot map peer from row"),
208 }
209 },
210 Ok(None) => {
211 trace!("fetched all network results");
212 break;
213 }
214 Err(e) => {
215 error!(error = %e, "failed to retrieve next network row");
216 break;
217 }
218 }
219 }
220 }))
221 }
222
223 async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats> {
224 Ok(Stats {
225 good_quality_public: hopr_db_entity::network_peer::Entity::find()
226 .filter(
227 sea_orm::Condition::all()
228 .add(hopr_db_entity::network_peer::Column::Public.eq(true))
229 .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
230 .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
231 )
232 .count(&self.peers_db)
233 .await
234 .map_err(DbSqlError::from)? as u32,
235 good_quality_non_public: hopr_db_entity::network_peer::Entity::find()
236 .filter(
237 sea_orm::Condition::all()
238 .add(hopr_db_entity::network_peer::Column::Public.eq(false))
239 .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
240 .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
241 )
242 .count(&self.peers_db)
243 .await
244 .map_err(DbSqlError::from)? as u32,
245 bad_quality_public: hopr_db_entity::network_peer::Entity::find()
246 .filter(
247 sea_orm::Condition::all()
248 .add(hopr_db_entity::network_peer::Column::Public.eq(true))
249 .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
250 .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
251 )
252 .count(&self.peers_db)
253 .await
254 .map_err(DbSqlError::from)? as u32,
255 bad_quality_non_public: hopr_db_entity::network_peer::Entity::find()
256 .filter(
257 sea_orm::Condition::all()
258 .add(hopr_db_entity::network_peer::Column::Public.eq(false))
259 .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
260 .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
261 )
262 .count(&self.peers_db)
263 .await
264 .map_err(DbSqlError::from)? as u32,
265 })
266 }
267}
268
269struct WrappedPeerStatus(PeerStatus);
270
271impl From<PeerStatus> for WrappedPeerStatus {
272 fn from(status: PeerStatus) -> Self {
273 WrappedPeerStatus(status)
274 }
275}
276
277impl TryFrom<hopr_db_entity::network_peer::Model> for WrappedPeerStatus {
278 type Error = crate::errors::DbSqlError;
279
280 fn try_from(value: hopr_db_entity::network_peer::Model) -> std::result::Result<Self, Self::Error> {
281 let key = OffchainPublicKey::try_from(value.packet_key.as_slice()).map_err(|_| Self::Error::DecodingError)?;
282 Ok(PeerStatus {
283 id: (key, key.into()),
284 origin: PeerOrigin::try_from(value.origin as u8).map_err(|_| Self::Error::DecodingError)?,
285 is_public: value.public,
286 last_seen: value.last_seen.into(),
287 last_seen_latency: Duration::from_millis(value.last_seen_latency as u64),
288 heartbeats_sent: value.heartbeats_sent.unwrap_or_default() as u64,
289 heartbeats_succeeded: value.heartbeats_successful.unwrap_or_default() as u64,
290 backoff: value.backoff.unwrap_or(1.0f64),
291 ignored: value.ignored.map(|v| v.into()),
292 peer_version: value.version,
293 multiaddresses: {
294 if let sea_orm::query::JsonValue::Array(mas) = value.multi_addresses {
295 mas.into_iter()
296 .filter_map(|s| {
297 if let sea_orm::query::JsonValue::String(s) = s {
298 Some(s)
299 } else {
300 None
301 }
302 })
303 .filter(|s| !s.trim().is_empty())
304 .map(Multiaddr::try_from)
305 .collect::<std::result::Result<Vec<_>, multiaddr::Error>>()
306 .map_err(|_| Self::Error::DecodingError)
307 } else {
308 Err(Self::Error::DecodingError)
309 }?
310 },
311 quality: value.quality,
312 quality_avg: bincode::serde::borrow_decode_from_slice(
313 value
314 .quality_sma
315 .ok_or_else(|| Self::Error::LogicalError("the SMA should always be present for every peer".into()))?
316 .as_slice(),
317 DB_BINCODE_CONFIGURATION,
318 )
319 .map(|(v, _bytes)| v)
320 .map_err(|_| Self::Error::DecodingError)?,
321 }
322 .into())
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use std::{
329 ops::Add,
330 time::{Duration, SystemTime},
331 };
332
333 use futures::StreamExt;
334 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
335 use libp2p_identity::PeerId;
336 use multiaddr::Multiaddr;
337
338 use super::*;
339
340 #[tokio::test]
341 async fn test_add_get() -> anyhow::Result<()> {
342 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
343
344 let peer_id: PeerId = OffchainKeypair::random().public().into();
345 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
346 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
347
348 db.add_network_peer(
349 &peer_id,
350 PeerOrigin::IncomingConnection,
351 vec![ma_1.clone(), ma_2.clone()],
352 0.0,
353 25,
354 )
355 .await?;
356
357 let peer_from_db = db.get_network_peer(&peer_id).await?.expect("peer must exist in the db");
358
359 let mut expected_peer = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.0, 25);
360 expected_peer.last_seen = SystemTime::UNIX_EPOCH;
361 expected_peer.last_seen_latency = Duration::from_secs(0);
362 expected_peer.multiaddresses = vec![ma_1, ma_2];
363
364 assert_eq!(expected_peer, peer_from_db, "peer states must match");
365 Ok(())
366 }
367
368 #[tokio::test]
369 async fn test_should_remove_peer() -> anyhow::Result<()> {
370 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
371
372 let peer_id: PeerId = OffchainKeypair::random().public().into();
373 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
374
375 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
376 .await?;
377 assert!(db.get_network_peer(&peer_id).await?.is_some(), "must have peer entry");
378
379 db.remove_network_peer(&peer_id).await?;
380 assert!(
381 db.get_network_peer(&peer_id).await?.is_none(),
382 "peer entry must be gone"
383 );
384
385 Ok(())
386 }
387
388 #[tokio::test]
389 async fn test_should_not_remove_non_existing_peer() -> anyhow::Result<()> {
390 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
391
392 let peer_id: PeerId = OffchainKeypair::random().public().into();
393
394 db.remove_network_peer(&peer_id)
395 .await
396 .expect_err("must not delete non-existent peer");
397
398 Ok(())
399 }
400
401 #[tokio::test]
402 async fn test_should_not_add_duplicate_peers() -> anyhow::Result<()> {
403 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
404
405 let peer_id: PeerId = OffchainKeypair::random().public().into();
406 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
407
408 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
409 .await?;
410 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![], 0.0, 25)
411 .await
412 .expect_err("should fail adding second time");
413
414 Ok(())
415 }
416
417 #[tokio::test]
418 async fn test_should_return_none_on_non_existing_peer() -> anyhow::Result<()> {
419 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
420
421 let peer_id: PeerId = OffchainKeypair::random().public().into();
422
423 assert!(db.get_network_peer(&peer_id).await?.is_none(), "should return none");
424 Ok(())
425 }
426
427 #[tokio::test]
428 async fn test_update() -> anyhow::Result<()> {
429 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
430
431 let peer_id: PeerId = OffchainKeypair::random().public().into();
432
433 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
434 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
435
436 db.add_network_peer(
437 &peer_id,
438 PeerOrigin::IncomingConnection,
439 vec![ma_1.clone(), ma_2.clone()],
440 0.0,
441 25,
442 )
443 .await?;
444
445 let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
446 peer_status.last_seen = SystemTime::UNIX_EPOCH;
447 peer_status.last_seen_latency = Duration::from_secs(2);
448 peer_status.multiaddresses = vec![ma_1, ma_2];
449 peer_status.backoff = 2.0;
450 peer_status.ignored = None;
451 peer_status.peer_version = Some("1.2.3".into());
452 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
453 peer_status.update_quality(i);
454 }
455 peer_status.quality = peer_status.quality as f32 as f64;
456
457 let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
458
459 assert_ne!(peer_status, peer_status_from_db, "entries must not be equal");
460
461 db.update_network_peer(peer_status.clone()).await?;
462
463 let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
464
465 assert_eq!(peer_status, peer_status_from_db, "entries must be equal");
466
467 Ok(())
468 }
469
470 #[tokio::test]
471 async fn test_should_fail_to_update_non_existing_peer() -> anyhow::Result<()> {
472 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
473
474 let peer_id: PeerId = OffchainKeypair::random().public().into();
475
476 let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
477 peer_status.last_seen = SystemTime::UNIX_EPOCH;
478 peer_status.last_seen_latency = Duration::from_secs(2);
479 peer_status.backoff = 2.0;
480 peer_status.ignored = None;
481 peer_status.peer_version = Some("1.2.3".into());
482 peer_status.multiaddresses = vec![];
483 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
484 peer_status.update_quality(i);
485 }
486
487 db.update_network_peer(peer_status)
488 .await
489 .expect_err("should fail updating non-existing peer");
490 Ok(())
491 }
492
493 #[tokio::test]
494 async fn test_get_multiple_should_return_all_peers() -> anyhow::Result<()> {
495 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
496
497 let peers = (0..10)
498 .map(|_| {
499 let peer_id: PeerId = OffchainKeypair::random().public().into();
500 peer_id
501 })
502 .collect::<Vec<_>>();
503
504 for peer in &peers {
505 db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 25)
506 .await?;
507 }
508
509 let peers_from_db: Vec<PeerId> = db
510 .get_network_peers(Default::default(), false)
511 .await?
512 .map(|s| s.id.1)
513 .collect()
514 .await;
515
516 assert_eq!(peers.len(), peers_from_db.len(), "lengths must match");
517 assert_eq!(peers, peers_from_db, "peer ids must match");
518
519 Ok(())
520 }
521
522 #[tokio::test]
523 async fn test_get_multiple_should_return_filtered_peers() -> anyhow::Result<()> {
524 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
525
526 let peer_count = 10;
527 let peers = (0..peer_count)
528 .map(|_| {
529 let peer_id: PeerId = OffchainKeypair::random().public().into();
530 peer_id
531 })
532 .collect::<Vec<_>>();
533
534 for (i, peer) in peers.iter().enumerate() {
535 db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.2, 25)
536 .await?;
537 if i >= peer_count / 2 {
538 let mut peer_status = PeerStatus::new(*peer, PeerOrigin::IncomingConnection, 0.2, 25);
539 peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(i as u64));
540 peer_status.last_seen_latency = Duration::from_secs(2);
541 peer_status.multiaddresses = vec![];
542 peer_status.heartbeats_sent = 3;
543 peer_status.heartbeats_succeeded = 4;
544 peer_status.backoff = 1.0;
545 peer_status.ignored = None;
546 peer_status.peer_version = Some("1.2.3".into());
547 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
548 peer_status.update_quality(i);
549 }
550
551 db.update_network_peer(peer_status).await?;
552 }
553 }
554
555 let peers_from_db: Vec<PeerId> = db
556 .get_network_peers(PeerSelector::default().with_quality_gte(0.501_f64), false)
557 .await?
558 .map(|s| s.id.1)
559 .collect()
560 .await;
561
562 assert_eq!(peer_count / 2, peers_from_db.len(), "lengths must match");
563 assert_eq!(
564 peers.into_iter().skip(5).rev().collect::<Vec<_>>(),
565 peers_from_db,
566 "peer ids must match"
567 );
568
569 Ok(())
570 }
571
572 #[tokio::test]
573 async fn test_should_update_stats_when_updating_peers() -> anyhow::Result<()> {
574 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
575
576 let peer_id_1: PeerId = OffchainKeypair::random().public().into();
577 let peer_id_2: PeerId = OffchainKeypair::random().public().into();
578
579 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id_1}").parse()?;
580 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id_2}").parse()?;
581
582 db.add_network_peer(&peer_id_1, PeerOrigin::IncomingConnection, vec![ma_1], 0.0, 25)
583 .await?;
584
585 let stats = db.network_peer_stats(0.2).await?;
586 assert_eq!(
587 Stats {
588 good_quality_public: 0,
589 bad_quality_public: 1,
590 good_quality_non_public: 0,
591 bad_quality_non_public: 0,
592 },
593 stats,
594 "stats must be equal"
595 );
596
597 db.add_network_peer(&peer_id_2, PeerOrigin::IncomingConnection, vec![ma_2], 0.0, 25)
598 .await?;
599
600 let stats = db.network_peer_stats(0.2).await?;
601 assert_eq!(
602 Stats {
603 good_quality_public: 0,
604 bad_quality_public: 2,
605 good_quality_non_public: 0,
606 bad_quality_non_public: 0,
607 },
608 stats,
609 "stats must be equal"
610 );
611
612 let mut peer_status = PeerStatus::new(peer_id_1, PeerOrigin::IncomingConnection, 0.2, 25);
613 peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
614 peer_status.last_seen_latency = Duration::from_secs(2);
615 peer_status.multiaddresses = vec![];
616 peer_status.heartbeats_sent = 3;
617 peer_status.heartbeats_succeeded = 4;
618 peer_status.backoff = 1.0;
619 peer_status.ignored = None;
620 peer_status.peer_version = Some("1.2.3".into());
621 for i in [0.1_f64, 0.4_f64, 0.6_f64].into_iter() {
622 peer_status.update_quality(i);
623 }
624
625 db.update_network_peer(peer_status).await?;
626
627 let stats = db.network_peer_stats(0.2).await?;
628 assert_eq!(
629 Stats {
630 good_quality_public: 1,
631 bad_quality_public: 1,
632 good_quality_non_public: 0,
633 bad_quality_non_public: 0,
634 },
635 stats,
636 "stats must be equal"
637 );
638
639 let mut peer_status = PeerStatus::new(peer_id_2, PeerOrigin::IncomingConnection, 0.2, 25);
640 peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
641 peer_status.last_seen_latency = Duration::from_secs(2);
642 peer_status.multiaddresses = vec![];
643 peer_status.is_public = false;
644 peer_status.heartbeats_sent = 3;
645 peer_status.heartbeats_succeeded = 4;
646 peer_status.backoff = 2.0;
647 peer_status.ignored = None;
648 peer_status.peer_version = Some("1.2.3".into());
649
650 db.update_network_peer(peer_status).await?;
651
652 let stats = db.network_peer_stats(0.2).await?;
653 assert_eq!(
654 Stats {
655 good_quality_public: 1,
656 bad_quality_public: 0,
657 good_quality_non_public: 0,
658 bad_quality_non_public: 1,
659 },
660 stats,
661 "stats must be equal"
662 );
663
664 db.remove_network_peer(&peer_id_1).await?;
665
666 let stats = db.network_peer_stats(0.2).await?;
667 assert_eq!(
668 Stats {
669 good_quality_public: 0,
670 bad_quality_public: 0,
671 good_quality_non_public: 0,
672 bad_quality_non_public: 1,
673 },
674 stats,
675 "stats must be equal"
676 );
677
678 Ok(())
679 }
680}