Skip to main content

hopr_lib/builder/
chain_wiring.rs

1use std::sync::Arc;
2
3use futures::{StreamExt, pin_mut};
4use hopr_api::{
5    HoprBalance, Multiaddr, OffchainPublicKey, PeerId,
6    chain::{ChainKeyOperations, WinningProbability},
7    graph::{EdgeCapacityUpdate, MeasurableEdge, NetworkGraphUpdate},
8    types::{
9        chain::chain_events::ChainEvent,
10        internal::prelude::ChannelStatus,
11        primitive::prelude::{Address, UnitaryFloatOps},
12    },
13};
14use hopr_transport::{NeighborTelemetry, PathTelemetry};
15use parking_lot::RwLock;
16
17/// Processes chain events and records them as graph updates.
18///
19/// Drives the chain-to-graph edge of the topology pipeline: converts incoming on-chain
20/// `ChainEvent`s into [`NetworkGraphUpdate`] calls so the routing graph stays current.
21/// When `peer_discovery_tx` is `Some`, each [`ChainEvent::Announcement`] is also forwarded
22/// to the p2p network layer so it can initiate connections to newly discovered peers.
23/// Runs until the supplied `events` stream terminates.
24#[allow(clippy::too_many_arguments)]
25pub(super) async fn process_chain_events<C, G>(
26    chain_reader: C,
27    graph_updater: G,
28    events: impl futures::Stream<Item = ChainEvent> + Send + 'static,
29    own_chain_addr: Address,
30    own_packet_key: OffchainPublicKey,
31    ticket_price: Arc<RwLock<HoprBalance>>,
32    win_probability: Arc<RwLock<WinningProbability>>,
33    mut peer_discovery_tx: Option<futures::channel::mpsc::Sender<(PeerId, Vec<Multiaddr>)>>,
34) where
35    C: ChainKeyOperations + Clone + Send + Sync + 'static,
36    G: NetworkGraphUpdate + Send + Sync + 'static,
37{
38    pin_mut!(events);
39    while let Some(chain_event) = events.next().await {
40        tracing::debug!(event = %chain_event, "processing chain event");
41        match chain_event {
42            ChainEvent::Announcement(account) => {
43                tracing::debug!(
44                    account = %account.public_key,
45                    "recording graph node for announced account"
46                );
47                graph_updater.record_node(account.public_key);
48                if let Some(ref mut tx) = peer_discovery_tx {
49                    let peer_id: PeerId = account.public_key.into();
50                    let multiaddrs = account.get_multiaddrs();
51                    let _span = tracing::info_span!(
52                        "peer_announcement",
53                        peer = %peer_id,
54                        multiaddresses = ?multiaddrs,
55                    )
56                    .entered();
57                    if let Err(e) = tx.try_send((peer_id, multiaddrs.to_vec())) {
58                        tracing::error!(%e, "peer-discovery channel full or closed; announcement dropped");
59                    }
60                }
61            }
62            ChainEvent::ChannelOpened(channel)
63            | ChainEvent::ChannelClosureInitiated(channel)
64            | ChainEvent::ChannelClosed(channel)
65            | ChainEvent::ChannelBalanceIncreased(channel, _)
66            | ChainEvent::ChannelBalanceDecreased(channel, _) => {
67                let src_addr = channel.source;
68                let dst_addr = channel.destination;
69                let reader = chain_reader.clone();
70                let keys = hopr_utils::runtime::prelude::spawn_blocking(move || {
71                    let resolve = |addr: Address| {
72                        if addr == own_chain_addr {
73                            return Ok(Some(own_packet_key));
74                        }
75                        reader.chain_key_to_packet_key(&addr).map_err(anyhow::Error::from)
76                    };
77                    resolve(src_addr).and_then(|src| resolve(dst_addr).map(|dst| src.zip(dst)))
78                })
79                .await
80                .map_err(anyhow::Error::from)
81                .flatten();
82
83                match keys {
84                    Ok(Some((from, to))) => {
85                        let capacity =
86                            match channel.status {
87                                ChannelStatus::Closed | ChannelStatus::PendingToClose(_) => None,
88                                _ => ticket_price.read().div_f64(win_probability.read().as_f64()).ok().map(
89                                    |ticket_value| {
90                                        channel
91                                            .balance
92                                            .amount()
93                                            .checked_div(ticket_value.amount())
94                                            .map(|v| v.low_u128())
95                                            .unwrap_or(u128::MAX)
96                                    },
97                                ),
98                            };
99
100                        tracing::debug!(
101                            %channel, ?capacity,
102                            "recording graph edge for channel capacity"
103                        );
104                        graph_updater.record_edge(MeasurableEdge::<NeighborTelemetry, PathTelemetry>::Capacity(
105                            Box::new(EdgeCapacityUpdate {
106                                capacity,
107                                src: from,
108                                dest: to,
109                            }),
110                        ));
111                    }
112                    Ok(None) => {
113                        tracing::error!(
114                            %channel,
115                            "could not find packet keys for channel endpoints"
116                        );
117                    }
118                    Err(error) => {
119                        tracing::error!(
120                            %error, %channel,
121                            "failed to convert chain keys to packet keys"
122                        );
123                    }
124                }
125            }
126            ChainEvent::WinningProbabilityIncreased(prob) | ChainEvent::WinningProbabilityDecreased(prob) => {
127                tracing::debug!(%prob, "recording winning probability change");
128                *win_probability.write() = prob;
129            }
130            ChainEvent::TicketPriceChanged(price) => {
131                tracing::debug!(%price, "recording ticket price change");
132                *ticket_price.write() = price;
133            }
134            _ => {}
135        }
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use std::{
142        collections::HashMap,
143        sync::{Arc, Mutex},
144        time::SystemTime,
145    };
146
147    use anyhow::Context as _;
148    use hopr_api::{
149        HoprBalance, OffchainPublicKey,
150        chain::{ChainKeyOperations, HoprKeyIdent, KeyIdMapping, WinningProbability},
151        graph::{EdgeCapacityUpdate, MeasurableEdge, MeasurablePath, MeasurablePeer, NetworkGraphUpdate},
152        types::{
153            chain::chain_events::ChainEvent,
154            crypto::prelude::{ChainKeypair, Keypair, OffchainKeypair},
155            internal::prelude::{AccountEntry, AccountType, ChannelEntry, ChannelStatus},
156            primitive::prelude::Address,
157        },
158    };
159    use parking_lot::RwLock;
160
161    use super::process_chain_events;
162
163    // ---------------------------------------------------------------------------
164    // Stubs
165    // ---------------------------------------------------------------------------
166
167    #[derive(Debug, Clone, thiserror::Error)]
168    #[error("stub: {0}")]
169    struct StubError(String);
170
171    #[derive(Debug, Clone)]
172    struct NoopMapper;
173
174    impl KeyIdMapping<HoprKeyIdent, OffchainPublicKey> for NoopMapper {
175        fn map_key_to_id(&self, _key: &OffchainPublicKey) -> Option<HoprKeyIdent> {
176            None
177        }
178
179        fn map_id_to_public(&self, _id: &HoprKeyIdent) -> Option<OffchainPublicKey> {
180            None
181        }
182    }
183
184    #[derive(Debug, Clone)]
185    struct StubChainKeys {
186        keys: HashMap<Address, OffchainPublicKey>,
187        mapper: NoopMapper,
188    }
189
190    impl StubChainKeys {
191        fn new(pairs: impl IntoIterator<Item = (Address, OffchainPublicKey)>) -> Self {
192            Self {
193                keys: pairs.into_iter().collect(),
194                mapper: NoopMapper,
195            }
196        }
197    }
198
199    impl ChainKeyOperations for StubChainKeys {
200        type Error = StubError;
201        type Mapper = NoopMapper;
202
203        fn chain_key_to_packet_key(&self, chain: &Address) -> Result<Option<OffchainPublicKey>, Self::Error> {
204            Ok(self.keys.get(chain).copied())
205        }
206
207        fn packet_key_to_chain_key(&self, packet: &OffchainPublicKey) -> Result<Option<Address>, Self::Error> {
208            Ok(self.keys.iter().find_map(|(a, k)| (k == packet).then_some(*a)))
209        }
210
211        fn key_id_mapper_ref(&self) -> &Self::Mapper {
212            &self.mapper
213        }
214    }
215
216    #[derive(Debug, Clone)]
217    enum GraphCall {
218        Node(OffchainPublicKey),
219        Edge(Box<EdgeCapacityUpdate>),
220    }
221
222    #[derive(Debug, Clone, Default)]
223    struct RecordingGraph {
224        calls: Arc<Mutex<Vec<GraphCall>>>,
225    }
226
227    impl RecordingGraph {
228        fn recorded(&self) -> Vec<GraphCall> {
229            self.calls.lock().unwrap().clone()
230        }
231
232        fn edges(&self) -> Vec<EdgeCapacityUpdate> {
233            self.recorded()
234                .into_iter()
235                .filter_map(|c| if let GraphCall::Edge(e) = c { Some(*e) } else { None })
236                .collect()
237        }
238
239        fn nodes(&self) -> Vec<OffchainPublicKey> {
240            self.recorded()
241                .into_iter()
242                .filter_map(|c| if let GraphCall::Node(n) = c { Some(n) } else { None })
243                .collect()
244        }
245    }
246
247    impl NetworkGraphUpdate for RecordingGraph {
248        fn record_edge<N, P>(&self, update: MeasurableEdge<N, P>)
249        where
250            N: MeasurablePeer + Clone + Send + Sync + 'static,
251            P: MeasurablePath + Clone + Send + Sync + 'static,
252        {
253            if let MeasurableEdge::Capacity(cap) = update {
254                self.calls.lock().unwrap().push(GraphCall::Edge(cap));
255            }
256        }
257
258        fn record_node<N>(&self, update: N)
259        where
260            N: hopr_api::graph::MeasurableNode + Clone + Send + Sync + 'static,
261        {
262            self.calls.lock().unwrap().push(GraphCall::Node(update.into()));
263        }
264    }
265
266    // ---------------------------------------------------------------------------
267    // Helpers
268    // ---------------------------------------------------------------------------
269
270    fn make_keypairs() -> (OffchainKeypair, ChainKeypair) {
271        (OffchainKeypair::random(), ChainKeypair::random())
272    }
273
274    fn channel(src: Address, dst: Address, balance: u128, status: ChannelStatus) -> ChannelEntry {
275        ChannelEntry::builder()
276            .source(src)
277            .destination(dst)
278            .amount(balance)
279            .status(status)
280            .build()
281            .expect("valid channel")
282    }
283
284    fn account(key: OffchainPublicKey, addr: Address) -> AccountEntry {
285        use hopr_api::types::primitive::prelude::KeyIdent;
286        AccountEntry {
287            public_key: key,
288            chain_addr: addr,
289            entry_type: AccountType::NotAnnounced,
290            safe_address: None,
291            key_id: KeyIdent::default(),
292        }
293    }
294
295    async fn run(
296        events: Vec<ChainEvent>,
297        chain: StubChainKeys,
298        graph: RecordingGraph,
299        own_chain_addr: Address,
300        own_packet_key: OffchainPublicKey,
301        ticket_price: HoprBalance,
302        win_probability: WinningProbability,
303    ) {
304        let _ = run_with_peer_discovery(
305            events,
306            chain,
307            graph,
308            own_chain_addr,
309            own_packet_key,
310            ticket_price,
311            win_probability,
312        )
313        .await;
314    }
315
316    async fn run_with_peer_discovery(
317        events: Vec<ChainEvent>,
318        chain: StubChainKeys,
319        graph: RecordingGraph,
320        own_chain_addr: Address,
321        own_packet_key: OffchainPublicKey,
322        ticket_price: HoprBalance,
323        win_probability: WinningProbability,
324    ) -> Vec<(hopr_api::PeerId, Vec<hopr_api::Multiaddr>)> {
325        use futures::StreamExt;
326        let (tx, rx) = futures::channel::mpsc::channel(64);
327        process_chain_events(
328            chain,
329            graph,
330            futures::stream::iter(events),
331            own_chain_addr,
332            own_packet_key,
333            Arc::new(RwLock::new(ticket_price)),
334            Arc::new(RwLock::new(win_probability)),
335            Some(tx),
336        )
337        .await;
338        rx.collect().await
339    }
340
341    // ---------------------------------------------------------------------------
342    // Tests
343    // ---------------------------------------------------------------------------
344
345    #[tokio::test]
346    async fn announcement_records_node() {
347        let (offchain, chain) = make_keypairs();
348        let addr = chain.public().to_address();
349        let graph = RecordingGraph::default();
350
351        run(
352            vec![ChainEvent::Announcement(account(*offchain.public(), addr))],
353            StubChainKeys::new([]),
354            graph.clone(),
355            addr,
356            *offchain.public(),
357            HoprBalance::from(10u64),
358            WinningProbability::ALWAYS,
359        )
360        .await;
361
362        assert_eq!(graph.nodes(), vec![*offchain.public()]);
363        assert!(graph.edges().is_empty());
364    }
365
366    #[tokio::test]
367    async fn announcement_should_forward_to_peer_discovery_when_tx_is_set() -> anyhow::Result<()> {
368        use std::str::FromStr;
369
370        use hopr_api::types::internal::prelude::AccountType;
371
372        let (offchain, chain) = make_keypairs();
373        let addr = chain.public().to_address();
374        let multiaddr = hopr_api::Multiaddr::from_str("/ip4/1.2.3.4/tcp/9000").context("parse multiaddr")?;
375        let entry = AccountEntry {
376            entry_type: AccountType::Announced(vec![multiaddr.clone()]),
377            ..account(*offchain.public(), addr)
378        };
379        let graph = RecordingGraph::default();
380
381        let received = run_with_peer_discovery(
382            vec![ChainEvent::Announcement(entry)],
383            StubChainKeys::new([]),
384            graph.clone(),
385            addr,
386            *offchain.public(),
387            HoprBalance::from(10u64),
388            WinningProbability::ALWAYS,
389        )
390        .await;
391
392        assert_eq!(received.len(), 1, "expected exactly one peer-discovery event");
393        let (peer_id, addrs) = &received[0];
394        assert_eq!(
395            *peer_id,
396            hopr_api::PeerId::from(*offchain.public()),
397            "peer id must match the announced account's public key"
398        );
399        assert_eq!(addrs, &vec![multiaddr], "multiaddrs must be forwarded unchanged");
400        assert_eq!(
401            graph.nodes(),
402            vec![*offchain.public()],
403            "graph must also record the node"
404        );
405        Ok(())
406    }
407
408    #[tokio::test]
409    async fn channel_opened_records_capacity() {
410        let (src_offchain, src_chain) = make_keypairs();
411        let (dst_offchain, dst_chain) = make_keypairs();
412        let src_addr = src_chain.public().to_address();
413        let dst_addr = dst_chain.public().to_address();
414
415        let graph = RecordingGraph::default();
416        let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
417
418        // price=10, win_prob=1.0, balance=100 → capacity = 100/(10/1.0) = 10
419        run(
420            vec![ChainEvent::ChannelOpened(channel(
421                src_addr,
422                dst_addr,
423                100,
424                ChannelStatus::Open,
425            ))],
426            stub,
427            graph.clone(),
428            src_addr,
429            *src_offchain.public(),
430            HoprBalance::from(10u64),
431            WinningProbability::ALWAYS,
432        )
433        .await;
434
435        let edges = graph.edges();
436        assert_eq!(edges.len(), 1);
437        assert_eq!(edges[0].capacity, Some(10));
438        assert_eq!(edges[0].src, *src_offchain.public());
439        assert_eq!(edges[0].dest, *dst_offchain.public());
440    }
441
442    #[tokio::test]
443    async fn channel_balance_decreased_records_updated_capacity() {
444        let (src_offchain, src_chain) = make_keypairs();
445        let (dst_offchain, dst_chain) = make_keypairs();
446        let src_addr = src_chain.public().to_address();
447        let dst_addr = dst_chain.public().to_address();
448
449        let graph = RecordingGraph::default();
450        let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
451
452        // price=10, win_prob=1.0, balance=50 after decrease → capacity = 50/10 = 5
453        run(
454            vec![ChainEvent::ChannelBalanceDecreased(
455                channel(src_addr, dst_addr, 50, ChannelStatus::Open),
456                HoprBalance::from(50u64),
457            )],
458            stub,
459            graph.clone(),
460            src_addr,
461            *src_offchain.public(),
462            HoprBalance::from(10u64),
463            WinningProbability::ALWAYS,
464        )
465        .await;
466
467        let edges = graph.edges();
468        assert_eq!(edges.len(), 1);
469        assert_eq!(edges[0].capacity, Some(5));
470    }
471
472    #[tokio::test]
473    async fn channel_closed_records_capacity_none() {
474        let (src_offchain, src_chain) = make_keypairs();
475        let (dst_offchain, dst_chain) = make_keypairs();
476        let src_addr = src_chain.public().to_address();
477        let dst_addr = dst_chain.public().to_address();
478
479        let graph = RecordingGraph::default();
480        let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
481
482        run(
483            vec![ChainEvent::ChannelClosed(channel(
484                src_addr,
485                dst_addr,
486                0,
487                ChannelStatus::Closed,
488            ))],
489            stub,
490            graph.clone(),
491            src_addr,
492            *src_offchain.public(),
493            HoprBalance::from(10u64),
494            WinningProbability::ALWAYS,
495        )
496        .await;
497
498        let edges = graph.edges();
499        assert_eq!(edges.len(), 1);
500        assert_eq!(edges[0].capacity, None);
501    }
502
503    /// Regression test: before the fix, ChannelClosureInitiated was a no-op and the
504    /// graph kept the prior `Some(N)` capacity for the channel lifetime of the close
505    /// timeout window, allowing routing to keep picking the dying edge.
506    #[tokio::test]
507    async fn channel_closure_initiated_records_capacity_none() {
508        let (src_offchain, src_chain) = make_keypairs();
509        let (dst_offchain, dst_chain) = make_keypairs();
510        let src_addr = src_chain.public().to_address();
511        let dst_addr = dst_chain.public().to_address();
512
513        let graph = RecordingGraph::default();
514        let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
515
516        run(
517            vec![ChainEvent::ChannelClosureInitiated(channel(
518                src_addr,
519                dst_addr,
520                100,
521                ChannelStatus::PendingToClose(SystemTime::now()),
522            ))],
523            stub,
524            graph.clone(),
525            src_addr,
526            *src_offchain.public(),
527            HoprBalance::from(10u64),
528            WinningProbability::ALWAYS,
529        )
530        .await;
531
532        let edges = graph.edges();
533        assert_eq!(edges.len(), 1, "closure-initiated must emit a graph update");
534        assert_eq!(
535            edges[0].capacity, None,
536            "closure-initiated must zero out the capacity so routing stops using this edge"
537        );
538    }
539
540    #[tokio::test]
541    async fn ticket_price_change_affects_subsequent_capacity() {
542        let (src_offchain, src_chain) = make_keypairs();
543        let (dst_offchain, dst_chain) = make_keypairs();
544        let src_addr = src_chain.public().to_address();
545        let dst_addr = dst_chain.public().to_address();
546
547        let graph = RecordingGraph::default();
548        let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
549
550        // initial price=10; after price change to 20, balance=200 → 200/(20/1.0) = 10
551        run(
552            vec![
553                ChainEvent::TicketPriceChanged(HoprBalance::from(20u64)),
554                ChainEvent::ChannelOpened(channel(src_addr, dst_addr, 200, ChannelStatus::Open)),
555            ],
556            stub,
557            graph.clone(),
558            src_addr,
559            *src_offchain.public(),
560            HoprBalance::from(10u64),
561            WinningProbability::ALWAYS,
562        )
563        .await;
564
565        let edges = graph.edges();
566        assert_eq!(edges.len(), 1);
567        assert_eq!(edges[0].capacity, Some(10));
568    }
569
570    #[tokio::test]
571    async fn win_probability_change_affects_subsequent_capacity() -> anyhow::Result<()> {
572        let (src_offchain, src_chain) = make_keypairs();
573        let (dst_offchain, dst_chain) = make_keypairs();
574        let src_addr = src_chain.public().to_address();
575        let dst_addr = dst_chain.public().to_address();
576
577        let graph = RecordingGraph::default();
578        let stub = StubChainKeys::new([(src_addr, *src_offchain.public()), (dst_addr, *dst_offchain.public())]);
579
580        // initial win_prob=1.0; after decrease to 0.5, balance=100, price=10 → 100/(10/0.5) = 5
581        let new_prob = WinningProbability::try_from_f64(0.5).context("0.5 is a valid winning probability")?;
582        run(
583            vec![
584                ChainEvent::WinningProbabilityDecreased(new_prob),
585                ChainEvent::ChannelOpened(channel(src_addr, dst_addr, 100, ChannelStatus::Open)),
586            ],
587            stub,
588            graph.clone(),
589            src_addr,
590            *src_offchain.public(),
591            HoprBalance::from(10u64),
592            WinningProbability::ALWAYS,
593        )
594        .await;
595
596        let edges = graph.edges();
597        assert_eq!(edges.len(), 1);
598        assert_eq!(edges[0].capacity, Some(5));
599        Ok(())
600    }
601
602    #[tokio::test]
603    async fn unknown_chain_key_produces_no_graph_update() {
604        let (src_offchain, src_chain) = make_keypairs();
605        let (_, dst_chain) = make_keypairs();
606        let src_addr = src_chain.public().to_address();
607        let dst_addr = dst_chain.public().to_address();
608
609        let graph = RecordingGraph::default();
610        // dst is NOT in the stub map → chain_key_to_packet_key returns None for dst
611        let stub = StubChainKeys::new([(src_addr, *src_offchain.public())]);
612
613        run(
614            vec![ChainEvent::ChannelOpened(channel(
615                src_addr,
616                dst_addr,
617                100,
618                ChannelStatus::Open,
619            ))],
620            stub,
621            graph.clone(),
622            src_addr,
623            *src_offchain.public(),
624            HoprBalance::from(10u64),
625            WinningProbability::ALWAYS,
626        )
627        .await;
628
629        assert!(graph.edges().is_empty(), "unknown key must produce no graph update");
630    }
631
632    #[tokio::test]
633    async fn self_address_resolved_via_own_packet_key() {
634        let (own_offchain, own_chain) = make_keypairs();
635        let (dst_offchain, dst_chain) = make_keypairs();
636        let own_chain_addr = own_chain.public().to_address();
637        let dst_addr = dst_chain.public().to_address();
638
639        let graph = RecordingGraph::default();
640        // own_chain_addr not in stub — must be resolved via own_packet_key
641        let stub = StubChainKeys::new([(dst_addr, *dst_offchain.public())]);
642
643        run(
644            vec![ChainEvent::ChannelOpened(channel(
645                own_chain_addr,
646                dst_addr,
647                100,
648                ChannelStatus::Open,
649            ))],
650            stub,
651            graph.clone(),
652            own_chain_addr,
653            *own_offchain.public(),
654            HoprBalance::from(10u64),
655            WinningProbability::ALWAYS,
656        )
657        .await;
658
659        let edges = graph.edges();
660        assert_eq!(edges.len(), 1);
661        assert_eq!(edges[0].src, *own_offchain.public());
662        assert_eq!(edges[0].dest, *dst_offchain.public());
663    }
664}