links/
server.rs

1//! Links redirector server utilities, including:
2//! - Listeners, which listen for incoming network traffic
3//! - Acceptors, which accept incoming connections and direct it to the handlers
4//! - Handlers, which handle requests, doing HTTP redirects or RPC calls
5//! - miscellaneous functions used by the server binary
6//!
7//! # Listeners
8//! A listener controls a network socket and calls its associated acceptor when
9//! a new connection is received. One listener exists for every socket that the
10//! links redirector server listens on. Listeners can be added and removed while
11//! the server is active.
12//!
13//! # Acceptors
14//! Acceptors are the bridge between listeners and handlers. They hold
15//! (references to) all necessary state used by themselves and the handlers. One
16//! acceptor exists for every combination of listener + encryption + handler
17//! type, e.g. an unencrypted TCP acceptor for HTTP, an encrypted TCP/TLS
18//! acceptor for RPC, etc. Acceptors are predefined for the lifetime of the
19//! redirector server, and initialized at server startup. Acceptors initially
20//! process incoming connections (doing handshakes and de/en-cryption if
21//! necessary), and then call their associated handler.
22//!
23//! # Handlers
24//! Handlers are responsible for all application logic. A handler is an async
25//! function called from an acceptor. There is one predefined handler for each
26//! kind of request: currently one external HTTP redirector, one HTTP to HTTPS
27//! redirector, and one RPC handler.
28
29use std::{
30	fmt::{Debug, Formatter, Result as FmtResult},
31	net::{IpAddr, Ipv6Addr, SocketAddr},
32	os::raw::c_int,
33	sync::Arc,
34	thread,
35};
36
37use hyper::{rt, server::conn::http2, service::service_fn, Request};
38use hyper_util::{
39	rt::{TokioExecutor, TokioIo},
40	server::conn::auto::Builder,
41	service::TowerToHyperService,
42};
43use links_id::Id;
44use links_normalized::{Link, Normalized};
45use parking_lot::Mutex;
46use socket2::{Domain, Protocol as SocketProtocol, Socket, Type};
47use strum::{Display as EnumDisplay, EnumString};
48use tokio::{
49	io::{AsyncRead, AsyncWrite, Error as IoError},
50	net::{TcpListener, TcpStream},
51	spawn,
52	task::JoinHandle,
53};
54use tokio_rustls::{rustls::ServerConfig, TlsAcceptor};
55use tonic::{
56	codegen::{CompressionEncoding, InterceptedService},
57	service::Routes,
58	transport::Server as RpcServer,
59};
60use tower::util::ServiceExt;
61use tracing::{debug, error, trace, warn};
62
63use crate::{
64	api::{self, Api, LinksServer},
65	certs::CertificateResolver,
66	config::{Config, ListenAddress},
67	redirector::{https_redirector, redirector},
68	stats::ExtraStatisticInfo,
69	store::{Current, Store},
70};
71
72/// Number of incoming connections that can be kept in the TCP socket backlog of
73/// a listener (see `listen`'s [linux man page] or [winsock docs] for details)
74///
75/// [linux man page]: https://linux.die.net/man/2/listen
76/// [winsock docs]: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-listen
77const LISTENER_TCP_BACKLOG_SIZE: c_int = 1024;
78
79/// A handler that does external HTTP redirects using information from the
80/// provided store. Extra information for statistics can be passed via
81/// `stat_info`.
82pub async fn http_handler(
83	stream: impl rt::Read + rt::Write + Send + Unpin + 'static,
84	store: Store,
85	config: &'static Config,
86	stat_info: ExtraStatisticInfo,
87) {
88	let redirector_service = service_fn(move |req: Request<_>| {
89		redirector(req, store.clone(), config.redirector(), stat_info.clone())
90	});
91
92	if let Err(err) = Builder::new(TokioExecutor::new())
93		.serve_connection(stream, redirector_service)
94		.await
95	{
96		error!(?err, "Error while handling HTTP connection");
97	}
98}
99
100/// A handler that redirects incoming requests to their original URL, but with
101/// the HTTPS scheme instead.
102///
103/// # Warning
104/// This function does not know the original URL scheme. If used as the handler
105/// for HTTPS requests, this might create a redirect loop.
106pub async fn http_to_https_handler(
107	stream: impl rt::Read + rt::Write + Send + Unpin + 'static,
108	config: &'static Config,
109) {
110	let redirector_service =
111		service_fn(move |req: Request<_>| https_redirector(req, config.redirector()));
112
113	if let Err(err) = Builder::new(TokioExecutor::new())
114		.serve_connection(stream, redirector_service)
115		.await
116	{
117		error!(?err, "Error while handling HTTP connection");
118	}
119}
120
121/// Handler processing RPC API calls.
122pub async fn rpc_handler(
123	stream: impl rt::Read + rt::Write + Send + Unpin + 'static,
124	service: Routes,
125) {
126	if let Err(rpc_err) = http2::Builder::new(TokioExecutor::new())
127		.serve_connection(
128			stream,
129			TowerToHyperService::new(
130				service.map_request(|req: Request<_>| req.map(tonic::body::boxed)),
131			),
132		)
133		.await
134	{
135		error!(?rpc_err, "Error while handling gRPC connection");
136	}
137}
138
139/// A trait for defining links server acceptors.
140///
141/// For more info about acceptors in general, please see the [module-level
142/// documentation][mod].
143///
144/// [mod]: crate::server
145#[async_trait::async_trait]
146pub trait Acceptor<S: AsyncRead + AsyncWrite + Send + Unpin + 'static>:
147	Send + Sync + 'static
148{
149	/// Accept an incoming connection in `stream` from `remote_addr` to
150	/// `local_addr`. This function should [spawn a task][spawn] to handle the
151	/// request using this acceptor's associated handler.
152	///
153	/// [spawn]: tokio::task
154	async fn accept(&self, stream: S, local_addr: SocketAddr, remote_addr: SocketAddr);
155
156	/// Get the [`Protocol`] that this acceptor processes
157	fn protocol(&self) -> Protocol;
158}
159
160/// An acceptor for plaintext (unencrypted) HTTP requests. Supports HTTP/1.0,
161/// HTTP/1.1, and HTTP/2 (in its rare unencrypted variety usually not found in
162/// any browsers).
163#[derive(Debug)]
164pub struct PlainHttpAcceptor {
165	config: &'static Config,
166	current_store: &'static Current,
167}
168
169impl PlainHttpAcceptor {
170	/// Create a new [`PlainHttpAcceptor`] with the provided [`Config`] and
171	/// [`Current`]
172	///
173	/// # Memory
174	/// This function leaks memory, and should therefore not be called an
175	/// unbounded number of times
176	pub fn new(config: &'static Config, current_store: &'static Current) -> &'static Self {
177		Box::leak(Box::new(Self {
178			config,
179			current_store,
180		}))
181	}
182}
183
184#[async_trait::async_trait]
185impl Acceptor<TcpStream> for PlainHttpAcceptor {
186	async fn accept(&self, stream: TcpStream, local_addr: SocketAddr, remote_addr: SocketAddr) {
187		let config = self.config;
188		let current_store = self.current_store;
189
190		spawn(async move {
191			trace!("New plain connection from {remote_addr} on {local_addr}");
192
193			if config.https_redirect() {
194				http_to_https_handler(TokioIo::new(stream), config).await;
195			} else {
196				http_handler(
197					TokioIo::new(stream),
198					current_store.get(),
199					config,
200					ExtraStatisticInfo::default(),
201				)
202				.await;
203			}
204		});
205	}
206
207	fn protocol(&self) -> Protocol {
208		Protocol::Http
209	}
210}
211
212/// An acceptor for TLS-encrypted HTTPS requests. Supports HTTP/1.0, HTTP/1.1,
213/// and HTTP/2.
214pub struct TlsHttpAcceptor {
215	config: &'static Config,
216	current_store: &'static Current,
217	tls_acceptor: TlsAcceptor,
218}
219
220impl TlsHttpAcceptor {
221	/// Create a new [`TlsHttpAcceptor`] with the provided [`Config`],
222	/// [`Current`], and a reference-counted (via [`Arc`])
223	/// [`CertificateResolver`]
224	///
225	/// # Memory
226	/// This function leaks memory, and should therefore not be called an
227	/// unbounded number of times
228	pub fn new(
229		config: &'static Config,
230		current_store: &'static Current,
231		cert_resolver: Arc<CertificateResolver>,
232	) -> &'static Self {
233		let mut server_config = ServerConfig::builder()
234			.with_no_client_auth()
235			.with_cert_resolver(cert_resolver);
236		server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
237
238		let server_config = Arc::new(server_config);
239		let tls_acceptor = TlsAcceptor::from(server_config);
240
241		Box::leak(Box::new(Self {
242			config,
243			current_store,
244			tls_acceptor,
245		}))
246	}
247}
248
249#[async_trait::async_trait]
250impl Acceptor<TcpStream> for TlsHttpAcceptor {
251	async fn accept(&self, stream: TcpStream, local_addr: SocketAddr, remote_addr: SocketAddr) {
252		let config = self.config;
253		let current_store = self.current_store;
254		let tls_acceptor = self.tls_acceptor.clone();
255
256		spawn(async move {
257			trace!("New TLS connection from {remote_addr} on {local_addr}");
258
259			match tls_acceptor.accept(stream).await {
260				Ok(stream) => {
261					let tls_conn = stream.get_ref().1;
262					let extra_info = ExtraStatisticInfo {
263						tls_sni: tls_conn.server_name().map(Arc::from),
264						tls_version: tls_conn.protocol_version(),
265						tls_cipher_suite: tls_conn.negotiated_cipher_suite(),
266					};
267
268					http_handler(
269						TokioIo::new(stream),
270						current_store.get(),
271						config,
272						extra_info,
273					)
274					.await;
275				}
276				Err(err) => warn!("Error accepting incoming TLS connection: {err:?}"),
277			}
278		});
279	}
280
281	fn protocol(&self) -> Protocol {
282		Protocol::Https
283	}
284}
285
286impl Debug for TlsHttpAcceptor {
287	fn fmt(&self, fmt: &mut Formatter<'_>) -> FmtResult {
288		#[derive(Debug)]
289		struct TlsAcceptor {}
290
291		fmt.debug_struct("TlsHttpAcceptor")
292			.field("config", self.config)
293			.field("current_store", self.current_store)
294			.field("tls_acceptor", &TlsAcceptor {})
295			.finish()
296	}
297}
298
299/// An acceptor for plaintext (unencrypted) RPC calls. Supports `gRPC` over
300/// unencrypted HTTP/2.
301#[derive(Debug)]
302pub struct PlainRpcAcceptor {
303	service: Mutex<Routes>,
304}
305
306impl PlainRpcAcceptor {
307	/// Create a new [`PlainRpcAcceptor`] with the provided [`Config`] and
308	/// [`Current`]
309	///
310	/// # Memory
311	/// This function leaks memory, and should therefore not be called an
312	/// unbounded number of times
313	pub fn new(config: &'static Config, current_store: &'static Current) -> &'static Self {
314		let service = RpcServer::builder()
315			.add_service(InterceptedService::new(
316				LinksServer::new(Api::new(current_store))
317					.send_compressed(CompressionEncoding::Gzip)
318					.accept_compressed(CompressionEncoding::Gzip),
319				api::get_auth_checker(config),
320			))
321			.into_service()
322			.prepare();
323
324		Box::leak(Box::new(Self {
325			service: Mutex::new(service),
326		}))
327	}
328}
329
330#[async_trait::async_trait]
331impl Acceptor<TcpStream> for PlainRpcAcceptor {
332	async fn accept(&self, stream: TcpStream, local_addr: SocketAddr, remote_addr: SocketAddr) {
333		let service = self.service.lock().clone();
334
335		spawn(async move {
336			trace!("New plain connection from {remote_addr} on {local_addr}");
337
338			rpc_handler(TokioIo::new(stream), service).await;
339		});
340	}
341
342	fn protocol(&self) -> Protocol {
343		Protocol::Grpc
344	}
345}
346
347/// An acceptor for TLS-encrypted RPC calls. Supports `gRPC` over
348/// HTTP/2 with HTTPS.
349pub struct TlsRpcAcceptor {
350	service: Arc<Mutex<Routes>>,
351	tls_acceptor: TlsAcceptor,
352}
353
354impl TlsRpcAcceptor {
355	/// Create a new [`TlsRpcAcceptor`] with the provided [`Config`],
356	/// [`Current`], and a reference-counted (via [`Arc`])
357	/// [`CertificateResolver`]
358	///
359	/// # Memory
360	/// This function leaks memory, and should therefore not be called an
361	/// unbounded number of times
362	pub fn new(
363		config: &'static Config,
364		current_store: &'static Current,
365		cert_resolver: Arc<CertificateResolver>,
366	) -> &'static Self {
367		let mut server_config = ServerConfig::builder()
368			.with_no_client_auth()
369			.with_cert_resolver(cert_resolver);
370		server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
371
372		let server_config = Arc::new(server_config);
373		let tls_acceptor = TlsAcceptor::from(server_config);
374
375		let service = RpcServer::builder()
376			.add_service(InterceptedService::new(
377				LinksServer::new(Api::new(current_store))
378					.send_compressed(CompressionEncoding::Gzip)
379					.accept_compressed(CompressionEncoding::Gzip),
380				api::get_auth_checker(config),
381			))
382			.into_service()
383			.prepare();
384
385		Box::leak(Box::new(Self {
386			service: Arc::new(Mutex::new(service)),
387			tls_acceptor,
388		}))
389	}
390}
391
392#[async_trait::async_trait]
393impl Acceptor<TcpStream> for TlsRpcAcceptor {
394	async fn accept(&self, stream: TcpStream, local_addr: SocketAddr, remote_addr: SocketAddr) {
395		let tls_acceptor = self.tls_acceptor.clone();
396		let service = self.service.lock().clone();
397
398		spawn(async move {
399			trace!("New TLS connection from {remote_addr} on {local_addr}");
400
401			match tls_acceptor.accept(stream).await {
402				Ok(stream) => rpc_handler(TokioIo::new(stream), service).await,
403				Err(err) => warn!("Error accepting incoming TLS connection: {err:?}"),
404			}
405		});
406	}
407
408	fn protocol(&self) -> Protocol {
409		Protocol::Grpcs
410	}
411}
412
413impl Debug for TlsRpcAcceptor {
414	fn fmt(&self, fmt: &mut Formatter<'_>) -> FmtResult {
415		#[derive(Debug)]
416		struct TlsAcceptor {}
417
418		fmt.debug_struct("TlsRpcAcceptor")
419			.field("service", &self.service)
420			.field("tls_acceptor", &TlsAcceptor {})
421			.finish()
422	}
423}
424
425/// The protocols that links redirector servers can listen on
426#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumString, EnumDisplay)]
427#[strum(serialize_all = "snake_case", ascii_case_insensitive)]
428pub enum Protocol {
429	/// HTTP/1.0, HTTP/1.1, and HTTP/2 (h2c) over TCP (unencrypted)
430	Http,
431	/// HTTP/1.0, HTTP/1.1, and HTTP/2 (h2) over TCP with TLS
432	Https,
433	/// gRPC over HTTP/2 (h2c) over TCP (unencrypted)
434	Grpc,
435	/// gRPC over HTTP/2 (h2) over TCP with TLS
436	Grpcs,
437}
438
439impl Protocol {
440	/// Default port for the `grpcs` protocol
441	pub const GRPCS_DEFAULT_PORT: u16 = 530;
442	/// Default port for the `grpc` protocol
443	pub const GRPC_DEFAULT_PORT: u16 = 50051;
444	/// Default port for the `https` protocol
445	pub const HTTPS_DEFAULT_PORT: u16 = 443;
446	/// Default port for the `http` protocol
447	pub const HTTP_DEFAULT_PORT: u16 = 80;
448
449	/// Get the default port for this [`Protocol`]
450	#[must_use]
451	pub const fn default_port(self) -> u16 {
452		match self {
453			Self::Http => Self::HTTP_DEFAULT_PORT,
454			Self::Https => Self::HTTPS_DEFAULT_PORT,
455			Self::Grpc => Self::GRPC_DEFAULT_PORT,
456			Self::Grpcs => Self::GRPCS_DEFAULT_PORT,
457		}
458	}
459}
460
461/// A links redirector listener.
462///
463/// Listens for incoming network connections on a specified address using a
464/// specified protocol in an async task in the background. On drop, the async
465/// task is aborted in order to stop listening.
466#[derive(Debug)]
467pub struct Listener {
468	/// The address this listener will listen on. No address indicates that this
469	/// listener will accept all traffic on any address (IPv4 and IPv6),
470	/// `0.0.0.0` means any IPv4 address (but not IPv6), `[::]` means any IPv6
471	/// address (but not IPv4).
472	pub addr: Option<IpAddr>,
473	/// The port this listener will listen on. Currently, this is a TCP port,
474	/// but may in the future also additionally indicate a UDP port.
475	pub port: u16,
476	/// The protocol of the acceptor/handler this listener uses to process
477	/// requests
478	pub proto: Protocol,
479	handle: JoinHandle<()>,
480}
481
482impl Listener {
483	/// Create a new [`Listener`] on the specified address, which will use the
484	/// specified acceptor to accept incoming connections. If no address is
485	/// specified, the listener will listen on all IPv4 and IPv6 addresses.
486	/// Address `0.0.0.0` can be used to listen on all IPv4 (but not IPv6)
487	/// addresses, and address `[::]` can be used to listen on all IPv6 (but not
488	/// IPv4) addresses. If the port is not specified, the protocol's default
489	/// port will be used (see [`Protocol`] for details).
490	///
491	/// **Note:**
492	/// Support for dual stack sockets (IPv4 and IPv6 in one socket, available
493	/// in links via an empty address) is not universal on all platforms (such
494	/// as some BSDs). On those platforms, an empty address and `[::]` will
495	/// behave the same, i.e. an empty address will only listen on IPv6, not
496	/// IPv4. To get the desired result (IPv4 *and* IPv6), you must use two
497	/// listeners, one listening on `0.0.0.0` and the other on `[::]`.
498	///
499	/// # Drop
500	/// When dropped, a listener will wait until its internal task is fully
501	/// cancelled, which can take some time to complete. Dropping a listener
502	/// should therefore be considered blocking, and only done in synchronous
503	/// contexts or via the [`spawn_blocking` function][spawn_blocking].
504	/// Additionally, because the `drop` function blocks its thread until the
505	/// async runtime completes the cancellation of the task in the background,
506	/// a listener requires more than one thread to drop, and can not
507	/// successfully be dropped inside of a single-threaded tokio runtime (the
508	/// entire program will block indefinitely).
509	///
510	/// [spawn_blocking]: fn@tokio::task::spawn_blocking
511	///
512	/// # Errors
513	/// This function returns an error if it can not set up the listening
514	/// socket.
515	#[allow(clippy::unused_async)] // TODO
516	pub async fn new(
517		addr: Option<IpAddr>,
518		port: Option<u16>,
519		acceptor: &'static impl Acceptor<TcpStream>,
520	) -> Result<Self, IoError> {
521		let proto = acceptor.protocol();
522		let port = port.unwrap_or_else(|| proto.default_port());
523		let socket_addr = (addr.unwrap_or(IpAddr::V6(Ipv6Addr::UNSPECIFIED)), port).into();
524
525		let socket = Socket::new(
526			Domain::for_address(socket_addr),
527			Type::STREAM,
528			Some(SocketProtocol::TCP),
529		)?;
530
531		// `SO_REUSEADDR` has different meanings across platforms:
532		// - On Windows, it allows multiple listeners per socket (which is very bad)
533		// - On Unix-like OSs, it allows a process to bind to a recently-closed socket
534		//   (which can occasionally speed up socket initialization)
535		socket.set_reuse_address(cfg!(unix))?;
536		// Set the socket into IPv6-only mode if the address is configured as IPv6 (even
537		// if it's `[::]`). This is done because the default depends on the OS and
538		// sometimes user configuration, and we want consistency across platforms.
539		if socket_addr.is_ipv6() {
540			socket.set_only_v6(addr.is_some())?;
541		}
542		// Required for Tokio to properly use async listeners
543		socket.set_nonblocking(true)?;
544		// Improves latency when sending responses
545		socket.set_nodelay(true)?;
546
547		socket.bind(&socket_addr.into())?;
548		socket.listen(LISTENER_TCP_BACKLOG_SIZE)?;
549		let listener = TcpListener::from_std(socket.into())?;
550
551		let handle = spawn(async move {
552			loop {
553				match listener.accept().await {
554					Ok((stream, remote_addr)) => {
555						acceptor.accept(stream, socket_addr, remote_addr).await;
556					}
557					Err(err) => {
558						warn!("Error accepting TCP connection on {socket_addr}: {err:?}");
559					}
560				}
561			}
562		});
563
564		debug!("Opened new listener on {}", ListenAddress {
565			protocol: proto,
566			address: addr,
567			port: Some(port),
568		});
569
570		Ok(Self {
571			addr,
572			port,
573			proto,
574			handle,
575		})
576	}
577
578	/// Get the [`ListenAddress`] of this listener
579	#[must_use]
580	pub const fn listen_address(&self) -> ListenAddress {
581		ListenAddress {
582			protocol: self.proto,
583			address: self.addr,
584			port: Some(self.port),
585		}
586	}
587}
588
589impl Drop for Listener {
590	/// Cancel the task responsible for listening
591	///
592	/// # Blocking
593	/// This functions blocks the current thread until the task is fully
594	/// aborted. Additionally, if used in the context of a single-threaded tokio
595	/// runtime, this function can completely block the entire program.
596	fn drop(&mut self) {
597		trace!("Closing listener on {}", self.listen_address());
598
599		self.handle.abort();
600
601		while !self.handle.is_finished() {
602			thread::yield_now();
603		}
604
605		debug!("Closed listener on {}", self.listen_address());
606	}
607}
608
609/// Set up the links store, optionally setting an example redirect
610/// (`example` -> `9dDbKpJP` -> `https://example.com/`).
611///
612/// # Errors
613/// This function returns an error if construction of the [`Store`] (using
614/// `Store::new`) fails or if the example redirect can not be set when
615/// requested.
616pub async fn store_setup(config: &Config, example_redirect: bool) -> Result<Store, anyhow::Error> {
617	let store = Store::new(config.store(), &config.store_config()).await?;
618
619	if example_redirect {
620		store
621			.set_redirect(Id::try_from(Id::MAX)?, Link::new("https://example.com/")?)
622			.await?;
623		store
624			.set_vanity(Normalized::new("example"), Id::try_from(Id::MAX)?)
625			.await?;
626	}
627
628	Ok(store)
629}
630
631#[cfg(test)]
632mod tests {
633	use std::time::{Duration, Instant};
634
635	use super::*;
636
637	/// A mock [`Acceptor`] that does nothing, while pretending to do HTTP
638	#[derive(Debug, Copy, Clone)]
639	struct UnAcceptor;
640
641	#[async_trait::async_trait]
642	impl Acceptor<TcpStream> for UnAcceptor {
643		async fn accept(&self, _: TcpStream, _: SocketAddr, _: SocketAddr) {
644			spawn(async {});
645		}
646
647		fn protocol(&self) -> Protocol {
648			Protocol::Http
649		}
650	}
651
652	#[tokio::test(flavor = "multi_thread")]
653	async fn listener_new_drop() {
654		let addr = Some([127, 0, 0, 1].into());
655		let port = Some(8000);
656
657		let listener = Listener::new(addr, port, &UnAcceptor).await.unwrap();
658
659		let start = Instant::now();
660		drop(listener);
661		let duration = start.elapsed();
662
663		let _listener = Listener::new(addr, port, &UnAcceptor).await.unwrap();
664
665		assert!(
666			dbg!(duration) < Duration::from_millis(if cfg!(debug_assertions) { 100 } else { 1 })
667		);
668	}
669
670	#[tokio::test]
671	async fn fn_store_setup() {
672		let with_example = store_setup(&Config::new(None), true).await.unwrap();
673		let without_example = store_setup(&Config::new(None), false).await.unwrap();
674
675		assert_eq!(
676			with_example.get_vanity("example".into()).await.unwrap(),
677			Some(Id::MAX.try_into().unwrap())
678		);
679
680		assert_eq!(
681			without_example.get_vanity("example".into()).await.unwrap(),
682			None
683		);
684	}
685}