scuffle_bootstrap/
global.rs

1//! Global state for the application.
2//!
3//! # [`Global`] vs. [`GlobalWithoutConfig`]
4//!
5//! [`Global`] has a set of functions that are called at different stages of the
6//! application lifecycle. To use [`Global`], your application is expected to
7//! have a config type implementing [`ConfigParser`]. If your application does
8//! not have a config, consider using the [`GlobalWithoutConfig`] trait which is
9//! a simplified version of [`Global`].
10
11use std::sync::Arc;
12
13use crate::config::{ConfigParser, EmptyConfig};
14
15fn default_runtime_builder() -> tokio::runtime::Builder {
16    let worker_threads = std::env::var("TOKIO_WORKER_THREADS")
17        .unwrap_or_default()
18        .parse::<usize>()
19        .ok()
20        .or_else(|| std::thread::available_parallelism().ok().map(|p| p.get()));
21
22    let mut builder = if let Some(1) = worker_threads {
23        tokio::runtime::Builder::new_current_thread()
24    } else {
25        tokio::runtime::Builder::new_multi_thread()
26    };
27
28    if let Some(worker_threads) = worker_threads {
29        builder.worker_threads(worker_threads);
30    }
31
32    if let Ok(max_blocking_threads) = std::env::var("TOKIO_MAX_BLOCKING_THREADS")
33        .unwrap_or_default()
34        .parse::<usize>()
35    {
36        builder.max_blocking_threads(max_blocking_threads);
37    }
38
39    if !std::env::var("TOKIO_DISABLE_TIME")
40        .unwrap_or_default()
41        .parse::<bool>()
42        .ok()
43        .unwrap_or(false)
44    {
45        builder.enable_time();
46    }
47
48    if !std::env::var("TOKIO_DISABLE_IO")
49        .unwrap_or_default()
50        .parse::<bool>()
51        .ok()
52        .unwrap_or(false)
53    {
54        builder.enable_io();
55    }
56
57    if let Ok(thread_stack_size) = std::env::var("TOKIO_THREAD_STACK_SIZE").unwrap_or_default().parse::<usize>() {
58        builder.thread_stack_size(thread_stack_size);
59    }
60
61    if let Ok(global_queue_interval) = std::env::var("TOKIO_GLOBAL_QUEUE_INTERVAL")
62        .unwrap_or_default()
63        .parse::<u32>()
64    {
65        builder.global_queue_interval(global_queue_interval);
66    }
67
68    if let Ok(event_interval) = std::env::var("TOKIO_EVENT_INTERVAL").unwrap_or_default().parse::<u32>() {
69        builder.event_interval(event_interval);
70    }
71
72    if let Ok(max_io_events_per_tick) = std::env::var("TOKIO_MAX_IO_EVENTS_PER_TICK")
73        .unwrap_or_default()
74        .parse::<usize>()
75    {
76        builder.max_io_events_per_tick(max_io_events_per_tick);
77    }
78
79    builder
80}
81
82/// This trait is implemented for the global type of your application.
83/// It is intended to be used to store any global state of your application.
84/// When using the [`main!`](crate::main) macro, one instance of this type will
85/// be made available to all services.
86///
87/// Using this trait requires a config type implementing [`ConfigParser`].
88/// If your application does not have a config, consider using the
89/// [`GlobalWithoutConfig`] trait.
90///
91/// # See Also
92///
93/// - [`GlobalWithoutConfig`]
94/// - [`Service`](crate::Service)
95/// - [`main`](crate::main)
96pub trait Global: Send + Sync + 'static {
97    /// The config type for the global.
98    ///
99    /// This type is expected to implement [`ConfigParser`].
100    type Config: ConfigParser + Send + 'static;
101
102    /// Pre-initialization.
103    ///
104    /// Called before initializing the tokio runtime and loading the config.
105    /// Returning an error from this function will cause the process to
106    /// immediately exit without calling [`on_exit`](Global::on_exit) first.
107    #[inline(always)]
108    fn pre_init() -> anyhow::Result<()> {
109        Ok(())
110    }
111
112    /// Builds the tokio runtime for the process.
113    ///
114    /// If not overridden, a default runtime builder is used to build the
115    /// runtime. It uses the following environment variables:
116    /// - `TOKIO_WORKER_THREADS`: Number of worker threads to use. If 1, a
117    ///   current thread runtime is used.
118    ///
119    ///   See [`tokio::runtime::Builder::worker_threads`] for details.
120    /// - `TOKIO_MAX_BLOCKING_THREADS`: Maximum number of blocking threads.
121    ///
122    ///   See [`tokio::runtime::Builder::max_blocking_threads`] for details.
123    /// - `TOKIO_DISABLE_TIME`: If `true` disables time.
124    ///
125    ///   See [`tokio::runtime::Builder::enable_time`] for details.
126    /// - `TOKIO_DISABLE_IO`: If `true` disables IO.
127    ///
128    ///   See [`tokio::runtime::Builder::enable_io`] for details.
129    /// - `TOKIO_THREAD_STACK_SIZE`: Thread stack size.
130    ///
131    ///   See [`tokio::runtime::Builder::thread_stack_size`] for details.
132    /// - `TOKIO_GLOBAL_QUEUE_INTERVAL`: Global queue interval.
133    ///
134    ///   See [`tokio::runtime::Builder::global_queue_interval`] for details.
135    /// - `TOKIO_EVENT_INTERVAL`: Event interval.
136    ///
137    ///   See [`tokio::runtime::Builder::event_interval`] for details.
138    /// - `TOKIO_MAX_IO_EVENTS_PER_TICK`: Maximum IO events per tick.
139    ///
140    ///   See [`tokio::runtime::Builder::max_io_events_per_tick`] for details.
141    #[inline(always)]
142    fn tokio_runtime() -> tokio::runtime::Runtime {
143        default_runtime_builder().build().expect("runtime build")
144    }
145
146    /// Initialize the global.
147    ///
148    /// Called to initialize the global.
149    /// Returning an error from this function will cause the process to
150    /// immediately exit without calling [`on_exit`](Global::on_exit) first.
151    fn init(config: Self::Config) -> impl std::future::Future<Output = anyhow::Result<Arc<Self>>> + Send;
152
153    /// Called right before all services start.
154    ///
155    /// Returning an error from this function will prevent any service from
156    /// starting and [`on_exit`](Global::on_exit) will be called with the result
157    /// of this function.
158    #[inline(always)]
159    fn on_services_start(self: &Arc<Self>) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
160        std::future::ready(Ok(()))
161    }
162
163    /// Called after a service exits.
164    ///
165    /// `name` is the name of the service that exited and `result` is the result
166    /// the service exited with. Returning an error from this function will
167    /// stop all currently running services and [`on_exit`](Global::on_exit)
168    /// will be called with the result of this function.
169    #[inline(always)]
170    fn on_service_exit(
171        self: &Arc<Self>,
172        name: &'static str,
173        result: anyhow::Result<()>,
174    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
175        let _ = name;
176        std::future::ready(result)
177    }
178
179    /// Called after the shutdown is complete, right before exiting the
180    /// process.
181    ///
182    /// `result` is [`Err`](anyhow::Result) when the process exits due to an
183    /// error in one of the services or handler functions, otherwise `Ok(())`.
184    #[inline(always)]
185    fn on_exit(
186        self: &Arc<Self>,
187        result: anyhow::Result<()>,
188    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
189        std::future::ready(result)
190    }
191}
192
193/// Simplified version of [`Global`].
194///
195/// Implementing this trait will automatically implement [`Global`] for your
196/// type. This trait is intended to be used when you don't need a config for
197/// your global.
198///
199/// Refer to [`Global`] for details.
200pub trait GlobalWithoutConfig: Send + Sync + 'static {
201    /// Builds the tokio runtime for the process.
202    ///
203    /// If not overridden, a default runtime builder is used to build the
204    /// runtime. It uses the following environment variables:
205    /// - `TOKIO_WORKER_THREADS`: Number of worker threads to use. If 1, a
206    ///   current thread runtime is used.
207    ///
208    ///   See [`tokio::runtime::Builder::worker_threads`] for details.
209    /// - `TOKIO_MAX_BLOCKING_THREADS`: Maximum number of blocking threads.
210    ///
211    ///   See [`tokio::runtime::Builder::max_blocking_threads`] for details.
212    /// - `TOKIO_DISABLE_TIME`: If `true` disables time.
213    ///
214    ///   See [`tokio::runtime::Builder::enable_time`] for details.
215    /// - `TOKIO_DISABLE_IO`: If `true` disables IO.
216    ///
217    ///   See [`tokio::runtime::Builder::enable_io`] for details.
218    /// - `TOKIO_THREAD_STACK_SIZE`: Thread stack size.
219    ///
220    ///   See [`tokio::runtime::Builder::thread_stack_size`] for details.
221    /// - `TOKIO_GLOBAL_QUEUE_INTERVAL`: Global queue interval.
222    ///
223    ///   See [`tokio::runtime::Builder::global_queue_interval`] for details.
224    /// - `TOKIO_EVENT_INTERVAL`: Event interval.
225    ///
226    ///   See [`tokio::runtime::Builder::event_interval`] for details.
227    /// - `TOKIO_MAX_IO_EVENTS_PER_TICK`: Maximum IO events per tick.
228    ///
229    ///   See [`tokio::runtime::Builder::max_io_events_per_tick`] for details.
230    #[inline(always)]
231    fn tokio_runtime() -> tokio::runtime::Runtime {
232        default_runtime_builder().build().expect("runtime build")
233    }
234
235    /// Initialize the global.
236    ///
237    /// Called to initialize the global.
238    /// Returning an error from this function will cause the process to
239    /// immediately exit without calling [`on_exit`](Global::on_exit) first.
240    fn init() -> impl std::future::Future<Output = anyhow::Result<Arc<Self>>> + Send;
241
242    /// Called right before all services start.
243    ///
244    /// Returning an error from this function will prevent any service from
245    /// starting and [`on_exit`](Global::on_exit) will be called with the result
246    /// of this function.
247    #[inline(always)]
248    fn on_services_start(self: &Arc<Self>) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
249        std::future::ready(Ok(()))
250    }
251
252    /// Called after a service exits.
253    ///
254    /// `name` is the name of the service that exited and `result` is the result
255    /// the service exited with. Returning an error from this function will
256    /// stop all currently running services and [`on_exit`](Global::on_exit)
257    /// will be called with the result of this function.
258    #[inline(always)]
259    fn on_service_exit(
260        self: &Arc<Self>,
261        name: &'static str,
262        result: anyhow::Result<()>,
263    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
264        let _ = name;
265        std::future::ready(result)
266    }
267
268    /// Called after the shutdown is complete, right before exiting the
269    /// process.
270    ///
271    /// `result` is [`Err`](anyhow::Result) when the process exits due to an
272    /// error in one of the services or handler functions, otherwise `Ok(())`.
273    #[inline(always)]
274    fn on_exit(
275        self: &Arc<Self>,
276        result: anyhow::Result<()>,
277    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
278        std::future::ready(result)
279    }
280}
281
282impl<T: GlobalWithoutConfig> Global for T {
283    type Config = EmptyConfig;
284
285    #[inline(always)]
286    fn tokio_runtime() -> tokio::runtime::Runtime {
287        <T as GlobalWithoutConfig>::tokio_runtime()
288    }
289
290    #[inline(always)]
291    fn init(_: Self::Config) -> impl std::future::Future<Output = anyhow::Result<Arc<Self>>> + Send {
292        <T as GlobalWithoutConfig>::init()
293    }
294
295    #[inline(always)]
296    fn on_services_start(self: &Arc<Self>) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
297        <T as GlobalWithoutConfig>::on_services_start(self)
298    }
299
300    #[inline(always)]
301    fn on_service_exit(
302        self: &Arc<Self>,
303        name: &'static str,
304        result: anyhow::Result<()>,
305    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
306        <T as GlobalWithoutConfig>::on_service_exit(self, name, result)
307    }
308
309    #[inline(always)]
310    fn on_exit(
311        self: &Arc<Self>,
312        result: anyhow::Result<()>,
313    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
314        <T as GlobalWithoutConfig>::on_exit(self, result)
315    }
316}
317
318#[cfg_attr(all(test, coverage_nightly), coverage(off))]
319#[cfg(test)]
320mod tests {
321    use std::sync::Arc;
322    use std::thread;
323
324    use super::{Global, GlobalWithoutConfig};
325    use crate::EmptyConfig;
326
327    struct TestGlobal;
328
329    impl Global for TestGlobal {
330        type Config = ();
331
332        async fn init(_config: Self::Config) -> anyhow::Result<std::sync::Arc<Self>> {
333            Ok(Arc::new(Self))
334        }
335    }
336
337    #[tokio::test]
338    async fn default_global() {
339        thread::spawn(|| {
340            // To get the coverage
341            TestGlobal::tokio_runtime();
342        });
343
344        assert!(matches!(TestGlobal::pre_init(), Ok(())));
345        let global = TestGlobal::init(()).await.unwrap();
346        assert!(matches!(global.on_services_start().await, Ok(())));
347
348        assert!(matches!(global.on_exit(Ok(())).await, Ok(())));
349        assert!(global.on_exit(Err(anyhow::anyhow!("error"))).await.is_err());
350
351        assert!(matches!(global.on_service_exit("test", Ok(())).await, Ok(())));
352        assert!(global.on_service_exit("test", Err(anyhow::anyhow!("error"))).await.is_err());
353    }
354
355    struct TestGlobalWithoutConfig;
356
357    impl GlobalWithoutConfig for TestGlobalWithoutConfig {
358        async fn init() -> anyhow::Result<std::sync::Arc<Self>> {
359            Ok(Arc::new(Self))
360        }
361    }
362
363    #[tokio::test]
364    async fn default_global_no_config() {
365        thread::spawn(|| {
366            // To get the coverage
367            <TestGlobalWithoutConfig as Global>::tokio_runtime();
368        });
369
370        assert!(matches!(TestGlobalWithoutConfig::pre_init(), Ok(())));
371        <TestGlobalWithoutConfig as Global>::init(EmptyConfig).await.unwrap();
372        let global = <TestGlobalWithoutConfig as GlobalWithoutConfig>::init().await.unwrap();
373        assert!(matches!(Global::on_services_start(&global).await, Ok(())));
374
375        assert!(matches!(Global::on_exit(&global, Ok(())).await, Ok(())));
376        assert!(Global::on_exit(&global, Err(anyhow::anyhow!("error"))).await.is_err());
377
378        assert!(matches!(Global::on_service_exit(&global, "test", Ok(())).await, Ok(())));
379        assert!(
380            Global::on_service_exit(&global, "test", Err(anyhow::anyhow!("error")))
381                .await
382                .is_err()
383        );
384    }
385}