Skip to content
This repository was archived by the owner on Aug 7, 2018. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<dependency>
<groupId>com.xjeffrose</groupId>
<artifactId>xio</artifactId>
<version>0.12.0-SNAPSHOT</version>
<version>0.11.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,28 +208,13 @@ public ListenableFuture<byte[]> read(byte[] colFam, byte[] key) {
UUID id = UUID.randomUUID();
SettableFuture<byte[]> f = SettableFuture.create();
futureMap.put(id, f);
Futures.addCallback(f, new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] bytes) {
f.set(bytes);
}

@Override
public void onFailure(Throwable throwable) {
f.setException(throwable);
}
});

Futures.addCallback(connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.READ, colFam, key, null)), new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean aBoolean) {

}
public void onSuccess(@Nullable Boolean aBoolean) {}

@Override
public void onFailure(Throwable throwable) {

}
public void onFailure(Throwable throwable) {}
});

workerLoop.schedule(() -> {
Expand Down Expand Up @@ -264,51 +249,35 @@ public ListenableFuture<Boolean> write(byte[] colFam, byte[] key, byte[] val) {
final List<SettableFuture<byte[]>> futureList = new ArrayList<>();
final SettableFuture<Boolean> respFuture = SettableFuture.create();
final List<String> nodes = getEffectiveNodes(colFam);
if (nodes.size() < quorum) {
if (nodes.size() == 0) {
log.error("Unable to establish Quorum");
return null;
}

nodes.stream().forEach(xs -> {
UUID id = UUID.randomUUID();
SettableFuture<byte[]> f = SettableFuture.create();
futureMap.put(id, f);
futureList.add(f);
Futures.addCallback(f, new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] bytes) {
f.set(bytes);
}

@Override
public void onFailure(Throwable throwable) {
f.setException(throwable);
}
});

Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.WRITE, colFam, key, val)), new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean aBoolean) {
futureList.add(f);
}
public void onSuccess(@Nullable Boolean aBoolean) {}

@Override
public void onFailure(Throwable throwable) {
Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.WRITE, colFam, key, val)), new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean aBoolean) {
futureList.add(f);
}
public void onSuccess(@Nullable Boolean aBoolean) {}

@Override
public void onFailure(Throwable throwable) {

}
public void onFailure(Throwable throwable) {}
});
}
});
});

Futures.addCallback(Futures.successfulAsList(futureList), new FutureCallback<List<byte[]>>() {
Futures.addCallback(Futures.allAsList(futureList), new FutureCallback<List<byte[]>>() {
@Override
public void onSuccess(@Nullable List<byte[]> bytes) {
respFuture.set(true);
Expand Down Expand Up @@ -340,22 +309,10 @@ public ListenableFuture<byte[]> tsWrite(byte[] topic, byte[] key, byte[] val) {
SettableFuture<byte[]> f = SettableFuture.create();
futureMap.put(id, f);
futureList.add(f);
Futures.addCallback(f, new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] bytes) {
f.set(bytes);
}

@Override
public void onFailure(Throwable throwable) {
f.setException(throwable);
}
});

Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.TS_WRITE, topic, key, val)), new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean aBoolean) {
}
public void onSuccess(@Nullable Boolean aBoolean) {}

@Override
public void onFailure(Throwable throwable) {
Expand All @@ -378,7 +335,7 @@ public void onFailure(Throwable throwable) {
});
});

Futures.addCallback(Futures.successfulAsList(futureList), new FutureCallback<List<byte[]>>() {
Futures.addCallback(Futures.allAsList(futureList), new FutureCallback<List<byte[]>>() {
@Override
public void onSuccess(@Nullable List<byte[]> bytes) {
respFuture.set(bytes.get(0));
Expand All @@ -404,37 +361,20 @@ public ListenableFuture<byte[]> stream(byte[] topic, byte[] offset) {
UUID id = UUID.randomUUID();
SettableFuture<byte[]> f = SettableFuture.create();
futureMap.put(id, f);
Futures.addCallback(f, new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] bytes) {
f.set(bytes);
}

@Override
public void onFailure(Throwable throwable) {
f.setException(throwable);
}
});

Futures.addCallback(connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.STREAM, topic, null, offset)), new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean aBoolean) {

}
public void onSuccess(@Nullable Boolean aBoolean) {}

@Override
public void onFailure(Throwable throwable) {

}
public void onFailure(Throwable throwable) {}
});

workerLoop.schedule(() -> {
if (nodes.size() > 1) {
Futures.addCallback(connectionManager.write(nodes.get(1), new DefaultChicagoMessage(id, Op.STREAM, topic, null, offset)), new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean aBoolean) {

}
public void onSuccess(@Nullable Boolean aBoolean) {}

@Override
public void onFailure(Throwable throwable) {
Expand All @@ -461,17 +401,6 @@ public ListenableFuture<Boolean> deleteColFam(byte[] colFam) {
SettableFuture<byte[]> f = SettableFuture.create();
futureMap.put(id, f);
futureList.add(f);
Futures.addCallback(f, new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] bytes) {
f.set(bytes);
}

@Override
public void onFailure(Throwable throwable) {
f.setException(throwable);
}
});

Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.DELETE, colFam, null, null)), new FutureCallback<Boolean>() {
@Override
Expand All @@ -488,9 +417,7 @@ public void onSuccess(@Nullable Boolean aBoolean) {
}

@Override
public void onFailure(Throwable throwable) {

}
public void onFailure(Throwable throwable) {}
});
}
});
Expand Down
14 changes: 14 additions & 0 deletions cluster-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,18 @@
</dependency>

</dependencies>
<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
27 changes: 12 additions & 15 deletions src/main/resources/reference.conf → server/config/app1.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
chicago {
applicationTemplate = ${xio.applicationTemplate} {
name = "Chicago DB"
application = ${chicago.applicationTemplate} {
settings {
zookeeperCluster = "localhost:2181"
bossThreads = 4
workersThreads = 20
dbPath = "/var/chicago"
dbPath = "/var/chicago/s1"
quorum = 3
compactionSize = 60GB
databaseMode = true
Expand All @@ -15,33 +12,33 @@ chicago {
]
}
servers {
admin = ${xio.serverTemplate} {
name = "Chicago Admin Server"
admin {
settings {
bindHost = 127.0.0.1
bindPort = 9991
}
}
stats = ${xio.serverTemplate} {
name = "Chicago Stats Server"
stats {
settings {
bindHost = 127.0.0.1
bindPort = 9001
}
}
db = ${xio.serverTemplate} {
name = "Chicago DB Server"
db {
settings {
bindHost = 127.0.0.1
bindPort = 12000
}
}
election = ${xio.serverTemplate} {
name = "Chicago Election Server"
election {
settings {
bindHost = 127.0.0.1
bindPort = 12001
}
}
paxos = ${xio.serverTemplate} {
name = "Chicago Paxos Server"
paxos {
settings {
bindHost = 127.0.0.1
bindPort = 12002
}
}
Expand Down
47 changes: 47 additions & 0 deletions server/config/app2.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
chicago {
application = ${chicago.applicationTemplate} {
settings {
zookeeperCluster = "localhost:2181"
dbPath = "/var/chicago/s2"
quorum = 3
compactionSize = 60GB
databaseMode = true
encryptAtRest = true
witnessList = [
""
]
}
servers {
admin {
settings {
bindHost = 127.0.0.2
bindPort = 9991
}
}
stats {
settings {
bindHost = 127.0.0.2
bindPort = 9001
}
}
db {
settings {
bindHost = 127.0.0.2
bindPort = 12000
}
}
election {
settings {
bindHost = 127.0.0.2
bindPort = 12001
}
}
paxos {
settings {
bindHost = 127.0.0.2
bindPort = 12002
}
}
}
}
}
47 changes: 47 additions & 0 deletions server/config/app3.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
chicago {
application = ${chicago.applicationTemplate} {
settings {
zookeeperCluster = "localhost:2181"
dbPath = "/var/chicago/s3"
quorum = 3
compactionSize = 60GB
databaseMode = true
encryptAtRest = true
witnessList = [
""
]
}
servers {
admin {
settings {
bindHost = 127.0.0.3
bindPort = 9991
}
}
stats {
settings {
bindHost = 127.0.0.3
bindPort = 9001
}
}
db {
settings {
bindHost = 127.0.0.3
bindPort = 12000
}
}
election {
settings {
bindHost = 127.0.0.3
bindPort = 12001
}
}
paxos {
settings {
bindHost = 127.0.0.3
bindPort = 12002
}
}
}
}
}
Loading