Skip to content

Commit

Permalink
Create a simple test case to fix the Pravega file listing/append issue.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <psp@zurich.ibm.com>
  • Loading branch information
pspoerri committed Apr 10, 2024
1 parent 4926861 commit 9272d9e
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 10 deletions.
10 changes: 8 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@ val hadoopVersion = sys.env.getOrElse("HADOOP_VERSION", "3.3.4")

libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided",
"com.ibm.geds" % "geds" % gedsApiVersion from "file://"+gedsInstallPath+"/java/geds-"+gedsApiVersion+".jar",
"junit" % "junit" % "4.13.2" % Test, // TRAVIS_SCALA_WORKAROUND_REMOVE_LINE
"com.ibm.geds" % "geds" % gedsApiVersion from "file://"+gedsInstallPath+"/java/geds-"+gedsApiVersion+".jar"
)

libraryDependencies ++= (if (scalaBinaryVersion.value == "2.12") Seq(
"org.scalatest" %% "scalatest" % "3.2.2" % Test,
"org.junit.jupiter" % "junit-jupiter-engine" % "5.10.2" % Test,
"net.aichler" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test
)
else Seq())

javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")
scalacOptions ++= Seq("-deprecation", "-unchecked")
Expand Down
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
// SPDX-License-Identifier: Apache-2.0
//
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0") // https://github.com/sbt/sbt-assembly (MIT)
addSbtPlugin("net.aichler" % "sbt-jupiter-interface" % "0.11.1")
26 changes: 18 additions & 8 deletions src/main/java/com/ibm/geds/hdfs/GEDSHadoopFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,26 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return new FSDataInputStream(new BufferedFSInputStream(new GEDSInputStream(file), bufferSize));
}

private String computeGEDSPath(Path f) {
private String computeGEDSPath(Path f, Boolean stripped) {
stripped = false;
try {
String s = Path.getPathWithoutSchemeAndAuthority(f).toString();
if (s.startsWith("/")) {
s = s.substring(1);
stripped = true;
}
System.out.println(s);
return s;
} catch(Exception e) {
} catch (Exception e) {
return "";
}
}

private String computeGEDSPath(Path f) {
Boolean stripped = false;
return computeGEDSPath(f, stripped);
}

@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress) throws IOException {
Expand Down Expand Up @@ -109,16 +117,18 @@ public boolean delete(Path f, boolean recursive) throws IOException {

@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
String path = computeGEDSPath(f);
if (!path.endsWith("/")) {
Boolean strippedPrefix = false;
String path = computeGEDSPath(f, strippedPrefix);
String prefix = strippedPrefix ? "/" : "";
if (!path.endsWith("/") && !path.equals("")) {
path = path + "/";
}
GEDSFileStatus[] st = geds.listAsFolder(bucket, path);
FileStatus[] response = new FileStatus[st.length];
for (int i = 0; i < st.length; i++) {
GEDSFileStatus s = st[i];
response[i] = new FileStatus(s.size, s.isDirectory, 1, blockSize, 0,
new Path(s.key).makeQualified(getUri(), workingDirectory));
new Path(prefix + s.key).makeQualified(getUri(), workingDirectory));
}
return response;
}
Expand All @@ -140,8 +150,8 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException {

@Override
public FileStatus getFileStatus(Path f) throws IOException {
GEDSFileStatus st = geds.status(bucket, computeGEDSPath(f));
return new FileStatus(st.size, st.isDirectory, 1, blockSize, 0,
new Path(st.key).makeQualified(getUri(), workingDirectory));
String path = computeGEDSPath(f);
GEDSFileStatus st = geds.status(bucket, path);
return new FileStatus(st.size, st.isDirectory, 1, blockSize, 0, f);
}
}
95 changes: 95 additions & 0 deletions src/test/scala/com/ibm/geds/hdfs/TestGEDSHDFS.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.ibm.geds.hdfs

import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions._

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs._

class TestGEDSHDFS {

val fs = TestGEDSHDFS.fs

@Test
def createFilePrefix(): Unit = {
val stream = fs.create(new Path("/test/createFilePrefix/file"))
stream.close()
var ls = fs.listStatus(new Path("/test/createFilePrefix"))
assertEquals(1, ls.length)
println(ls(0).getPath().getName())
assertEquals("file", ls(0).getPath().getName())

ls = fs.listStatus(new Path("/test/createFilePrefix/"))
assertEquals(1, ls.length)
println(ls(0).getPath().getName())
assertEquals("file", ls(0).getPath().getName())
}

@Test
def createFile(): Unit = {
val stream = fs.create(new Path("test/createFile/file"))
stream.close()
var ls = fs.listStatus(new Path("test/createFile"))
assertEquals(1, ls.length)
println(ls(0).getPath().getName())
assertEquals("file", ls(0).getPath().getName())
ls = fs.listStatus(new Path("test/createFile/"))
assertEquals(1, ls.length)
println(ls(0).getPath().getName())
assertEquals("file", ls(0).getPath().getName())
}

@Test
def append(): Unit = {
val path = new Path("test/append")
var pos = 0
// Create empty
var stream = fs.create(path)
stream.close()
var fstatus = fs.getFileStatus(path)
assertEquals(pos, fstatus.getLen())
assertFalse(fstatus.isDirectory())

// Append + write 1 byte
stream = fs.append(path)
stream.write(1)
pos += 1
assertEquals(pos, stream.getPos())
stream.flush()
stream.close()

// Ensure 1 byte got written
fstatus = fs.getFileStatus(path)
assertEquals(pos, fstatus.getLen())

// Append + write 1 byte
stream = fs.append(path)
stream.writeByte(1)
pos += 1
assertEquals(pos, stream.getPos())
stream.close()

// Ensure 1 byte got written (total length: 2)
fstatus = fs.getFileStatus(path)
assertEquals(pos, fstatus.getLen())
assertFalse(fstatus.isDirectory())
}
}

object TestGEDSHDFS {
var fs: FileSystem = _;

@BeforeAll
def setup(): Unit = {
val conf = new Configuration()
conf.set("fs.default.name", "geds")
conf.set("fs.default.fs", "geds")
conf.set("fs.geds.impl", "com.ibm.geds.hdfs.GEDSHadoopFileSystem")
conf.set("fs.geds.path", "/tmp/geds")
conf.set("fs.geds.metadataserver", "localhost:4381")
fs = FileSystem.get(new java.net.URI("geds://test"), conf)
System.out.println(
"@BeforeAll - executes once before all test methods in this class"
)
}
}

0 comments on commit 9272d9e

Please sign in to comment.