ironpost_core/
pipeline.rs1use std::fmt;
7use std::future::Future;
8use std::pin::Pin;
9
10use serde::{Deserialize, Serialize};
11
12use crate::error::IronpostError;
13
14pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
16use crate::types::{Alert, LogEntry};
17
18pub trait Pipeline: Send + Sync {
43 fn start(&mut self) -> impl std::future::Future<Output = Result<(), IronpostError>> + Send;
51
52 fn stop(&mut self) -> impl std::future::Future<Output = Result<(), IronpostError>> + Send;
61
62 fn health_check(&self) -> impl std::future::Future<Output = HealthStatus> + Send;
66}
67
68pub trait DynPipeline: Send + Sync {
83 fn start(&mut self) -> BoxFuture<'_, Result<(), IronpostError>>;
85
86 fn stop(&mut self) -> BoxFuture<'_, Result<(), IronpostError>>;
88
89 fn health_check(&self) -> BoxFuture<'_, HealthStatus>;
91}
92
93impl<T: Pipeline> DynPipeline for T {
94 fn start(&mut self) -> BoxFuture<'_, Result<(), IronpostError>> {
95 Box::pin(Pipeline::start(self))
96 }
97
98 fn stop(&mut self) -> BoxFuture<'_, Result<(), IronpostError>> {
99 Box::pin(Pipeline::stop(self))
100 }
101
102 fn health_check(&self) -> BoxFuture<'_, HealthStatus> {
103 Box::pin(Pipeline::health_check(self))
104 }
105}
106
107#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub enum HealthStatus {
112 Healthy,
114 Degraded(String),
116 Unhealthy(String),
118}
119
120impl HealthStatus {
121 pub fn is_healthy(&self) -> bool {
123 matches!(self, Self::Healthy)
124 }
125
126 pub fn is_unhealthy(&self) -> bool {
128 matches!(self, Self::Unhealthy(_))
129 }
130}
131
132impl fmt::Display for HealthStatus {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 match self {
135 Self::Healthy => write!(f, "healthy"),
136 Self::Degraded(reason) => write!(f, "degraded: {reason}"),
137 Self::Unhealthy(reason) => write!(f, "unhealthy: {reason}"),
138 }
139 }
140}
141
142pub trait Detector: Send + Sync {
164 fn name(&self) -> &str;
166
167 fn detect(&self, entry: &LogEntry) -> Result<Option<Alert>, IronpostError>;
172}
173
174pub trait LogParser: Send + Sync {
193 fn format_name(&self) -> &str;
195
196 fn parse(&self, raw: &[u8]) -> Result<LogEntry, IronpostError>;
198}
199
200pub trait PolicyEnforcer: Send + Sync {
205 fn name(&self) -> &str;
207
208 fn enforce(&self, alert: &Alert) -> Result<bool, IronpostError>;
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217
218 #[test]
219 fn health_status_healthy() {
220 let status = HealthStatus::Healthy;
221 assert!(status.is_healthy());
222 assert!(!status.is_unhealthy());
223 assert_eq!(status.to_string(), "healthy");
224 }
225
226 #[test]
227 fn health_status_degraded() {
228 let status = HealthStatus::Degraded("high latency".to_owned());
229 assert!(!status.is_healthy());
230 assert!(!status.is_unhealthy());
231 assert!(status.to_string().contains("high latency"));
232 }
233
234 #[test]
235 fn health_status_unhealthy() {
236 let status = HealthStatus::Unhealthy("connection lost".to_owned());
237 assert!(!status.is_healthy());
238 assert!(status.is_unhealthy());
239 assert!(status.to_string().contains("connection lost"));
240 }
241
242 #[test]
243 fn health_status_equality() {
244 assert_eq!(HealthStatus::Healthy, HealthStatus::Healthy);
245 assert_ne!(
246 HealthStatus::Healthy,
247 HealthStatus::Degraded("reason".to_owned())
248 );
249 }
250
251 #[test]
252 fn health_status_serialize_deserialize() {
253 let status = HealthStatus::Degraded("slow".to_owned());
254 let json = serde_json::to_string(&status).unwrap();
255 let deserialized: HealthStatus = serde_json::from_str(&json).unwrap();
256 assert_eq!(status, deserialized);
257 }
258
259 struct MockPipeline {
261 running: bool,
262 }
263
264 impl MockPipeline {
265 fn new() -> Self {
266 Self { running: false }
267 }
268 }
269
270 impl Pipeline for MockPipeline {
271 async fn start(&mut self) -> Result<(), IronpostError> {
272 if self.running {
273 return Err(crate::error::PipelineError::AlreadyRunning.into());
274 }
275 self.running = true;
276 Ok(())
277 }
278
279 async fn stop(&mut self) -> Result<(), IronpostError> {
280 if !self.running {
281 return Err(crate::error::PipelineError::NotRunning.into());
282 }
283 self.running = false;
284 Ok(())
285 }
286
287 async fn health_check(&self) -> HealthStatus {
288 if self.running {
289 HealthStatus::Healthy
290 } else {
291 HealthStatus::Unhealthy("not running".to_owned())
292 }
293 }
294 }
295
296 #[tokio::test]
297 async fn mock_pipeline_lifecycle() {
298 let mut pipeline = MockPipeline::new();
299
300 assert!(Pipeline::health_check(&pipeline).await.is_unhealthy());
302
303 Pipeline::start(&mut pipeline).await.unwrap();
305 assert!(Pipeline::health_check(&pipeline).await.is_healthy());
306
307 let err = Pipeline::start(&mut pipeline).await;
309 assert!(err.is_err());
310
311 Pipeline::stop(&mut pipeline).await.unwrap();
313 assert!(Pipeline::health_check(&pipeline).await.is_unhealthy());
314
315 let err = Pipeline::stop(&mut pipeline).await;
317 assert!(err.is_err());
318 }
319
320 #[tokio::test]
321 async fn dyn_pipeline_can_be_boxed() {
322 let mut pipeline: Box<dyn DynPipeline> = Box::new(MockPipeline::new());
323
324 assert!(pipeline.health_check().await.is_unhealthy());
325 pipeline.start().await.unwrap();
326 assert!(pipeline.health_check().await.is_healthy());
327 pipeline.stop().await.unwrap();
328 assert!(pipeline.health_check().await.is_unhealthy());
329 }
330
331 struct AlwaysAlertDetector;
333
334 impl Detector for AlwaysAlertDetector {
335 fn name(&self) -> &str {
336 "always-alert"
337 }
338
339 fn detect(&self, entry: &LogEntry) -> Result<Option<Alert>, IronpostError> {
340 Ok(Some(Alert {
341 id: "test-alert".to_owned(),
342 title: format!("Alert for: {}", entry.message),
343 description: String::new(),
344 severity: crate::types::Severity::Medium,
345 rule_name: self.name().to_owned(),
346 source_ip: None,
347 target_ip: None,
348 created_at: std::time::SystemTime::now(),
349 }))
350 }
351 }
352
353 #[test]
354 fn detector_produces_alert() {
355 let detector = AlwaysAlertDetector;
356 assert_eq!(detector.name(), "always-alert");
357
358 let entry = LogEntry {
359 source: "test".to_owned(),
360 timestamp: std::time::SystemTime::now(),
361 hostname: "localhost".to_owned(),
362 process: "test".to_owned(),
363 message: "suspicious activity".to_owned(),
364 severity: crate::types::Severity::Info,
365 fields: vec![],
366 };
367
368 let result = detector.detect(&entry).unwrap();
369 assert!(result.is_some());
370 let alert = result.unwrap();
371 assert!(alert.title.contains("suspicious activity"));
372 }
373
374 struct SimpleParser;
376
377 impl LogParser for SimpleParser {
378 fn format_name(&self) -> &str {
379 "simple"
380 }
381
382 fn parse(&self, raw: &[u8]) -> Result<LogEntry, IronpostError> {
383 let message = String::from_utf8_lossy(raw).into_owned();
384 Ok(LogEntry {
385 source: "simple-parser".to_owned(),
386 timestamp: std::time::SystemTime::now(),
387 hostname: "unknown".to_owned(),
388 process: "unknown".to_owned(),
389 message,
390 severity: crate::types::Severity::Info,
391 fields: vec![],
392 })
393 }
394 }
395
396 #[test]
397 fn log_parser_parses_raw_bytes() {
398 let parser = SimpleParser;
399 assert_eq!(parser.format_name(), "simple");
400
401 let entry = parser.parse(b"hello world").unwrap();
402 assert_eq!(entry.message, "hello world");
403 }
404}