File tree Expand file tree Collapse file tree
INF_TEST-ssla2n7kxevn6fh72-atsiotap-northeast-2amazonawscom8883
src/main/java/com/monitory/data Expand file tree Collapse file tree Original file line number Diff line number Diff line change 22
33import com .monitory .data .sources .MqttSource ;
44import com .monitory .data .transformations .TimeStampAssigner ;
5+ import com .monitory .data .utils .KafkaUtil ;
56import org .apache .flink .api .common .eventtime .WatermarkStrategy ;
67import org .apache .flink .configuration .Configuration ;
78import org .apache .flink .streaming .api .datastream .DataStream ;
89import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
910
10- public class FlinkApplication {
11+ public class FlinkSourceApplication {
1112 public static void main (String [] args ) throws Exception {
1213 // 1. Flink 환경 설정
1314 Configuration conf = new Configuration ();
@@ -16,14 +17,15 @@ public static void main (String [] args) throws Exception {
1617 // 2. 데이터 소스
1718 DataStream <String > sourceStream = env .fromSource (new MqttSource (), WatermarkStrategy .noWatermarks (), "MQTT-Source" );
1819
19- // 3. 데이터 처리: Time Stamp 출력과 Anomaly 감지
20+ // 3. 데이터 처리: Time Stamp 출력
2021 DataStream <String > transformedStream = sourceStream
2122 .map (new TimeStampAssigner ());
2223
23- // 4. 데이터 싱크: 콘솔에 출력
24+ // 4. 데이터 싱크: 콘솔에 출력 & kafka publish
25+ transformedStream .sinkTo (KafkaUtil .createKafkaSink ());
2426 transformedStream .print ();
2527
2628 // 5. 실행
27- env .execute ("Flink DataStream Example " );
29+ env .execute ("Flink to Kafka Produce " );
2830 }
2931}
Original file line number Diff line number Diff line change 1+ package com .monitory .data .config ;
2+
3+ import java .io .InputStream ;
4+ import java .util .Properties ;
5+
6+ public class KafkaConfig {
7+ private static final Properties properties = new Properties ();
8+
9+ static {
10+ try (InputStream input = KafkaConfig .class .getClassLoader ()
11+ .getResourceAsStream ("application.properties" )) {
12+ if (input == null ) {
13+ throw new RuntimeException ("❌ application.properties 파일을 찾을 수 없습니다." );
14+ }
15+ properties .load (input );
16+ } catch (Exception e ) {
17+ throw new RuntimeException ("❌ properties 파일 로딩 실패" , e );
18+ }
19+ }
20+
21+ public static String get (String key ) {
22+ return properties .getProperty (key );
23+ }
24+ }
Original file line number Diff line number Diff line change 1+ package com .monitory .data .utils ;
2+
3+ import com .fasterxml .jackson .databind .JsonNode ;
4+ import com .fasterxml .jackson .databind .ObjectMapper ;
5+ import com .monitory .data .config .KafkaConfig ;
6+ import org .apache .flink .api .common .serialization .SimpleStringSchema ;
7+ import org .apache .flink .connector .kafka .sink .KafkaRecordSerializationSchema ;
8+ import org .apache .flink .connector .kafka .sink .KafkaSink ;
9+
10+ public class KafkaUtil {
11+ private static final ObjectMapper mapper = new ObjectMapper ();
12+
13+ public static KafkaSink <String > createKafkaSink () {
14+ return KafkaSink .<String >builder ()
15+ .setBootstrapServers (KafkaConfig .get ("KAFKA_SERVER" ))
16+ .setRecordSerializer (
17+ KafkaRecordSerializationSchema .<String >builder ()
18+ .setValueSerializationSchema (new SimpleStringSchema ())
19+ .setTopicSelector ((element ) -> {
20+ try {
21+ JsonNode json = mapper .readTree (element );
22+
23+ String zoneId = json .path ("zoneId" ).asText (null );
24+ String equipId = json .path ("equipId" ).asText (null );
25+
26+ if (zoneId != null && equipId != null ) {
27+ return "EQUIPMENT" ;
28+ } else if (zoneId != null && equipId == null ) {
29+ return "ENVIRONMENT" ;
30+ } else {
31+ return "sensor.unknown_topic" ;
32+ }
33+
34+ } catch (Exception e ) {
35+ return "sensor.error_topic" ;
36+ }
37+ })
38+ .build ()
39+ )
40+ .build ();
41+ }
42+ }
You can’t perform that action at this time.
0 commit comments