1use std::time::Duration;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::{stream::BoxStream, TryStreamExt};
6use libp2p_identity::PeerId;
7use multiaddr::Multiaddr;
8use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder};
9use sea_query::{Condition, Expr, IntoCondition, Order};
10use sqlx::types::chrono::{self, DateTime, Utc};
11use tracing::{error, trace};
12
13use hopr_crypto_types::prelude::OffchainPublicKey;
14use hopr_db_api::{
15 errors::Result,
16 peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats},
17};
18use hopr_db_entity::network_peer;
19use hopr_primitive_types::prelude::*;
20
21use crate::{db::HoprDb, prelude::DbSqlError};
22
23const DB_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
24 .with_little_endian()
25 .with_variable_int_encoding();
26
27struct WrappedPeerSelector(PeerSelector);
28
29impl From<PeerSelector> for WrappedPeerSelector {
30 fn from(selector: PeerSelector) -> Self {
31 WrappedPeerSelector(selector)
32 }
33}
34
35impl IntoCondition for WrappedPeerSelector {
36 fn into_condition(self) -> Condition {
37 let mut ret = Expr::value(1);
38
39 if let Some(last_seen_l) = self.0.last_seen.0 {
40 ret = ret.and(network_peer::Column::LastSeen.gte(chrono::DateTime::<chrono::Utc>::from(last_seen_l)));
41 }
42
43 if let Some(last_seen_u) = self.0.last_seen.1 {
44 ret = ret.and(network_peer::Column::LastSeen.lte(chrono::DateTime::<chrono::Utc>::from(last_seen_u)));
45 }
46
47 if let Some(quality_l) = self.0.quality.0 {
48 ret = ret.and(network_peer::Column::Quality.gte(quality_l));
49 }
50
51 if let Some(quality_u) = self.0.quality.1 {
52 ret = ret.and(network_peer::Column::Quality.lte(quality_u));
53 }
54
55 ret.into_condition()
56 }
57}
58
59#[async_trait]
60impl HoprDbPeersOperations for HoprDb {
61 async fn add_network_peer(
62 &self,
63 peer: &PeerId,
64 origin: PeerOrigin,
65 mas: Vec<Multiaddr>,
66 backoff: f64,
67 quality_window: u32,
68 ) -> Result<()> {
69 let new_peer = hopr_db_entity::network_peer::ActiveModel {
70 packet_key: sea_orm::ActiveValue::Set(Vec::from(
71 OffchainPublicKey::try_from(peer)
72 .map_err(|_| crate::errors::DbSqlError::DecodingError)?
73 .as_ref(),
74 )),
75 multi_addresses: sea_orm::ActiveValue::Set(
76 mas.into_iter().map(|m| m.to_string()).collect::<Vec<String>>().into(),
77 ),
78 origin: sea_orm::ActiveValue::Set(origin as i8),
79 backoff: sea_orm::ActiveValue::Set(Some(backoff)),
80 quality_sma: sea_orm::ActiveValue::Set(Some(
81 bincode::serde::encode_to_vec(
82 SingleSumSMA::<f64>::new(quality_window as usize),
83 DB_BINCODE_CONFIGURATION,
84 )
85 .map_err(|_| crate::errors::DbSqlError::DecodingError)?,
86 )),
87 ..Default::default()
88 };
89
90 let _ = new_peer.insert(&self.peers_db).await.map_err(DbSqlError::from)?;
91
92 Ok(())
93 }
94
95 async fn remove_network_peer(&self, peer: &PeerId) -> Result<()> {
96 let res = hopr_db_entity::network_peer::Entity::delete_many()
97 .filter(
98 hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(
99 OffchainPublicKey::try_from(peer)
100 .map_err(|_| crate::errors::DbSqlError::DecodingError)?
101 .as_ref(),
102 )),
103 )
104 .exec(&self.peers_db)
105 .await
106 .map_err(DbSqlError::from)?;
107
108 if res.rows_affected > 0 {
109 Ok(())
110 } else {
111 Err(
112 crate::errors::DbSqlError::LogicalError("peer cannot be removed because it does not exist".into())
113 .into(),
114 )
115 }
116 }
117
118 async fn update_network_peer(&self, new_status: PeerStatus) -> Result<()> {
119 let row = hopr_db_entity::network_peer::Entity::find()
120 .filter(hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(new_status.id.0.as_ref())))
121 .one(&self.peers_db)
122 .await
123 .map_err(DbSqlError::from)?;
124
125 if let Some(model) = row {
126 let mut peer_data: hopr_db_entity::network_peer::ActiveModel = model.into();
127 peer_data.packet_key = sea_orm::ActiveValue::Set(Vec::from(new_status.id.0.as_ref()));
128 peer_data.multi_addresses = sea_orm::ActiveValue::Set(
129 new_status
130 .multiaddresses
131 .into_iter()
132 .map(|m| m.to_string())
133 .collect::<Vec<String>>()
134 .into(),
135 );
136 peer_data.origin = sea_orm::ActiveValue::Set(new_status.origin as i8);
137 peer_data.version = sea_orm::ActiveValue::Set(new_status.peer_version);
138 peer_data.last_seen = sea_orm::ActiveValue::Set(DateTime::<Utc>::from(new_status.last_seen));
139 peer_data.last_seen_latency = sea_orm::ActiveValue::Set(new_status.last_seen_latency.as_millis() as i32);
140 peer_data.ignored = sea_orm::ActiveValue::Set(new_status.ignored.map(DateTime::<Utc>::from));
141 peer_data.public = sea_orm::ActiveValue::Set(new_status.is_public);
142 peer_data.quality = sea_orm::ActiveValue::Set(new_status.quality);
143 peer_data.quality_sma = sea_orm::ActiveValue::Set(Some(
144 bincode::serde::encode_to_vec(&new_status.quality_avg, DB_BINCODE_CONFIGURATION)
145 .map_err(|e| crate::errors::DbSqlError::LogicalError(format!("cannot serialize sma: {e}")))?,
146 ));
147 peer_data.backoff = sea_orm::ActiveValue::Set(Some(new_status.backoff));
148 peer_data.heartbeats_sent = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_sent as i32));
149 peer_data.heartbeats_successful = sea_orm::ActiveValue::Set(Some(new_status.heartbeats_succeeded as i32));
150
151 peer_data.update(&self.peers_db).await.map_err(DbSqlError::from)?;
152
153 Ok(())
154 } else {
155 Err(crate::errors::DbSqlError::LogicalError(format!(
156 "cannot update a non-existing peer '{}'",
157 new_status.id.1
158 ))
159 .into())
160 }
161 }
162
163 async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
164 let row = hopr_db_entity::network_peer::Entity::find()
165 .filter(
166 hopr_db_entity::network_peer::Column::PacketKey.eq(Vec::from(
167 OffchainPublicKey::try_from(peer)
168 .map_err(|_| crate::errors::DbSqlError::DecodingError)?
169 .as_ref(),
170 )),
171 )
172 .one(&self.peers_db)
173 .await
174 .map_err(DbSqlError::from)?;
175
176 if let Some(model) = row {
177 let status: WrappedPeerStatus = model.try_into()?;
178 Ok(Some(status.0))
179 } else {
180 Ok(None)
181 }
182 }
183
184 async fn get_network_peers<'a>(
185 &'a self,
186 selector: PeerSelector,
187 sort_last_seen_asc: bool,
188 ) -> Result<BoxStream<'a, PeerStatus>> {
189 let selector: WrappedPeerSelector = selector.into();
190 let mut sub_stream = hopr_db_entity::network_peer::Entity::find()
191 .filter(selector)
193 .order_by(
194 network_peer::Column::LastSeen,
195 if sort_last_seen_asc { Order::Asc } else { Order::Desc },
196 )
197 .stream(&self.peers_db)
198 .await
199 .map_err(DbSqlError::from)?;
200
201 Ok(Box::pin(stream! {
202 loop {
203 match sub_stream.try_next().await {
204 Ok(Some(peer_row)) => {
205 trace!(?peer_row, "got db network row");
206 match WrappedPeerStatus::try_from(peer_row) {
207 Ok(peer_status) => yield peer_status.0,
208 Err(e) => error!(error = %e, "cannot map peer from row"),
209 }
210 },
211 Ok(None) => {
212 trace!("fetched all network results");
213 break;
214 }
215 Err(e) => {
216 error!(error = %e, "failed to retrieve next network row");
217 break;
218 }
219 }
220 }
221 }))
222 }
223
224 async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats> {
225 Ok(Stats {
226 good_quality_public: hopr_db_entity::network_peer::Entity::find()
227 .filter(
228 sea_orm::Condition::all()
229 .add(hopr_db_entity::network_peer::Column::Public.eq(true))
230 .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
231 .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
232 )
233 .count(&self.peers_db)
234 .await
235 .map_err(DbSqlError::from)? as u32,
236 good_quality_non_public: hopr_db_entity::network_peer::Entity::find()
237 .filter(
238 sea_orm::Condition::all()
239 .add(hopr_db_entity::network_peer::Column::Public.eq(false))
240 .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
241 .add(hopr_db_entity::network_peer::Column::Quality.gt(quality_threshold)),
242 )
243 .count(&self.peers_db)
244 .await
245 .map_err(DbSqlError::from)? as u32,
246 bad_quality_public: hopr_db_entity::network_peer::Entity::find()
247 .filter(
248 sea_orm::Condition::all()
249 .add(hopr_db_entity::network_peer::Column::Public.eq(true))
250 .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
251 .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
252 )
253 .count(&self.peers_db)
254 .await
255 .map_err(DbSqlError::from)? as u32,
256 bad_quality_non_public: hopr_db_entity::network_peer::Entity::find()
257 .filter(
258 sea_orm::Condition::all()
259 .add(hopr_db_entity::network_peer::Column::Public.eq(false))
260 .add(hopr_db_entity::network_peer::Column::Ignored.is_null())
261 .add(hopr_db_entity::network_peer::Column::Quality.lte(quality_threshold)),
262 )
263 .count(&self.peers_db)
264 .await
265 .map_err(DbSqlError::from)? as u32,
266 })
267 }
268}
269
270struct WrappedPeerStatus(PeerStatus);
271
272impl From<PeerStatus> for WrappedPeerStatus {
273 fn from(status: PeerStatus) -> Self {
274 WrappedPeerStatus(status)
275 }
276}
277
278impl TryFrom<hopr_db_entity::network_peer::Model> for WrappedPeerStatus {
279 type Error = crate::errors::DbSqlError;
280
281 fn try_from(value: hopr_db_entity::network_peer::Model) -> std::result::Result<Self, Self::Error> {
282 let key = OffchainPublicKey::try_from(value.packet_key.as_slice()).map_err(|_| Self::Error::DecodingError)?;
283 Ok(PeerStatus {
284 id: (key, key.into()),
285 origin: PeerOrigin::try_from(value.origin as u8).map_err(|_| Self::Error::DecodingError)?,
286 is_public: value.public,
287 last_seen: value.last_seen.into(),
288 last_seen_latency: Duration::from_millis(value.last_seen_latency as u64),
289 heartbeats_sent: value.heartbeats_sent.unwrap_or_default() as u64,
290 heartbeats_succeeded: value.heartbeats_successful.unwrap_or_default() as u64,
291 backoff: value.backoff.unwrap_or(1.0f64),
292 ignored: value.ignored.map(|v| v.into()),
293 peer_version: value.version,
294 multiaddresses: {
295 if let sea_orm::query::JsonValue::Array(mas) = value.multi_addresses {
296 mas.into_iter()
297 .filter_map(|s| {
298 if let sea_orm::query::JsonValue::String(s) = s {
299 Some(s)
300 } else {
301 None
302 }
303 })
304 .filter(|s| !s.trim().is_empty())
305 .map(Multiaddr::try_from)
306 .collect::<std::result::Result<Vec<_>, multiaddr::Error>>()
307 .map_err(|_| Self::Error::DecodingError)
308 } else {
309 Err(Self::Error::DecodingError)
310 }?
311 },
312 quality: value.quality,
313 quality_avg: bincode::serde::borrow_decode_from_slice(
314 value
315 .quality_sma
316 .ok_or_else(|| Self::Error::LogicalError("the SMA should always be present for every peer".into()))?
317 .as_slice(),
318 DB_BINCODE_CONFIGURATION,
319 )
320 .map(|(v, _bytes)| v)
321 .map_err(|_| Self::Error::DecodingError)?,
322 }
323 .into())
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use futures::StreamExt;
331 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
332 use libp2p_identity::PeerId;
333 use multiaddr::Multiaddr;
334 use std::ops::Add;
335 use std::time::{Duration, SystemTime};
336
337 #[async_std::test]
338 async fn test_add_get() -> anyhow::Result<()> {
339 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
340
341 let peer_id: PeerId = OffchainKeypair::random().public().into();
342 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
343 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
344
345 db.add_network_peer(
346 &peer_id,
347 PeerOrigin::IncomingConnection,
348 vec![ma_1.clone(), ma_2.clone()],
349 0.0,
350 25,
351 )
352 .await?;
353
354 let peer_from_db = db.get_network_peer(&peer_id).await?.expect("peer must exist in the db");
355
356 let mut expected_peer = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.0, 25);
357 expected_peer.last_seen = SystemTime::UNIX_EPOCH;
358 expected_peer.last_seen_latency = Duration::from_secs(0);
359 expected_peer.multiaddresses = vec![ma_1, ma_2];
360
361 assert_eq!(expected_peer, peer_from_db, "peer states must match");
362 Ok(())
363 }
364
365 #[async_std::test]
366 async fn test_should_remove_peer() -> anyhow::Result<()> {
367 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
368
369 let peer_id: PeerId = OffchainKeypair::random().public().into();
370 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
371
372 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
373 .await?;
374 assert!(db.get_network_peer(&peer_id).await?.is_some(), "must have peer entry");
375
376 db.remove_network_peer(&peer_id).await?;
377 assert!(
378 db.get_network_peer(&peer_id).await?.is_none(),
379 "peer entry must be gone"
380 );
381
382 Ok(())
383 }
384
385 #[async_std::test]
386 async fn test_should_not_remove_non_existing_peer() -> anyhow::Result<()> {
387 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
388
389 let peer_id: PeerId = OffchainKeypair::random().public().into();
390
391 db.remove_network_peer(&peer_id)
392 .await
393 .expect_err("must not delete non-existent peer");
394
395 Ok(())
396 }
397
398 #[async_std::test]
399 async fn test_should_not_add_duplicate_peers() -> anyhow::Result<()> {
400 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
401
402 let peer_id: PeerId = OffchainKeypair::random().public().into();
403 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
404
405 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![ma_1.clone()], 0.0, 25)
406 .await?;
407 db.add_network_peer(&peer_id, PeerOrigin::IncomingConnection, vec![], 0.0, 25)
408 .await
409 .expect_err("should fail adding second time");
410
411 Ok(())
412 }
413
414 #[async_std::test]
415 async fn test_should_return_none_on_non_existing_peer() -> anyhow::Result<()> {
416 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
417
418 let peer_id: PeerId = OffchainKeypair::random().public().into();
419
420 assert!(db.get_network_peer(&peer_id).await?.is_none(), "should return none");
421 Ok(())
422 }
423
424 #[async_std::test]
425 async fn test_update() -> anyhow::Result<()> {
426 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
427
428 let peer_id: PeerId = OffchainKeypair::random().public().into();
429
430 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
431 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
432
433 db.add_network_peer(
434 &peer_id,
435 PeerOrigin::IncomingConnection,
436 vec![ma_1.clone(), ma_2.clone()],
437 0.0,
438 25,
439 )
440 .await?;
441
442 let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
443 peer_status.last_seen = SystemTime::UNIX_EPOCH;
444 peer_status.last_seen_latency = Duration::from_secs(2);
445 peer_status.multiaddresses = vec![ma_1, ma_2];
446 peer_status.backoff = 2.0;
447 peer_status.ignored = None;
448 peer_status.peer_version = Some("1.2.3".into());
449 for i in [0.1_f64, 0.4_64, 0.6_f64].into_iter() {
450 peer_status.update_quality(i);
451 }
452 peer_status.quality = peer_status.quality as f32 as f64;
453
454 let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
455
456 assert_ne!(peer_status, peer_status_from_db, "entries must not be equal");
457
458 db.update_network_peer(peer_status.clone()).await?;
459
460 let peer_status_from_db = db.get_network_peer(&peer_id).await?.expect("entry should exist");
461
462 assert_eq!(peer_status, peer_status_from_db, "entries must be equal");
463
464 Ok(())
465 }
466
467 #[async_std::test]
468 async fn test_should_fail_to_update_non_existing_peer() -> anyhow::Result<()> {
469 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
470
471 let peer_id: PeerId = OffchainKeypair::random().public().into();
472
473 let mut peer_status = PeerStatus::new(peer_id, PeerOrigin::IncomingConnection, 0.2, 25);
474 peer_status.last_seen = SystemTime::UNIX_EPOCH;
475 peer_status.last_seen_latency = Duration::from_secs(2);
476 peer_status.backoff = 2.0;
477 peer_status.ignored = None;
478 peer_status.peer_version = Some("1.2.3".into());
479 peer_status.multiaddresses = vec![];
480 for i in [0.1_f64, 0.4_64, 0.6_f64].into_iter() {
481 peer_status.update_quality(i);
482 }
483
484 db.update_network_peer(peer_status)
485 .await
486 .expect_err("should fail updating non-existing peer");
487 Ok(())
488 }
489
490 #[async_std::test]
491 async fn test_get_multiple_should_return_all_peers() -> anyhow::Result<()> {
492 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
493
494 let peers = (0..10)
495 .map(|_| {
496 let peer_id: PeerId = OffchainKeypair::random().public().into();
497 peer_id
498 })
499 .collect::<Vec<_>>();
500
501 for peer in &peers {
502 db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 25)
503 .await?;
504 }
505
506 let peers_from_db: Vec<PeerId> = db
507 .get_network_peers(Default::default(), false)
508 .await?
509 .map(|s| s.id.1)
510 .collect()
511 .await;
512
513 assert_eq!(peers.len(), peers_from_db.len(), "lengths must match");
514 assert_eq!(peers, peers_from_db, "peer ids must match");
515
516 Ok(())
517 }
518
519 #[async_std::test]
520 async fn test_get_multiple_should_return_filtered_peers() -> anyhow::Result<()> {
521 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
522
523 let peer_count = 10;
524 let peers = (0..peer_count)
525 .map(|_| {
526 let peer_id: PeerId = OffchainKeypair::random().public().into();
527 peer_id
528 })
529 .collect::<Vec<_>>();
530
531 for (i, peer) in peers.iter().enumerate() {
532 db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.2, 25)
533 .await?;
534 if i >= peer_count / 2 {
535 let mut peer_status = PeerStatus::new(*peer, PeerOrigin::IncomingConnection, 0.2, 25);
536 peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(i as u64));
537 peer_status.last_seen_latency = Duration::from_secs(2);
538 peer_status.multiaddresses = vec![];
539 peer_status.heartbeats_sent = 3;
540 peer_status.heartbeats_succeeded = 4;
541 peer_status.backoff = 1.0;
542 peer_status.ignored = None;
543 peer_status.peer_version = Some("1.2.3".into());
544 for i in [0.1_f64, 0.4_64, 0.6_f64].into_iter() {
545 peer_status.update_quality(i);
546 }
547
548 db.update_network_peer(peer_status).await?;
549 }
550 }
551
552 let peers_from_db: Vec<PeerId> = db
553 .get_network_peers(PeerSelector::default().with_quality_gte(0.501_f64), false)
554 .await?
555 .map(|s| s.id.1)
556 .collect()
557 .await;
558
559 assert_eq!(peer_count / 2, peers_from_db.len(), "lengths must match");
560 assert_eq!(
561 peers.into_iter().skip(5).rev().collect::<Vec<_>>(),
562 peers_from_db,
563 "peer ids must match"
564 );
565
566 Ok(())
567 }
568
569 #[async_std::test]
570 async fn test_should_update_stats_when_updating_peers() -> anyhow::Result<()> {
571 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
572
573 let peer_id_1: PeerId = OffchainKeypair::random().public().into();
574 let peer_id_2: PeerId = OffchainKeypair::random().public().into();
575
576 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id_1}").parse()?;
577 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id_2}").parse()?;
578
579 db.add_network_peer(&peer_id_1, PeerOrigin::IncomingConnection, vec![ma_1], 0.0, 25)
580 .await?;
581
582 let stats = db.network_peer_stats(0.2).await?;
583 assert_eq!(
584 Stats {
585 good_quality_public: 0,
586 bad_quality_public: 1,
587 good_quality_non_public: 0,
588 bad_quality_non_public: 0,
589 },
590 stats,
591 "stats must be equal"
592 );
593
594 db.add_network_peer(&peer_id_2, PeerOrigin::IncomingConnection, vec![ma_2], 0.0, 25)
595 .await?;
596
597 let stats = db.network_peer_stats(0.2).await?;
598 assert_eq!(
599 Stats {
600 good_quality_public: 0,
601 bad_quality_public: 2,
602 good_quality_non_public: 0,
603 bad_quality_non_public: 0,
604 },
605 stats,
606 "stats must be equal"
607 );
608
609 let mut peer_status = PeerStatus::new(peer_id_1, PeerOrigin::IncomingConnection, 0.2, 25);
610 peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
611 peer_status.last_seen_latency = Duration::from_secs(2);
612 peer_status.multiaddresses = vec![];
613 peer_status.heartbeats_sent = 3;
614 peer_status.heartbeats_succeeded = 4;
615 peer_status.backoff = 1.0;
616 peer_status.ignored = None;
617 peer_status.peer_version = Some("1.2.3".into());
618 for i in [0.1_f64, 0.4_64, 0.6_f64].into_iter() {
619 peer_status.update_quality(i);
620 }
621
622 db.update_network_peer(peer_status).await?;
623
624 let stats = db.network_peer_stats(0.2).await?;
625 assert_eq!(
626 Stats {
627 good_quality_public: 1,
628 bad_quality_public: 1,
629 good_quality_non_public: 0,
630 bad_quality_non_public: 0,
631 },
632 stats,
633 "stats must be equal"
634 );
635
636 let mut peer_status = PeerStatus::new(peer_id_2, PeerOrigin::IncomingConnection, 0.2, 25);
637 peer_status.last_seen = SystemTime::UNIX_EPOCH.add(Duration::from_secs(2));
638 peer_status.last_seen_latency = Duration::from_secs(2);
639 peer_status.multiaddresses = vec![];
640 peer_status.is_public = false;
641 peer_status.heartbeats_sent = 3;
642 peer_status.heartbeats_succeeded = 4;
643 peer_status.backoff = 2.0;
644 peer_status.ignored = None;
645 peer_status.peer_version = Some("1.2.3".into());
646
647 db.update_network_peer(peer_status).await?;
648
649 let stats = db.network_peer_stats(0.2).await?;
650 assert_eq!(
651 Stats {
652 good_quality_public: 1,
653 bad_quality_public: 0,
654 good_quality_non_public: 0,
655 bad_quality_non_public: 1,
656 },
657 stats,
658 "stats must be equal"
659 );
660
661 db.remove_network_peer(&peer_id_1).await?;
662
663 let stats = db.network_peer_stats(0.2).await?;
664 assert_eq!(
665 Stats {
666 good_quality_public: 0,
667 bad_quality_public: 0,
668 good_quality_non_public: 0,
669 bad_quality_non_public: 1,
670 },
671 stats,
672 "stats must be equal"
673 );
674
675 Ok(())
676 }
677}