1use std::collections::HashMap;
14use std::sync::Arc;
15
16use crate::utils::Time;
17use async_trait::async_trait;
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use tokio::sync::{mpsc, RwLock};
21use tracing::{Event, Subscriber};
22use tracing_appender::non_blocking::WorkerGuard;
23use tracing_subscriber::layer::Identity;
24use tracing_subscriber::{
25 fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry,
26};
27use uuid::Uuid;
28
29use crate::config::{LogFormat, LoggingConfig};
30use crate::error::{Error, ErrorKind, Result, ResultExt};
31use crate::manager::{ManagedState, Manager, ManagerStatus};
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LogEntry {
35 pub id: Uuid,
36 pub level: LogLevel,
37 pub timestamp: DateTime<Utc>,
38 pub source: String,
39 pub message: String,
40 pub target: String,
41 pub file: Option<String>,
42 pub line: Option<u32>,
43 pub correlation_id: Option<Uuid>,
44 pub fields: HashMap<String, serde_json::Value>,
45 pub span: Option<SpanInfo>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct SpanInfo {
50 pub id: String,
51 pub parent_id: Option<String>,
52 pub name: String,
53 pub fields: HashMap<String, serde_json::Value>,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
57pub enum LogLevel {
58 Trace,
59 Debug,
60 Info,
61 Warn,
62 Error,
63}
64
65impl From<tracing::Level> for LogLevel {
66 fn from(level: tracing::Level) -> Self {
67 match level {
68 tracing::Level::TRACE => Self::Trace,
69 tracing::Level::DEBUG => Self::Debug,
70 tracing::Level::INFO => Self::Info,
71 tracing::Level::WARN => Self::Warn,
72 tracing::Level::ERROR => Self::Error,
73 }
74 }
75}
76
77impl From<LogLevel> for tracing::Level {
78 fn from(level: LogLevel) -> Self {
79 match level {
80 LogLevel::Trace => Self::TRACE,
81 LogLevel::Debug => Self::DEBUG,
82 LogLevel::Info => Self::INFO,
83 LogLevel::Warn => Self::WARN,
84 LogLevel::Error => Self::ERROR,
85 }
86 }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct LogStats {
91 pub total_entries: u64,
92 pub entries_by_level: HashMap<LogLevel, u64>,
93 pub avg_entries_per_second: f64,
94 pub current_file_size: u64,
95 pub rotated_files: u32,
96 pub last_rotation: Option<DateTime<Utc>>,
97}
98
99#[async_trait]
100pub trait LogWriter: Send + Sync + std::fmt::Debug {
101 async fn write(&self, entry: &LogEntry) -> Result<()>;
102
103 async fn flush(&self) -> Result<()>;
104
105 async fn close(&self) -> Result<()>;
106}
107
108#[derive(Debug)]
109pub struct DatabaseLogWriter {
110 table_name: String,
112}
113
114impl DatabaseLogWriter {
115 pub fn new(table_name: impl Into<String>) -> Self {
116 Self {
117 table_name: table_name.into(),
118 }
119 }
120}
121
122#[async_trait]
123impl LogWriter for DatabaseLogWriter {
124 async fn write(&self, entry: &LogEntry) -> Result<()> {
125 tracing::debug!(
127 "Would write log entry to database table: {}",
128 self.table_name
129 );
130 tracing::debug!("Entry: {:?}", entry);
131 Ok(())
132 }
133
134 async fn flush(&self) -> Result<()> {
135 Ok(())
137 }
138
139 async fn close(&self) -> Result<()> {
140 Ok(())
142 }
143}
144
145#[derive(Debug)]
146pub struct HttpLogWriter {
147 endpoint: String,
148 headers: HashMap<String, String>,
149 client: reqwest::Client,
150}
151
152impl HttpLogWriter {
153 pub fn new(endpoint: impl Into<String>) -> Self {
154 Self {
155 endpoint: endpoint.into(),
156 headers: HashMap::new(),
157 client: reqwest::Client::new(),
158 }
159 }
160
161 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
162 self.headers.insert(key.into(), value.into());
163 self
164 }
165}
166
167#[async_trait]
168impl LogWriter for HttpLogWriter {
169 async fn write(&self, entry: &LogEntry) -> Result<()> {
170 let mut request = self.client.post(&self.endpoint);
171
172 for (key, value) in &self.headers {
173 request = request.header(key, value);
174 }
175
176 let response = request
177 .json(entry)
178 .send()
179 .await
180 .with_context(|| "Failed to send log entry to HTTP endpoint".to_string())?;
181
182 if !response.status().is_success() {
183 return Err(Error::new(
184 ErrorKind::Network {
185 status_code: Some(response.status().as_u16()),
186 endpoint: Some(self.endpoint.clone()),
187 },
188 format!("HTTP log writer failed with status: {}", response.status()),
189 ));
190 }
191
192 Ok(())
193 }
194
195 async fn flush(&self) -> Result<()> {
196 Ok(())
198 }
199
200 async fn close(&self) -> Result<()> {
201 Ok(())
203 }
204}
205
206#[derive(Clone, Debug)]
207struct QorzenLayer {
208 entry_sender: mpsc::UnboundedSender<LogEntry>,
209 stats: Arc<RwLock<LogStats>>,
210}
211
212impl QorzenLayer {
213 fn new(entry_sender: mpsc::UnboundedSender<LogEntry>) -> Self {
214 Self {
215 entry_sender,
216 stats: Arc::new(RwLock::new(LogStats {
217 total_entries: 0,
218 entries_by_level: HashMap::new(),
219 avg_entries_per_second: 0.0,
220 current_file_size: 0,
221 rotated_files: 0,
222 last_rotation: None,
223 })),
224 }
225 }
226
227 async fn get_stats(&self) -> LogStats {
228 self.stats.read().await.clone()
229 }
230}
231
232impl<S> Layer<S> for QorzenLayer
233where
234 S: Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
235{
236 fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
237 let level = LogLevel::from(*event.metadata().level());
238
239 let entry = LogEntry {
241 id: Uuid::new_v4(),
242 level,
243 timestamp: Time::now(),
244 source: event.metadata().target().to_string(),
245 message: format!("{:?}", event), target: event.metadata().target().to_string(),
247 file: event.metadata().file().map(String::from),
248 line: event.metadata().line(),
249 correlation_id: None, fields: HashMap::new(), span: None, };
253
254 if self.entry_sender.send(entry).is_err() {
256 eprintln!("Failed to send log entry: receiver dropped");
257 }
258
259 let stats = self.stats.clone();
261 std::thread::spawn(move || {
262 if let Ok(rt) = tokio::runtime::Handle::try_current() {
263 rt.spawn(async move {
264 let mut stats_guard = stats.write().await;
265 stats_guard.total_entries += 1;
266 *stats_guard.entries_by_level.entry(level).or_insert(0) += 1;
267 });
268 }
269 });
270 }
271}
272
273#[derive(Debug)]
274pub struct LoggingManager {
275 state: ManagedState,
276 config: LoggingConfig,
277 custom_layer: Option<QorzenLayer>,
278 _guards: Vec<WorkerGuard>, writers: Vec<Arc<dyn LogWriter>>,
280 entry_sender: Option<mpsc::UnboundedSender<LogEntry>>,
281 writer_task_handle: Option<tokio::task::JoinHandle<()>>,
282}
283
284impl LoggingManager {
285 pub fn new(config: LoggingConfig) -> Self {
286 Self {
287 state: ManagedState::new(Uuid::new_v4(), "logging_manager"),
288 config,
289 custom_layer: None,
290 _guards: Vec::new(),
291 writers: Vec::new(),
292 entry_sender: None,
293 writer_task_handle: None,
294 }
295 }
296
297 async fn setup_tracing(&mut self) -> Result<()> {
298 let filter = EnvFilter::try_from_default_env()
299 .unwrap_or_else(|_| EnvFilter::new(&self.config.level));
300
301 let registry = Registry::default().with(filter);
302
303 let registry = if self.config.console.enabled {
305 let console_layer = if self.config.console.colored {
306 fmt::layer()
307 .with_ansi(true)
308 .with_target(true)
309 .with_line_number(true)
310 .with_file(true)
311 .boxed()
312 } else {
313 fmt::layer()
314 .with_ansi(false)
315 .with_target(true)
316 .with_line_number(true)
317 .with_file(true)
318 .boxed()
319 };
320
321 registry.with(console_layer)
322 } else {
323 registry.with(Identity::new().boxed())
324 };
325
326 let registry = if let Some(file_config) = &self.config.file {
328 let file_appender = tracing_appender::rolling::daily(
329 file_config
330 .path
331 .parent()
332 .unwrap_or_else(|| std::path::Path::new(".")),
333 file_config
334 .path
335 .file_name()
336 .unwrap_or_else(|| std::ffi::OsStr::new("app.log")),
337 );
338
339 let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
340 self._guards.push(guard);
341
342 let file_layer = match self.config.format {
343 LogFormat::Json => fmt::layer().json().with_writer(non_blocking).boxed(),
344 LogFormat::Pretty => fmt::layer().pretty().with_writer(non_blocking).boxed(),
345 LogFormat::Compact => fmt::layer().compact().with_writer(non_blocking).boxed(),
346 };
347
348 registry.with(file_layer)
349 } else {
350 registry.with(Identity::new().boxed())
351 };
352
353 let (entry_sender, mut entry_receiver) = mpsc::unbounded_channel::<LogEntry>();
355 let custom_layer = QorzenLayer::new(entry_sender.clone());
356
357 self.entry_sender = Some(entry_sender);
358 self.custom_layer = Some(custom_layer.clone());
359
360 let writers = Arc::new(RwLock::new(self.writers.clone()));
362 let writers_clone = Arc::clone(&writers);
363
364 let writer_task = tokio::spawn(async move {
365 while let Some(entry) = entry_receiver.recv().await {
366 let writers_guard = writers_clone.read().await;
367 for writer in writers_guard.iter() {
368 if let Err(e) = writer.write(&entry).await {
369 eprintln!("Failed to write log entry: {}", e);
370 }
371 }
372 }
373 });
374
375 self.writer_task_handle = Some(writer_task);
376
377 let registry = registry.with(custom_layer);
379
380 registry.try_init().ok();
382
383 Ok(())
384 }
385
386 pub async fn add_writer(&mut self, writer: Arc<dyn LogWriter>) -> Result<()> {
387 self.writers.push(writer);
388 Ok(())
389 }
390
391 pub async fn get_stats(&self) -> LogStats {
392 if let Some(layer) = &self.custom_layer {
393 layer.get_stats().await
394 } else {
395 LogStats {
396 total_entries: 0,
397 entries_by_level: HashMap::new(),
398 avg_entries_per_second: 0.0,
399 current_file_size: 0,
400 rotated_files: 0,
401 last_rotation: None,
402 }
403 }
404 }
405
406 pub async fn set_log_level(&mut self, level: LogLevel) -> Result<()> {
407 tracing::info!("Log level updated to: {:?}", level);
409 Ok(())
410 }
411
412 pub async fn flush(&self) -> Result<()> {
413 for writer in &self.writers {
414 writer
415 .flush()
416 .await
417 .with_context(|| "Failed to flush log writer".to_string())?;
418 }
419 Ok(())
420 }
421
422 pub fn create_logger(&self, component: impl Into<String>) -> Logger {
423 Logger::new(component.into())
424 }
425}
426
427#[async_trait]
428impl Manager for LoggingManager {
429 fn name(&self) -> &str {
430 "logging_manager"
431 }
432
433 fn id(&self) -> Uuid {
434 Uuid::new_v4() }
436
437 async fn initialize(&mut self) -> Result<()> {
438 self.state
439 .set_state(crate::manager::ManagerState::Initializing)
440 .await;
441
442 self.setup_tracing().await?;
444
445 if let Some(file_config) = &self.config.file {
447 if let Some(parent) = file_config.path.parent() {
448 tokio::fs::create_dir_all(parent).await.with_context(|| {
449 format!("Failed to create log directory: {}", parent.display())
450 })?;
451 }
452 }
453
454 self.state
455 .set_state(crate::manager::ManagerState::Running)
456 .await;
457 tracing::info!("Logging manager initialized successfully");
458 Ok(())
459 }
460
461 async fn shutdown(&mut self) -> Result<()> {
462 self.state
463 .set_state(crate::manager::ManagerState::ShuttingDown)
464 .await;
465
466 tracing::info!("Shutting down logging manager");
467
468 self.flush().await?;
470
471 for writer in &self.writers {
473 writer
474 .close()
475 .await
476 .with_context(|| "Failed to close log writer".to_string())?;
477 }
478
479 if let Some(handle) = self.writer_task_handle.take() {
481 handle.abort();
482 let _ = handle.await;
483 }
484
485 self.state
486 .set_state(crate::manager::ManagerState::Shutdown)
487 .await;
488 Ok(())
489 }
490
491 async fn status(&self) -> ManagerStatus {
492 let mut status = self.state.status().await;
493 let stats = self.get_stats().await;
494
495 status.add_metadata(
496 "total_entries",
497 serde_json::Value::from(stats.total_entries),
498 );
499 status.add_metadata("writers_count", serde_json::Value::from(self.writers.len()));
500 status.add_metadata(
501 "file_logging",
502 serde_json::Value::Bool(self.config.file.is_some()),
503 );
504 status.add_metadata(
505 "console_logging",
506 serde_json::Value::Bool(self.config.console.enabled),
507 );
508 status.add_metadata(
509 "log_level",
510 serde_json::Value::String(self.config.level.clone()),
511 );
512
513 status
514 }
515}
516
517#[derive(Debug, Clone)]
518pub struct Logger {
519 component: String,
520 correlation_id: Option<Uuid>,
521 metadata: HashMap<String, serde_json::Value>,
522}
523
524impl Logger {
525 pub fn new(component: String) -> Self {
526 Self {
527 component,
528 correlation_id: None,
529 metadata: HashMap::new(),
530 }
531 }
532
533 pub fn with_correlation_id(mut self, correlation_id: Uuid) -> Self {
534 self.correlation_id = Some(correlation_id);
535 self
536 }
537
538 pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
539 self.metadata.insert(key.into(), value);
540 self
541 }
542
543 pub fn trace(&self, message: impl AsRef<str>) {
544 self.log(LogLevel::Trace, message.as_ref(), &HashMap::new());
545 }
546
547 pub fn debug(&self, message: impl AsRef<str>) {
548 self.log(LogLevel::Debug, message.as_ref(), &HashMap::new());
549 }
550
551 pub fn info(&self, message: impl AsRef<str>) {
552 self.log(LogLevel::Info, message.as_ref(), &HashMap::new());
553 }
554
555 pub fn warn(&self, message: impl AsRef<str>) {
556 self.log(LogLevel::Warn, message.as_ref(), &HashMap::new());
557 }
558
559 pub fn error(&self, message: impl AsRef<str>) {
560 self.log(LogLevel::Error, message.as_ref(), &HashMap::new());
561 }
562
563 pub fn log_with_fields(
564 &self,
565 level: LogLevel,
566 message: impl AsRef<str>,
567 fields: &HashMap<String, serde_json::Value>,
568 ) {
569 self.log(level, message.as_ref(), fields);
570 }
571
572 fn log(&self, level: LogLevel, message: &str, fields: &HashMap<String, serde_json::Value>) {
573 let mut all_fields = self.metadata.clone();
575 all_fields.extend(fields.clone());
576 all_fields.insert(
577 "component".to_string(),
578 serde_json::Value::String(self.component.clone()),
579 );
580
581 if let Some(correlation_id) = self.correlation_id {
582 all_fields.insert(
583 "correlation_id".to_string(),
584 serde_json::Value::String(correlation_id.to_string()),
585 );
586 }
587
588 match level {
590 LogLevel::Trace => tracing::trace!("{}: {}", self.component, message),
591 LogLevel::Debug => tracing::debug!("{}: {}", self.component, message),
592 LogLevel::Info => tracing::info!("{}: {}", self.component, message),
593 LogLevel::Warn => tracing::warn!("{}: {}", self.component, message),
594 LogLevel::Error => tracing::error!("{}: {}", self.component, message),
595 }
596 }
597}
598
599#[macro_export]
600macro_rules! log_trace {
601 ($logger:expr, $($arg:tt)*) => {
602 $logger.trace(format!($($arg)*))
603 };
604}
605
606#[macro_export]
607macro_rules! log_debug {
608 ($logger:expr, $($arg:tt)*) => {
609 $logger.debug(format!($($arg)*))
610 };
611}
612
613#[macro_export]
614macro_rules! log_info {
615 ($logger:expr, $($arg:tt)*) => {
616 $logger.info(format!($($arg)*))
617 };
618}
619
620#[macro_export]
621macro_rules! log_warn {
622 ($logger:expr, $($arg:tt)*) => {
623 $logger.warn(format!($($arg)*))
624 };
625}
626
627#[macro_export]
628macro_rules! log_error {
629 ($logger:expr, $($arg:tt)*) => {
630 $logger.error(format!($($arg)*))
631 };
632}
633
634#[cfg(test)]
635mod tests {
636 use super::*;
637 use std::sync::atomic::{AtomicU64, Ordering};
638
639 #[derive(Debug)]
640 struct TestLogWriter {
641 entries: Arc<AtomicU64>,
642 }
643
644 impl TestLogWriter {
645 fn new() -> Self {
646 Self {
647 entries: Arc::new(AtomicU64::new(0)),
648 }
649 }
650
651 fn get_entry_count(&self) -> u64 {
652 self.entries.load(Ordering::SeqCst)
653 }
654 }
655
656 #[async_trait]
657 impl LogWriter for TestLogWriter {
658 async fn write(&self, _entry: &LogEntry) -> Result<()> {
659 self.entries.fetch_add(1, Ordering::SeqCst);
660 Ok(())
661 }
662
663 async fn flush(&self) -> Result<()> {
664 Ok(())
665 }
666
667 async fn close(&self) -> Result<()> {
668 Ok(())
669 }
670 }
671
672 #[tokio::test]
673 async fn test_logging_manager_initialization() {
674 let config = LoggingConfig::default();
675 let mut manager = LoggingManager::new(config);
676
677 manager.initialize().await.unwrap();
678
679 let status = manager.status().await;
680 assert_eq!(status.state, crate::manager::ManagerState::Running);
681
682 manager.shutdown().await.unwrap();
683 }
684
685 #[tokio::test]
686 async fn test_custom_log_writer() {
687 let config = LoggingConfig::default();
688 let mut manager = LoggingManager::new(config);
689
690 let test_writer = Arc::new(TestLogWriter::new());
691 manager.add_writer(test_writer.clone()).await.unwrap();
692
693 manager.initialize().await.unwrap();
694
695 let logger = manager.create_logger("test_component");
697 logger.info("Test message 1");
698 logger.warn("Test message 2");
699 logger.error("Test message 3");
700
701 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
703
704 manager.shutdown().await.unwrap();
709 }
710
711 #[tokio::test]
712 async fn test_logger_with_context() {
713 let logger = Logger::new("test_component".to_string())
714 .with_correlation_id(Uuid::new_v4())
715 .with_metadata("user_id", serde_json::Value::String("12345".to_string()));
716
717 logger.info("Test message with context");
719
720 let mut fields = HashMap::new();
721 fields.insert(
722 "custom_field".to_string(),
723 serde_json::Value::Number(42.into()),
724 );
725 logger.log_with_fields(LogLevel::Debug, "Message with fields", &fields);
726 }
727}