ironpost_core/
pipeline.rs

1//! 파이프라인 trait — 모듈 생명주기 및 확장 포인트 정의
2//!
3//! [`Pipeline`] trait은 모든 모듈이 구현하는 생명주기 인터페이스입니다.
4//! [`Detector`], [`LogParser`], [`PolicyEnforcer`] trait은 플러그인 확장 포인트입니다.
5
6use std::fmt;
7use std::future::Future;
8use std::pin::Pin;
9
10use serde::{Deserialize, Serialize};
11
12use crate::error::IronpostError;
13
14/// dyn-compatible Future 타입 별칭
15pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
16use crate::types::{Alert, LogEntry};
17
18/// 모든 파이프라인 모듈이 구현하는 생명주기 trait
19///
20/// `ironpost-daemon`에서 각 모듈을 시작/정지하고 상태를 확인하는 데 사용됩니다.
21///
22/// # 구현 예시
23/// ```ignore
24/// struct EbpfPipeline { /* ... */ }
25///
26/// impl Pipeline for EbpfPipeline {
27///     async fn start(&mut self) -> Result<(), IronpostError> {
28///         // XDP 프로그램 로드, 링 버퍼 설정 등
29///         Ok(())
30///     }
31///
32///     async fn stop(&mut self) -> Result<(), IronpostError> {
33///         // 리소스 정리, 프로그램 언로드
34///         Ok(())
35///     }
36///
37///     async fn health_check(&self) -> HealthStatus {
38///         HealthStatus::Healthy
39///     }
40/// }
41/// ```
42pub trait Pipeline: Send + Sync {
43    /// 모듈을 시작합니다.
44    ///
45    /// 리소스 초기화, 워커 스폰, 채널 연결 등을 수행합니다.
46    ///
47    /// # Errors
48    ///
49    /// 이미 실행 중인 경우 `PipelineError::AlreadyRunning`을 반환합니다.
50    fn start(&mut self) -> impl std::future::Future<Output = Result<(), IronpostError>> + Send;
51
52    /// 모듈을 정지합니다.
53    ///
54    /// Graceful shutdown을 수행합니다.
55    /// 진행 중인 작업을 완료하고 리소스를 정리합니다.
56    ///
57    /// # Errors
58    ///
59    /// 정지 과정에서 문제가 발생하면 에러를 반환합니다.
60    fn stop(&mut self) -> impl std::future::Future<Output = Result<(), IronpostError>> + Send;
61
62    /// 모듈의 현재 상태를 확인합니다.
63    ///
64    /// 주기적으로 호출되어 모듈의 건강 상태를 모니터링합니다.
65    fn health_check(&self) -> impl std::future::Future<Output = HealthStatus> + Send;
66}
67
68/// dyn-compatible 파이프라인 trait
69///
70/// `Pipeline` trait은 RPITIT를 사용하므로 `dyn Pipeline`이 불가합니다.
71/// `DynPipeline`은 `BoxFuture`를 반환하여 `Vec<Box<dyn DynPipeline>>`으로
72/// 모듈을 동적 관리할 수 있게 합니다.
73///
74/// # 구현 예시
75/// ```ignore
76/// // Pipeline을 구현한 타입은 blanket impl으로 자동으로 DynPipeline도 구현됩니다.
77/// let modules: Vec<Box<dyn DynPipeline>> = vec![
78///     Box::new(ebpf_pipeline),
79///     Box::new(log_pipeline),
80/// ];
81/// ```
82pub trait DynPipeline: Send + Sync {
83    /// 모듈을 시작합니다.
84    fn start(&mut self) -> BoxFuture<'_, Result<(), IronpostError>>;
85
86    /// 모듈을 정지합니다.
87    fn stop(&mut self) -> BoxFuture<'_, Result<(), IronpostError>>;
88
89    /// 모듈의 현재 상태를 확인합니다.
90    fn health_check(&self) -> BoxFuture<'_, HealthStatus>;
91}
92
93impl<T: Pipeline> DynPipeline for T {
94    fn start(&mut self) -> BoxFuture<'_, Result<(), IronpostError>> {
95        Box::pin(Pipeline::start(self))
96    }
97
98    fn stop(&mut self) -> BoxFuture<'_, Result<(), IronpostError>> {
99        Box::pin(Pipeline::stop(self))
100    }
101
102    fn health_check(&self) -> BoxFuture<'_, HealthStatus> {
103        Box::pin(Pipeline::health_check(self))
104    }
105}
106
107/// 모듈 헬스 상태
108///
109/// 각 모듈의 현재 운영 상태를 나타냅니다.
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub enum HealthStatus {
112    /// 정상 동작 중
113    Healthy,
114    /// 성능 저하 또는 부분적 장애 (서비스는 계속 동작)
115    Degraded(String),
116    /// 비정상 — 서비스 불가 상태
117    Unhealthy(String),
118}
119
120impl HealthStatus {
121    /// 정상 상태인지 확인합니다.
122    pub fn is_healthy(&self) -> bool {
123        matches!(self, Self::Healthy)
124    }
125
126    /// 비정상 상태인지 확인합니다.
127    pub fn is_unhealthy(&self) -> bool {
128        matches!(self, Self::Unhealthy(_))
129    }
130}
131
132impl fmt::Display for HealthStatus {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        match self {
135            Self::Healthy => write!(f, "healthy"),
136            Self::Degraded(reason) => write!(f, "degraded: {reason}"),
137            Self::Unhealthy(reason) => write!(f, "unhealthy: {reason}"),
138        }
139    }
140}
141
142/// 탐지 로직을 구현하는 trait
143///
144/// 새로운 탐지 규칙을 추가하려면 이 trait을 구현합니다.
145/// `ironpost-daemon`에서 빌더 패턴으로 등록하여 사용합니다.
146///
147/// # 구현 예시
148/// ```ignore
149/// struct BruteForceDetector;
150///
151/// impl Detector for BruteForceDetector {
152///     fn name(&self) -> &str { "brute_force" }
153///
154///     fn detect(&self, entry: &LogEntry) -> Result<Option<Alert>, IronpostError> {
155///         if entry.message.contains("Failed password") {
156///             Ok(Some(Alert { /* ... */ }))
157///         } else {
158///             Ok(None)
159///         }
160///     }
161/// }
162/// ```
163pub trait Detector: Send + Sync {
164    /// 탐지기 이름
165    fn name(&self) -> &str;
166
167    /// 로그 엔트리를 분석하여 알림 생성 여부를 결정합니다.
168    ///
169    /// 탐지 규칙에 매칭되면 `Ok(Some(Alert))`을,
170    /// 매칭되지 않으면 `Ok(None)`을 반환합니다.
171    fn detect(&self, entry: &LogEntry) -> Result<Option<Alert>, IronpostError>;
172}
173
174/// 로그 파서 trait
175///
176/// 새로운 로그 형식을 지원하려면 이 trait을 구현합니다.
177/// Syslog, CEF, JSON 등 다양한 형식의 파서를 추가할 수 있습니다.
178///
179/// # 구현 예시
180/// ```ignore
181/// struct SyslogParser;
182///
183/// impl LogParser for SyslogParser {
184///     fn format_name(&self) -> &str { "syslog" }
185///
186///     fn parse(&self, raw: &[u8]) -> Result<LogEntry, IronpostError> {
187///         // syslog 형식 파싱 로직
188///         todo!()
189///     }
190/// }
191/// ```
192pub trait LogParser: Send + Sync {
193    /// 지원하는 로그 형식 이름
194    fn format_name(&self) -> &str;
195
196    /// 원시 바이트를 로그 엔트리로 파싱합니다.
197    fn parse(&self, raw: &[u8]) -> Result<LogEntry, IronpostError>;
198}
199
200/// 격리 정책을 구현하는 trait
201///
202/// 보안 알림에 대한 자동 대응 정책을 정의합니다.
203/// 알림의 심각도와 유형에 따라 격리 여부를 결정합니다.
204pub trait PolicyEnforcer: Send + Sync {
205    /// 정책 이름
206    fn name(&self) -> &str;
207
208    /// 알림에 대해 격리 실행 여부를 결정하고, 필요 시 격리를 수행합니다.
209    ///
210    /// 격리를 수행했으면 `Ok(true)`, 수행하지 않았으면 `Ok(false)`를 반환합니다.
211    fn enforce(&self, alert: &Alert) -> Result<bool, IronpostError>;
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217
218    #[test]
219    fn health_status_healthy() {
220        let status = HealthStatus::Healthy;
221        assert!(status.is_healthy());
222        assert!(!status.is_unhealthy());
223        assert_eq!(status.to_string(), "healthy");
224    }
225
226    #[test]
227    fn health_status_degraded() {
228        let status = HealthStatus::Degraded("high latency".to_owned());
229        assert!(!status.is_healthy());
230        assert!(!status.is_unhealthy());
231        assert!(status.to_string().contains("high latency"));
232    }
233
234    #[test]
235    fn health_status_unhealthy() {
236        let status = HealthStatus::Unhealthy("connection lost".to_owned());
237        assert!(!status.is_healthy());
238        assert!(status.is_unhealthy());
239        assert!(status.to_string().contains("connection lost"));
240    }
241
242    #[test]
243    fn health_status_equality() {
244        assert_eq!(HealthStatus::Healthy, HealthStatus::Healthy);
245        assert_ne!(
246            HealthStatus::Healthy,
247            HealthStatus::Degraded("reason".to_owned())
248        );
249    }
250
251    #[test]
252    fn health_status_serialize_deserialize() {
253        let status = HealthStatus::Degraded("slow".to_owned());
254        let json = serde_json::to_string(&status).unwrap();
255        let deserialized: HealthStatus = serde_json::from_str(&json).unwrap();
256        assert_eq!(status, deserialized);
257    }
258
259    // Pipeline trait의 구현 테스트를 위한 mock
260    struct MockPipeline {
261        running: bool,
262    }
263
264    impl MockPipeline {
265        fn new() -> Self {
266            Self { running: false }
267        }
268    }
269
270    impl Pipeline for MockPipeline {
271        async fn start(&mut self) -> Result<(), IronpostError> {
272            if self.running {
273                return Err(crate::error::PipelineError::AlreadyRunning.into());
274            }
275            self.running = true;
276            Ok(())
277        }
278
279        async fn stop(&mut self) -> Result<(), IronpostError> {
280            if !self.running {
281                return Err(crate::error::PipelineError::NotRunning.into());
282            }
283            self.running = false;
284            Ok(())
285        }
286
287        async fn health_check(&self) -> HealthStatus {
288            if self.running {
289                HealthStatus::Healthy
290            } else {
291                HealthStatus::Unhealthy("not running".to_owned())
292            }
293        }
294    }
295
296    #[tokio::test]
297    async fn mock_pipeline_lifecycle() {
298        let mut pipeline = MockPipeline::new();
299
300        // 시작 전 상태 확인
301        assert!(Pipeline::health_check(&pipeline).await.is_unhealthy());
302
303        // 시작
304        Pipeline::start(&mut pipeline).await.unwrap();
305        assert!(Pipeline::health_check(&pipeline).await.is_healthy());
306
307        // 중복 시작 시 에러
308        let err = Pipeline::start(&mut pipeline).await;
309        assert!(err.is_err());
310
311        // 정지
312        Pipeline::stop(&mut pipeline).await.unwrap();
313        assert!(Pipeline::health_check(&pipeline).await.is_unhealthy());
314
315        // 중복 정지 시 에러
316        let err = Pipeline::stop(&mut pipeline).await;
317        assert!(err.is_err());
318    }
319
320    #[tokio::test]
321    async fn dyn_pipeline_can_be_boxed() {
322        let mut pipeline: Box<dyn DynPipeline> = Box::new(MockPipeline::new());
323
324        assert!(pipeline.health_check().await.is_unhealthy());
325        pipeline.start().await.unwrap();
326        assert!(pipeline.health_check().await.is_healthy());
327        pipeline.stop().await.unwrap();
328        assert!(pipeline.health_check().await.is_unhealthy());
329    }
330
331    // Detector trait mock 테스트
332    struct AlwaysAlertDetector;
333
334    impl Detector for AlwaysAlertDetector {
335        fn name(&self) -> &str {
336            "always-alert"
337        }
338
339        fn detect(&self, entry: &LogEntry) -> Result<Option<Alert>, IronpostError> {
340            Ok(Some(Alert {
341                id: "test-alert".to_owned(),
342                title: format!("Alert for: {}", entry.message),
343                description: String::new(),
344                severity: crate::types::Severity::Medium,
345                rule_name: self.name().to_owned(),
346                source_ip: None,
347                target_ip: None,
348                created_at: std::time::SystemTime::now(),
349            }))
350        }
351    }
352
353    #[test]
354    fn detector_produces_alert() {
355        let detector = AlwaysAlertDetector;
356        assert_eq!(detector.name(), "always-alert");
357
358        let entry = LogEntry {
359            source: "test".to_owned(),
360            timestamp: std::time::SystemTime::now(),
361            hostname: "localhost".to_owned(),
362            process: "test".to_owned(),
363            message: "suspicious activity".to_owned(),
364            severity: crate::types::Severity::Info,
365            fields: vec![],
366        };
367
368        let result = detector.detect(&entry).unwrap();
369        assert!(result.is_some());
370        let alert = result.unwrap();
371        assert!(alert.title.contains("suspicious activity"));
372    }
373
374    // LogParser trait mock 테스트
375    struct SimpleParser;
376
377    impl LogParser for SimpleParser {
378        fn format_name(&self) -> &str {
379            "simple"
380        }
381
382        fn parse(&self, raw: &[u8]) -> Result<LogEntry, IronpostError> {
383            let message = String::from_utf8_lossy(raw).into_owned();
384            Ok(LogEntry {
385                source: "simple-parser".to_owned(),
386                timestamp: std::time::SystemTime::now(),
387                hostname: "unknown".to_owned(),
388                process: "unknown".to_owned(),
389                message,
390                severity: crate::types::Severity::Info,
391                fields: vec![],
392            })
393        }
394    }
395
396    #[test]
397    fn log_parser_parses_raw_bytes() {
398        let parser = SimpleParser;
399        assert_eq!(parser.format_name(), "simple");
400
401        let entry = parser.parse(b"hello world").unwrap();
402        assert_eq!(entry.message, "hello world");
403    }
404}