1use axum::{
2 extract::{Json, Query, State},
3 http::status::StatusCode,
4 response::IntoResponse,
5};
6use hopr_crypto_types::types::Hash;
7use serde::Deserialize;
8use serde_with::{serde_as, Bytes, DisplayFromStr, DurationMilliSeconds};
9use std::{sync::Arc, time::Duration};
10use tracing::error;
11use validator::Validate;
12
13use hopr_lib::{
14 errors::{HoprLibError, HoprStatusError},
15 AsUnixTimestamp, RoutingOptions, RESERVED_TAG_UPPER_LIMIT,
16};
17
18use crate::{
19 types::{HoprIdentifier, PeerOrAddress},
20 ApiError, ApiErrorStatus, InternalState, BASE_PATH,
21};
22
23#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema, utoipa::IntoParams)]
24#[into_params(parameter_in = Query)]
25pub(crate) struct TagQueryRequest {
26 #[schema(required = false)]
27 tag: Option<u16>,
28}
29
30#[derive(Debug, Clone, serde::Serialize, Deserialize, utoipa::ToSchema)]
31pub(crate) struct SizeResponse {
32 size: usize,
33}
34
35#[serde_as]
36#[derive(Debug, Clone, PartialEq, Deserialize, validator::Validate, utoipa::ToSchema)]
37#[serde(rename_all = "camelCase")]
38#[schema(example = json!({
39 "body": "Test message",
40 "path": [
41 "12D3KooWR4uwjKCDCAY1xsEFB4esuWLF9Q5ijYvCjz5PNkTbnu33"
42 ],
43 "destination": "12D3KooWEDc1vGJevww48trVDDf6pr1f6N3F86sGJfQrKCyc8kJ1",
44 "tag": 2000
45 }))]
46#[schema(example = json!({
47 "body": "Test message",
48 "hops": 1,
49 "peerId": "12D3KooWEDc1vGJevww48trVDDf6pr1f6N3F86sGJfQrKCyc8kJ1",
50 "tag": 2000
51}))]
52pub(crate) struct SendMessageBodyRequest {
53 #[schema(minimum = 1024, maximum = 65535)]
55 tag: u16,
56 #[serde_as(as = "Bytes")]
58 body: Vec<u8>,
59 #[serde_as(as = "DisplayFromStr")]
61 #[schema(value_type = String)]
62 destination: PeerOrAddress,
63 #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
64 #[validate(length(min = 0, max = 3))]
65 #[schema(value_type = Option<Vec<String>>)]
66 path: Option<Vec<PeerOrAddress>>,
67 #[schema(minimum = 0, maximum = 3)]
68 #[validate(range(min = 0, max = 3))]
69 hops: Option<u16>,
70}
71
72#[serde_as]
73#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
74#[schema(example = json!({
75 "timestamp": 2147483647
76 }))]
77#[serde(rename_all = "camelCase")]
78pub(crate) struct SendMessageResponse {
79 #[serde_as(as = "DurationMilliSeconds<u64>")]
80 #[schema(value_type = u64)]
81 timestamp: std::time::Duration,
82 #[serde_as(as = "DisplayFromStr")]
83 #[schema(value_type = String)]
84 challenge: Hash,
85}
86
87#[serde_as]
88#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
89#[schema(example = json!({
90 "tag": 801,
91 "timestamp": 2147483647
92}))]
93pub(crate) struct GetMessageBodyRequest {
94 #[schema(required = false)]
96 tag: Option<u16>,
97 #[serde_as(as = "Option<DurationMilliSeconds<u64>>")]
99 #[schema(required = false, value_type = u64)]
100 timestamp: Option<std::time::Duration>,
101}
102
103#[utoipa::path(
108 post,
109 path = const_format::formatcp!("{BASE_PATH}/messages"),
110 request_body(
111 content = SendMessageBodyRequest,
112 description = "Body of a message to send",
113 content_type = "application/json"),
114 responses(
115 (status = 202, description = "The message was sent successfully, DOES NOT imply successful delivery.", body = SendMessageResponse),
116 (status = 401, description = "Invalid authorization token.", body = ApiError),
117 (status = 412, description = "The node is not ready."),
118 (status = 422, description = "Unknown failure", body = ApiError)
119 ),
120 security(
121 ("api_token" = []),
122 ("bearer_token" = [])
123 ),
124 tag = "Messages",
125 )]
126pub(super) async fn send_message(
127 State(state): State<Arc<InternalState>>,
128 Json(args): Json<SendMessageBodyRequest>,
129) -> Result<impl IntoResponse, impl IntoResponse> {
130 let hopr = state.hopr.clone();
131
132 args.validate().map_err(|e| {
133 (
134 StatusCode::UNPROCESSABLE_ENTITY,
135 ApiErrorStatus::UnknownFailure(e.to_string()),
136 )
137 .into_response()
138 })?;
139
140 let peer_id = match HoprIdentifier::new_with(args.destination, hopr.peer_resolver()).await {
141 Ok(destination) => destination.peer_id,
142 Err(e) => return Err(e.into_response()),
143 };
144
145 #[cfg(not(feature = "explicit-path"))]
146 if args.path.is_some() {
147 return Err((
148 StatusCode::UNPROCESSABLE_ENTITY,
149 ApiErrorStatus::InvalidPath("explicit paths are not allowed".into()),
150 )
151 .into_response());
152 }
153
154 let options = if let Some(intermediate_path) = args.path {
155 let peer_ids_future = intermediate_path
156 .into_iter()
157 .map(|address| HoprIdentifier::new_with(address, hopr.peer_resolver()))
158 .collect::<Vec<_>>();
159
160 let path = futures::future::try_join_all(peer_ids_future)
161 .await
162 .map_err(|e: ApiErrorStatus| {
163 (
164 StatusCode::UNPROCESSABLE_ENTITY,
165 ApiErrorStatus::UnknownFailure(format!("Failed to fulfill path: {e}")),
166 )
167 .into_response()
168 })?
169 .into_iter()
170 .map(|v| v.peer_id)
171 .collect::<Vec<_>>();
172
173 RoutingOptions::IntermediatePath(
174 path.try_into().map_err(|_| {
176 (
177 StatusCode::UNPROCESSABLE_ENTITY,
178 ApiErrorStatus::InvalidPath("Invalid number of hops".into()),
179 )
180 .into_response()
181 })?,
182 )
183 } else if let Some(hops) = args.hops {
184 RoutingOptions::Hops((hops as u8).try_into().map_err(|_| {
185 (
186 StatusCode::UNPROCESSABLE_ENTITY,
187 ApiErrorStatus::InvalidPath(format!(
188 "Number of hops cannot be larger than {}",
189 RoutingOptions::MAX_INTERMEDIATE_HOPS
190 )),
191 )
192 .into_response()
193 })?)
194 } else {
195 return Err((
196 StatusCode::UNPROCESSABLE_ENTITY,
197 ApiErrorStatus::InvalidPath("One of either hops or intermediate path must be specified".into()),
198 )
199 .into_response());
200 };
201
202 let timestamp = std::time::SystemTime::now().as_unix_timestamp();
203
204 match hopr
205 .send_message(args.body.into_boxed_slice(), peer_id, options, Some(args.tag))
206 .await
207 {
208 Ok(_) => Ok((
209 StatusCode::ACCEPTED,
210 Json(SendMessageResponse {
211 timestamp,
212 challenge: Hash::create(&[b"This value is useless and is present only for backwards compatibility"]),
213 }),
214 )
215 .into_response()),
216 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(_, _))) => {
217 Err((StatusCode::PRECONDITION_FAILED, ApiErrorStatus::NotReady).into_response())
218 }
219 Err(e) => Err((StatusCode::UNPROCESSABLE_ENTITY, ApiErrorStatus::from(e)).into_response()),
220 }
221}
222
223#[utoipa::path(
225 delete,
226 path = const_format::formatcp!("{BASE_PATH}/messages"),
227 params(TagQueryRequest),
228 responses(
229 (status = 204, description = "Messages successfully deleted."),
230 (status = 400, description = "Bad request.", body = ApiError),
231 (status = 401, description = "Invalid authorization token.", body = ApiError),
232 ),
233 tag = "Messages",
234 security(
235 ("api_token" = []),
236 ("bearer_token" = [])
237 )
238 )]
239pub(super) async fn delete_messages(
240 Query(TagQueryRequest { tag }): Query<TagQueryRequest>,
241 State(state): State<Arc<InternalState>>,
242) -> impl IntoResponse {
243 if let Some(tag) = tag {
244 if tag < RESERVED_TAG_UPPER_LIMIT {
245 return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
246 }
247 }
248
249 let inbox = state.inbox.clone();
250 inbox.write().await.pop_all(tag).await;
251 (StatusCode::NO_CONTENT, ()).into_response()
252}
253
254#[utoipa::path(
256 get,
257 path = const_format::formatcp!("{BASE_PATH}/messages/size"),
258 params(TagQueryRequest),
259 responses(
260 (status = 200, description = "Returns the message inbox size filtered by the given tag", body = SizeResponse),
261 (status = 400, description = "Bad request.", body = ApiError),
262 (status = 401, description = "Invalid authorization token.", body = ApiError),
263 ),
264 security(
265 ("api_token" = []),
266 ("bearer_token" = [])
267 ),
268 tag = "Messages"
269 )]
270pub(super) async fn size(
271 Query(TagQueryRequest { tag }): Query<TagQueryRequest>,
272 State(state): State<Arc<InternalState>>,
273) -> impl IntoResponse {
274 if let Some(tag) = tag {
275 if tag < RESERVED_TAG_UPPER_LIMIT {
276 return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
277 }
278 }
279
280 let inbox = state.inbox.clone();
281 let size = inbox.read().await.size(tag).await;
282
283 (StatusCode::OK, Json(SizeResponse { size })).into_response()
284}
285
286#[serde_as]
287#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
288#[schema(example = json!({
289 "body": "Test message 1",
290 "receivedAt": 1704453953073i64,
291 "tag": 2000
292 }))]
293#[serde(rename_all = "camelCase")]
294pub(crate) struct MessagePopResponse {
295 tag: u16,
296 body: String,
297 #[serde_as(as = "DurationMilliSeconds<u64>")]
298 #[schema(value_type = u64)]
299 received_at: std::time::Duration,
300}
301
302fn to_api_message(data: hopr_lib::ApplicationData, received_at: Duration) -> Result<MessagePopResponse, String> {
303 if let Some(tag) = data.application_tag {
304 match std::str::from_utf8(&data.plain_text) {
305 Ok(data_str) => Ok(MessagePopResponse {
306 tag,
307 body: data_str.into(),
308 received_at,
309 }),
310 Err(error) => Err(format!("Failed to deserialize data into string: {error}")),
311 }
312 } else {
313 Err("No application tag was present despite picking from a tagged inbox".into())
314 }
315}
316
317#[utoipa::path(
321 post,
322 path = const_format::formatcp!("{BASE_PATH}/messages/pop"),
323 request_body(
324 content = TagQueryRequest,
325 description = "Tag of message queue to pop from",
326 content_type = "application/json"
327 ),
328 responses(
329 (status = 200, description = "Message successfully extracted.", body = MessagePopResponse),
330 (status = 400, description = "Bad request.", body = ApiError),
331 (status = 401, description = "Invalid authorization token.", body = ApiError),
332 (status = 404, description = "The specified resource was not found."),
333 (status = 422, description = "Unknown failure", body = ApiError)
334 ),
335 security(
336 ("api_token" = []),
337 ("bearer_token" = [])
338 ),
339 tag = "Messages"
340 )]
341pub(super) async fn pop(
342 State(state): State<Arc<InternalState>>,
343 Json(TagQueryRequest { tag }): Json<TagQueryRequest>,
344) -> impl IntoResponse {
345 if let Some(tag) = tag {
346 if tag < RESERVED_TAG_UPPER_LIMIT {
347 return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
348 }
349 }
350
351 let inbox = state.inbox.clone();
352 let inbox = inbox.write().await;
353 if let Some((data, ts)) = inbox.pop(tag).await {
354 match to_api_message(data, ts) {
355 Ok(message) => (StatusCode::OK, Json(message)).into_response(),
356 Err(e) => (StatusCode::UNPROCESSABLE_ENTITY, ApiErrorStatus::UnknownFailure(e)).into_response(),
357 }
358 } else {
359 (StatusCode::NOT_FOUND, ()).into_response()
360 }
361}
362
363#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
364pub(crate) struct MessagePopAllResponse {
365 messages: Vec<MessagePopResponse>,
366}
367
368#[utoipa::path(
372 post,
373 path = const_format::formatcp!("{BASE_PATH}/messages/pop-all"),
374 request_body(
375 content = TagQueryRequest,
376 description = "Tag of message queue to pop from. When an empty object or an object with a `tag: 0` is provided, it lists and removes all the messages.",
377 content_type = "application/json"
378 ),
379 responses(
380 (status = 200, description = "All message successfully extracted.", body = MessagePopAllResponse),
381 (status = 400, description = "Bad request.", body = ApiError),
382 (status = 401, description = "Invalid authorization token.", body = ApiError),
383 (status = 404, description = "The specified resource was not found."),
384 (status = 422, description = "Unknown failure", body = ApiError)
385 ),
386 security(
387 ("api_token" = []),
388 ("bearer_token" = [])
389 ),
390 tag = "Messages"
391 )]
392pub(super) async fn pop_all(
393 State(state): State<Arc<InternalState>>,
394 Json(TagQueryRequest { tag }): Json<TagQueryRequest>,
395) -> impl IntoResponse {
396 if let Some(tag) = tag {
397 if tag < RESERVED_TAG_UPPER_LIMIT {
398 return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
399 }
400 }
401
402 let inbox = state.inbox.clone();
403 let inbox = inbox.write().await;
404 let messages: Vec<MessagePopResponse> = inbox
405 .pop_all(tag)
406 .await
407 .into_iter()
408 .filter_map(|(data, ts)| match to_api_message(data, ts) {
409 Ok(msg) => Some(msg),
410 Err(e) => {
411 error!(error = %e, "failed to pop message");
412 None
413 }
414 })
415 .collect::<Vec<_>>();
416
417 (StatusCode::OK, Json(MessagePopAllResponse { messages })).into_response()
418}
419
420#[utoipa::path(
424 post,
425 path = const_format::formatcp!("{BASE_PATH}/messages/peek"),
426 request_body(
427 content = TagQueryRequest,
428 description = "Tag of message queue to peek from",
429 content_type = "application/json"
430 ),
431 responses(
432 (status = 200, description = "Message successfully peeked at.", body = MessagePopResponse),
433 (status = 400, description = "Bad request.", body = ApiError),
434 (status = 401, description = "Invalid authorization token.", body = ApiError),
435 (status = 404, description = "The specified resource was not found."),
436 (status = 422, description = "Unknown failure", body = ApiError)
437 ),
438 security(
439 ("api_token" = []),
440 ("bearer_token" = [])
441 ),
442 tag = "Messages"
443 )]
444pub(super) async fn peek(
445 State(state): State<Arc<InternalState>>,
446 Json(TagQueryRequest { tag }): Json<TagQueryRequest>,
447) -> impl IntoResponse {
448 if let Some(tag) = tag {
449 if tag < RESERVED_TAG_UPPER_LIMIT {
450 return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
451 }
452 }
453
454 let inbox = state.inbox.clone();
455 let inbox = inbox.write().await;
456 if let Some((data, ts)) = inbox.peek(tag).await {
457 match to_api_message(data, ts) {
458 Ok(message) => (StatusCode::OK, Json(message)).into_response(),
459 Err(e) => (StatusCode::UNPROCESSABLE_ENTITY, ApiErrorStatus::UnknownFailure(e)).into_response(),
460 }
461 } else {
462 (StatusCode::NOT_FOUND, ()).into_response()
463 }
464}
465
466#[utoipa::path(
470 post,
471 path = const_format::formatcp!("{BASE_PATH}/messages/peek-all"),
472 request_body(
473 content = GetMessageBodyRequest,
474 description = "Tag of message queue and optionally a timestamp since from to start peeking. When an empty object or an object with a `tag: 0` is provided, it fetches all the messages.",
475 content_type = "application/json"
476 ),
477 responses(
478 (status = 200, description = "All messages successfully peeked at.", body = MessagePopAllResponse),
479 (status = 400, description = "Bad request.", body = ApiError),
480 (status = 401, description = "Invalid authorization token.", body = ApiError),
481 (status = 404, description = "The specified resource was not found."),
482 (status = 422, description = "Unknown failure", body = ApiError)
483 ),
484 security(
485 ("api_token" = []),
486 ("bearer_token" = [])
487 ),
488 tag = "Messages"
489 )]
490
491pub(super) async fn peek_all(
492 State(state): State<Arc<InternalState>>,
493 Json(GetMessageBodyRequest { tag, timestamp }): Json<GetMessageBodyRequest>,
494) -> impl IntoResponse {
495 if let Some(tag) = tag {
496 if tag < RESERVED_TAG_UPPER_LIMIT {
497 return (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidApplicationTag).into_response();
498 }
499 }
500
501 let inbox = state.inbox.clone();
502 let inbox = inbox.read().await;
503 let messages = inbox
504 .peek_all(tag, timestamp)
505 .await
506 .into_iter()
507 .filter_map(|(data, ts)| match to_api_message(data, ts) {
508 Ok(msg) => Some(msg),
509 Err(e) => {
510 error!(error = %e, "failed to peek message:");
511 None
512 }
513 })
514 .collect::<Vec<_>>();
515
516 (StatusCode::OK, Json(MessagePopAllResponse { messages })).into_response()
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522
523 use libp2p_identity::PeerId;
524 use serde_json::{from_value, json};
525
526 #[test]
527 fn send_message_accepts_bytes_in_body() -> anyhow::Result<()> {
528 let peer_id = PeerId::random();
529 let destination = PeerOrAddress::from(peer_id);
530 let test_sequence = b"wow, this actually works";
531
532 let json_value = json!({
533 "tag": 5,
534 "body": test_sequence.to_vec(),
535 "destination": peer_id,
536 });
537
538 let expected = SendMessageBodyRequest {
539 tag: 5,
540 body: test_sequence.to_vec(),
541 destination: destination,
542 path: None,
543 hops: None,
544 };
545
546 let actual: SendMessageBodyRequest = from_value(json_value)?;
547
548 assert_eq!(actual, expected);
549
550 Ok(())
551 }
552
553 #[test]
554 fn send_message_accepts_utf8_string_in_body() -> anyhow::Result<()> {
555 let peer_id = PeerId::random();
556 let destination = PeerOrAddress::from(peer_id);
557 let test_sequence = b"wow, this actually works";
558
559 let json_value = json!({
560 "tag": 5,
561 "destination": peer_id,
562 "body": String::from_utf8(test_sequence.to_vec())?,
563 });
564
565 let expected = SendMessageBodyRequest {
566 tag: 5,
567 body: test_sequence.to_vec(),
568 destination: destination,
569 path: None,
570 hops: None,
571 };
572
573 let actual: SendMessageBodyRequest = from_value(json_value)?;
574
575 assert_eq!(actual, expected);
576
577 Ok(())
578 }
579}