-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathS3SinkUtil.java
More file actions
47 lines (43 loc) · 2.11 KB
/
S3SinkUtil.java
File metadata and controls
47 lines (43 loc) · 2.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.monitory.data.utils;
import com.monitory.data.sinks.BucketJson;
import com.monitory.data.sinks.S3BucketAssigner;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
public class S3SinkUtil {
private static final BucketAssigner<BucketJson, String> s3BucketAssigner = new S3BucketAssigner();
public static FileSink<BucketJson> createS3Sink(String s3Bucket) {
OutputFileConfig outputFileConfig = OutputFileConfig.builder()
.withPartPrefix("equip")
.withPartSuffix(".json")
.build();
return FileSink
.forRowFormat(
new Path("s3a://" + s3Bucket + "/EQUIPMENT"),
new SimpleStringEncoder<BucketJson>("UTF-8"){
@Override
public void encode(BucketJson record, OutputStream stream) throws IOException {
stream.write((record.getJson() + "\n").getBytes(StandardCharsets.UTF_8));
}
}
)
.withBucketAssigner(s3BucketAssigner)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofHours(1))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(512))
.build()
)
.withOutputFileConfig(outputFileConfig)
.build();
}
}