1use async_trait::async_trait;
15use futures::StreamExt;
16use hopr_chain_types::actions::Action;
17use hopr_crypto_types::{keypairs::OffchainKeypair, prelude::Keypair};
18use hopr_db_sql::prelude::HoprDbAccountOperations;
19use hopr_internal_types::prelude::*;
20use hopr_primitive_types::prelude::*;
21use multiaddr::Multiaddr;
22use tracing::info;
23
24use crate::{
25    ChainActions,
26    action_queue::PendingAction,
27    errors::{
28        ChainActionsError::{AlreadyAnnounced, InvalidArguments},
29        Result,
30    },
31};
32
33#[async_trait]
35pub trait NodeActions {
36    async fn withdraw<C: Currency + Send>(&self, recipient: Address, amount: Balance<C>) -> Result<PendingAction>;
38
39    async fn announce(&self, multiaddrs: &[Multiaddr], offchain_key: &OffchainKeypair) -> Result<PendingAction>;
42
43    async fn register_safe_by_node(&self, safe_address: Address) -> Result<PendingAction>;
45}
46
47#[async_trait]
48impl<Db: Sync> NodeActions for ChainActions<Db> {
49    #[tracing::instrument(level = "debug", skip(self))]
50    async fn withdraw<C: Currency + Send>(&self, recipient: Address, amount: Balance<C>) -> Result<PendingAction> {
51        if !amount.is_zero() {
52            if C::is::<XDai>() {
53                info!(%amount, %recipient, "initiating native withdrawal");
54                self.tx_sender
55                    .send(Action::WithdrawNative(recipient, amount.amount().into()))
56                    .await
57            } else if C::is::<WxHOPR>() {
58                info!(%amount, %recipient, "initiating token withdrawal");
59                self.tx_sender
60                    .send(Action::Withdraw(recipient, amount.amount().into()))
61                    .await
62            } else {
63                Err(InvalidArguments("invalid currency".into()))
64            }
65        } else {
66            Err(InvalidArguments("cannot withdraw zero amount".into()))
67        }
68    }
69
70    #[tracing::instrument(level = "debug", skip(self))]
71    async fn announce(&self, multiaddrs: &[Multiaddr], offchain_key: &OffchainKeypair) -> Result<PendingAction> {
72        let announcement_data = AnnouncementData::new(
74            multiaddrs[0].clone(),
75            Some(KeyBinding::new(self.self_address(), offchain_key)),
76        )?;
77
78        let count_announced = self
79            .index_db
80            .stream_accounts(true)
81            .await?
82            .filter(|account| {
83                futures::future::ready(
84                    &account.public_key == offchain_key.public()
85                        && account
86                            .get_multiaddr()
87                            .is_some_and(|ma| decapsulate_multiaddress(ma).eq(announcement_data.multiaddress())),
88                )
89            })
90            .count()
91            .await;
92
93        if count_announced == 0 {
94            info!(%announcement_data, "initiating announcement");
95            self.tx_sender.send(Action::Announce(announcement_data)).await
96        } else {
97            Err(AlreadyAnnounced)
98        }
99    }
100
101    #[tracing::instrument(level = "debug", skip(self))]
102    async fn register_safe_by_node(&self, safe_address: Address) -> Result<PendingAction> {
103        info!(%safe_address, "initiating safe address registration");
104        self.tx_sender.send(Action::RegisterSafe(safe_address)).await
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use std::str::FromStr;
111
112    use futures::FutureExt;
113    use hex_literal::hex;
114    use hopr_chain_types::{
115        actions::Action,
116        chain_events::{ChainEventType, SignificantChainEvent},
117    };
118    use hopr_crypto_random::random_bytes;
119    use hopr_crypto_types::prelude::*;
120    use hopr_db_node::HoprNodeDb;
121    use hopr_db_sql::{
122        HoprIndexerDb, accounts::HoprDbAccountOperations, info::HoprDbInfoOperations, prelude::DomainSeparator,
123    };
124    use hopr_internal_types::prelude::*;
125    use hopr_primitive_types::prelude::*;
126    use multiaddr::Multiaddr;
127
128    use crate::{
129        ChainActions,
130        action_queue::{ActionQueue, MockTransactionExecutor},
131        action_state::MockActionState,
132        errors::ChainActionsError,
133        node::NodeActions,
134    };
135
136    lazy_static::lazy_static! {
137        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
138        static ref BOB_KP: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be constructible");
139        static ref ALICE: Address = ALICE_KP.public().to_address();
140        static ref BOB: Address = BOB_KP.public().to_address();
141        static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::from_secret(&hex!("e0bf93e9c916104da00b1850adc4608bd7e9087bbd3f805451f4556aa6b3fd6e")).expect("lazy static keypair should be constructible");
142    }
143
144    #[tokio::test]
145    async fn test_announce() -> anyhow::Result<()> {
146        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
147        let announce_multiaddr = Multiaddr::from_str("/ip4/1.2.3.4/tcp/9009")?;
148
149        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
150        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
151        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
152            .await?;
153
154        let ma = announce_multiaddr.clone();
155        let pubkey_clone = *ALICE_OFFCHAIN.public();
156        let mut tx_exec = MockTransactionExecutor::new();
157        tx_exec
158            .expect_announce()
159            .once()
160            .withf(move |ad| {
161                let kb = ad.key_binding.clone().expect("key binding must be present");
162                ma.eq(ad.multiaddress()) && kb.packet_key == pubkey_clone && kb.chain_key == *ALICE
163            })
164            .returning(move |_| Ok(random_hash));
165
166        let ma = announce_multiaddr.clone();
167        let pk = *ALICE_OFFCHAIN.public();
168        let mut indexer_action_tracker = MockActionState::new();
169        indexer_action_tracker
170            .expect_register_expectation()
171            .once()
172            .returning(move |_| {
173                Ok(futures::future::ok(SignificantChainEvent {
174                    tx_hash: random_hash,
175                    event_type: ChainEventType::Announcement {
176                        peer: pk.into(),
177                        multiaddresses: vec![ma.clone()],
178                        address: *ALICE,
179                    },
180                })
181                .boxed())
182            });
183
184        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
185        let tx_sender = tx_queue.new_sender();
186        tokio::task::spawn(async move {
187            tx_queue.start().await;
188        });
189
190        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
191        let tx_res = actions.announce(&[announce_multiaddr], &ALICE_OFFCHAIN).await?.await?;
192
193        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
194        assert!(matches!(tx_res.action, Action::Announce(_)), "must be announce action");
195        assert!(
196            matches!(tx_res.event, Some(ChainEventType::Announcement { .. })),
197            "must correspond to announcement chain event"
198        );
199
200        Ok(())
201    }
202
203    #[tokio::test]
204    async fn test_announce_should_not_allow_reannouncing_with_same_multiaddress() -> anyhow::Result<()> {
205        let announce_multiaddr = Multiaddr::from_str("/ip4/1.2.3.4/tcp/9009")?;
206
207        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
208        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
209        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
210            .await?;
211
212        db.insert_account(
213            None,
214            AccountEntry {
215                public_key: *ALICE_OFFCHAIN.public(),
216                chain_addr: *ALICE,
217                entry_type: AccountType::Announced {
218                    multiaddr: announce_multiaddr.clone(),
219                    updated_block: 0,
220                },
221                published_at: 1,
222            },
223        )
224        .await?;
225
226        let tx_queue = ActionQueue::new(
227            node_db.clone(),
228            MockActionState::new(),
229            MockTransactionExecutor::new(),
230            Default::default(),
231        );
232        let tx_sender = tx_queue.new_sender();
233
234        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
235
236        let res = actions.announce(&[announce_multiaddr], &ALICE_OFFCHAIN).await;
237        assert!(
238            matches!(res, Err(ChainActionsError::AlreadyAnnounced)),
239            "must not be able to re-announce with same address"
240        );
241
242        Ok(())
243    }
244
245    #[tokio::test]
246    async fn test_withdraw() -> anyhow::Result<()> {
247        let stake = HoprBalance::from(10_u32);
248        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
249
250        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
251        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
252        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
253            .await?;
254
255        let mut tx_exec = MockTransactionExecutor::new();
256        tx_exec
257            .expect_withdraw()
258            .times(1)
259            .withf(move |dst, balance| *BOB == *dst && stake.eq(balance))
260            .returning(move |_, _| Ok(random_hash));
261
262        let mut indexer_action_tracker = MockActionState::new();
263        indexer_action_tracker.expect_register_expectation().never();
264
265        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
266        let tx_sender = tx_queue.new_sender();
267        tokio::task::spawn(async move {
268            tx_queue.start().await;
269        });
270
271        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
272
273        let tx_res = actions.withdraw(*BOB, stake).await?.await?;
274
275        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
276        assert!(
277            matches!(tx_res.action, Action::Withdraw(_, _)),
278            "must be withdraw action"
279        );
280        assert!(
281            tx_res.event.is_none(),
282            "withdraw tx must not connect to any chain event"
283        );
284
285        Ok(())
286    }
287
288    #[tokio::test]
289    async fn test_should_not_withdraw_zero_amount() -> anyhow::Result<()> {
290        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
291        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
292        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
293            .await?;
294
295        let tx_queue = ActionQueue::new(
296            node_db.clone(),
297            MockActionState::new(),
298            MockTransactionExecutor::new(),
299            Default::default(),
300        );
301        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
302
303        assert!(
304            matches!(
305                actions
306                    .withdraw::<WxHOPR>(*BOB, 0_u32.into())
307                    .await
308                    .err()
309                    .expect("must be error"),
310                ChainActionsError::InvalidArguments(_)
311            ),
312            "should not allow to withdraw 0"
313        );
314
315        Ok(())
316    }
317}