diff --git a/konduit-serving-pipeline/pom.xml b/konduit-serving-pipeline/pom.xml index 6f99957c7..dc0c7dd52 100644 --- a/konduit-serving-pipeline/pom.xml +++ b/konduit-serving-pipeline/pom.xml @@ -16,13 +16,17 @@ 1.8 1.8 - + + + oss.sonatype.org + https://oss.sonatype.org/content/repositories/snapshots/ + + @@ -36,6 +40,22 @@ konduit-serving-annotation ${project.version} + + com.jcabi + jcabi-aspects + 0.22.6 + + + javax.validation + validation-api + + + + + org.apache.commons + commons-csv + 1.4 + + + + + + ai.konduit.serving + konduit-serving-annotation + ${project.version} + + + org.apache.hadoop + hadoop-core + 2.4.1-mapr-1408 + + + org.apache.hadoop + hadoop-hdfs + 2.4.1-mapr-1408 + + + + + org.nd4j + jackson + ${dl4j.version} + + + + org.projectlombok + lombok + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + junit + junit + test + + + + org.apache.hadoop + hadoop-minicluster + test + 3.2.1 + + + + ch.qos.logback + logback-classic + ${logback.version} + test + + + + ai.konduit.serving + konduit-serving-pipeline + ${project.version} + + + + ai.konduit.serving + konduit-serving-common-tests + test + ${project.version} + + + + + + + maven-resources-plugin + 2.6 + + + copy-resources + process-resources + + copy-resources + + + ${basedir}/target/resources + + + resources + + + + + + + + + + \ No newline at end of file diff --git a/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/main/java/handlers/HdfsHandler.java b/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/main/java/handlers/HdfsHandler.java new file mode 100644 index 000000000..8fc6be441 --- /dev/null +++ b/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/main/java/handlers/HdfsHandler.java @@ -0,0 +1,55 @@ +/* + * ****************************************************************************** + * * Copyright (c) 2020 Konduit K.K. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Apache License, Version 2.0 which is available at + * * https://www.apache.org/licenses/LICENSE-2.0. + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * * License for the specific language governing permissions and limitations + * * under the License. + * * + * * SPDX-License-Identifier: Apache-2.0 + * ***************************************************************************** + */ + +package handlers; + +import ai.konduit.serving.pipeline.api.protocol.Credentials; +import lombok.extern.slf4j.Slf4j; +import providers.HdfsAccessProvider; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; + +@Slf4j +public class HdfsHandler extends URLStreamHandler { + + protected URLConnection openConnection(URL url) throws IOException { + + return new URLConnection(url) { + + @Override + public InputStream getInputStream() throws IOException { + HdfsAccessProvider accessProvider = new HdfsAccessProvider(); + try { + return accessProvider.connect(url, accessProvider.getCredentials()); + } catch (URISyntaxException e) { + log.error("Failed connection to " + url, e); + throw new IOException(e); + } + } + + @Override + public void connect() throws IOException { } + + }; + } +} \ No newline at end of file diff --git a/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/main/java/providers/HdfsAccessProvider.java b/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/main/java/providers/HdfsAccessProvider.java new file mode 100644 index 000000000..0f118c7e0 --- /dev/null +++ b/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/main/java/providers/HdfsAccessProvider.java @@ -0,0 +1,53 @@ +/* + * ****************************************************************************** + * * Copyright (c) 2020 Konduit K.K. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Apache License, Version 2.0 which is available at + * * https://www.apache.org/licenses/LICENSE-2.0. + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * * License for the specific language governing permissions and limitations + * * under the License. + * * + * * SPDX-License-Identifier: Apache-2.0 + * ***************************************************************************** + */ +package providers; + +import ai.konduit.serving.pipeline.api.protocol.Credentials; +import ai.konduit.serving.pipeline.api.protocol.URLAccessProvider; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.net.URL; + +public class HdfsAccessProvider implements URLAccessProvider { + + static { + URL.setURLStreamHandlerFactory(new HdfsStreamHandlerFactory()); + } + + @Override + public Credentials getCredentials() { + String accessKey = System.getenv("HADOOP_ACCESS_KEY"); + String secretKey = System.getenv("HADOOP_SECRET_KEY"); + return new Credentials(accessKey, secretKey); + } + + @Override + public InputStream connect(URL url, Credentials credentials) throws IOException, URISyntaxException { + Configuration conf = new Configuration(); + InputStream retVal = null; + + try (org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(url.toURI(),conf)) { + retVal = fs.open(new Path(url.toURI())); + } + return retVal; + } +} diff --git a/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/main/java/providers/HdfsStreamHandlerFactory.java b/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/main/java/providers/HdfsStreamHandlerFactory.java new file mode 100644 index 000000000..eccaa4d0b --- /dev/null +++ b/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/main/java/providers/HdfsStreamHandlerFactory.java @@ -0,0 +1,37 @@ +/* + * ****************************************************************************** + * * Copyright (c) 2020 Konduit K.K. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Apache License, Version 2.0 which is available at + * * https://www.apache.org/licenses/LICENSE-2.0. + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * * License for the specific language governing permissions and limitations + * * under the License. + * * + * * SPDX-License-Identifier: Apache-2.0 + * ***************************************************************************** + */ + +package providers; + +import handlers.HdfsHandler; +import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; + +import java.net.URLStreamHandler; +import java.net.URLStreamHandlerFactory; + +public class HdfsStreamHandlerFactory implements URLStreamHandlerFactory { + + @Override + public URLStreamHandler createURLStreamHandler(String protocol) { + if ("hdfs".equals(protocol)) { + return new FsUrlStreamHandlerFactory().createURLStreamHandler("hdfs"); + } + return null; + } + +} diff --git a/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/test/java/TestURL.java b/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/test/java/TestURL.java new file mode 100644 index 000000000..d8845dbf7 --- /dev/null +++ b/konduit-serving-protocol/konduit-serving-protocol-hdfs/src/test/java/TestURL.java @@ -0,0 +1,62 @@ +import ai.konduit.serving.common.test.BaseHttpUriTest; +import ai.konduit.serving.pipeline.api.protocol.Credentials; +import lombok.val; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import providers.HdfsAccessProvider; +import providers.HdfsStreamHandlerFactory; + +import java.io.*; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLStreamHandlerFactory; + +import static junit.framework.TestCase.assertTrue; + +public class TestURL extends BaseHttpUriTest { + + @Rule + public TemporaryFolder testDir = new TemporaryFolder(); + + private File hdfsPath; + + @Before + public void setUp() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set("fs.defaultFS", "hdfs://localhost"); + hdfsPath = new File(testDir + File.separator + "hadoop" + File.separator + "hdfs"); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsPath.getAbsolutePath()); + MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf) + .nameNodePort(1234) + .nameNodeHttpPort(12341) + .numDataNodes(1) + .format(true) + .racks(null) + .build(); + miniDFSCluster.waitActive(); + } + + @Test + public void testHdfsAccess() throws IOException, URISyntaxException { + URL url = new URL("hdfs://localhost:1234/user/root/config.json"); + HdfsAccessProvider accessProvider = new HdfsAccessProvider(); + InputStream input = accessProvider.connect(url, new Credentials(System.getenv("HDFS_ACCESS_KEY"), + System.getenv("HDFS_SECRET_KEY"))); + File targetFile = new File(hdfsPath, "config.json"); + FileUtils.copyInputStreamToFile(input, targetFile); + assertTrue(targetFile.exists()); + } + + @Override + public URLStreamHandlerFactory streamHandler() { + return new HdfsStreamHandlerFactory(); + } +} diff --git a/konduit-serving-protocol/konduit-serving-protocol-s3/pom.xml b/konduit-serving-protocol/konduit-serving-protocol-s3/pom.xml new file mode 100644 index 000000000..050ee7150 --- /dev/null +++ b/konduit-serving-protocol/konduit-serving-protocol-s3/pom.xml @@ -0,0 +1,159 @@ + + + + konduit-serving + ai.konduit.serving + 0.1.0-SNAPSHOT + + 4.0.0 + + konduit-serving-protocol-s3 + + + UTF-8 + 1.8 + 1.8 + + + + + oss.sonatype.org + https://oss.sonatype.org/content/repositories/snapshots/ + + + + + + + ai.konduit.serving + konduit-serving-annotation + ${project.version} + + + com.jcabi + jcabi-aspects + 0.22.6 + + + javax.validation + validation-api + + + + + org.apache.commons + commons-csv + 1.4 + + + + net.java.dev.jets3t + jets3t + 0.9.4 + + + javax.activation + activation + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + commons-logging + commons-logging + + + + + + + org.nd4j + jackson + ${dl4j.version} + + + + org.projectlombok + lombok + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + junit + junit + test + + + + ch.qos.logback + logback-classic + ${logback.version} + test + + + + ai.konduit.serving + konduit-serving-pipeline + ${project.version} + + + + ai.konduit.serving + konduit-serving-common-tests + test + ${project.version} + + + + + + + maven-resources-plugin + 2.6 + + + copy-resources + process-resources + + copy-resources + + + ${basedir}/target/resources + + + resources + + + + + + + + + + \ No newline at end of file diff --git a/konduit-serving-protocol/konduit-serving-protocol-s3/src/main/java/ai/konduit/serving/protocols/handlers/S3Handler.java b/konduit-serving-protocol/konduit-serving-protocol-s3/src/main/java/ai/konduit/serving/protocols/handlers/S3Handler.java new file mode 100644 index 000000000..441e018b6 --- /dev/null +++ b/konduit-serving-protocol/konduit-serving-protocol-s3/src/main/java/ai/konduit/serving/protocols/handlers/S3Handler.java @@ -0,0 +1,61 @@ +/* + * ****************************************************************************** + * * Copyright (c) 2020 Konduit K.K. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Apache License, Version 2.0 which is available at + * * https://www.apache.org/licenses/LICENSE-2.0. + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * * License for the specific language governing permissions and limitations + * * under the License. + * * + * * SPDX-License-Identifier: Apache-2.0 + * ***************************************************************************** + */ + +package ai.konduit.serving.protocols.handlers; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; + +import ai.konduit.serving.pipeline.api.protocol.Credentials; +import ai.konduit.serving.protocols.providers.S3AccessProvider; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; +import org.jets3t.service.security.AWSCredentials; + +public class S3Handler extends URLStreamHandler { + + protected URLConnection openConnection(URL url) throws IOException { + + return new URLConnection(url) { + + @Override + public InputStream getInputStream() throws IOException { + + String accessKey = null; + String secretKey = null; + + if (url.getUserInfo() != null) { + String[] credentials = url.getUserInfo().split("[:]"); + accessKey = credentials[0]; + secretKey = credentials[1]; + } + + S3AccessProvider accessProvider = new S3AccessProvider(); + return accessProvider.connect(url, new Credentials(accessKey, secretKey)); + } + + @Override + public void connect() throws IOException { } + + }; + } +} \ No newline at end of file diff --git a/konduit-serving-protocol/konduit-serving-protocol-s3/src/main/java/ai/konduit/serving/protocols/providers/S3AccessProvider.java b/konduit-serving-protocol/konduit-serving-protocol-s3/src/main/java/ai/konduit/serving/protocols/providers/S3AccessProvider.java new file mode 100644 index 000000000..f867f153e --- /dev/null +++ b/konduit-serving-protocol/konduit-serving-protocol-s3/src/main/java/ai/konduit/serving/protocols/providers/S3AccessProvider.java @@ -0,0 +1,58 @@ +/* + * ****************************************************************************** + * * Copyright (c) 2020 Konduit K.K. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Apache License, Version 2.0 which is available at + * * https://www.apache.org/licenses/LICENSE-2.0. + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * * License for the specific language governing permissions and limitations + * * under the License. + * * + * * SPDX-License-Identifier: Apache-2.0 + * ***************************************************************************** + */ +package ai.konduit.serving.protocols.providers; + +import ai.konduit.serving.pipeline.api.protocol.Credentials; +import ai.konduit.serving.pipeline.api.protocol.URLAccessProvider; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; +import org.jets3t.service.security.AWSCredentials; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; + +public class S3AccessProvider implements URLAccessProvider { + + static { + URL.setURLStreamHandlerFactory(new S3StreamHandlerFactory()); + } + + @Override + public Credentials getCredentials() { + String accessKey = System.getenv("S3_ACCESS_KEY"); + String secretKey = System.getenv("S3_SECRET_KEY"); + return new Credentials(accessKey, secretKey); + } + + @Override + public InputStream connect(URL url, Credentials credentials) throws IOException { + String bucket = url.getHost().substring(0, url.getHost().indexOf(".")); + String key = url.getPath().substring(1); + + try { + RestS3Service s3Service = new RestS3Service( + new AWSCredentials(credentials.getAccessKey(), credentials.getSecretKey())); + S3Object s3obj = s3Service.getObject(bucket, key); + return s3obj.getDataInputStream(); + } catch (ServiceException e) { + throw new IOException(e); + } + } +} diff --git a/konduit-serving-protocol/konduit-serving-protocol-s3/src/main/java/ai/konduit/serving/protocols/providers/S3StreamHandlerFactory.java b/konduit-serving-protocol/konduit-serving-protocol-s3/src/main/java/ai/konduit/serving/protocols/providers/S3StreamHandlerFactory.java new file mode 100644 index 000000000..bafcd1af1 --- /dev/null +++ b/konduit-serving-protocol/konduit-serving-protocol-s3/src/main/java/ai/konduit/serving/protocols/providers/S3StreamHandlerFactory.java @@ -0,0 +1,36 @@ +/* + * ****************************************************************************** + * * Copyright (c) 2020 Konduit K.K. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Apache License, Version 2.0 which is available at + * * https://www.apache.org/licenses/LICENSE-2.0. + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * * License for the specific language governing permissions and limitations + * * under the License. + * * + * * SPDX-License-Identifier: Apache-2.0 + * ***************************************************************************** + */ + +package ai.konduit.serving.protocols.providers; + +import ai.konduit.serving.protocols.handlers.S3Handler; + +import java.net.URLStreamHandler; +import java.net.URLStreamHandlerFactory; + +public class S3StreamHandlerFactory implements URLStreamHandlerFactory { + + @Override + public URLStreamHandler createURLStreamHandler(String protocol) { + if ("s3".equals(protocol)) { + return new S3Handler(); + } + return null; + } + +} diff --git a/konduit-serving-protocol/konduit-serving-protocol-s3/src/test/java/ai/konduit/serving/protocols/test/TestURL.java b/konduit-serving-protocol/konduit-serving-protocol-s3/src/test/java/ai/konduit/serving/protocols/test/TestURL.java new file mode 100644 index 000000000..c1991031c --- /dev/null +++ b/konduit-serving-protocol/konduit-serving-protocol-s3/src/test/java/ai/konduit/serving/protocols/test/TestURL.java @@ -0,0 +1,31 @@ +package ai.konduit.serving.protocols.test; + +import ai.konduit.serving.common.test.BaseHttpUriTest; +import ai.konduit.serving.protocols.providers.S3StreamHandlerFactory; +import org.apache.commons.io.IOUtils; +import org.junit.Test; + +import java.io.*; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandlerFactory; + +public class TestURL extends BaseHttpUriTest { + + @Test + public void testS3() throws IOException { + URL url = new URL("s3://:@.s3.amazonaws.com/"); + + URLConnection conn = url.openConnection(); + try (InputStream is = conn.getInputStream()) { + Reader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); + String received = IOUtils.toString(reader); + System.out.println(received); + } + } + + @Override + public URLStreamHandlerFactory streamHandler() { + return new S3StreamHandlerFactory(); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 762eafaa4..25cde510c 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,8 @@ konduit-serving-metrics konduit-serving-clients konduit-serving-endpoint + konduit-serving-protocol + konduit-serving-protocol-hdfs konduit-serving