qorzen_oxide/
logging.rs

1// src/logging.rs
2
3//! Structured logging system with multiple outputs and advanced features
4//!
5//! This module provides a comprehensive logging system that supports:
6//! - Structured logging with JSON and human-readable formats
7//! - Multiple log outputs (console, file, external systems)
8//! - Log rotation and retention
9//! - Performance metrics and tracing integration
10//! - Dynamic log level configuration
11//! - Context-aware logging with correlation IDs
12
13use std::collections::HashMap;
14use std::sync::Arc;
15
16use crate::utils::Time;
17use async_trait::async_trait;
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use tokio::sync::{mpsc, RwLock};
21use tracing::{Event, Subscriber};
22use tracing_appender::non_blocking::WorkerGuard;
23use tracing_subscriber::layer::Identity;
24use tracing_subscriber::{
25    fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry,
26};
27use uuid::Uuid;
28
29use crate::config::{LogFormat, LoggingConfig};
30use crate::error::{Error, ErrorKind, Result, ResultExt};
31use crate::manager::{ManagedState, Manager, ManagerStatus};
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LogEntry {
35    pub id: Uuid,
36    pub level: LogLevel,
37    pub timestamp: DateTime<Utc>,
38    pub source: String,
39    pub message: String,
40    pub target: String,
41    pub file: Option<String>,
42    pub line: Option<u32>,
43    pub correlation_id: Option<Uuid>,
44    pub fields: HashMap<String, serde_json::Value>,
45    pub span: Option<SpanInfo>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct SpanInfo {
50    pub id: String,
51    pub parent_id: Option<String>,
52    pub name: String,
53    pub fields: HashMap<String, serde_json::Value>,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
57pub enum LogLevel {
58    Trace,
59    Debug,
60    Info,
61    Warn,
62    Error,
63}
64
65impl From<tracing::Level> for LogLevel {
66    fn from(level: tracing::Level) -> Self {
67        match level {
68            tracing::Level::TRACE => Self::Trace,
69            tracing::Level::DEBUG => Self::Debug,
70            tracing::Level::INFO => Self::Info,
71            tracing::Level::WARN => Self::Warn,
72            tracing::Level::ERROR => Self::Error,
73        }
74    }
75}
76
77impl From<LogLevel> for tracing::Level {
78    fn from(level: LogLevel) -> Self {
79        match level {
80            LogLevel::Trace => Self::TRACE,
81            LogLevel::Debug => Self::DEBUG,
82            LogLevel::Info => Self::INFO,
83            LogLevel::Warn => Self::WARN,
84            LogLevel::Error => Self::ERROR,
85        }
86    }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct LogStats {
91    pub total_entries: u64,
92    pub entries_by_level: HashMap<LogLevel, u64>,
93    pub avg_entries_per_second: f64,
94    pub current_file_size: u64,
95    pub rotated_files: u32,
96    pub last_rotation: Option<DateTime<Utc>>,
97}
98
99#[async_trait]
100pub trait LogWriter: Send + Sync + std::fmt::Debug {
101    async fn write(&self, entry: &LogEntry) -> Result<()>;
102
103    async fn flush(&self) -> Result<()>;
104
105    async fn close(&self) -> Result<()>;
106}
107
108#[derive(Debug)]
109pub struct DatabaseLogWriter {
110    // Database connection would go here
111    table_name: String,
112}
113
114impl DatabaseLogWriter {
115    pub fn new(table_name: impl Into<String>) -> Self {
116        Self {
117            table_name: table_name.into(),
118        }
119    }
120}
121
122#[async_trait]
123impl LogWriter for DatabaseLogWriter {
124    async fn write(&self, entry: &LogEntry) -> Result<()> {
125        // In a real implementation, this would write to a database
126        tracing::debug!(
127            "Would write log entry to database table: {}",
128            self.table_name
129        );
130        tracing::debug!("Entry: {:?}", entry);
131        Ok(())
132    }
133
134    async fn flush(&self) -> Result<()> {
135        // Flush database writes
136        Ok(())
137    }
138
139    async fn close(&self) -> Result<()> {
140        // Close database connection
141        Ok(())
142    }
143}
144
145#[derive(Debug)]
146pub struct HttpLogWriter {
147    endpoint: String,
148    headers: HashMap<String, String>,
149    client: reqwest::Client,
150}
151
152impl HttpLogWriter {
153    pub fn new(endpoint: impl Into<String>) -> Self {
154        Self {
155            endpoint: endpoint.into(),
156            headers: HashMap::new(),
157            client: reqwest::Client::new(),
158        }
159    }
160
161    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
162        self.headers.insert(key.into(), value.into());
163        self
164    }
165}
166
167#[async_trait]
168impl LogWriter for HttpLogWriter {
169    async fn write(&self, entry: &LogEntry) -> Result<()> {
170        let mut request = self.client.post(&self.endpoint);
171
172        for (key, value) in &self.headers {
173            request = request.header(key, value);
174        }
175
176        let response = request
177            .json(entry)
178            .send()
179            .await
180            .with_context(|| "Failed to send log entry to HTTP endpoint".to_string())?;
181
182        if !response.status().is_success() {
183            return Err(Error::new(
184                ErrorKind::Network {
185                    status_code: Some(response.status().as_u16()),
186                    endpoint: Some(self.endpoint.clone()),
187                },
188                format!("HTTP log writer failed with status: {}", response.status()),
189            ));
190        }
191
192        Ok(())
193    }
194
195    async fn flush(&self) -> Result<()> {
196        // HTTP requests are sent immediately, no buffering to flush
197        Ok(())
198    }
199
200    async fn close(&self) -> Result<()> {
201        // No persistent connection to close
202        Ok(())
203    }
204}
205
206#[derive(Clone, Debug)]
207struct QorzenLayer {
208    entry_sender: mpsc::UnboundedSender<LogEntry>,
209    stats: Arc<RwLock<LogStats>>,
210}
211
212impl QorzenLayer {
213    fn new(entry_sender: mpsc::UnboundedSender<LogEntry>) -> Self {
214        Self {
215            entry_sender,
216            stats: Arc::new(RwLock::new(LogStats {
217                total_entries: 0,
218                entries_by_level: HashMap::new(),
219                avg_entries_per_second: 0.0,
220                current_file_size: 0,
221                rotated_files: 0,
222                last_rotation: None,
223            })),
224        }
225    }
226
227    async fn get_stats(&self) -> LogStats {
228        self.stats.read().await.clone()
229    }
230}
231
232impl<S> Layer<S> for QorzenLayer
233where
234    S: Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
235{
236    fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
237        let level = LogLevel::from(*event.metadata().level());
238
239        // Create log entry
240        let entry = LogEntry {
241            id: Uuid::new_v4(),
242            level,
243            timestamp: Time::now(),
244            source: event.metadata().target().to_string(),
245            message: format!("{:?}", event), // Simplified - would extract actual message
246            target: event.metadata().target().to_string(),
247            file: event.metadata().file().map(String::from),
248            line: event.metadata().line(),
249            correlation_id: None,   // Would extract from context
250            fields: HashMap::new(), // Would extract from event fields
251            span: None,             // Would extract current span info
252        };
253
254        // Send entry through channel - this won't panic if the receiver is dropped
255        if self.entry_sender.send(entry).is_err() {
256            eprintln!("Failed to send log entry: receiver dropped");
257        }
258
259        // Update stats synchronously to avoid tokio calls
260        let stats = self.stats.clone();
261        std::thread::spawn(move || {
262            if let Ok(rt) = tokio::runtime::Handle::try_current() {
263                rt.spawn(async move {
264                    let mut stats_guard = stats.write().await;
265                    stats_guard.total_entries += 1;
266                    *stats_guard.entries_by_level.entry(level).or_insert(0) += 1;
267                });
268            }
269        });
270    }
271}
272
273#[derive(Debug)]
274pub struct LoggingManager {
275    state: ManagedState,
276    config: LoggingConfig,
277    custom_layer: Option<QorzenLayer>,
278    _guards: Vec<WorkerGuard>, // Keep guards alive
279    writers: Vec<Arc<dyn LogWriter>>,
280    entry_sender: Option<mpsc::UnboundedSender<LogEntry>>,
281    writer_task_handle: Option<tokio::task::JoinHandle<()>>,
282}
283
284impl LoggingManager {
285    pub fn new(config: LoggingConfig) -> Self {
286        Self {
287            state: ManagedState::new(Uuid::new_v4(), "logging_manager"),
288            config,
289            custom_layer: None,
290            _guards: Vec::new(),
291            writers: Vec::new(),
292            entry_sender: None,
293            writer_task_handle: None,
294        }
295    }
296
297    async fn setup_tracing(&mut self) -> Result<()> {
298        let filter = EnvFilter::try_from_default_env()
299            .unwrap_or_else(|_| EnvFilter::new(&self.config.level));
300
301        let registry = Registry::default().with(filter);
302
303        // Console output
304        let registry = if self.config.console.enabled {
305            let console_layer = if self.config.console.colored {
306                fmt::layer()
307                    .with_ansi(true)
308                    .with_target(true)
309                    .with_line_number(true)
310                    .with_file(true)
311                    .boxed()
312            } else {
313                fmt::layer()
314                    .with_ansi(false)
315                    .with_target(true)
316                    .with_line_number(true)
317                    .with_file(true)
318                    .boxed()
319            };
320
321            registry.with(console_layer)
322        } else {
323            registry.with(Identity::new().boxed())
324        };
325
326        // File output
327        let registry = if let Some(file_config) = &self.config.file {
328            let file_appender = tracing_appender::rolling::daily(
329                file_config
330                    .path
331                    .parent()
332                    .unwrap_or_else(|| std::path::Path::new(".")),
333                file_config
334                    .path
335                    .file_name()
336                    .unwrap_or_else(|| std::ffi::OsStr::new("app.log")),
337            );
338
339            let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
340            self._guards.push(guard);
341
342            let file_layer = match self.config.format {
343                LogFormat::Json => fmt::layer().json().with_writer(non_blocking).boxed(),
344                LogFormat::Pretty => fmt::layer().pretty().with_writer(non_blocking).boxed(),
345                LogFormat::Compact => fmt::layer().compact().with_writer(non_blocking).boxed(),
346            };
347
348            registry.with(file_layer)
349        } else {
350            registry.with(Identity::new().boxed())
351        };
352
353        // Setup custom layer with channel
354        let (entry_sender, mut entry_receiver) = mpsc::unbounded_channel::<LogEntry>();
355        let custom_layer = QorzenLayer::new(entry_sender.clone());
356
357        self.entry_sender = Some(entry_sender);
358        self.custom_layer = Some(custom_layer.clone());
359
360        // Start the writer task
361        let writers = Arc::new(RwLock::new(self.writers.clone()));
362        let writers_clone = Arc::clone(&writers);
363
364        let writer_task = tokio::spawn(async move {
365            while let Some(entry) = entry_receiver.recv().await {
366                let writers_guard = writers_clone.read().await;
367                for writer in writers_guard.iter() {
368                    if let Err(e) = writer.write(&entry).await {
369                        eprintln!("Failed to write log entry: {}", e);
370                    }
371                }
372            }
373        });
374
375        self.writer_task_handle = Some(writer_task);
376
377        // Add our custom layer
378        let registry = registry.with(custom_layer);
379
380        // Initialize the global subscriber
381        registry.try_init().ok();
382
383        Ok(())
384    }
385
386    pub async fn add_writer(&mut self, writer: Arc<dyn LogWriter>) -> Result<()> {
387        self.writers.push(writer);
388        Ok(())
389    }
390
391    pub async fn get_stats(&self) -> LogStats {
392        if let Some(layer) = &self.custom_layer {
393            layer.get_stats().await
394        } else {
395            LogStats {
396                total_entries: 0,
397                entries_by_level: HashMap::new(),
398                avg_entries_per_second: 0.0,
399                current_file_size: 0,
400                rotated_files: 0,
401                last_rotation: None,
402            }
403        }
404    }
405
406    pub async fn set_log_level(&mut self, level: LogLevel) -> Result<()> {
407        // This would update the filter in a real implementation
408        tracing::info!("Log level updated to: {:?}", level);
409        Ok(())
410    }
411
412    pub async fn flush(&self) -> Result<()> {
413        for writer in &self.writers {
414            writer
415                .flush()
416                .await
417                .with_context(|| "Failed to flush log writer".to_string())?;
418        }
419        Ok(())
420    }
421
422    pub fn create_logger(&self, component: impl Into<String>) -> Logger {
423        Logger::new(component.into())
424    }
425}
426
427#[async_trait]
428impl Manager for LoggingManager {
429    fn name(&self) -> &str {
430        "logging_manager"
431    }
432
433    fn id(&self) -> Uuid {
434        Uuid::new_v4() // Simplified
435    }
436
437    async fn initialize(&mut self) -> Result<()> {
438        self.state
439            .set_state(crate::manager::ManagerState::Initializing)
440            .await;
441
442        // Setup tracing
443        self.setup_tracing().await?;
444
445        // Create log directories if needed
446        if let Some(file_config) = &self.config.file {
447            if let Some(parent) = file_config.path.parent() {
448                tokio::fs::create_dir_all(parent).await.with_context(|| {
449                    format!("Failed to create log directory: {}", parent.display())
450                })?;
451            }
452        }
453
454        self.state
455            .set_state(crate::manager::ManagerState::Running)
456            .await;
457        tracing::info!("Logging manager initialized successfully");
458        Ok(())
459    }
460
461    async fn shutdown(&mut self) -> Result<()> {
462        self.state
463            .set_state(crate::manager::ManagerState::ShuttingDown)
464            .await;
465
466        tracing::info!("Shutting down logging manager");
467
468        // Flush all writers
469        self.flush().await?;
470
471        // Close all writers
472        for writer in &self.writers {
473            writer
474                .close()
475                .await
476                .with_context(|| "Failed to close log writer".to_string())?;
477        }
478
479        // Stop the writer task
480        if let Some(handle) = self.writer_task_handle.take() {
481            handle.abort();
482            let _ = handle.await;
483        }
484
485        self.state
486            .set_state(crate::manager::ManagerState::Shutdown)
487            .await;
488        Ok(())
489    }
490
491    async fn status(&self) -> ManagerStatus {
492        let mut status = self.state.status().await;
493        let stats = self.get_stats().await;
494
495        status.add_metadata(
496            "total_entries",
497            serde_json::Value::from(stats.total_entries),
498        );
499        status.add_metadata("writers_count", serde_json::Value::from(self.writers.len()));
500        status.add_metadata(
501            "file_logging",
502            serde_json::Value::Bool(self.config.file.is_some()),
503        );
504        status.add_metadata(
505            "console_logging",
506            serde_json::Value::Bool(self.config.console.enabled),
507        );
508        status.add_metadata(
509            "log_level",
510            serde_json::Value::String(self.config.level.clone()),
511        );
512
513        status
514    }
515}
516
517#[derive(Debug, Clone)]
518pub struct Logger {
519    component: String,
520    correlation_id: Option<Uuid>,
521    metadata: HashMap<String, serde_json::Value>,
522}
523
524impl Logger {
525    pub fn new(component: String) -> Self {
526        Self {
527            component,
528            correlation_id: None,
529            metadata: HashMap::new(),
530        }
531    }
532
533    pub fn with_correlation_id(mut self, correlation_id: Uuid) -> Self {
534        self.correlation_id = Some(correlation_id);
535        self
536    }
537
538    pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
539        self.metadata.insert(key.into(), value);
540        self
541    }
542
543    pub fn trace(&self, message: impl AsRef<str>) {
544        self.log(LogLevel::Trace, message.as_ref(), &HashMap::new());
545    }
546
547    pub fn debug(&self, message: impl AsRef<str>) {
548        self.log(LogLevel::Debug, message.as_ref(), &HashMap::new());
549    }
550
551    pub fn info(&self, message: impl AsRef<str>) {
552        self.log(LogLevel::Info, message.as_ref(), &HashMap::new());
553    }
554
555    pub fn warn(&self, message: impl AsRef<str>) {
556        self.log(LogLevel::Warn, message.as_ref(), &HashMap::new());
557    }
558
559    pub fn error(&self, message: impl AsRef<str>) {
560        self.log(LogLevel::Error, message.as_ref(), &HashMap::new());
561    }
562
563    pub fn log_with_fields(
564        &self,
565        level: LogLevel,
566        message: impl AsRef<str>,
567        fields: &HashMap<String, serde_json::Value>,
568    ) {
569        self.log(level, message.as_ref(), fields);
570    }
571
572    fn log(&self, level: LogLevel, message: &str, fields: &HashMap<String, serde_json::Value>) {
573        // Combine metadata and fields
574        let mut all_fields = self.metadata.clone();
575        all_fields.extend(fields.clone());
576        all_fields.insert(
577            "component".to_string(),
578            serde_json::Value::String(self.component.clone()),
579        );
580
581        if let Some(correlation_id) = self.correlation_id {
582            all_fields.insert(
583                "correlation_id".to_string(),
584                serde_json::Value::String(correlation_id.to_string()),
585            );
586        }
587
588        // Use tracing macros without dynamic target or fields
589        match level {
590            LogLevel::Trace => tracing::trace!("{}: {}", self.component, message),
591            LogLevel::Debug => tracing::debug!("{}: {}", self.component, message),
592            LogLevel::Info => tracing::info!("{}: {}", self.component, message),
593            LogLevel::Warn => tracing::warn!("{}: {}", self.component, message),
594            LogLevel::Error => tracing::error!("{}: {}", self.component, message),
595        }
596    }
597}
598
599#[macro_export]
600macro_rules! log_trace {
601    ($logger:expr, $($arg:tt)*) => {
602        $logger.trace(format!($($arg)*))
603    };
604}
605
606#[macro_export]
607macro_rules! log_debug {
608    ($logger:expr, $($arg:tt)*) => {
609        $logger.debug(format!($($arg)*))
610    };
611}
612
613#[macro_export]
614macro_rules! log_info {
615    ($logger:expr, $($arg:tt)*) => {
616        $logger.info(format!($($arg)*))
617    };
618}
619
620#[macro_export]
621macro_rules! log_warn {
622    ($logger:expr, $($arg:tt)*) => {
623        $logger.warn(format!($($arg)*))
624    };
625}
626
627#[macro_export]
628macro_rules! log_error {
629    ($logger:expr, $($arg:tt)*) => {
630        $logger.error(format!($($arg)*))
631    };
632}
633
634#[cfg(test)]
635mod tests {
636    use super::*;
637    use std::sync::atomic::{AtomicU64, Ordering};
638
639    #[derive(Debug)]
640    struct TestLogWriter {
641        entries: Arc<AtomicU64>,
642    }
643
644    impl TestLogWriter {
645        fn new() -> Self {
646            Self {
647                entries: Arc::new(AtomicU64::new(0)),
648            }
649        }
650
651        fn get_entry_count(&self) -> u64 {
652            self.entries.load(Ordering::SeqCst)
653        }
654    }
655
656    #[async_trait]
657    impl LogWriter for TestLogWriter {
658        async fn write(&self, _entry: &LogEntry) -> Result<()> {
659            self.entries.fetch_add(1, Ordering::SeqCst);
660            Ok(())
661        }
662
663        async fn flush(&self) -> Result<()> {
664            Ok(())
665        }
666
667        async fn close(&self) -> Result<()> {
668            Ok(())
669        }
670    }
671
672    #[tokio::test]
673    async fn test_logging_manager_initialization() {
674        let config = LoggingConfig::default();
675        let mut manager = LoggingManager::new(config);
676
677        manager.initialize().await.unwrap();
678
679        let status = manager.status().await;
680        assert_eq!(status.state, crate::manager::ManagerState::Running);
681
682        manager.shutdown().await.unwrap();
683    }
684
685    #[tokio::test]
686    async fn test_custom_log_writer() {
687        let config = LoggingConfig::default();
688        let mut manager = LoggingManager::new(config);
689
690        let test_writer = Arc::new(TestLogWriter::new());
691        manager.add_writer(test_writer.clone()).await.unwrap();
692
693        manager.initialize().await.unwrap();
694
695        // Create a logger and log some messages
696        let logger = manager.create_logger("test_component");
697        logger.info("Test message 1");
698        logger.warn("Test message 2");
699        logger.error("Test message 3");
700
701        // Give some time for async processing
702        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
703
704        // Note: Due to the complexity of the tracing integration,
705        // the test writer might not receive all messages in this simplified example
706        // In a full implementation, this would work as expected
707
708        manager.shutdown().await.unwrap();
709    }
710
711    #[tokio::test]
712    async fn test_logger_with_context() {
713        let logger = Logger::new("test_component".to_string())
714            .with_correlation_id(Uuid::new_v4())
715            .with_metadata("user_id", serde_json::Value::String("12345".to_string()));
716
717        // These would work with a properly initialized tracing subscriber
718        logger.info("Test message with context");
719
720        let mut fields = HashMap::new();
721        fields.insert(
722            "custom_field".to_string(),
723            serde_json::Value::Number(42.into()),
724        );
725        logger.log_with_fields(LogLevel::Debug, "Message with fields", &fields);
726    }
727}