1use std::sync::Arc;
2
3use futures::{StreamExt, pin_mut};
4use hopr_api::{
5 HoprBalance, Multiaddr, OffchainPublicKey, PeerId,
6 chain::{ChainKeyOperations, WinningProbability},
7 graph::{EdgeCapacityUpdate, MeasurableEdge, NetworkGraphUpdate},
8 types::{
9 chain::chain_events::ChainEvent,
10 internal::prelude::ChannelStatus,
11 primitive::prelude::{Address, UnitaryFloatOps},
12 },
13};
14use hopr_transport::{NeighborTelemetry, PathTelemetry};
15use parking_lot::RwLock;
16
17#[allow(clippy::too_many_arguments)]
25pub(super) async fn process_chain_events<C, G>(
26 chain_reader: C,
27 graph_updater: G,
28 events: impl futures::Stream<Item = ChainEvent> + Send + 'static,
29 own_chain_addr: Address,
30 own_packet_key: OffchainPublicKey,
31 ticket_price: Arc<RwLock<HoprBalance>>,
32 win_probability: Arc<RwLock<WinningProbability>>,
33 mut peer_discovery_tx: Option<futures::channel::mpsc::Sender<(PeerId, Vec<Multiaddr>)>>,
34) where
35 C: ChainKeyOperations + Clone + Send + Sync + 'static,
36 G: NetworkGraphUpdate + Send + Sync + 'static,
37{
38 pin_mut!(events);
39 while let Some(chain_event) = events.next().await {
40 tracing::debug!(event = %chain_event, "processing chain event");
41 match chain_event {
42 ChainEvent::Announcement(account) => {
43 tracing::debug!(
44 account = %account.public_key,
45 "recording graph node for announced account"
46 );
47 graph_updater.record_node(account.public_key);
48 if let Some(ref mut tx) = peer_discovery_tx {
49 let peer_id: PeerId = account.public_key.into();
50 let multiaddrs = account.get_multiaddrs();
51 let _span = tracing::info_span!(
52 "peer_announcement",
53 peer = %peer_id,
54 multiaddresses = ?multiaddrs,
55 )
56 .entered();
57 if let Err(e) = tx.try_send((peer_id, multiaddrs.to_vec())) {
58 tracing::error!(%e, "peer-discovery channel full or closed; announcement dropped");
59 }
60 }
61 }
62 ChainEvent::ChannelOpened(channel)
63 | ChainEvent::ChannelClosureInitiated(channel)
64 | ChainEvent::ChannelClosed(channel)
65 | ChainEvent::ChannelBalanceIncreased(channel, _)
66 | ChainEvent::ChannelBalanceDecreased(channel, _) => {
67 let src_addr = channel.source;
68 let dst_addr = channel.destination;
69 let reader = chain_reader.clone();
70 let keys = hopr_utils::runtime::prelude::spawn_blocking(move || {
71 let resolve = |addr: Address| {
72 if addr == own_chain_addr {
73 return Ok(Some(own_packet_key));
74 }
75 reader.chain_key_to_packet_key(&addr).map_err(anyhow::Error::from)
76 };
77 resolve(src_addr).and_then(|src| resolve(dst_addr).map(|dst| src.zip(dst)))
78 })
79 .await
80 .map_err(anyhow::Error::from)
81 .flatten();
82
83 match keys {
84 Ok(Some((from, to))) => {
85 let capacity =
86 match channel.status {
87 ChannelStatus::Closed | ChannelStatus::PendingToClose(_) => None,
88 _ => ticket_price.read().div_f64(win_probability.read().as_f64()).ok().map(
89 |ticket_value| {
90 channel
91 .balance
92 .amount()
93 .checked_div(ticket_value.amount())
94 .map(|v| v.low_u128())
95 .unwrap_or(u128::MAX)
96 },
97 ),
98 };
99
100 tracing::debug!(
101 %channel, ?capacity,
102 "recording graph edge for channel capacity"
103 );
104 graph_updater.record_edge(MeasurableEdge::<NeighborTelemetry, PathTelemetry>::Capacity(
105 Box::new(EdgeCapacityUpdate {
106 capacity,
107 src: from,
108 dest: to,
109 }),
110 ));
111 }
112 Ok(None) => {
113 tracing::error!(
114 %channel,
115 "could not find packet keys for channel endpoints"
116 );
117 }
118 Err(error) => {
119 tracing::error!(
120 %error, %channel,
121 "failed to convert chain keys to packet keys"
122 );
123 }
124 }
125 }
126 ChainEvent::WinningProbabilityIncreased(prob) | ChainEvent::WinningProbabilityDecreased(prob) => {
127 tracing::debug!(%prob, "recording winning probability change");
128 *win_probability.write() = prob;
129 }
130 ChainEvent::TicketPriceChanged(price) => {
131 tracing::debug!(%price, "recording ticket price change");
132 *ticket_price.write() = price;
133 }
134 _ => {}
135 }
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use std::{
142 collections::HashMap,
143 sync::{Arc, Mutex},
144 time::SystemTime,
145 };
146
147 use anyhow::Context as _;
148 use hopr_api::{
149 HoprBalance, OffchainPublicKey,
150 chain::{ChainKeyOperations, HoprKeyIdent, KeyIdMapping, WinningProbability},
151 graph::{EdgeCapacityUpdate, MeasurableEdge, MeasurablePath, MeasurablePeer, NetworkGraphUpdate},
152 types::{
153 chain::chain_events::ChainEvent,
154 crypto::prelude::{ChainKeypair, Keypair, OffchainKeypair},
155 internal::prelude::{AccountEntry, AccountType, ChannelEntry, ChannelStatus},
156 primitive::prelude::Address,
157 },
158 };
159 use parking_lot::RwLock;
160
161 use super::process_chain_events;
162
163 #[derive(Debug, Clone, thiserror::Error)]
168 #[error("stub: {0}")]
169 struct StubError(String);
170
171 #[derive(Debug, Clone)]
172 struct NoopMapper;
173
174 impl KeyIdMapping<HoprKeyIdent, OffchainPublicKey> for NoopMapper {
175 fn map_key_to_id(&self, _key: &OffchainPublicKey) -> Option<HoprKeyIdent> {
176 None
177 }
178
179 fn map_id_to_public(&self, _id: &HoprKeyIdent) -> Option<OffchainPublicKey> {
180 None
181 }
182 }
183
184 #[derive(Debug, Clone)]
185 struct StubChainKeys {
186 keys: HashMap<Address, OffchainPublicKey>,
187 mapper: NoopMapper,
188 }
189
190 impl StubChainKeys {
191 fn new(pairs: impl IntoIterator<Item = (Address, OffchainPublicKey)>) -> Self {
192 Self {
193 keys: pairs.into_iter().collect(),
194 mapper: NoopMapper,
195 }
196 }
197 }
198
199 impl ChainKeyOperations for StubChainKeys {
200 type Error = StubError;
201 type Mapper = NoopMapper;
202
203 fn chain_key_to_packet_key(&self, chain: &Address) -> Result<Option<OffchainPublicKey>, Self::Error> {
204 Ok(self.keys.get(chain).copied())
205 }
206
207 fn packet_key_to_chain_key(&self, packet: &OffchainPublicKey) -> Result<Option<Address>, Self::Error> {
208 Ok(self.keys.iter().find_map(|(a, k)| (k == packet).then_some(*a)))
209 }
210
211 fn key_id_mapper_ref(&self) -> &Self::Mapper {
212 &self.mapper
213 }
214 }
215
216 #[derive(Debug, Clone)]
217 enum GraphCall {
218 Node(OffchainPublicKey),
219 Edge(Box<EdgeCapacityUpdate>),
220 }
221
222 #[derive(Debug, Clone, Default)]
223 struct RecordingGraph {
224 calls: Arc<Mutex<Vec<GraphCall>>>,
225 }
226
227 impl RecordingGraph {
228 fn recorded(&self) -> Vec<GraphCall> {
229 self.calls.lock().unwrap().clone()
230 }
231
232 fn edges(&self) -> Vec<EdgeCapacityUpdate> {
233 self.recorded()
234 .into_iter()
235 .filter_map(|c| if let GraphCall::Edge(e) = c { Some(*e) } else { None })
236 .collect()
237 }
238
239 fn nodes(&self) -> Vec<OffchainPublicKey> {
240 self.recorded()
241 .into_iter()
242 .filter_map(|c| if let GraphCall::Node(n) = c { Some(n) } else { None })
243 .collect()
244 }
245 }
246
247 impl NetworkGraphUpdate for RecordingGraph {
248 fn record_edge<N, P>(&self, update: MeasurableEdge<N, P>)
249 where
250 N: MeasurablePeer + Clone + Send + Sync + 'static,
251 P: MeasurablePath + Clone + Send + Sync + 'static,
252 {
253 if let MeasurableEdge::Capacity(cap) = update {
254 self.calls.lock().unwrap().push(GraphCall::Edge(cap));
255 }
256 }
257
258 fn record_node<N>(&self, update: N)
259 where
260 N: hopr_api::graph::MeasurableNode + Clone + Send + Sync + 'static,
261 {
262 self.calls.lock().unwrap().push(GraphCall::Node(update.into()));
263 }
264 }
265
266 fn make_keypairs() -> (OffchainKeypair, ChainKeypair) {
271 (OffchainKeypair::random(), ChainKeypair::random())
272 }
273
274 fn channel(src: Address, dst: Address, balance: u128, status: ChannelStatus) -> ChannelEntry {
275 ChannelEntry::builder()
276 .source(src)
277 .destination(dst)
278 .amount(balance)
279 .status(status)
280 .build()
281 .expect("valid channel")
282 }
283
284 fn account(key: OffchainPublicKey, addr: Address) -> AccountEntry {
285 use hopr_api::types::primitive::prelude::KeyIdent;
286 AccountEntry {
287 public_key: key,
288 chain_addr: addr,
289 entry_type: AccountType::NotAnnounced,
290 safe_address: None,
291 key_id: KeyIdent::default(),
292 }
293 }
294
295 async fn run(
296 events: Vec<ChainEvent>,
297 chain: StubChainKeys,
298 graph: RecordingGraph,
299 own_chain_addr: Address,
300 own_packet_key: OffchainPublicKey,
301 ticket_price: HoprBalance,
302 win_probability: WinningProbability,
303 ) {
304 let _ = run_with_peer_discovery(
305 events,
306 chain,
307 graph,
308 own_chain_addr,
309 own_packet_key,
310 ticket_price,
311 win_probability,
312 )
313 .await;
314 }
315
316 async fn run_with_peer_discovery(
317 events: Vec<ChainEvent>,
318 chain: StubChainKeys,
319 graph: RecordingGraph,
320 own_chain_addr: Address,
321 own_packet_key: OffchainPublicKey,
322 ticket_price: HoprBalance,
323 win_probability: WinningProbability,
324 ) -> Vec<(hopr_api::PeerId, Vec<hopr_api::Multiaddr>)> {
325 use futures::StreamExt;
326 let (tx, rx) = futures::channel::mpsc::channel(64);
327 process_chain_events(
328 chain,
329 graph,
330 futures::stream::iter(events),
331 own_chain_addr,
332 own_packet_key,
333 Arc::new(RwLock::new(ticket_price)),
334 Arc::new(RwLock::new(win_probability)),
335 Some(tx),
336 )
337 .await;
338 rx.collect().await
339 }
340
341 #[tokio::test]
346 async fn announcement_records_node() {
347 let (offchain, chain) = make_keypairs();
348 let addr = chain.public().to_address();
349 let graph = RecordingGraph::default();
350
351 run(
352 vec![ChainEvent::Announcement(account(*offchain.public(), addr))],
353 StubChainKeys::new([]),
354 graph.clone(),
355 addr,
356 *offchain.public(),
357 HoprBalance::from(10u64),
358 WinningProbability::ALWAYS,
359 )
360 .await;
361
362 assert_eq!(graph.nodes(), vec![*offchain.public()]);
363 assert!(graph.edges().is_empty());
364 }
365
366 #[tokio::test]
367 async fn announcement_should_forward_to_peer_discovery_when_tx_is_set() -> anyhow::Result<()> {
368 use std::str::FromStr;
369
370 use hopr_api::types::internal::prelude::AccountType;
371
372 let (offchain, chain) = make_keypairs();
373 let addr = chain.public().to_address();
374 let multiaddr = hopr_api::Multiaddr::from_str("/ip4/1.2.3.4/tcp/9000").context("parse multiaddr")?;
375 let entry = AccountEntry {
376 entry_type: AccountType::Announced(vec![multiaddr.clone()]),
377 ..account(*offchain.public(), addr)
378 };
379 let graph = RecordingGraph::default();
380
381 let received = run_with_peer_discovery(
382 vec![ChainEvent::Announcement(entry)],
383 StubChainKeys::new([]),
384 graph.clone(),
385 addr,
386 *offchain.public(),
387 HoprBalance::from(10u64),
388 WinningProbability::ALWAYS,
389 )
390 .await;
391
392 assert_eq!(received.len(), 1, "expected exactly one peer-discovery event");
393 let (peer_id, addrs) = &received[0];
394 assert_eq!(
395 *peer_id,
396 hopr_api::PeerId::from(*offchain.public()),
397 "peer id must match the announced account's public key"
398 );
399 assert_eq!(addrs, &vec![multiaddr], "multiaddrs must be forwarded unchanged");
400 assert_eq!(
401 graph.nodes(),
402 vec![*offchain.public()],
403 "graph must also record the node"
404 );
405 Ok(())
406 }
407
408 #[tokio::test]
409 async fn channel_opened_records_capacity() {
410 let (src_offchain, src_chain) = make_keypairs();
411 let (dst_offchain, dst_chain) = make_keypairs();
412 let src_addr = src_chain.public().to_address();
413 let dst_addr = dst_chain.public().to_address();
414
415 let graph = RecordingGraph::default();
416 let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
417
418 run(
420 vec![ChainEvent::ChannelOpened(channel(
421 src_addr,
422 dst_addr,
423 100,
424 ChannelStatus::Open,
425 ))],
426 stub,
427 graph.clone(),
428 src_addr,
429 *src_offchain.public(),
430 HoprBalance::from(10u64),
431 WinningProbability::ALWAYS,
432 )
433 .await;
434
435 let edges = graph.edges();
436 assert_eq!(edges.len(), 1);
437 assert_eq!(edges[0].capacity, Some(10));
438 assert_eq!(edges[0].src, *src_offchain.public());
439 assert_eq!(edges[0].dest, *dst_offchain.public());
440 }
441
442 #[tokio::test]
443 async fn channel_balance_decreased_records_updated_capacity() {
444 let (src_offchain, src_chain) = make_keypairs();
445 let (dst_offchain, dst_chain) = make_keypairs();
446 let src_addr = src_chain.public().to_address();
447 let dst_addr = dst_chain.public().to_address();
448
449 let graph = RecordingGraph::default();
450 let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
451
452 run(
454 vec![ChainEvent::ChannelBalanceDecreased(
455 channel(src_addr, dst_addr, 50, ChannelStatus::Open),
456 HoprBalance::from(50u64),
457 )],
458 stub,
459 graph.clone(),
460 src_addr,
461 *src_offchain.public(),
462 HoprBalance::from(10u64),
463 WinningProbability::ALWAYS,
464 )
465 .await;
466
467 let edges = graph.edges();
468 assert_eq!(edges.len(), 1);
469 assert_eq!(edges[0].capacity, Some(5));
470 }
471
472 #[tokio::test]
473 async fn channel_closed_records_capacity_none() {
474 let (src_offchain, src_chain) = make_keypairs();
475 let (dst_offchain, dst_chain) = make_keypairs();
476 let src_addr = src_chain.public().to_address();
477 let dst_addr = dst_chain.public().to_address();
478
479 let graph = RecordingGraph::default();
480 let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
481
482 run(
483 vec![ChainEvent::ChannelClosed(channel(
484 src_addr,
485 dst_addr,
486 0,
487 ChannelStatus::Closed,
488 ))],
489 stub,
490 graph.clone(),
491 src_addr,
492 *src_offchain.public(),
493 HoprBalance::from(10u64),
494 WinningProbability::ALWAYS,
495 )
496 .await;
497
498 let edges = graph.edges();
499 assert_eq!(edges.len(), 1);
500 assert_eq!(edges[0].capacity, None);
501 }
502
503 #[tokio::test]
507 async fn channel_closure_initiated_records_capacity_none() {
508 let (src_offchain, src_chain) = make_keypairs();
509 let (dst_offchain, dst_chain) = make_keypairs();
510 let src_addr = src_chain.public().to_address();
511 let dst_addr = dst_chain.public().to_address();
512
513 let graph = RecordingGraph::default();
514 let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
515
516 run(
517 vec![ChainEvent::ChannelClosureInitiated(channel(
518 src_addr,
519 dst_addr,
520 100,
521 ChannelStatus::PendingToClose(SystemTime::now()),
522 ))],
523 stub,
524 graph.clone(),
525 src_addr,
526 *src_offchain.public(),
527 HoprBalance::from(10u64),
528 WinningProbability::ALWAYS,
529 )
530 .await;
531
532 let edges = graph.edges();
533 assert_eq!(edges.len(), 1, "closure-initiated must emit a graph update");
534 assert_eq!(
535 edges[0].capacity, None,
536 "closure-initiated must zero out the capacity so routing stops using this edge"
537 );
538 }
539
540 #[tokio::test]
541 async fn ticket_price_change_affects_subsequent_capacity() {
542 let (src_offchain, src_chain) = make_keypairs();
543 let (dst_offchain, dst_chain) = make_keypairs();
544 let src_addr = src_chain.public().to_address();
545 let dst_addr = dst_chain.public().to_address();
546
547 let graph = RecordingGraph::default();
548 let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
549
550 run(
552 vec![
553 ChainEvent::TicketPriceChanged(HoprBalance::from(20u64)),
554 ChainEvent::ChannelOpened(channel(src_addr, dst_addr, 200, ChannelStatus::Open)),
555 ],
556 stub,
557 graph.clone(),
558 src_addr,
559 *src_offchain.public(),
560 HoprBalance::from(10u64),
561 WinningProbability::ALWAYS,
562 )
563 .await;
564
565 let edges = graph.edges();
566 assert_eq!(edges.len(), 1);
567 assert_eq!(edges[0].capacity, Some(10));
568 }
569
570 #[tokio::test]
571 async fn win_probability_change_affects_subsequent_capacity() -> anyhow::Result<()> {
572 let (src_offchain, src_chain) = make_keypairs();
573 let (dst_offchain, dst_chain) = make_keypairs();
574 let src_addr = src_chain.public().to_address();
575 let dst_addr = dst_chain.public().to_address();
576
577 let graph = RecordingGraph::default();
578 let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
579
580 let new_prob = WinningProbability::try_from_f64(0.5).context("0.5 is a valid winning probability")?;
582 run(
583 vec![
584 ChainEvent::WinningProbabilityDecreased(new_prob),
585 ChainEvent::ChannelOpened(channel(src_addr, dst_addr, 100, ChannelStatus::Open)),
586 ],
587 stub,
588 graph.clone(),
589 src_addr,
590 *src_offchain.public(),
591 HoprBalance::from(10u64),
592 WinningProbability::ALWAYS,
593 )
594 .await;
595
596 let edges = graph.edges();
597 assert_eq!(edges.len(), 1);
598 assert_eq!(edges[0].capacity, Some(5));
599 Ok(())
600 }
601
602 #[tokio::test]
603 async fn unknown_chain_key_produces_no_graph_update() {
604 let (src_offchain, src_chain) = make_keypairs();
605 let (_, dst_chain) = make_keypairs();
606 let src_addr = src_chain.public().to_address();
607 let dst_addr = dst_chain.public().to_address();
608
609 let graph = RecordingGraph::default();
610 let stub = StubChainKeys::new([(src_addr, *src_offchain.public())]);
612
613 run(
614 vec![ChainEvent::ChannelOpened(channel(
615 src_addr,
616 dst_addr,
617 100,
618 ChannelStatus::Open,
619 ))],
620 stub,
621 graph.clone(),
622 src_addr,
623 *src_offchain.public(),
624 HoprBalance::from(10u64),
625 WinningProbability::ALWAYS,
626 )
627 .await;
628
629 assert!(graph.edges().is_empty(), "unknown key must produce no graph update");
630 }
631
632 #[tokio::test]
633 async fn self_address_resolved_via_own_packet_key() {
634 let (own_offchain, own_chain) = make_keypairs();
635 let (dst_offchain, dst_chain) = make_keypairs();
636 let own_chain_addr = own_chain.public().to_address();
637 let dst_addr = dst_chain.public().to_address();
638
639 let graph = RecordingGraph::default();
640 let stub = StubChainKeys::new([(dst_addr, *dst_offchain.public())]);
642
643 run(
644 vec![ChainEvent::ChannelOpened(channel(
645 own_chain_addr,
646 dst_addr,
647 100,
648 ChannelStatus::Open,
649 ))],
650 stub,
651 graph.clone(),
652 own_chain_addr,
653 *own_offchain.public(),
654 HoprBalance::from(10u64),
655 WinningProbability::ALWAYS,
656 )
657 .await;
658
659 let edges = graph.edges();
660 assert_eq!(edges.len(), 1);
661 assert_eq!(edges[0].src, *own_offchain.public());
662 assert_eq!(edges[0].dest, *dst_offchain.public());
663 }
664}