generated from Fac2Real/.github
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathFlinkSourceApplication.java
More file actions
120 lines (101 loc) · 5.72 KB
/
FlinkSourceApplication.java
File metadata and controls
120 lines (101 loc) · 5.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package com.monitory.data;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.monitory.data.sinks.BucketJson;
import com.monitory.data.sinks.S3WindowFunction;
import com.monitory.data.transformations.EnvironmentDangerLevelAssigner;
import com.monitory.data.transformations.FaultyAssigner;
import com.monitory.data.transformations.WearableDangerLevelAssigner;
import com.monitory.data.utils.KinesisSourceUtil;
import com.monitory.data.utils.S3SinkUtil;
import org.apache.flink.connector.file.sink.FileSink;
import com.monitory.data.transformations.TimeStampAssigner;
import com.monitory.data.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import java.time.Duration;
public class FlinkSourceApplication {
public static void main (String [] args) throws Exception {
// 1. Flink 환경 설정
Configuration conf = new Configuration();
conf.setString("metrics.reporters", "prom");
conf.setString("metrics.reporter.prom.factory.class", "org.apache.flink.metrics.prometheus.PrometheusReporterFactory");
conf.setString("metrics.reporter.prom.port", "9249");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ObjectMapper staticMapper = new ObjectMapper();
// 1-1. 체크포인트 설정
env.enableCheckpointing(60000); // 60초(1분)마다 체크포인트 생성
env.getCheckpointConfig().setCheckpointTimeout(30_000); // 체크포인트 타임아웃 30초
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 체크포인트 간 최소 간격 500ms
env.setParallelism(4);
// 2. 데이터 소스 설정
FlinkKinesisConsumer<String> kinesisConsumer = KinesisSourceUtil.createKinesisSource();
DataStream<String> sourceStream = env.addSource(kinesisConsumer, "Kinesis-Streams-Source")
.assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> {
try {
JsonNode node = staticMapper.readTree(element);
return node.get("utc_ingestion_time").asLong();
} catch (Exception e) {
return recordTimestamp;
}
})
.withIdleness(Duration.ofMinutes(1))
)
.returns(TypeInformation.of(String.class));
// DataStream<String> sourceStream = env.fromSource(new MqttSource(), WatermarkStrategy.noWatermarks(), "MQTT-Source");
// 3-1. 데이터 처리: Time Stamp 출력
DataStream<String> timeTransformedStream = sourceStream
.map(new TimeStampAssigner());
// 3-2. 데이터 처리: 이상치 검색
DataStream<String> labelTransformedStream = timeTransformedStream
.map(new EnvironmentDangerLevelAssigner());
labelTransformedStream = labelTransformedStream
.map(new FaultyAssigner());
labelTransformedStream = labelTransformedStream
.map(new WearableDangerLevelAssigner());
// 4-1. 데이터 싱크: 콘솔에 출력 & kafka publish
labelTransformedStream.sinkTo(KafkaUtil.createKafkaSink());
labelTransformedStream.print();
// 4-2. S3로 보낼 데이터만 filter
ObjectMapper mapper = new ObjectMapper();
DataStream<String> equipmentStream = labelTransformedStream
.filter(json -> {
try {
JsonNode node = mapper.readTree(json);
return node.has("category") && "EQUIPMENT".equalsIgnoreCase(node.get("category").asText());
} catch (Exception e) {
return false;
}
});
// 4-3. zoneId, equipId별로 1시간 단위 집계 by window 함수 (데이터 집계), keyBy (커스텀 파티셔닝)
DataStream<String> aggregatedStream = equipmentStream
.keyBy(json -> {
JsonNode node = mapper.readTree(json);
String zoneId = node.get("zoneId").asText("unknown");
String equipId = node.get("equipId").asText("unknown");
return zoneId + "|" + equipId;
})
.window(TumblingProcessingTimeWindows.of(Time.hours(1)))
.apply(new S3WindowFunction());
// 4-4. S3 Sink 설정 (S3SinkUtil로 분리)
FileSink<BucketJson> s3Sink = S3SinkUtil.createS3Sink("monitory-bucket");
// 4-5. S3에 저장 (경로 제외하고 데이터만 저장)
DataStream<BucketJson> bucketJsonStream = aggregatedStream
.map(element -> {
String[] parts = element.split("\\|", 2);
return new BucketJson(parts[0], parts[1]);
});
// 4-6.
bucketJsonStream.sinkTo(s3Sink);
// 5. 실행
env.execute("Flink to Kafka Produce");
}
}