hopr_transport/
proxy.rs

1use std::sync::{Arc, OnceLock};
2
3use hopr_crypto_types::types::Hash;
4use hopr_db_sql::api::tickets::{AggregationPrerequisites, HoprDbTicketOperations};
5use hopr_transport_p2p::swarm::{TicketAggregationRequestType, TicketAggregationResponseType};
6use hopr_transport_protocol::{
7    errors::ProtocolError,
8    ticket_aggregation::processor::{AwaitingAggregator, TicketAggregationActions, TicketAggregatorTrait},
9};
10
11#[derive(Debug, Clone)]
12pub struct TicketAggregatorProxy<Db>
13where
14    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
15{
16    db: Db,
17    maybe_writer: Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
18    agg_timeout: std::time::Duration,
19}
20
21impl<Db> TicketAggregatorProxy<Db>
22where
23    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
24{
25    pub fn new(
26        db: Db,
27        maybe_writer: Arc<
28            OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>,
29        >,
30        agg_timeout: std::time::Duration,
31    ) -> Self {
32        Self {
33            db,
34            maybe_writer,
35            agg_timeout,
36        }
37    }
38}
39
40#[async_trait::async_trait]
41impl<Db> TicketAggregatorTrait for TicketAggregatorProxy<Db>
42where
43    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
44{
45    async fn aggregate_tickets(
46        &self,
47        channel: &Hash,
48        prerequisites: AggregationPrerequisites,
49    ) -> hopr_transport_protocol::errors::Result<()> {
50        if let Some(writer) = self.maybe_writer.clone().get() {
51            AwaitingAggregator::new(self.db.clone(), writer.clone(), self.agg_timeout)
52                .aggregate_tickets(channel, prerequisites)
53                .await
54        } else {
55            Err(ProtocolError::TransportError(
56                "Ticket aggregation writer not available, the object was not yet initialized".to_string(),
57            ))
58        }
59    }
60}