hoprd_api/
messages.rs

1use axum::{
2    extract::{Json, Query, State},
3    http::status::StatusCode,
4    response::IntoResponse,
5};
6use hopr_crypto_types::types::Hash;
7use serde::Deserialize;
8use serde_with::{serde_as, Bytes, DisplayFromStr, DurationMilliSeconds};
9use std::{sync::Arc, time::Duration};
10use tracing::error;
11use validator::Validate;
12
13use hopr_lib::{
14    errors::{HoprLibError, HoprStatusError},
15    AsUnixTimestamp, RoutingOptions, RESERVED_TAG_UPPER_LIMIT,
16};
17
18use crate::{
19    types::{HoprIdentifier, PeerOrAddress},
20    ApiError, ApiErrorStatus, InternalState, BASE_PATH,
21};
22
23#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema, utoipa::IntoParams)]
24#[into_params(parameter_in = Query)]
25pub(crate) struct TagQueryRequest {
26    #[schema(required = false)]
27    tag: Option<u16>,
28}
29
30#[derive(Debug, Clone, serde::Serialize, Deserialize, utoipa::ToSchema)]
31pub(crate) struct SizeResponse {
32    size: usize,
33}
34
35#[serde_as]
36#[derive(Debug, Clone, PartialEq, Deserialize, validator::Validate, utoipa::ToSchema)]
37#[serde(rename_all = "camelCase")]
38#[schema(example = json!({
39        "body": "Test message",
40        "path": [
41            "12D3KooWR4uwjKCDCAY1xsEFB4esuWLF9Q5ijYvCjz5PNkTbnu33"
42        ],
43        "destination": "12D3KooWEDc1vGJevww48trVDDf6pr1f6N3F86sGJfQrKCyc8kJ1",
44        "tag": 2000
45    }))]
46#[schema(example = json!({
47    "body": "Test message",
48    "hops": 1,
49    "peerId": "12D3KooWEDc1vGJevww48trVDDf6pr1f6N3F86sGJfQrKCyc8kJ1",
50    "tag": 2000
51}))]
52pub(crate) struct SendMessageBodyRequest {
53    /// The message tag used to filter messages based on application, must be from range <1024,65535>
54    #[schema(minimum = 1024, maximum = 65535)]
55    tag: u16,
56    /// Message to be transmitted over the network
57    #[serde_as(as = "Bytes")]
58    body: Vec<u8>,
59    /// The recipient HOPR PeerId or address
60    #[serde_as(as = "DisplayFromStr")]
61    #[schema(value_type = String)]
62    destination: PeerOrAddress,
63    #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
64    #[validate(length(min = 0, max = 3))]
65    #[schema(value_type = Option<Vec<String>>)]
66    path: Option<Vec<PeerOrAddress>>,
67    #[schema(minimum = 0, maximum = 3)]
68    #[validate(range(min = 0, max = 3))]
69    hops: Option<u16>,
70}
71
72#[serde_as]
73#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
74#[schema(example = json!({
75        "timestamp": 2147483647
76    }))]
77#[serde(rename_all = "camelCase")]
78pub(crate) struct SendMessageResponse {
79    #[serde_as(as = "DurationMilliSeconds<u64>")]
80    #[schema(value_type = u64)]
81    timestamp: std::time::Duration,
82    #[serde_as(as = "DisplayFromStr")]
83    #[schema(value_type = String)]
84    challenge: Hash,
85}
86
87#[serde_as]
88#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
89#[schema(example = json!({
90    "tag": 801,
91    "timestamp": 2147483647
92}))]
93pub(crate) struct GetMessageBodyRequest {
94    /// The message tag used to filter messages based on application
95    #[schema(required = false)]
96    tag: Option<u16>,
97    /// Timestamp to filter messages received after this timestamp
98    #[serde_as(as = "Option<DurationMilliSeconds<u64>>")]
99    #[schema(required = false, value_type = u64)]
100    timestamp: Option<std::time::Duration>,
101}
102
103/// Send a message to another peer using the given path.
104///
105/// The message can be sent either over a specified path or using a specified
106/// number of HOPS, if no path is given.
107#[utoipa::path(
108        post,
109        path = const_format::formatcp!("{BASE_PATH}/messages"),
110        request_body(
111            content = SendMessageBodyRequest,
112            description = "Body of a message to send",
113            content_type = "application/json"),
114        responses(
115            (status = 202, description = "The message was sent successfully, DOES NOT imply successful delivery.", body = SendMessageResponse),
116            (status = 401, description = "Invalid authorization token.", body = ApiError),
117            (status = 412, description = "The node is not ready."),
118            (status = 422, description = "Unknown failure", body = ApiError)
119        ),
120        security(
121            ("api_token" = []),
122            ("bearer_token" = [])
123        ),
124        tag = "Messages",
125    )]
126pub(super) async fn send_message(
127    State(state): State<Arc<InternalState>>,
128    Json(args): Json<SendMessageBodyRequest>,
129) -> Result<impl IntoResponse, impl IntoResponse> {
130    let hopr = state.hopr.clone();
131
132    args.validate().map_err(|e| {
133        (
134            StatusCode::UNPROCESSABLE_ENTITY,
135            ApiErrorStatus::UnknownFailure(e.to_string()),
136        )
137            .into_response()
138    })?;
139
140    let peer_id = match HoprIdentifier::new_with(args.destination, hopr.peer_resolver()).await {
141        Ok(destination) => destination.peer_id,
142        Err(e) => return Err(e.into_response()),
143    };
144
145    #[cfg(not(feature = "explicit-path"))]
146    if args.path.is_some() {
147        return Err((
148            StatusCode::UNPROCESSABLE_ENTITY,
149            ApiErrorStatus::InvalidPath("explicit paths are not allowed".into()),
150        )
151            .into_response());
152    }
153
154    let options = if let Some(intermediate_path) = args.path {
155        let peer_ids_future = intermediate_path
156            .into_iter()
157            .map(|address| HoprIdentifier::new_with(address, hopr.peer_resolver()))
158            .collect::<Vec<_>>();
159
160        let path = futures::future::try_join_all(peer_ids_future)
161            .await
162            .map_err(|e: ApiErrorStatus| {
163                (
164                    StatusCode::UNPROCESSABLE_ENTITY,
165                    ApiErrorStatus::UnknownFailure(format!("Failed to fulfill path: {e}")),
166                )
167                    .into_response()
168            })?
169            .into_iter()
170            .map(|v| v.peer_id)
171            .collect::<Vec<_>>();
172
173        RoutingOptions::IntermediatePath(
174            // get a vec of peer ids from the intermediate path
175            path.try_into().map_err(|_| {
176                (
177                    StatusCode::UNPROCESSABLE_ENTITY,
178                    ApiErrorStatus::InvalidPath("Invalid number of hops".into()),
179                )
180                    .into_response()
181            })?,
182        )
183    } else if let Some(hops) = args.hops {
184        RoutingOptions::Hops((hops as u8).try_into().map_err(|_| {
185            (
186                StatusCode::UNPROCESSABLE_ENTITY,
187                ApiErrorStatus::InvalidPath(format!(
188                    "Number of hops cannot be larger than {}",
189                    RoutingOptions::MAX_INTERMEDIATE_HOPS
190                )),
191            )
192                .into_response()
193        })?)
194    } else {
195        return Err((
196            StatusCode::UNPROCESSABLE_ENTITY,
197            ApiErrorStatus::InvalidPath("One of either hops or intermediate path must be specified".into()),
198        )
199            .into_response());
200    };
201
202    let timestamp = std::time::SystemTime::now().as_unix_timestamp();
203
204    match hopr
205        .send_message(args.body.into_boxed_slice(), peer_id, options, Some(args.tag))
206        .await
207    {
208        Ok(_) => Ok((
209            StatusCode::ACCEPTED,
210            Json(SendMessageResponse {
211                timestamp,
212                challenge: Hash::create(&[b"This value is useless and is present only for backwards compatibility"]),
213            }),
214        )
215            .into_response()),
216        Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(_, _))) => {
217            Err((StatusCode::PRECONDITION_FAILED, ApiErrorStatus::NotReady).into_response())
218        }
219        Err(e) => Err((StatusCode::UNPROCESSABLE_ENTITY, ApiErrorStatus::from(e)).into_response()),
220    }
221}
222
223/// Delete messages from nodes message inbox.
224#[utoipa::path(
225        delete,
226        path = const_format::formatcp!("{BASE_PATH}/messages"),
227        params(TagQueryRequest),
228        responses(
229            (status = 204, description = "Messages successfully deleted."),
230            (status = 400, description = "Bad request.", body = ApiError),
231            (status = 401, description = "Invalid authorization token.", body = ApiError),
232        ),
233        tag = "Messages",
234        security(
235            ("api_token" = []),
236            ("bearer_token" = [])
237        )
238    )]
239pub(super) async fn delete_messages(
240    Query(TagQueryRequest { tag }): Query<TagQueryRequest>,
241    State(state): State<Arc<InternalState>>,
242) -> impl IntoResponse {
243    if let Some(tag) = tag {
244        if tag < RESERVED_TAG_UPPER_LIMIT {
245            return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
246        }
247    }
248
249    let inbox = state.inbox.clone();
250    inbox.write().await.pop_all(tag).await;
251    (StatusCode::NO_CONTENT, ()).into_response()
252}
253
254/// Get size of filtered message inbox for a specific tag
255#[utoipa::path(
256        get,
257        path = const_format::formatcp!("{BASE_PATH}/messages/size"),
258        params(TagQueryRequest),
259        responses(
260            (status = 200, description = "Returns the message inbox size filtered by the given tag", body = SizeResponse),
261            (status = 400, description = "Bad request.", body = ApiError),
262            (status = 401, description = "Invalid authorization token.", body = ApiError),
263        ),
264        security(
265            ("api_token" = []),
266            ("bearer_token" = [])
267        ),
268        tag = "Messages"
269    )]
270pub(super) async fn size(
271    Query(TagQueryRequest { tag }): Query<TagQueryRequest>,
272    State(state): State<Arc<InternalState>>,
273) -> impl IntoResponse {
274    if let Some(tag) = tag {
275        if tag < RESERVED_TAG_UPPER_LIMIT {
276            return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
277        }
278    }
279
280    let inbox = state.inbox.clone();
281    let size = inbox.read().await.size(tag).await;
282
283    (StatusCode::OK, Json(SizeResponse { size })).into_response()
284}
285
286#[serde_as]
287#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
288#[schema(example = json!({
289        "body": "Test message 1",
290        "receivedAt": 1704453953073i64,
291        "tag": 2000
292    }))]
293#[serde(rename_all = "camelCase")]
294pub(crate) struct MessagePopResponse {
295    tag: u16,
296    body: String,
297    #[serde_as(as = "DurationMilliSeconds<u64>")]
298    #[schema(value_type = u64)]
299    received_at: std::time::Duration,
300}
301
302fn to_api_message(data: hopr_lib::ApplicationData, received_at: Duration) -> Result<MessagePopResponse, String> {
303    if let Some(tag) = data.application_tag {
304        match std::str::from_utf8(&data.plain_text) {
305            Ok(data_str) => Ok(MessagePopResponse {
306                tag,
307                body: data_str.into(),
308                received_at,
309            }),
310            Err(error) => Err(format!("Failed to deserialize data into string: {error}")),
311        }
312    } else {
313        Err("No application tag was present despite picking from a tagged inbox".into())
314    }
315}
316
317/// Get the oldest message currently present in the nodes message inbox.
318///
319/// The message is removed from the inbox.
320#[utoipa::path(
321        post,
322        path = const_format::formatcp!("{BASE_PATH}/messages/pop"),
323        request_body(
324            content = TagQueryRequest,
325            description = "Tag of message queue to pop from",
326            content_type = "application/json"
327        ),
328        responses(
329            (status = 200, description = "Message successfully extracted.", body = MessagePopResponse),
330            (status = 400, description = "Bad request.", body = ApiError),
331            (status = 401, description = "Invalid authorization token.", body = ApiError),
332            (status = 404, description = "The specified resource was not found."),
333            (status = 422, description = "Unknown failure", body = ApiError)
334        ),
335        security(
336            ("api_token" = []),
337            ("bearer_token" = [])
338        ),
339        tag = "Messages"
340    )]
341pub(super) async fn pop(
342    State(state): State<Arc<InternalState>>,
343    Json(TagQueryRequest { tag }): Json<TagQueryRequest>,
344) -> impl IntoResponse {
345    if let Some(tag) = tag {
346        if tag < RESERVED_TAG_UPPER_LIMIT {
347            return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
348        }
349    }
350
351    let inbox = state.inbox.clone();
352    let inbox = inbox.write().await;
353    if let Some((data, ts)) = inbox.pop(tag).await {
354        match to_api_message(data, ts) {
355            Ok(message) => (StatusCode::OK, Json(message)).into_response(),
356            Err(e) => (StatusCode::UNPROCESSABLE_ENTITY, ApiErrorStatus::UnknownFailure(e)).into_response(),
357        }
358    } else {
359        (StatusCode::NOT_FOUND, ()).into_response()
360    }
361}
362
363#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
364pub(crate) struct MessagePopAllResponse {
365    messages: Vec<MessagePopResponse>,
366}
367
368/// Get the list of messages currently present in the nodes message inbox.
369///
370/// The messages are removed from the inbox.
371#[utoipa::path(
372        post,
373        path = const_format::formatcp!("{BASE_PATH}/messages/pop-all"),
374        request_body(
375            content = TagQueryRequest,
376            description = "Tag of message queue to pop from. When an empty object or an object with a `tag: 0` is provided, it lists and removes all the messages.",
377            content_type = "application/json"
378        ),
379        responses(
380            (status = 200, description = "All message successfully extracted.", body = MessagePopAllResponse),
381            (status = 400, description = "Bad request.", body = ApiError),
382            (status = 401, description = "Invalid authorization token.", body = ApiError),
383            (status = 404, description = "The specified resource was not found."),
384            (status = 422, description = "Unknown failure", body = ApiError)
385        ),
386        security(
387            ("api_token" = []),
388            ("bearer_token" = [])
389        ),
390        tag = "Messages"
391    )]
392pub(super) async fn pop_all(
393    State(state): State<Arc<InternalState>>,
394    Json(TagQueryRequest { tag }): Json<TagQueryRequest>,
395) -> impl IntoResponse {
396    if let Some(tag) = tag {
397        if tag < RESERVED_TAG_UPPER_LIMIT {
398            return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
399        }
400    }
401
402    let inbox = state.inbox.clone();
403    let inbox = inbox.write().await;
404    let messages: Vec<MessagePopResponse> = inbox
405        .pop_all(tag)
406        .await
407        .into_iter()
408        .filter_map(|(data, ts)| match to_api_message(data, ts) {
409            Ok(msg) => Some(msg),
410            Err(e) => {
411                error!(error = %e, "failed to pop message");
412                None
413            }
414        })
415        .collect::<Vec<_>>();
416
417    (StatusCode::OK, Json(MessagePopAllResponse { messages })).into_response()
418}
419
420/// Peek the oldest message currently present in the nodes message inbox.
421///
422/// The message is not removed from the inbox.
423#[utoipa::path(
424        post,
425        path = const_format::formatcp!("{BASE_PATH}/messages/peek"),
426        request_body(
427            content = TagQueryRequest,
428            description = "Tag of message queue to peek from",
429            content_type = "application/json"
430        ),
431        responses(
432            (status = 200, description = "Message successfully peeked at.", body = MessagePopResponse),
433            (status = 400, description = "Bad request.", body = ApiError),
434            (status = 401, description = "Invalid authorization token.", body = ApiError),
435            (status = 404, description = "The specified resource was not found."),
436            (status = 422, description = "Unknown failure", body = ApiError)
437        ),
438        security(
439            ("api_token" = []),
440            ("bearer_token" = [])
441        ),
442        tag = "Messages"
443    )]
444pub(super) async fn peek(
445    State(state): State<Arc<InternalState>>,
446    Json(TagQueryRequest { tag }): Json<TagQueryRequest>,
447) -> impl IntoResponse {
448    if let Some(tag) = tag {
449        if tag < RESERVED_TAG_UPPER_LIMIT {
450            return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
451        }
452    }
453
454    let inbox = state.inbox.clone();
455    let inbox = inbox.write().await;
456    if let Some((data, ts)) = inbox.peek(tag).await {
457        match to_api_message(data, ts) {
458            Ok(message) => (StatusCode::OK, Json(message)).into_response(),
459            Err(e) => (StatusCode::UNPROCESSABLE_ENTITY, ApiErrorStatus::UnknownFailure(e)).into_response(),
460        }
461    } else {
462        (StatusCode::NOT_FOUND, ()).into_response()
463    }
464}
465
466/// Peek the list of messages currently present in the nodes message inbox, filtered by tag,
467/// and optionally by timestamp (epoch in milliseconds).
468/// The messages are not removed from the inbox.
469#[utoipa::path(
470        post,
471        path = const_format::formatcp!("{BASE_PATH}/messages/peek-all"),
472        request_body(
473            content = GetMessageBodyRequest,
474            description = "Tag of message queue and optionally a timestamp since from to start peeking. When an empty object or an object with a `tag: 0` is provided, it fetches all the messages.",
475            content_type = "application/json"
476        ),
477        responses(
478            (status = 200, description = "All messages successfully peeked at.", body = MessagePopAllResponse),
479            (status = 400, description = "Bad request.", body = ApiError),
480            (status = 401, description = "Invalid authorization token.", body = ApiError),
481            (status = 404, description = "The specified resource was not found."),
482            (status = 422, description = "Unknown failure", body = ApiError)
483        ),
484        security(
485            ("api_token" = []),
486            ("bearer_token" = [])
487        ),
488        tag = "Messages"
489    )]
490
491pub(super) async fn peek_all(
492    State(state): State<Arc<InternalState>>,
493    Json(GetMessageBodyRequest { tag, timestamp }): Json<GetMessageBodyRequest>,
494) -> impl IntoResponse {
495    if let Some(tag) = tag {
496        if tag < RESERVED_TAG_UPPER_LIMIT {
497            return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
498        }
499    }
500
501    let inbox = state.inbox.clone();
502    let inbox = inbox.read().await;
503    let messages = inbox
504        .peek_all(tag, timestamp)
505        .await
506        .into_iter()
507        .filter_map(|(data, ts)| match to_api_message(data, ts) {
508            Ok(msg) => Some(msg),
509            Err(e) => {
510                error!(error = %e, "failed to peek message:");
511                None
512            }
513        })
514        .collect::<Vec<_>>();
515
516    (StatusCode::OK, Json(MessagePopAllResponse { messages })).into_response()
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522
523    use libp2p_identity::PeerId;
524    use serde_json::{from_value, json};
525
526    #[test]
527    fn send_message_accepts_bytes_in_body() -> anyhow::Result<()> {
528        let peer_id = PeerId::random();
529        let destination = PeerOrAddress::from(peer_id);
530        let test_sequence = b"wow, this actually works";
531
532        let json_value = json!({
533            "tag": 5,
534            "body": test_sequence.to_vec(),
535            "destination": peer_id,
536        });
537
538        let expected = SendMessageBodyRequest {
539            tag: 5,
540            body: test_sequence.to_vec(),
541            destination: destination,
542            path: None,
543            hops: None,
544        };
545
546        let actual: SendMessageBodyRequest = from_value(json_value)?;
547
548        assert_eq!(actual, expected);
549
550        Ok(())
551    }
552
553    #[test]
554    fn send_message_accepts_utf8_string_in_body() -> anyhow::Result<()> {
555        let peer_id = PeerId::random();
556        let destination = PeerOrAddress::from(peer_id);
557        let test_sequence = b"wow, this actually works";
558
559        let json_value = json!({
560            "tag": 5,
561            "destination": peer_id,
562            "body": String::from_utf8(test_sequence.to_vec())?,
563        });
564
565        let expected = SendMessageBodyRequest {
566            tag: 5,
567            body: test_sequence.to_vec(),
568            destination: destination,
569            path: None,
570            hops: None,
571        };
572
573        let actual: SendMessageBodyRequest = from_value(json_value)?;
574
575        assert_eq!(actual, expected);
576
577        Ok(())
578    }
579}