1use std::any::Any;
15use std::collections::HashMap;
16use std::fmt::{self, Debug};
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use crate::utils::Time;
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use dashmap::DashMap;
25use serde::{Deserialize, Serialize};
26use tokio::sync::{mpsc, RwLock};
27use uuid::Uuid;
28
29use crate::error::{Error, ErrorKind, EventOperation, Result};
30use crate::manager::{ManagedState, Manager, ManagerStatus};
31use crate::types::Metadata;
32
33pub trait Event: Send + Sync + Debug {
35 fn event_type(&self) -> &'static str;
37
38 fn source(&self) -> &str;
40
41 fn metadata(&self) -> &Metadata;
43
44 fn as_any(&self) -> &dyn Any;
46
47 fn timestamp(&self) -> DateTime<Utc> {
49 Time::now()
50 }
51
52 fn correlation_id(&self) -> Option<Uuid> {
54 self.metadata()
55 .get("correlation_id")
56 .and_then(|v| v.as_str())
57 .and_then(|s| Uuid::parse_str(s).ok())
58 }
59
60 fn priority(&self) -> EventPriority {
62 EventPriority::Normal
63 }
64
65 fn should_persist(&self) -> bool {
67 false
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
73pub enum EventPriority {
74 Low = 0,
76 Normal = 50,
78 High = 100,
80 Critical = 200,
82}
83
84impl Default for EventPriority {
85 fn default() -> Self {
86 Self::Normal
87 }
88}
89
90#[cfg(not(target_arch = "wasm32"))]
92#[async_trait]
93pub trait EventHandler: Send + Sync + Debug {
94 async fn handle(&self, event: &dyn Event) -> Result<()>;
96
97 fn name(&self) -> &str;
99
100 fn event_types(&self) -> Vec<&'static str>;
102
103 fn is_wildcard(&self) -> bool {
105 false
106 }
107
108 fn priority(&self) -> i32 {
110 0
111 }
112}
113
114#[cfg(target_arch = "wasm32")]
115#[async_trait(?Send)]
116pub trait EventHandler: Sync + Debug {
117 async fn handle(&self, event: &dyn Event) -> Result<()>;
119
120 fn name(&self) -> &str;
122
123 fn event_types(&self) -> Vec<&'static str>;
125
126 fn is_wildcard(&self) -> bool {
128 false
129 }
130
131 fn priority(&self) -> i32 {
133 0
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct EventFilter {
140 pub event_types: Vec<String>,
142 pub source_patterns: Vec<String>,
144 pub metadata_filters: HashMap<String, serde_json::Value>,
146 pub min_priority: EventPriority,
148}
149
150impl EventFilter {
151 pub fn new() -> Self {
153 Self {
154 event_types: Vec::new(),
155 source_patterns: Vec::new(),
156 metadata_filters: HashMap::new(),
157 min_priority: EventPriority::Low,
158 }
159 }
160
161 pub fn with_event_type(mut self, event_type: impl Into<String>) -> Self {
163 self.event_types.push(event_type.into());
164 self
165 }
166
167 pub fn with_source_pattern(mut self, pattern: impl Into<String>) -> Self {
169 self.source_patterns.push(pattern.into());
170 self
171 }
172
173 pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
175 self.metadata_filters.insert(key.into(), value);
176 self
177 }
178
179 pub fn with_min_priority(mut self, priority: EventPriority) -> Self {
181 self.min_priority = priority;
182 self
183 }
184
185 pub fn matches(&self, event: &dyn Event) -> bool {
187 if !self.event_types.is_empty()
189 && !self.event_types.contains(&event.event_type().to_string())
190 {
191 return false;
192 }
193
194 if !self.source_patterns.is_empty() {
196 let source = event.source();
197 if !self.source_patterns.iter().any(|pattern| {
198 pattern == "*" || source.contains(pattern)
200 }) {
201 return false;
202 }
203 }
204
205 if event.priority() < self.min_priority {
207 return false;
208 }
209
210 let event_metadata = event.metadata();
212 for (key, expected_value) in &self.metadata_filters {
213 match event_metadata.get(key) {
214 Some(actual_value) => {
215 if actual_value != expected_value {
216 return false;
217 }
218 }
219 None => return false,
220 }
221 }
222
223 true
224 }
225}
226
227impl Default for EventFilter {
228 fn default() -> Self {
229 Self::new()
230 }
231}
232
233pub struct EventSubscription {
235 pub id: Uuid,
237 pub filter: EventFilter,
239 pub sender: mpsc::UnboundedSender<Arc<dyn Event>>,
241 pub created_at: DateTime<Utc>,
243 pub active: bool,
245 pub metadata: Metadata,
247}
248
249impl Debug for EventSubscription {
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 f.debug_struct("EventSubscription")
252 .field("id", &self.id)
253 .field("filter", &self.filter)
254 .field("created_at", &self.created_at)
255 .field("active", &self.active)
256 .field("metadata", &self.metadata)
257 .finish()
258 }
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct EventStats {
264 pub total_published: u64,
266 pub total_processed: u64,
268 pub total_failed: u64,
270 pub events_by_type: HashMap<String, u64>,
272 pub events_by_priority: HashMap<EventPriority, u64>,
274 pub avg_processing_time_ms: f64,
276 pub active_subscriptions: usize,
278 pub queue_size: usize,
280}
281
282#[derive(Debug, Clone)]
284pub struct EventBusConfig {
285 pub worker_count: usize,
287 pub queue_capacity: usize,
289 pub default_timeout: Duration,
291 pub enable_persistence: bool,
293 pub enable_metrics: bool,
295 pub batch_size: usize,
297 pub max_retry_delay: Duration,
299}
300
301fn get_default_worker_count() -> usize {
302 #[cfg(not(target_arch = "wasm32"))]
303 {
304 num_cpus::get()
305 }
306 #[cfg(target_arch = "wasm32")]
307 {
308 1
309 }
310}
311
312impl Default for EventBusConfig {
313 fn default() -> Self {
314 Self {
315 worker_count: get_default_worker_count(),
316 queue_capacity: 10000,
317 default_timeout: Duration::from_secs(30),
318 enable_persistence: false,
319 enable_metrics: true,
320 batch_size: 100,
321 max_retry_delay: Duration::from_secs(60),
322 }
323 }
324}
325
326#[derive(Debug)]
328struct EventEnvelope {
329 event: Arc<dyn Event>,
331 #[allow(dead_code)]
333 received_at: Instant,
334 #[allow(dead_code)]
336 retry_count: u32,
337 #[allow(dead_code)]
339 max_retries: u32,
340}
341
342pub struct EventBusManager {
344 state: ManagedState,
345 config: EventBusConfig,
346 subscriptions: Arc<DashMap<Uuid, EventSubscription>>,
347 event_queue: mpsc::UnboundedSender<EventEnvelope>,
348 stats: Arc<RwLock<EventStats>>,
349 event_counter: Arc<AtomicU64>,
350 worker_handles: Vec<tokio::task::JoinHandle<()>>,
351}
352
353impl Debug for EventBusManager {
354 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355 f.debug_struct("EventBusManager")
356 .field("config", &self.config)
357 .field("subscriptions", &self.subscriptions.len())
358 .finish()
359 }
360}
361
362impl EventBusManager {
363 pub fn new(config: EventBusConfig) -> Self {
365 let (event_sender, _event_receiver) = mpsc::unbounded_channel::<EventEnvelope>();
366
367 Self {
368 state: ManagedState::new(Uuid::new_v4(), "event_bus_manager"),
369 config,
370 subscriptions: Arc::new(DashMap::new()),
371 event_queue: event_sender,
372 stats: Arc::new(RwLock::new(EventStats {
373 total_published: 0,
374 total_processed: 0,
375 total_failed: 0,
376 events_by_type: HashMap::new(),
377 events_by_priority: HashMap::new(),
378 avg_processing_time_ms: 0.0,
379 active_subscriptions: 0,
380 queue_size: 0,
381 })),
382 event_counter: Arc::new(AtomicU64::new(0)),
383 worker_handles: Vec::new(),
384 }
385 }
386
387 pub async fn publish<E: Event + 'static>(&self, event: E) -> Result<()> {
389 let event_arc: Arc<dyn Event> = Arc::new(event);
390
391 self.event_counter.fetch_add(1, Ordering::Relaxed);
393 {
394 let mut stats = self.stats.write().await;
395 stats.total_published += 1;
396 *stats
397 .events_by_type
398 .entry(event_arc.event_type().to_string())
399 .or_insert(0) += 1;
400 *stats
401 .events_by_priority
402 .entry(event_arc.priority())
403 .or_insert(0) += 1;
404 }
405
406 let envelope = EventEnvelope {
408 event: event_arc,
409 received_at: Instant::now(),
410 retry_count: 0,
411 max_retries: 3,
412 };
413
414 self.event_queue.send(envelope).map_err(|_| {
416 Error::new(
417 ErrorKind::Event {
418 event_type: Some("unknown".to_string()),
419 subscriber_id: None,
420 operation: EventOperation::Publish,
421 },
422 "Event queue is closed",
423 )
424 })?;
425
426 Ok(())
427 }
428
429 pub async fn subscribe(
431 &self,
432 filter: EventFilter,
433 ) -> Result<mpsc::UnboundedReceiver<Arc<dyn Event>>> {
434 let (sender, receiver) = mpsc::unbounded_channel::<Arc<dyn Event>>();
435 let subscription_id = Uuid::new_v4();
436
437 let subscription = EventSubscription {
438 id: subscription_id,
439 filter,
440 sender,
441 created_at: Time::now(),
442 active: true,
443 metadata: HashMap::new(),
444 };
445
446 self.subscriptions.insert(subscription_id, subscription);
447
448 {
450 let mut stats = self.stats.write().await;
451 stats.active_subscriptions = self.subscriptions.len();
452 }
453
454 tracing::debug!("Created subscription: {}", subscription_id);
455
456 Ok(receiver)
457 }
458
459 #[cfg(not(target_arch = "wasm32"))]
461 pub async fn subscribe_with_handler<H: EventHandler + 'static + Send>(
462 &self,
463 filter: EventFilter,
464 handler: Arc<H>,
465 ) -> Result<Uuid> {
466 let mut receiver = self.subscribe(filter).await?;
467 let handler_name = handler.name().to_string();
468
469 let handle = tokio::spawn(async move {
471 while let Some(event) = receiver.recv().await {
472 let start_time = Instant::now();
473
474 match handler.handle(event.as_ref()).await {
475 Ok(()) => {
476 let processing_time = start_time.elapsed();
477 tracing::trace!(
478 "Handler '{}' processed event in {:?}",
479 handler_name,
480 processing_time
481 );
482 }
483 Err(e) => {
484 tracing::error!(
485 "Handler '{}' failed to process event: {}",
486 handler_name,
487 e
488 );
489 }
490 }
491 }
492 });
493
494 drop(handle);
496
497 Ok(Uuid::new_v4())
499 }
500
501 #[cfg(target_arch = "wasm32")]
502 pub async fn subscribe_with_handler<H: EventHandler + 'static>(
503 &self,
504 filter: EventFilter,
505 handler: Arc<H>,
506 ) -> Result<Uuid> {
507 let mut receiver = self.subscribe(filter).await?;
508 let handler_name = handler.name().to_string();
509
510 wasm_bindgen_futures::spawn_local(async move {
512 while let Some(event) = receiver.recv().await {
513 let start_time = Instant::now();
514
515 match handler.handle(event.as_ref()).await {
516 Ok(()) => {
517 let processing_time = start_time.elapsed();
518 web_sys::console::log_1(
519 &format!(
520 "Handler '{}' processed event in {:?}",
521 handler_name, processing_time
522 )
523 .into(),
524 );
525 }
526 Err(e) => {
527 web_sys::console::error_1(
528 &format!("Handler '{}' failed to process event: {}", handler_name, e)
529 .into(),
530 );
531 }
532 }
533 }
534 });
535
536 Ok(Uuid::new_v4())
538 }
539
540 pub async fn unsubscribe(&self, subscription_id: Uuid) -> Result<()> {
542 if let Some(mut subscription) = self.subscriptions.get_mut(&subscription_id) {
543 subscription.active = false;
544 }
545
546 self.subscriptions.remove(&subscription_id).ok_or_else(|| {
547 Error::new(
548 ErrorKind::Event {
549 event_type: None,
550 subscriber_id: Some(subscription_id),
551 operation: EventOperation::Subscribe,
552 },
553 "Subscription not found",
554 )
555 })?;
556
557 {
559 let mut stats = self.stats.write().await;
560 stats.active_subscriptions = self.subscriptions.len();
561 }
562
563 tracing::debug!("Removed subscription: {}", subscription_id);
564
565 Ok(())
566 }
567
568 pub async fn get_stats(&self) -> EventStats {
570 self.stats.read().await.clone()
571 }
572
573 async fn start_workers(&mut self) -> Result<()> {
575 let (event_sender, event_receiver) = mpsc::unbounded_channel::<EventEnvelope>();
576 self.event_queue = event_sender;
577
578 let subscriptions = Arc::clone(&self.subscriptions);
579 let stats = Arc::clone(&self.stats);
580
581 let handle = tokio::spawn(Self::worker_task(event_receiver, subscriptions, stats));
583
584 self.worker_handles.push(handle);
585
586 Ok(())
587 }
588
589 async fn worker_task(
591 mut event_receiver: mpsc::UnboundedReceiver<EventEnvelope>,
592 subscriptions: Arc<DashMap<Uuid, EventSubscription>>,
593 stats: Arc<RwLock<EventStats>>,
594 ) {
595 tracing::debug!("Event worker started");
596
597 while let Some(envelope) = event_receiver.recv().await {
598 Self::process_event(envelope, &subscriptions, &stats).await;
599 }
600
601 tracing::debug!("Event worker stopped");
602 }
603
604 async fn process_event(
606 envelope: EventEnvelope,
607 subscriptions: &DashMap<Uuid, EventSubscription>,
608 stats: &RwLock<EventStats>,
609 ) {
610 let start_time = Instant::now();
611 let event = &envelope.event;
612
613 let matching_subscriptions: Vec<(Uuid, Arc<dyn Event>)> = subscriptions
615 .iter()
616 .filter_map(|entry| {
617 let subscription = entry.value();
618 if subscription.active && subscription.filter.matches(event.as_ref()) {
619 Some((subscription.id, Arc::clone(event)))
620 } else {
621 None
622 }
623 })
624 .collect();
625
626 let mut successful_deliveries = 0;
628 let mut failed_deliveries = 0;
629
630 for (subscription_id, _event_clone) in matching_subscriptions {
631 if let Some(subscription) = subscriptions.get(&subscription_id) {
632 match subscription.sender.send(Arc::clone(event)) {
633 Ok(()) => successful_deliveries += 1,
634 Err(_) => {
635 failed_deliveries += 1;
636 tracing::warn!(
637 "Failed to deliver event to subscription {}",
638 subscription_id
639 );
640 }
641 }
642 }
643 }
644
645 let processing_time = start_time.elapsed();
646
647 {
649 let mut stats_guard = stats.write().await;
650 stats_guard.total_processed += 1;
651 if failed_deliveries > 0 {
652 stats_guard.total_failed += 1;
653 }
654
655 let total_processed = stats_guard.total_processed;
657 stats_guard.avg_processing_time_ms = (stats_guard.avg_processing_time_ms
658 * (total_processed - 1) as f64
659 + processing_time.as_millis() as f64)
660 / total_processed as f64;
661 }
662
663 tracing::trace!(
664 "Processed event '{}' in {:?} (delivered to {} subscriptions, {} failed)",
665 event.event_type(),
666 processing_time,
667 successful_deliveries,
668 failed_deliveries
669 );
670 }
671
672 async fn stop_workers(&mut self) {
674 for handle in self.worker_handles.drain(..) {
675 handle.abort();
676 let _ = handle.await;
677 }
678 }
679}
680
681#[cfg(not(target_arch = "wasm32"))]
682#[async_trait]
683impl Manager for EventBusManager {
684 fn name(&self) -> &str {
685 "event_bus_manager"
686 }
687
688 fn id(&self) -> Uuid {
689 Uuid::new_v4() }
691
692 async fn initialize(&mut self) -> Result<()> {
693 self.state
694 .set_state(crate::manager::ManagerState::Initializing)
695 .await;
696
697 self.start_workers().await?;
699
700 self.state
701 .set_state(crate::manager::ManagerState::Running)
702 .await;
703 tracing::info!(
704 "Event bus manager initialized with {} workers",
705 self.config.worker_count
706 );
707 Ok(())
708 }
709
710 async fn shutdown(&mut self) -> Result<()> {
711 self.state
712 .set_state(crate::manager::ManagerState::ShuttingDown)
713 .await;
714
715 self.stop_workers().await;
717
718 self.subscriptions.clear();
720
721 self.state
722 .set_state(crate::manager::ManagerState::Shutdown)
723 .await;
724 tracing::info!("Event bus manager shut down");
725 Ok(())
726 }
727
728 async fn status(&self) -> ManagerStatus {
729 let mut status = self.state.status().await;
730 let stats = self.get_stats().await;
731
732 status.add_metadata(
733 "total_published",
734 serde_json::Value::from(stats.total_published),
735 );
736 status.add_metadata(
737 "total_processed",
738 serde_json::Value::from(stats.total_processed),
739 );
740 status.add_metadata("total_failed", serde_json::Value::from(stats.total_failed));
741 status.add_metadata(
742 "active_subscriptions",
743 serde_json::Value::from(stats.active_subscriptions),
744 );
745 status.add_metadata(
746 "worker_count",
747 serde_json::Value::from(self.config.worker_count),
748 );
749 status.add_metadata(
750 "avg_processing_time_ms",
751 serde_json::Value::from(stats.avg_processing_time_ms),
752 );
753
754 status
755 }
756}
757
758#[cfg(target_arch = "wasm32")]
759#[async_trait(?Send)]
760impl Manager for EventBusManager {
761 fn name(&self) -> &str {
762 "event_bus_manager"
763 }
764
765 fn id(&self) -> Uuid {
766 Uuid::new_v4() }
768
769 async fn initialize(&mut self) -> Result<()> {
770 self.state
771 .set_state(crate::manager::ManagerState::Initializing)
772 .await;
773
774 self.start_workers().await?;
776
777 self.state
778 .set_state(crate::manager::ManagerState::Running)
779 .await;
780 tracing::info!(
781 "Event bus manager initialized with {} workers",
782 self.config.worker_count
783 );
784 Ok(())
785 }
786
787 async fn shutdown(&mut self) -> Result<()> {
788 self.state
789 .set_state(crate::manager::ManagerState::ShuttingDown)
790 .await;
791
792 self.stop_workers().await;
794
795 self.subscriptions.clear();
797
798 self.state
799 .set_state(crate::manager::ManagerState::Shutdown)
800 .await;
801 tracing::info!("Event bus manager shut down");
802 Ok(())
803 }
804
805 async fn status(&self) -> ManagerStatus {
806 let mut status = self.state.status().await;
807 let stats = self.get_stats().await;
808
809 status.add_metadata(
810 "total_published",
811 serde_json::Value::from(stats.total_published),
812 );
813 status.add_metadata(
814 "total_processed",
815 serde_json::Value::from(stats.total_processed),
816 );
817 status.add_metadata("total_failed", serde_json::Value::from(stats.total_failed));
818 status.add_metadata(
819 "active_subscriptions",
820 serde_json::Value::from(stats.active_subscriptions),
821 );
822 status.add_metadata(
823 "worker_count",
824 serde_json::Value::from(self.config.worker_count),
825 );
826 status.add_metadata(
827 "avg_processing_time_ms",
828 serde_json::Value::from(stats.avg_processing_time_ms),
829 );
830
831 status
832 }
833}
834
835#[macro_export]
837macro_rules! define_event {
838 ($name:ident, $event_type:expr, $($field:ident: $type:ty),*) => {
839 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
840 pub struct $name {
841 pub timestamp: chrono::DateTime<chrono::Utc>,
842 pub source: String,
843 pub metadata: std::collections::HashMap<String, serde_json::Value>,
844 $(pub $field: $type,)*
845 }
846
847 impl $crate::event::Event for $name {
848 fn event_type(&self) -> &'static str {
849 $event_type
850 }
851
852 fn source(&self) -> &str {
853 &self.source
854 }
855
856 fn metadata(&self) -> &$crate::types::Metadata {
857 &self.metadata
858 }
859
860 fn as_any(&self) -> &dyn std::any::Any {
861 self
862 }
863
864 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
865 self.timestamp
866 }
867 }
868 };
869}
870
871#[cfg(test)]
872mod tests {
873 use super::*;
874 use std::time::Duration;
875
876 #[derive(Debug, Clone)]
878 struct TestEvent {
879 source: String,
880 metadata: Metadata,
881 data: String,
882 }
883
884 impl Event for TestEvent {
885 fn event_type(&self) -> &'static str {
886 "test.event"
887 }
888
889 fn source(&self) -> &str {
890 &self.source
891 }
892
893 fn metadata(&self) -> &Metadata {
894 &self.metadata
895 }
896
897 fn as_any(&self) -> &dyn Any {
898 self
899 }
900 }
901
902 #[tokio::test]
903 async fn test_event_bus_creation() {
904 let config = EventBusConfig::default();
905 let bus = EventBusManager::new(config);
906 assert_eq!(bus.subscriptions.len(), 0);
907 }
908
909 #[tokio::test]
910 async fn test_event_publishing() {
911 let config = EventBusConfig::default();
912 let mut bus = EventBusManager::new(config);
913
914 bus.initialize().await.unwrap();
915
916 let event = TestEvent {
917 source: "test".to_string(),
918 metadata: HashMap::new(),
919 data: "test data".to_string(),
920 };
921
922 bus.publish(event).await.unwrap();
923
924 let stats = bus.get_stats().await;
925 assert_eq!(stats.total_published, 1);
926
927 bus.shutdown().await.unwrap();
928 }
929
930 #[tokio::test]
931 async fn test_event_subscription() {
932 let config = EventBusConfig::default();
933 let mut bus = EventBusManager::new(config);
934
935 bus.initialize().await.unwrap();
936
937 let filter = EventFilter::new().with_event_type("test.event");
938 let mut receiver = bus.subscribe(filter).await.unwrap();
939
940 let event = TestEvent {
941 source: "test".to_string(),
942 metadata: HashMap::new(),
943 data: "test data".to_string(),
944 };
945
946 bus.publish(event).await.unwrap();
947
948 tokio::time::sleep(Duration::from_millis(100)).await;
950
951 if let Ok(received_event) =
953 tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await
954 {
955 assert!(received_event.is_some());
956 let event = received_event.unwrap();
957 assert_eq!(event.event_type(), "test.event");
958 }
959
960 bus.shutdown().await.unwrap();
961 }
962
963 #[test]
964 fn test_event_filter() {
965 let filter = EventFilter::new()
966 .with_event_type("test.event")
967 .with_source_pattern("test")
968 .with_min_priority(EventPriority::Normal);
969
970 let event = TestEvent {
971 source: "test_source".to_string(),
972 metadata: HashMap::new(),
973 data: "test data".to_string(),
974 };
975
976 assert!(filter.matches(&event));
977
978 let filter_no_match = EventFilter::new().with_event_type("other.event");
979
980 assert!(!filter_no_match.matches(&event));
981 }
982
983 #[test]
984 fn test_event_priority() {
985 assert!(EventPriority::Critical > EventPriority::High);
986 assert!(EventPriority::High > EventPriority::Normal);
987 assert!(EventPriority::Normal > EventPriority::Low);
988 }
989}