links/
api.rs

1//! This module contains the gRPC-based low-level links API, responsible for
2//! allowing outside services access to the links store.
3
4use links_id::Id;
5use links_normalized::{Link, Normalized};
6use rpc::links_server::Links;
7pub use rpc::{
8	links_client::LinksClient, links_server::LinksServer, GetRedirectRequest, GetRedirectResponse,
9	GetStatisticsRequest, GetVanityRequest, GetVanityResponse, RemRedirectRequest,
10	RemRedirectResponse, RemStatisticsRequest, RemVanityRequest, RemVanityResponse,
11	SetRedirectRequest, SetRedirectResponse, SetVanityRequest, SetVanityResponse,
12};
13use rpc_wrapper::rpc;
14use tokio::time::Instant;
15pub use tonic::{Code, Request, Response, Status};
16use tracing::{info, instrument, trace};
17
18use crate::{
19	config::Config,
20	stats::StatisticDescription,
21	store::{Current, Store},
22};
23
24/// A wrapper around the generated tonic code. Contains the `rpc` module with
25/// all of the actual functionality. This is necessary to allow
26/// `clippy::pedantic` on the generated code.
27mod rpc_wrapper {
28	tonic::include_proto!("links");
29}
30
31/// Get a function that checks authentication/authorization of an incoming grpc
32/// API call.
33///
34/// The incoming request is checked for the `auth` metadata value, which should
35/// be a shared secret string value, that is simply compared to the one
36/// configured. **It is critical that this value is kept secret and never
37/// exposed publicly!**
38///
39/// # Errors
40/// Returns the `UNAUTHENTICATED` status code if the token is not provided or
41/// is invalid.
42pub fn get_auth_checker(
43	config: &'static Config,
44) -> impl Fn(Request<()>) -> Result<Request<()>, Status> + Clone {
45	move |req: Request<()>| -> Result<Request<()>, Status> {
46		let token = if let Some(token) = req.metadata().get("auth") {
47			token.as_encoded_bytes()
48		} else {
49			trace!("no auth token to check");
50			return Err(Status::new(Code::Unauthenticated, "no auth token provided"));
51		};
52
53		let secret = config.token();
54
55		trace!("checking auth token {token:?}, secret is {secret:?}");
56
57		if secret.as_bytes() == token {
58			trace!("auth token is valid");
59			Ok(req)
60		} else {
61			trace!("auth token is not valid");
62			Err(Status::new(Code::Unauthenticated, "auth token is invalid"))
63		}
64	}
65}
66
67/// The grpc API implementation. Contains a reference to the store on which all
68/// operations are performed. Implements all RPC calls from `links.proto`.
69#[derive(Debug)]
70pub struct Api {
71	store: &'static Current,
72}
73
74impl Api {
75	/// Create a new API instance. This instance will operate on the `store`
76	/// provided, and provide access to that store via gRPC.
77	#[instrument(level = "info", skip_all, fields(store = store.backend_name()))]
78	pub fn new(store: &'static Current) -> Self {
79		Self { store }
80	}
81
82	/// Get a reference to this API's store.
83	#[instrument(level = "trace", skip_all)]
84	pub fn store(&self) -> Store {
85		self.store.get()
86	}
87}
88
89#[tonic::async_trait]
90impl Links for Api {
91	#[instrument(level = "info", name = "rpc_get_redirect", skip_all, fields(store = %self.store.backend_name()))]
92	async fn get_redirect(
93		&self,
94		req: Request<rpc::GetRedirectRequest>,
95	) -> Result<Response<rpc::GetRedirectResponse>, Status> {
96		let time = Instant::now();
97		let store = self.store();
98
99		let Ok(id) = Id::try_from(req.into_inner().id) else {
100			return Err(Status::new(Code::InvalidArgument, "id is invalid"));
101		};
102
103		let Ok(link) = store.get_redirect(id).await else {
104			return Err(Status::new(Code::Internal, "store operation failed"));
105		};
106
107		let res = Ok(Response::new(rpc::GetRedirectResponse {
108			link: link.map(Link::into_string),
109		}));
110
111		let time = time.elapsed();
112		info!(
113			time_ns = %time.as_nanos(),
114			success = %res.is_ok(),
115			"rpc processed in {:.6} seconds",
116			time.as_secs_f64()
117		);
118
119		res
120	}
121
122	#[instrument(level = "info", name = "rpc_set_redirect", skip_all, fields(store = %self.store.backend_name()))]
123	async fn set_redirect(
124		&self,
125		req: Request<rpc::SetRedirectRequest>,
126	) -> Result<Response<rpc::SetRedirectResponse>, Status> {
127		let time = Instant::now();
128		let store = self.store();
129
130		let rpc::SetRedirectRequest { id, link } = req.into_inner();
131
132		let Ok(id) = Id::try_from(id) else {
133			return Err(Status::new(Code::InvalidArgument, "id is invalid"));
134		};
135
136		let Ok(link) = Link::new(&link) else {
137			return Err(Status::new(Code::InvalidArgument, "link is invalid"));
138		};
139
140		let Ok(link) = store.set_redirect(id, link).await else {
141			return Err(Status::new(Code::Internal, "store operation failed"));
142		};
143
144		let res = Ok(Response::new(rpc::SetRedirectResponse {
145			link: link.map(Link::into_string),
146		}));
147
148		let time = time.elapsed();
149		info!(
150			time_ns = %time.as_nanos(),
151			success = %res.is_ok(),
152			"rpc processed in {:.6} seconds",
153			time.as_secs_f64()
154		);
155
156		res
157	}
158
159	#[instrument(level = "info", name = "rpc_rem_redirect", skip_all, fields(store = %self.store.backend_name()))]
160	async fn rem_redirect(
161		&self,
162		req: Request<rpc::RemRedirectRequest>,
163	) -> Result<Response<rpc::RemRedirectResponse>, Status> {
164		let time = Instant::now();
165		let store = self.store();
166
167		let Ok(id) = Id::try_from(req.into_inner().id) else {
168			return Err(Status::new(Code::InvalidArgument, "id is invalid"));
169		};
170
171		let Ok(link) = store.rem_redirect(id).await else {
172			return Err(Status::new(Code::Internal, "store operation failed"));
173		};
174
175		let res = Ok(Response::new(rpc::RemRedirectResponse {
176			link: link.map(Link::into_string),
177		}));
178
179		let time = time.elapsed();
180		info!(
181			time_ns = %time.as_nanos(),
182			success = %res.is_ok(),
183			"rpc processed in {:.6} seconds",
184			time.as_secs_f64()
185		);
186
187		res
188	}
189
190	#[instrument(level = "info", name = "rpc_get_vanity", skip_all, fields(store = %self.store.backend_name()))]
191	async fn get_vanity(
192		&self,
193		req: Request<rpc::GetVanityRequest>,
194	) -> Result<Response<rpc::GetVanityResponse>, Status> {
195		let time = Instant::now();
196		let store = self.store();
197
198		let vanity = Normalized::new(&req.into_inner().vanity);
199
200		let Ok(id) = store.get_vanity(vanity).await else {
201			return Err(Status::new(Code::Internal, "store operation failed"));
202		};
203
204		let res = Ok(Response::new(rpc::GetVanityResponse {
205			id: id.map(|id| id.to_string()),
206		}));
207
208		let time = time.elapsed();
209		info!(
210			time_ns = %time.as_nanos(),
211			success = %res.is_ok(),
212			"rpc processed in {:.6} seconds",
213			time.as_secs_f64()
214		);
215
216		res
217	}
218
219	#[instrument(level = "info", name = "rpc_set_vanity", skip_all, fields(store = %self.store.backend_name()))]
220	async fn set_vanity(
221		&self,
222		req: Request<rpc::SetVanityRequest>,
223	) -> Result<Response<rpc::SetVanityResponse>, Status> {
224		let time = Instant::now();
225		let store = self.store();
226
227		let rpc::SetVanityRequest { vanity, id } = req.into_inner();
228
229		let vanity = Normalized::new(&vanity);
230
231		let Ok(id) = Id::try_from(id) else {
232			return Err(Status::new(Code::InvalidArgument, "id is invalid"));
233		};
234
235		let Ok(id) = store.set_vanity(vanity, id).await else {
236			return Err(Status::new(Code::Internal, "store operation failed"));
237		};
238
239		let res = Ok(Response::new(rpc::SetVanityResponse {
240			id: id.map(|id| id.to_string()),
241		}));
242
243		let time = time.elapsed();
244		info!(
245			time_ns = %time.as_nanos(),
246			success = %res.is_ok(),
247			"rpc processed in {:.6} seconds",
248			time.as_secs_f64()
249		);
250
251		res
252	}
253
254	#[instrument(level = "info", name = "rpc_rem_vanity", skip_all, fields(store = %self.store.backend_name()))]
255	async fn rem_vanity(
256		&self,
257		req: Request<rpc::RemVanityRequest>,
258	) -> Result<Response<rpc::RemVanityResponse>, Status> {
259		let time = Instant::now();
260		let store = self.store();
261
262		let vanity = Normalized::new(&req.into_inner().vanity);
263
264		let Ok(id) = store.rem_vanity(vanity).await else {
265			return Err(Status::new(Code::Internal, "store operation failed"));
266		};
267
268		let res = Ok(Response::new(rpc::RemVanityResponse {
269			id: id.map(|id| id.to_string()),
270		}));
271
272		let time = time.elapsed();
273		info!(
274			time_ns = %time.as_nanos(),
275			success = %res.is_ok(),
276			"rpc processed in {:.6} seconds",
277			time.as_secs_f64()
278		);
279
280		res
281	}
282
283	#[instrument(level = "info", name = "rpc_get_statistics", skip_all, fields(store = %self.store.backend_name()))]
284	async fn get_statistics(
285		&self,
286		req: Request<rpc::GetStatisticsRequest>,
287	) -> Result<Response<rpc::GetStatisticsResponse>, Status> {
288		let time = Instant::now();
289		let store = self.store();
290
291		let rpc::GetStatisticsRequest {
292			link,
293			r#type: stat_type,
294			data,
295			time: stat_time,
296		} = req.into_inner();
297
298		let stat_desc = match (
299			link.map(TryInto::try_into).transpose(),
300			stat_type.map(|s| s.as_str().try_into()).transpose(),
301			data.map(TryInto::try_into).transpose(),
302			stat_time.map(|t| t.as_str().try_into()).transpose(),
303		) {
304			(Ok(link), Ok(stat_type), Ok(data), Ok(time)) => StatisticDescription {
305				link,
306				stat_type,
307				data,
308				time,
309			},
310			_ => {
311				return Err(Status::new(
312					Code::InvalidArgument,
313					"one of the supplied arguments is invalid",
314				))
315			}
316		};
317
318		let Ok(stats) = store.get_statistics(stat_desc).await else {
319			return Err(Status::new(Code::Internal, "store operation failed"));
320		};
321
322		let statistics = stats
323			.map(|(s, v)| rpc::StatisticWithValue {
324				link: s.link.to_string(),
325				r#type: s.stat_type.to_string(),
326				data: s.data.to_string(),
327				time: s.time.to_string(),
328				value: v.get(),
329			})
330			.collect();
331
332		let res = Ok(Response::new(rpc::GetStatisticsResponse { statistics }));
333
334		let time = time.elapsed();
335		info!(
336			time_ns = %time.as_nanos(),
337			success = %res.is_ok(),
338			"rpc processed in {:.6} seconds",
339			time.as_secs_f64()
340		);
341
342		res
343	}
344
345	#[instrument(level = "info", name = "rpc_rem_statistics", skip_all, fields(store = %self.store.backend_name()))]
346	async fn rem_statistics(
347		&self,
348		req: Request<rpc::RemStatisticsRequest>,
349	) -> Result<Response<rpc::RemStatisticsResponse>, Status> {
350		let time = Instant::now();
351		let store = self.store();
352
353		let rpc::RemStatisticsRequest {
354			link,
355			r#type: stat_type,
356			data,
357			time: stat_time,
358		} = req.into_inner();
359
360		let stat_desc = match (
361			link.map(TryInto::try_into).transpose(),
362			stat_type.map(|s| s.as_str().try_into()).transpose(),
363			data.map(TryInto::try_into).transpose(),
364			stat_time.map(|t| t.as_str().try_into()).transpose(),
365		) {
366			(Ok(link), Ok(stat_type), Ok(data), Ok(time)) => StatisticDescription {
367				link,
368				stat_type,
369				data,
370				time,
371			},
372			_ => {
373				return Err(Status::new(
374					Code::InvalidArgument,
375					"one of the supplied arguments is invalid",
376				))
377			}
378		};
379
380		let Ok(stats) = store.rem_statistics(stat_desc).await else {
381			return Err(Status::new(Code::Internal, "store operation failed"));
382		};
383
384		let statistics = stats
385			.map(|(s, v)| rpc::StatisticWithValue {
386				link: s.link.to_string(),
387				r#type: s.stat_type.to_string(),
388				data: s.data.to_string(),
389				time: s.time.to_string(),
390				value: v.get(),
391			})
392			.collect();
393
394		let res = Ok(Response::new(rpc::RemStatisticsResponse { statistics }));
395
396		let time = time.elapsed();
397		info!(
398			time_ns = %time.as_nanos(),
399			success = %res.is_ok(),
400			"rpc processed in {:.6} seconds",
401			time.as_secs_f64()
402		);
403
404		res
405	}
406}