1use std::sync::{Arc, OnceLock};
2
3use hopr_crypto_types::types::Hash;
4use hopr_db_sql::api::tickets::{AggregationPrerequisites, HoprDbTicketOperations};
5use hopr_transport_p2p::swarm::{TicketAggregationRequestType, TicketAggregationResponseType};
6use hopr_transport_protocol::{
7 errors::ProtocolError,
8 ticket_aggregation::processor::{AwaitingAggregator, TicketAggregationActions, TicketAggregatorTrait},
9};
10
11#[derive(Debug, Clone)]
12pub struct TicketAggregatorProxy<Db>
13where
14 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
15{
16 db: Db,
17 maybe_writer: Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
18 agg_timeout: std::time::Duration,
19}
20
21impl<Db> TicketAggregatorProxy<Db>
22where
23 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
24{
25 pub fn new(
26 db: Db,
27 maybe_writer: Arc<
28 OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>,
29 >,
30 agg_timeout: std::time::Duration,
31 ) -> Self {
32 Self {
33 db,
34 maybe_writer,
35 agg_timeout,
36 }
37 }
38}
39
40#[async_trait::async_trait]
41impl<Db> TicketAggregatorTrait for TicketAggregatorProxy<Db>
42where
43 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
44{
45 async fn aggregate_tickets(
46 &self,
47 channel: &Hash,
48 prerequisites: AggregationPrerequisites,
49 ) -> hopr_transport_protocol::errors::Result<()> {
50 if let Some(writer) = self.maybe_writer.clone().get() {
51 AwaitingAggregator::new(self.db.clone(), writer.clone(), self.agg_timeout)
52 .aggregate_tickets(channel, prerequisites)
53 .await
54 } else {
55 Err(ProtocolError::TransportError(
56 "Ticket aggregation writer not available, the object was not yet initialized".to_string(),
57 ))
58 }
59 }
60}