qorzen_oxide/
event.rs

1// src/event.rs
2
3//! Event-driven architecture system with async event bus
4//!
5//! This module provides a comprehensive event system that supports:
6//! - Type-safe event publishing and subscription
7//! - Async event handlers with backpressure
8//! - Event filtering and routing
9//! - Event persistence and replay
10//! - Dead letter queue for failed events
11//! - Metrics and monitoring
12//! - Event serialization for network transport
13
14use std::any::Any;
15use std::collections::HashMap;
16use std::fmt::{self, Debug};
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use crate::utils::Time;
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use dashmap::DashMap;
25use serde::{Deserialize, Serialize};
26use tokio::sync::{mpsc, RwLock};
27use uuid::Uuid;
28
29use crate::error::{Error, ErrorKind, EventOperation, Result};
30use crate::manager::{ManagedState, Manager, ManagerStatus};
31use crate::types::Metadata;
32
33/// Base event trait that all events must implement
34pub trait Event: Send + Sync + Debug {
35    /// Get the event type identifier
36    fn event_type(&self) -> &'static str;
37
38    /// Get the event source
39    fn source(&self) -> &str;
40
41    /// Get event metadata
42    fn metadata(&self) -> &Metadata;
43
44    /// Get event as Any for downcasting
45    fn as_any(&self) -> &dyn Any;
46
47    /// Get event timestamp (default implementation)
48    fn timestamp(&self) -> DateTime<Utc> {
49        Time::now()
50    }
51
52    /// Get event correlation ID if available
53    fn correlation_id(&self) -> Option<Uuid> {
54        self.metadata()
55            .get("correlation_id")
56            .and_then(|v| v.as_str())
57            .and_then(|s| Uuid::parse_str(s).ok())
58    }
59
60    /// Get event priority (default is normal)
61    fn priority(&self) -> EventPriority {
62        EventPriority::Normal
63    }
64
65    /// Whether this event should be persisted
66    fn should_persist(&self) -> bool {
67        false
68    }
69}
70
71/// Event priority levels
72#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
73pub enum EventPriority {
74    /// Low priority events (background processing)
75    Low = 0,
76    /// Normal priority events
77    Normal = 50,
78    /// High priority events (user actions)
79    High = 100,
80    /// Critical priority events (system alerts)
81    Critical = 200,
82}
83
84impl Default for EventPriority {
85    fn default() -> Self {
86        Self::Normal
87    }
88}
89
90/// Event handler trait for processing events
91#[cfg(not(target_arch = "wasm32"))]
92#[async_trait]
93pub trait EventHandler: Send + Sync + Debug {
94    /// Handle an event
95    async fn handle(&self, event: &dyn Event) -> Result<()>;
96
97    /// Get handler name for debugging
98    fn name(&self) -> &str;
99
100    /// Get event types this handler is interested in
101    fn event_types(&self) -> Vec<&'static str>;
102
103    /// Whether this handler should receive all events (wildcard)
104    fn is_wildcard(&self) -> bool {
105        false
106    }
107
108    /// Get handler priority (affects processing order)
109    fn priority(&self) -> i32 {
110        0
111    }
112}
113
114#[cfg(target_arch = "wasm32")]
115#[async_trait(?Send)]
116pub trait EventHandler: Sync + Debug {
117    /// Handle an event
118    async fn handle(&self, event: &dyn Event) -> Result<()>;
119
120    /// Get handler name for debugging
121    fn name(&self) -> &str;
122
123    /// Get event types this handler is interested in
124    fn event_types(&self) -> Vec<&'static str>;
125
126    /// Whether this handler should receive all events (wildcard)
127    fn is_wildcard(&self) -> bool {
128        false
129    }
130
131    /// Get handler priority (affects processing order)
132    fn priority(&self) -> i32 {
133        0
134    }
135}
136
137/// Event subscription filter
138#[derive(Debug, Clone)]
139pub struct EventFilter {
140    /// Event types to match (empty means all)
141    pub event_types: Vec<String>,
142    /// Source patterns to match
143    pub source_patterns: Vec<String>,
144    /// Metadata filters
145    pub metadata_filters: HashMap<String, serde_json::Value>,
146    /// Minimum priority level
147    pub min_priority: EventPriority,
148}
149
150impl EventFilter {
151    /// Create a new event filter
152    pub fn new() -> Self {
153        Self {
154            event_types: Vec::new(),
155            source_patterns: Vec::new(),
156            metadata_filters: HashMap::new(),
157            min_priority: EventPriority::Low,
158        }
159    }
160
161    /// Add event type filter
162    pub fn with_event_type(mut self, event_type: impl Into<String>) -> Self {
163        self.event_types.push(event_type.into());
164        self
165    }
166
167    /// Add source pattern filter
168    pub fn with_source_pattern(mut self, pattern: impl Into<String>) -> Self {
169        self.source_patterns.push(pattern.into());
170        self
171    }
172
173    /// Add metadata filter
174    pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
175        self.metadata_filters.insert(key.into(), value);
176        self
177    }
178
179    /// Set minimum priority
180    pub fn with_min_priority(mut self, priority: EventPriority) -> Self {
181        self.min_priority = priority;
182        self
183    }
184
185    /// Check if an event matches this filter
186    pub fn matches(&self, event: &dyn Event) -> bool {
187        // Check event type
188        if !self.event_types.is_empty()
189            && !self.event_types.contains(&event.event_type().to_string())
190        {
191            return false;
192        }
193
194        // Check source patterns
195        if !self.source_patterns.is_empty() {
196            let source = event.source();
197            if !self.source_patterns.iter().any(|pattern| {
198                // Simple pattern matching (could be enhanced with regex)
199                pattern == "*" || source.contains(pattern)
200            }) {
201                return false;
202            }
203        }
204
205        // Check priority
206        if event.priority() < self.min_priority {
207            return false;
208        }
209
210        // Check metadata filters
211        let event_metadata = event.metadata();
212        for (key, expected_value) in &self.metadata_filters {
213            match event_metadata.get(key) {
214                Some(actual_value) => {
215                    if actual_value != expected_value {
216                        return false;
217                    }
218                }
219                None => return false,
220            }
221        }
222
223        true
224    }
225}
226
227impl Default for EventFilter {
228    fn default() -> Self {
229        Self::new()
230    }
231}
232
233/// Event subscription
234pub struct EventSubscription {
235    /// Subscription ID
236    pub id: Uuid,
237    /// Filter for events
238    pub filter: EventFilter,
239    /// Event sender channel
240    pub sender: mpsc::UnboundedSender<Arc<dyn Event>>,
241    /// When subscription was created
242    pub created_at: DateTime<Utc>,
243    /// Whether subscription is active
244    pub active: bool,
245    /// Subscription metadata
246    pub metadata: Metadata,
247}
248
249impl Debug for EventSubscription {
250    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251        f.debug_struct("EventSubscription")
252            .field("id", &self.id)
253            .field("filter", &self.filter)
254            .field("created_at", &self.created_at)
255            .field("active", &self.active)
256            .field("metadata", &self.metadata)
257            .finish()
258    }
259}
260
261/// Event statistics
262#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct EventStats {
264    /// Total events published
265    pub total_published: u64,
266    /// Total events processed
267    pub total_processed: u64,
268    /// Total events failed
269    pub total_failed: u64,
270    /// Events by type
271    pub events_by_type: HashMap<String, u64>,
272    /// Events by priority
273    pub events_by_priority: HashMap<EventPriority, u64>,
274    /// Average processing time in milliseconds
275    pub avg_processing_time_ms: f64,
276    /// Current active subscriptions
277    pub active_subscriptions: usize,
278    /// Queue size
279    pub queue_size: usize,
280}
281
282/// Event bus configuration
283#[derive(Debug, Clone)]
284pub struct EventBusConfig {
285    /// Number of worker threads
286    pub worker_count: usize,
287    /// Queue capacity
288    pub queue_capacity: usize,
289    /// Default timeout for event processing
290    pub default_timeout: Duration,
291    /// Whether to enable event persistence
292    pub enable_persistence: bool,
293    /// Whether to enable metrics collection
294    pub enable_metrics: bool,
295    /// Batch size for processing events
296    pub batch_size: usize,
297    /// Maximum retry delay
298    pub max_retry_delay: Duration,
299}
300
301fn get_default_worker_count() -> usize {
302    #[cfg(not(target_arch = "wasm32"))]
303    {
304        num_cpus::get()
305    }
306    #[cfg(target_arch = "wasm32")]
307    {
308        1
309    }
310}
311
312impl Default for EventBusConfig {
313    fn default() -> Self {
314        Self {
315            worker_count: get_default_worker_count(),
316            queue_capacity: 10000,
317            default_timeout: Duration::from_secs(30),
318            enable_persistence: false,
319            enable_metrics: true,
320            batch_size: 100,
321            max_retry_delay: Duration::from_secs(60),
322        }
323    }
324}
325
326/// Internal event wrapper for the bus
327#[derive(Debug)]
328struct EventEnvelope {
329    /// The actual event
330    event: Arc<dyn Event>,
331    /// When event was received
332    #[allow(dead_code)]
333    received_at: Instant,
334    /// Retry count
335    #[allow(dead_code)]
336    retry_count: u32,
337    /// Maximum retries
338    #[allow(dead_code)]
339    max_retries: u32,
340}
341
342/// Event bus manager
343pub struct EventBusManager {
344    state: ManagedState,
345    config: EventBusConfig,
346    subscriptions: Arc<DashMap<Uuid, EventSubscription>>,
347    event_queue: mpsc::UnboundedSender<EventEnvelope>,
348    stats: Arc<RwLock<EventStats>>,
349    event_counter: Arc<AtomicU64>,
350    worker_handles: Vec<tokio::task::JoinHandle<()>>,
351}
352
353impl Debug for EventBusManager {
354    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355        f.debug_struct("EventBusManager")
356            .field("config", &self.config)
357            .field("subscriptions", &self.subscriptions.len())
358            .finish()
359    }
360}
361
362impl EventBusManager {
363    /// Create a new event bus manager
364    pub fn new(config: EventBusConfig) -> Self {
365        let (event_sender, _event_receiver) = mpsc::unbounded_channel::<EventEnvelope>();
366
367        Self {
368            state: ManagedState::new(Uuid::new_v4(), "event_bus_manager"),
369            config,
370            subscriptions: Arc::new(DashMap::new()),
371            event_queue: event_sender,
372            stats: Arc::new(RwLock::new(EventStats {
373                total_published: 0,
374                total_processed: 0,
375                total_failed: 0,
376                events_by_type: HashMap::new(),
377                events_by_priority: HashMap::new(),
378                avg_processing_time_ms: 0.0,
379                active_subscriptions: 0,
380                queue_size: 0,
381            })),
382            event_counter: Arc::new(AtomicU64::new(0)),
383            worker_handles: Vec::new(),
384        }
385    }
386
387    /// Publish an event to the bus
388    pub async fn publish<E: Event + 'static>(&self, event: E) -> Result<()> {
389        let event_arc: Arc<dyn Event> = Arc::new(event);
390
391        // Update statistics
392        self.event_counter.fetch_add(1, Ordering::Relaxed);
393        {
394            let mut stats = self.stats.write().await;
395            stats.total_published += 1;
396            *stats
397                .events_by_type
398                .entry(event_arc.event_type().to_string())
399                .or_insert(0) += 1;
400            *stats
401                .events_by_priority
402                .entry(event_arc.priority())
403                .or_insert(0) += 1;
404        }
405
406        // Create event envelope
407        let envelope = EventEnvelope {
408            event: event_arc,
409            received_at: Instant::now(),
410            retry_count: 0,
411            max_retries: 3,
412        };
413
414        // Send to processing queue
415        self.event_queue.send(envelope).map_err(|_| {
416            Error::new(
417                ErrorKind::Event {
418                    event_type: Some("unknown".to_string()),
419                    subscriber_id: None,
420                    operation: EventOperation::Publish,
421                },
422                "Event queue is closed",
423            )
424        })?;
425
426        Ok(())
427    }
428
429    /// Subscribe to events with a filter
430    pub async fn subscribe(
431        &self,
432        filter: EventFilter,
433    ) -> Result<mpsc::UnboundedReceiver<Arc<dyn Event>>> {
434        let (sender, receiver) = mpsc::unbounded_channel::<Arc<dyn Event>>();
435        let subscription_id = Uuid::new_v4();
436
437        let subscription = EventSubscription {
438            id: subscription_id,
439            filter,
440            sender,
441            created_at: Time::now(),
442            active: true,
443            metadata: HashMap::new(),
444        };
445
446        self.subscriptions.insert(subscription_id, subscription);
447
448        // Update stats
449        {
450            let mut stats = self.stats.write().await;
451            stats.active_subscriptions = self.subscriptions.len();
452        }
453
454        tracing::debug!("Created subscription: {}", subscription_id);
455
456        Ok(receiver)
457    }
458
459    /// Subscribe with a handler
460    #[cfg(not(target_arch = "wasm32"))]
461    pub async fn subscribe_with_handler<H: EventHandler + 'static + Send>(
462        &self,
463        filter: EventFilter,
464        handler: Arc<H>,
465    ) -> Result<Uuid> {
466        let mut receiver = self.subscribe(filter).await?;
467        let handler_name = handler.name().to_string();
468
469        // Spawn task to handle events
470        let handle = tokio::spawn(async move {
471            while let Some(event) = receiver.recv().await {
472                let start_time = Instant::now();
473
474                match handler.handle(event.as_ref()).await {
475                    Ok(()) => {
476                        let processing_time = start_time.elapsed();
477                        tracing::trace!(
478                            "Handler '{}' processed event in {:?}",
479                            handler_name,
480                            processing_time
481                        );
482                    }
483                    Err(e) => {
484                        tracing::error!(
485                            "Handler '{}' failed to process event: {}",
486                            handler_name,
487                            e
488                        );
489                    }
490                }
491            }
492        });
493
494        // Store the handle (simplified - in practice you'd want to track these)
495        drop(handle);
496
497        // Return a dummy subscription ID
498        Ok(Uuid::new_v4())
499    }
500
501    #[cfg(target_arch = "wasm32")]
502    pub async fn subscribe_with_handler<H: EventHandler + 'static>(
503        &self,
504        filter: EventFilter,
505        handler: Arc<H>,
506    ) -> Result<Uuid> {
507        let mut receiver = self.subscribe(filter).await?;
508        let handler_name = handler.name().to_string();
509
510        // For WASM, we'll use a simpler approach without tokio::spawn
511        wasm_bindgen_futures::spawn_local(async move {
512            while let Some(event) = receiver.recv().await {
513                let start_time = Instant::now();
514
515                match handler.handle(event.as_ref()).await {
516                    Ok(()) => {
517                        let processing_time = start_time.elapsed();
518                        web_sys::console::log_1(
519                            &format!(
520                                "Handler '{}' processed event in {:?}",
521                                handler_name, processing_time
522                            )
523                            .into(),
524                        );
525                    }
526                    Err(e) => {
527                        web_sys::console::error_1(
528                            &format!("Handler '{}' failed to process event: {}", handler_name, e)
529                                .into(),
530                        );
531                    }
532                }
533            }
534        });
535
536        // Return a dummy subscription ID
537        Ok(Uuid::new_v4())
538    }
539
540    /// Unsubscribe from events
541    pub async fn unsubscribe(&self, subscription_id: Uuid) -> Result<()> {
542        if let Some(mut subscription) = self.subscriptions.get_mut(&subscription_id) {
543            subscription.active = false;
544        }
545
546        self.subscriptions.remove(&subscription_id).ok_or_else(|| {
547            Error::new(
548                ErrorKind::Event {
549                    event_type: None,
550                    subscriber_id: Some(subscription_id),
551                    operation: EventOperation::Subscribe,
552                },
553                "Subscription not found",
554            )
555        })?;
556
557        // Update stats
558        {
559            let mut stats = self.stats.write().await;
560            stats.active_subscriptions = self.subscriptions.len();
561        }
562
563        tracing::debug!("Removed subscription: {}", subscription_id);
564
565        Ok(())
566    }
567
568    /// Get event bus statistics
569    pub async fn get_stats(&self) -> EventStats {
570        self.stats.read().await.clone()
571    }
572
573    /// Start event processing workers
574    async fn start_workers(&mut self) -> Result<()> {
575        let (event_sender, event_receiver) = mpsc::unbounded_channel::<EventEnvelope>();
576        self.event_queue = event_sender;
577
578        let subscriptions = Arc::clone(&self.subscriptions);
579        let stats = Arc::clone(&self.stats);
580
581        // Move event_receiver OUT of self scope BEFORE the spawn
582        let handle = tokio::spawn(Self::worker_task(event_receiver, subscriptions, stats));
583
584        self.worker_handles.push(handle);
585
586        Ok(())
587    }
588
589    // This function owns event_receiver and can move it safely
590    async fn worker_task(
591        mut event_receiver: mpsc::UnboundedReceiver<EventEnvelope>,
592        subscriptions: Arc<DashMap<Uuid, EventSubscription>>,
593        stats: Arc<RwLock<EventStats>>,
594    ) {
595        tracing::debug!("Event worker started");
596
597        while let Some(envelope) = event_receiver.recv().await {
598            Self::process_event(envelope, &subscriptions, &stats).await;
599        }
600
601        tracing::debug!("Event worker stopped");
602    }
603
604    /// Process a single event
605    async fn process_event(
606        envelope: EventEnvelope,
607        subscriptions: &DashMap<Uuid, EventSubscription>,
608        stats: &RwLock<EventStats>,
609    ) {
610        let start_time = Instant::now();
611        let event = &envelope.event;
612
613        // Find matching subscriptions
614        let matching_subscriptions: Vec<(Uuid, Arc<dyn Event>)> = subscriptions
615            .iter()
616            .filter_map(|entry| {
617                let subscription = entry.value();
618                if subscription.active && subscription.filter.matches(event.as_ref()) {
619                    Some((subscription.id, Arc::clone(event)))
620                } else {
621                    None
622                }
623            })
624            .collect();
625
626        // Send event to matching subscriptions
627        let mut successful_deliveries = 0;
628        let mut failed_deliveries = 0;
629
630        for (subscription_id, _event_clone) in matching_subscriptions {
631            if let Some(subscription) = subscriptions.get(&subscription_id) {
632                match subscription.sender.send(Arc::clone(event)) {
633                    Ok(()) => successful_deliveries += 1,
634                    Err(_) => {
635                        failed_deliveries += 1;
636                        tracing::warn!(
637                            "Failed to deliver event to subscription {}",
638                            subscription_id
639                        );
640                    }
641                }
642            }
643        }
644
645        let processing_time = start_time.elapsed();
646
647        // Update statistics
648        {
649            let mut stats_guard = stats.write().await;
650            stats_guard.total_processed += 1;
651            if failed_deliveries > 0 {
652                stats_guard.total_failed += 1;
653            }
654
655            // Update average processing time
656            let total_processed = stats_guard.total_processed;
657            stats_guard.avg_processing_time_ms = (stats_guard.avg_processing_time_ms
658                * (total_processed - 1) as f64
659                + processing_time.as_millis() as f64)
660                / total_processed as f64;
661        }
662
663        tracing::trace!(
664            "Processed event '{}' in {:?} (delivered to {} subscriptions, {} failed)",
665            event.event_type(),
666            processing_time,
667            successful_deliveries,
668            failed_deliveries
669        );
670    }
671
672    /// Stop all workers
673    async fn stop_workers(&mut self) {
674        for handle in self.worker_handles.drain(..) {
675            handle.abort();
676            let _ = handle.await;
677        }
678    }
679}
680
681#[cfg(not(target_arch = "wasm32"))]
682#[async_trait]
683impl Manager for EventBusManager {
684    fn name(&self) -> &str {
685        "event_bus_manager"
686    }
687
688    fn id(&self) -> Uuid {
689        Uuid::new_v4() // Simplified
690    }
691
692    async fn initialize(&mut self) -> Result<()> {
693        self.state
694            .set_state(crate::manager::ManagerState::Initializing)
695            .await;
696
697        // Start event processing workers
698        self.start_workers().await?;
699
700        self.state
701            .set_state(crate::manager::ManagerState::Running)
702            .await;
703        tracing::info!(
704            "Event bus manager initialized with {} workers",
705            self.config.worker_count
706        );
707        Ok(())
708    }
709
710    async fn shutdown(&mut self) -> Result<()> {
711        self.state
712            .set_state(crate::manager::ManagerState::ShuttingDown)
713            .await;
714
715        // Stop processing new events
716        self.stop_workers().await;
717
718        // Clear subscriptions
719        self.subscriptions.clear();
720
721        self.state
722            .set_state(crate::manager::ManagerState::Shutdown)
723            .await;
724        tracing::info!("Event bus manager shut down");
725        Ok(())
726    }
727
728    async fn status(&self) -> ManagerStatus {
729        let mut status = self.state.status().await;
730        let stats = self.get_stats().await;
731
732        status.add_metadata(
733            "total_published",
734            serde_json::Value::from(stats.total_published),
735        );
736        status.add_metadata(
737            "total_processed",
738            serde_json::Value::from(stats.total_processed),
739        );
740        status.add_metadata("total_failed", serde_json::Value::from(stats.total_failed));
741        status.add_metadata(
742            "active_subscriptions",
743            serde_json::Value::from(stats.active_subscriptions),
744        );
745        status.add_metadata(
746            "worker_count",
747            serde_json::Value::from(self.config.worker_count),
748        );
749        status.add_metadata(
750            "avg_processing_time_ms",
751            serde_json::Value::from(stats.avg_processing_time_ms),
752        );
753
754        status
755    }
756}
757
758#[cfg(target_arch = "wasm32")]
759#[async_trait(?Send)]
760impl Manager for EventBusManager {
761    fn name(&self) -> &str {
762        "event_bus_manager"
763    }
764
765    fn id(&self) -> Uuid {
766        Uuid::new_v4() // Simplified
767    }
768
769    async fn initialize(&mut self) -> Result<()> {
770        self.state
771            .set_state(crate::manager::ManagerState::Initializing)
772            .await;
773
774        // Start event processing workers
775        self.start_workers().await?;
776
777        self.state
778            .set_state(crate::manager::ManagerState::Running)
779            .await;
780        tracing::info!(
781            "Event bus manager initialized with {} workers",
782            self.config.worker_count
783        );
784        Ok(())
785    }
786
787    async fn shutdown(&mut self) -> Result<()> {
788        self.state
789            .set_state(crate::manager::ManagerState::ShuttingDown)
790            .await;
791
792        // Stop processing new events
793        self.stop_workers().await;
794
795        // Clear subscriptions
796        self.subscriptions.clear();
797
798        self.state
799            .set_state(crate::manager::ManagerState::Shutdown)
800            .await;
801        tracing::info!("Event bus manager shut down");
802        Ok(())
803    }
804
805    async fn status(&self) -> ManagerStatus {
806        let mut status = self.state.status().await;
807        let stats = self.get_stats().await;
808
809        status.add_metadata(
810            "total_published",
811            serde_json::Value::from(stats.total_published),
812        );
813        status.add_metadata(
814            "total_processed",
815            serde_json::Value::from(stats.total_processed),
816        );
817        status.add_metadata("total_failed", serde_json::Value::from(stats.total_failed));
818        status.add_metadata(
819            "active_subscriptions",
820            serde_json::Value::from(stats.active_subscriptions),
821        );
822        status.add_metadata(
823            "worker_count",
824            serde_json::Value::from(self.config.worker_count),
825        );
826        status.add_metadata(
827            "avg_processing_time_ms",
828            serde_json::Value::from(stats.avg_processing_time_ms),
829        );
830
831        status
832    }
833}
834
835/// Convenience macros for event handling
836#[macro_export]
837macro_rules! define_event {
838    ($name:ident, $event_type:expr, $($field:ident: $type:ty),*) => {
839        #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
840        pub struct $name {
841            pub timestamp: chrono::DateTime<chrono::Utc>,
842            pub source: String,
843            pub metadata: std::collections::HashMap<String, serde_json::Value>,
844            $(pub $field: $type,)*
845        }
846
847        impl $crate::event::Event for $name {
848            fn event_type(&self) -> &'static str {
849                $event_type
850            }
851
852            fn source(&self) -> &str {
853                &self.source
854            }
855
856            fn metadata(&self) -> &$crate::types::Metadata {
857                &self.metadata
858            }
859
860            fn as_any(&self) -> &dyn std::any::Any {
861                self
862            }
863
864            fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
865                self.timestamp
866            }
867        }
868    };
869}
870
871#[cfg(test)]
872mod tests {
873    use super::*;
874    use std::time::Duration;
875
876    // Test event
877    #[derive(Debug, Clone)]
878    struct TestEvent {
879        source: String,
880        metadata: Metadata,
881        data: String,
882    }
883
884    impl Event for TestEvent {
885        fn event_type(&self) -> &'static str {
886            "test.event"
887        }
888
889        fn source(&self) -> &str {
890            &self.source
891        }
892
893        fn metadata(&self) -> &Metadata {
894            &self.metadata
895        }
896
897        fn as_any(&self) -> &dyn Any {
898            self
899        }
900    }
901
902    #[tokio::test]
903    async fn test_event_bus_creation() {
904        let config = EventBusConfig::default();
905        let bus = EventBusManager::new(config);
906        assert_eq!(bus.subscriptions.len(), 0);
907    }
908
909    #[tokio::test]
910    async fn test_event_publishing() {
911        let config = EventBusConfig::default();
912        let mut bus = EventBusManager::new(config);
913
914        bus.initialize().await.unwrap();
915
916        let event = TestEvent {
917            source: "test".to_string(),
918            metadata: HashMap::new(),
919            data: "test data".to_string(),
920        };
921
922        bus.publish(event).await.unwrap();
923
924        let stats = bus.get_stats().await;
925        assert_eq!(stats.total_published, 1);
926
927        bus.shutdown().await.unwrap();
928    }
929
930    #[tokio::test]
931    async fn test_event_subscription() {
932        let config = EventBusConfig::default();
933        let mut bus = EventBusManager::new(config);
934
935        bus.initialize().await.unwrap();
936
937        let filter = EventFilter::new().with_event_type("test.event");
938        let mut receiver = bus.subscribe(filter).await.unwrap();
939
940        let event = TestEvent {
941            source: "test".to_string(),
942            metadata: HashMap::new(),
943            data: "test data".to_string(),
944        };
945
946        bus.publish(event).await.unwrap();
947
948        // Give some time for processing
949        tokio::time::sleep(Duration::from_millis(100)).await;
950
951        // Check if we received the event
952        if let Ok(received_event) =
953            tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await
954        {
955            assert!(received_event.is_some());
956            let event = received_event.unwrap();
957            assert_eq!(event.event_type(), "test.event");
958        }
959
960        bus.shutdown().await.unwrap();
961    }
962
963    #[test]
964    fn test_event_filter() {
965        let filter = EventFilter::new()
966            .with_event_type("test.event")
967            .with_source_pattern("test")
968            .with_min_priority(EventPriority::Normal);
969
970        let event = TestEvent {
971            source: "test_source".to_string(),
972            metadata: HashMap::new(),
973            data: "test data".to_string(),
974        };
975
976        assert!(filter.matches(&event));
977
978        let filter_no_match = EventFilter::new().with_event_type("other.event");
979
980        assert!(!filter_no_match.matches(&event));
981    }
982
983    #[test]
984    fn test_event_priority() {
985        assert!(EventPriority::Critical > EventPriority::High);
986        assert!(EventPriority::High > EventPriority::Normal);
987        assert!(EventPriority::Normal > EventPriority::Low);
988    }
989}