From 73faa4f687c8d1f8ac68f72f2c58a976610a0b96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Fri, 5 Apr 2024 09:32:51 +0200 Subject: [PATCH] Create a simple test case to fix the Pravega file listing/append issue. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal SpoĢˆrri --- build.sbt | 10 +- project/plugins.sbt | 1 + .../ibm/geds/hdfs/GEDSHadoopFileSystem.java | 26 +++-- .../com/ibm/geds/hdfs/TestGEDSHDFS.scala | 95 +++++++++++++++++++ 4 files changed, 122 insertions(+), 10 deletions(-) create mode 100644 src/test/scala/com/ibm/geds/hdfs/TestGEDSHDFS.scala diff --git a/build.sbt b/build.sbt index b1bde41..fa944bb 100644 --- a/build.sbt +++ b/build.sbt @@ -15,10 +15,16 @@ val hadoopVersion = sys.env.getOrElse("HADOOP_VERSION", "3.3.4") libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided", - "com.ibm.geds" % "geds" % gedsApiVersion from "file://"+gedsInstallPath+"/java/geds-"+gedsApiVersion+".jar", - "junit" % "junit" % "4.13.2" % Test, // TRAVIS_SCALA_WORKAROUND_REMOVE_LINE + "com.ibm.geds" % "geds" % gedsApiVersion from "file://"+gedsInstallPath+"/java/geds-"+gedsApiVersion+".jar" ) +libraryDependencies ++= (if (scalaBinaryVersion.value == "2.12") Seq( + "org.scalatest" %% "scalatest" % "3.2.2" % Test, + "org.junit.jupiter" % "junit-jupiter-engine" % "5.10.2" % Test, + "net.aichler" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test +) +else Seq()) + javacOptions ++= Seq("-source", "1.8", "-target", "1.8") javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled") scalacOptions ++= Seq("-deprecation", "-unchecked") diff --git a/project/plugins.sbt b/project/plugins.sbt index 5a52039..1eab6fe 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -3,3 +3,4 @@ // SPDX-License-Identifier: Apache-2.0 // addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0") // https://github.com/sbt/sbt-assembly (MIT) +addSbtPlugin("net.aichler" % "sbt-jupiter-interface" % "0.11.1") diff --git a/src/main/java/com/ibm/geds/hdfs/GEDSHadoopFileSystem.java b/src/main/java/com/ibm/geds/hdfs/GEDSHadoopFileSystem.java index 39d9262..0fc5c6f 100644 --- a/src/main/java/com/ibm/geds/hdfs/GEDSHadoopFileSystem.java +++ b/src/main/java/com/ibm/geds/hdfs/GEDSHadoopFileSystem.java @@ -66,18 +66,26 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { return new FSDataInputStream(new BufferedFSInputStream(new GEDSInputStream(file), bufferSize)); } - private String computeGEDSPath(Path f) { + private String computeGEDSPath(Path f, Boolean stripped) { + stripped = false; try { String s = Path.getPathWithoutSchemeAndAuthority(f).toString(); if (s.startsWith("/")) { s = s.substring(1); + stripped = true; } + System.out.println(s); return s; - } catch(Exception e) { + } catch (Exception e) { return ""; } } + private String computeGEDSPath(Path f) { + Boolean stripped = false; + return computeGEDSPath(f, stripped); + } + @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { @@ -109,8 +117,10 @@ public boolean delete(Path f, boolean recursive) throws IOException { @Override public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { - String path = computeGEDSPath(f); - if (!path.endsWith("/")) { + Boolean strippedPrefix = false; + String path = computeGEDSPath(f, strippedPrefix); + String prefix = strippedPrefix ? "/" : ""; + if (!path.endsWith("/") && !path.equals("")) { path = path + "/"; } GEDSFileStatus[] st = geds.listAsFolder(bucket, path); @@ -118,7 +128,7 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException for (int i = 0; i < st.length; i++) { GEDSFileStatus s = st[i]; response[i] = new FileStatus(s.size, s.isDirectory, 1, blockSize, 0, - new Path(s.key).makeQualified(getUri(), workingDirectory)); + new Path(prefix + s.key).makeQualified(getUri(), workingDirectory)); } return response; } @@ -140,8 +150,8 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException { @Override public FileStatus getFileStatus(Path f) throws IOException { - GEDSFileStatus st = geds.status(bucket, computeGEDSPath(f)); - return new FileStatus(st.size, st.isDirectory, 1, blockSize, 0, - new Path(st.key).makeQualified(getUri(), workingDirectory)); + String path = computeGEDSPath(f); + GEDSFileStatus st = geds.status(bucket, path); + return new FileStatus(st.size, st.isDirectory, 1, blockSize, 0, f); } } diff --git a/src/test/scala/com/ibm/geds/hdfs/TestGEDSHDFS.scala b/src/test/scala/com/ibm/geds/hdfs/TestGEDSHDFS.scala new file mode 100644 index 0000000..1661579 --- /dev/null +++ b/src/test/scala/com/ibm/geds/hdfs/TestGEDSHDFS.scala @@ -0,0 +1,95 @@ +package com.ibm.geds.hdfs + +import org.junit.jupiter.api._ +import org.junit.jupiter.api.Assertions._ + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs._ + +class TestGEDSHDFS { + + val fs = TestGEDSHDFS.fs + + @Test + def createFilePrefix(): Unit = { + val stream = fs.create(new Path("/test/createFilePrefix/file")) + stream.close() + var ls = fs.listStatus(new Path("/test/createFilePrefix")) + assertEquals(1, ls.length) + println(ls(0).getPath().getName()) + assertEquals("file", ls(0).getPath().getName()) + + ls = fs.listStatus(new Path("/test/createFilePrefix/")) + assertEquals(1, ls.length) + println(ls(0).getPath().getName()) + assertEquals("file", ls(0).getPath().getName()) + } + + @Test + def createFile(): Unit = { + val stream = fs.create(new Path("test/createFile/file")) + stream.close() + var ls = fs.listStatus(new Path("test/createFile")) + assertEquals(1, ls.length) + println(ls(0).getPath().getName()) + assertEquals("file", ls(0).getPath().getName()) + ls = fs.listStatus(new Path("test/createFile/")) + assertEquals(1, ls.length) + println(ls(0).getPath().getName()) + assertEquals("file", ls(0).getPath().getName()) + } + + @Test + def append(): Unit = { + val path = new Path("test/append") + var pos = 0 + // Create empty + var stream = fs.create(path) + stream.close() + var fstatus = fs.getFileStatus(path) + assertEquals(pos, fstatus.getLen()) + assertFalse(fstatus.isDirectory()) + + // Append + write 1 byte + stream = fs.append(path) + stream.write(1) + pos += 1 + assertEquals(pos, stream.getPos()) + stream.flush() + stream.close() + + // Ensure 1 byte got written + fstatus = fs.getFileStatus(path) + assertEquals(pos, fstatus.getLen()) + + // Append + write 1 byte + stream = fs.append(path) + stream.writeByte(1) + pos += 1 + assertEquals(pos, stream.getPos()) + stream.close() + + // Ensure 1 byte got written (total length: 2) + fstatus = fs.getFileStatus(path) + assertEquals(pos, fstatus.getLen()) + assertFalse(fstatus.isDirectory()) + } +} + +object TestGEDSHDFS { + var fs: FileSystem = _; + + @BeforeAll + def setup(): Unit = { + val conf = new Configuration() + conf.set("fs.default.name", "geds") + conf.set("fs.default.fs", "geds") + conf.set("fs.geds.impl", "com.ibm.geds.hdfs.GEDSHadoopFileSystem") + conf.set("fs.geds.path", "/tmp/geds") + conf.set("fs.geds.metadataserver", "localhost:4381") + fs = FileSystem.get(new java.net.URI("geds://test"), conf) + System.out.println( + "@BeforeAll - executes once before all test methods in this class" + ) + } +}