diff --git a/client/pom.xml b/client/pom.xml
index aebb8b4..017028b 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -66,7 +66,7 @@
com.xjeffrose
xio
- 0.12.0-SNAPSHOT
+ 0.11.2-SNAPSHOT
com.typesafe
diff --git a/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java b/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java
index 79f661c..3a2ff98 100644
--- a/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java
+++ b/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java
@@ -208,28 +208,13 @@ public ListenableFuture read(byte[] colFam, byte[] key) {
UUID id = UUID.randomUUID();
SettableFuture f = SettableFuture.create();
futureMap.put(id, f);
- Futures.addCallback(f, new FutureCallback() {
- @Override
- public void onSuccess(@Nullable byte[] bytes) {
- f.set(bytes);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- f.setException(throwable);
- }
- });
Futures.addCallback(connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.READ, colFam, key, null)), new FutureCallback() {
@Override
- public void onSuccess(@Nullable Boolean aBoolean) {
-
- }
+ public void onSuccess(@Nullable Boolean aBoolean) {}
@Override
- public void onFailure(Throwable throwable) {
-
- }
+ public void onFailure(Throwable throwable) {}
});
workerLoop.schedule(() -> {
@@ -264,51 +249,35 @@ public ListenableFuture write(byte[] colFam, byte[] key, byte[] val) {
final List> futureList = new ArrayList<>();
final SettableFuture respFuture = SettableFuture.create();
final List nodes = getEffectiveNodes(colFam);
- if (nodes.size() < quorum) {
+ if (nodes.size() == 0) {
log.error("Unable to establish Quorum");
return null;
}
+
nodes.stream().forEach(xs -> {
UUID id = UUID.randomUUID();
SettableFuture f = SettableFuture.create();
futureMap.put(id, f);
futureList.add(f);
- Futures.addCallback(f, new FutureCallback() {
- @Override
- public void onSuccess(@Nullable byte[] bytes) {
- f.set(bytes);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- f.setException(throwable);
- }
- });
Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.WRITE, colFam, key, val)), new FutureCallback() {
@Override
- public void onSuccess(@Nullable Boolean aBoolean) {
- futureList.add(f);
- }
+ public void onSuccess(@Nullable Boolean aBoolean) {}
@Override
public void onFailure(Throwable throwable) {
Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.WRITE, colFam, key, val)), new FutureCallback() {
@Override
- public void onSuccess(@Nullable Boolean aBoolean) {
- futureList.add(f);
- }
+ public void onSuccess(@Nullable Boolean aBoolean) {}
@Override
- public void onFailure(Throwable throwable) {
-
- }
+ public void onFailure(Throwable throwable) {}
});
}
});
});
- Futures.addCallback(Futures.successfulAsList(futureList), new FutureCallback>() {
+ Futures.addCallback(Futures.allAsList(futureList), new FutureCallback>() {
@Override
public void onSuccess(@Nullable List bytes) {
respFuture.set(true);
@@ -340,22 +309,10 @@ public ListenableFuture tsWrite(byte[] topic, byte[] key, byte[] val) {
SettableFuture f = SettableFuture.create();
futureMap.put(id, f);
futureList.add(f);
- Futures.addCallback(f, new FutureCallback() {
- @Override
- public void onSuccess(@Nullable byte[] bytes) {
- f.set(bytes);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- f.setException(throwable);
- }
- });
Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.TS_WRITE, topic, key, val)), new FutureCallback() {
@Override
- public void onSuccess(@Nullable Boolean aBoolean) {
- }
+ public void onSuccess(@Nullable Boolean aBoolean) {}
@Override
public void onFailure(Throwable throwable) {
@@ -378,7 +335,7 @@ public void onFailure(Throwable throwable) {
});
});
- Futures.addCallback(Futures.successfulAsList(futureList), new FutureCallback>() {
+ Futures.addCallback(Futures.allAsList(futureList), new FutureCallback>() {
@Override
public void onSuccess(@Nullable List bytes) {
respFuture.set(bytes.get(0));
@@ -404,37 +361,20 @@ public ListenableFuture stream(byte[] topic, byte[] offset) {
UUID id = UUID.randomUUID();
SettableFuture f = SettableFuture.create();
futureMap.put(id, f);
- Futures.addCallback(f, new FutureCallback() {
- @Override
- public void onSuccess(@Nullable byte[] bytes) {
- f.set(bytes);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- f.setException(throwable);
- }
- });
Futures.addCallback(connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.STREAM, topic, null, offset)), new FutureCallback() {
@Override
- public void onSuccess(@Nullable Boolean aBoolean) {
-
- }
+ public void onSuccess(@Nullable Boolean aBoolean) {}
@Override
- public void onFailure(Throwable throwable) {
-
- }
+ public void onFailure(Throwable throwable) {}
});
workerLoop.schedule(() -> {
if (nodes.size() > 1) {
Futures.addCallback(connectionManager.write(nodes.get(1), new DefaultChicagoMessage(id, Op.STREAM, topic, null, offset)), new FutureCallback() {
@Override
- public void onSuccess(@Nullable Boolean aBoolean) {
-
- }
+ public void onSuccess(@Nullable Boolean aBoolean) {}
@Override
public void onFailure(Throwable throwable) {
@@ -461,17 +401,6 @@ public ListenableFuture deleteColFam(byte[] colFam) {
SettableFuture f = SettableFuture.create();
futureMap.put(id, f);
futureList.add(f);
- Futures.addCallback(f, new FutureCallback() {
- @Override
- public void onSuccess(@Nullable byte[] bytes) {
- f.set(bytes);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- f.setException(throwable);
- }
- });
Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.DELETE, colFam, null, null)), new FutureCallback() {
@Override
@@ -488,9 +417,7 @@ public void onSuccess(@Nullable Boolean aBoolean) {
}
@Override
- public void onFailure(Throwable throwable) {
-
- }
+ public void onFailure(Throwable throwable) {}
});
}
});
diff --git a/cluster-test/pom.xml b/cluster-test/pom.xml
index aa0d8b1..9d27126 100644
--- a/cluster-test/pom.xml
+++ b/cluster-test/pom.xml
@@ -111,4 +111,18 @@
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.5.1
+
+ 1.8
+ 1.8
+
+
+
+
diff --git a/src/main/resources/reference.conf b/server/config/app1.conf
similarity index 50%
rename from src/main/resources/reference.conf
rename to server/config/app1.conf
index 582caa0..db38ebb 100644
--- a/src/main/resources/reference.conf
+++ b/server/config/app1.conf
@@ -1,11 +1,8 @@
chicago {
- applicationTemplate = ${xio.applicationTemplate} {
- name = "Chicago DB"
+ application = ${chicago.applicationTemplate} {
settings {
zookeeperCluster = "localhost:2181"
- bossThreads = 4
- workersThreads = 20
- dbPath = "/var/chicago"
+ dbPath = "/var/chicago/s1"
quorum = 3
compactionSize = 60GB
databaseMode = true
@@ -15,33 +12,33 @@ chicago {
]
}
servers {
- admin = ${xio.serverTemplate} {
- name = "Chicago Admin Server"
+ admin {
settings {
+ bindHost = 127.0.0.1
bindPort = 9991
}
}
- stats = ${xio.serverTemplate} {
- name = "Chicago Stats Server"
+ stats {
settings {
+ bindHost = 127.0.0.1
bindPort = 9001
}
}
- db = ${xio.serverTemplate} {
- name = "Chicago DB Server"
+ db {
settings {
+ bindHost = 127.0.0.1
bindPort = 12000
}
}
- election = ${xio.serverTemplate} {
- name = "Chicago Election Server"
+ election {
settings {
+ bindHost = 127.0.0.1
bindPort = 12001
}
}
- paxos = ${xio.serverTemplate} {
- name = "Chicago Paxos Server"
+ paxos {
settings {
+ bindHost = 127.0.0.1
bindPort = 12002
}
}
diff --git a/server/config/app2.conf b/server/config/app2.conf
new file mode 100644
index 0000000..9225748
--- /dev/null
+++ b/server/config/app2.conf
@@ -0,0 +1,47 @@
+chicago {
+ application = ${chicago.applicationTemplate} {
+ settings {
+ zookeeperCluster = "localhost:2181"
+ dbPath = "/var/chicago/s2"
+ quorum = 3
+ compactionSize = 60GB
+ databaseMode = true
+ encryptAtRest = true
+ witnessList = [
+ ""
+ ]
+ }
+ servers {
+ admin {
+ settings {
+ bindHost = 127.0.0.2
+ bindPort = 9991
+ }
+ }
+ stats {
+ settings {
+ bindHost = 127.0.0.2
+ bindPort = 9001
+ }
+ }
+ db {
+ settings {
+ bindHost = 127.0.0.2
+ bindPort = 12000
+ }
+ }
+ election {
+ settings {
+ bindHost = 127.0.0.2
+ bindPort = 12001
+ }
+ }
+ paxos {
+ settings {
+ bindHost = 127.0.0.2
+ bindPort = 12002
+ }
+ }
+ }
+ }
+}
diff --git a/server/config/app3.conf b/server/config/app3.conf
new file mode 100644
index 0000000..4ad016b
--- /dev/null
+++ b/server/config/app3.conf
@@ -0,0 +1,47 @@
+chicago {
+ application = ${chicago.applicationTemplate} {
+ settings {
+ zookeeperCluster = "localhost:2181"
+ dbPath = "/var/chicago/s3"
+ quorum = 3
+ compactionSize = 60GB
+ databaseMode = true
+ encryptAtRest = true
+ witnessList = [
+ ""
+ ]
+ }
+ servers {
+ admin {
+ settings {
+ bindHost = 127.0.0.3
+ bindPort = 9991
+ }
+ }
+ stats {
+ settings {
+ bindHost = 127.0.0.3
+ bindPort = 9001
+ }
+ }
+ db {
+ settings {
+ bindHost = 127.0.0.3
+ bindPort = 12000
+ }
+ }
+ election {
+ settings {
+ bindHost = 127.0.0.3
+ bindPort = 12001
+ }
+ }
+ paxos {
+ settings {
+ bindHost = 127.0.0.3
+ bindPort = 12002
+ }
+ }
+ }
+ }
+}
diff --git a/server/config/app4.conf b/server/config/app4.conf
new file mode 100644
index 0000000..9b80be8
--- /dev/null
+++ b/server/config/app4.conf
@@ -0,0 +1,47 @@
+chicago {
+ application = ${chicago.applicationTemplate} {
+ settings {
+ zookeeperCluster = "localhost:2181"
+ dbPath = "/var/chicago/s4"
+ quorum = 3
+ compactionSize = 60GB
+ databaseMode = true
+ encryptAtRest = true
+ witnessList = [
+ ""
+ ]
+ }
+ servers {
+ admin {
+ settings {
+ bindHost = 127.0.0.4
+ bindPort = 9991
+ }
+ }
+ stats {
+ settings {
+ bindHost = 127.0.0.4
+ bindPort = 9001
+ }
+ }
+ db {
+ settings {
+ bindHost = 127.0.0.4
+ bindPort = 12000
+ }
+ }
+ election {
+ settings {
+ bindHost = 127.0.0.4
+ bindPort = 12001
+ }
+ }
+ paxos {
+ settings {
+ bindHost = 127.0.0.4
+ bindPort = 12002
+ }
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/com/xjeffrose/chicago/db/EncryptedStorageProvider.java b/server/src/main/java/com/xjeffrose/chicago/db/EncryptedStorageProvider.java
index 0b77731..a6088b6 100644
--- a/server/src/main/java/com/xjeffrose/chicago/db/EncryptedStorageProvider.java
+++ b/server/src/main/java/com/xjeffrose/chicago/db/EncryptedStorageProvider.java
@@ -48,7 +48,7 @@ private void configure() {
}
}
- byte[] encrypt(byte[] raw) {
+ synchronized byte[] encrypt(byte[] raw) {
try {
ByteArrayOutputStream os = new ByteArrayOutputStream();
CryptoOutputStream cos = new CryptoOutputStream(os, cipher, 4096, key, iv);
@@ -65,7 +65,7 @@ byte[] encrypt(byte[] raw) {
return null;
}
- byte[] decrypt(byte[] encryptedData) {
+ synchronized byte[] decrypt(byte[] encryptedData) {
try {
final byte[] decryptedData = new byte[encryptedData.length];
CryptoInputStream cis = null;
diff --git a/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java b/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java
index 70a9325..992f0e0 100644
--- a/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java
+++ b/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java
@@ -12,6 +12,7 @@
public class ChiConfig {
@Getter
private final String zkHosts;
+ @Getter
private Config conf;
// private Map channelStats;
@Getter
diff --git a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoCluster.java b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoCluster.java
new file mode 100644
index 0000000..3d662d0
--- /dev/null
+++ b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoCluster.java
@@ -0,0 +1,29 @@
+package com.xjeffrose.chicago.server;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by smadan on 8/29/16.
+ */
+public class ChicagoCluster {
+ private static final Logger log = LoggerFactory.getLogger(Chicago.class.getName());
+
+ public static void main(String[] args) {
+ log.info("Starting Chicago, have a nice day");
+ for (int i = 1; i < 5; i++) {
+ Config settings = ConfigFactory.load("app"+i+".conf");
+ ChiConfig config = new ChiConfig(settings.getConfig("chicago.application"));
+
+ try {
+ ChicagoServer server = new ChicagoServer(config);
+ server.start();
+ } catch (Exception e) {
+ log.error("Error Starting Chicago", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java
index f828f15..869650d 100755
--- a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java
+++ b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java
@@ -33,7 +33,7 @@ public ChicagoServer(ChiConfig config) {
zkClient = new ZkClient(config.getZkHosts(),true);
db = getStorageProvider(config);
nodeWatcher = new NodeWatcher(NODE_LIST_PATH, NODE_LOCK_PATH, config.getQuorum());
- dbRouter = new DBRouter(db);
+ dbRouter = new DBRouter(db,config.getConf());
}
private StorageProvider getStorageProvider(ChiConfig config) {
diff --git a/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java b/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java
index 354e0fc..6449585 100755
--- a/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java
+++ b/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java
@@ -1,5 +1,6 @@
package com.xjeffrose.chicago.server;
+import com.typesafe.config.Config;
import com.xjeffrose.chicago.db.DBManager;
import com.xjeffrose.chicago.db.StorageProvider;
import com.xjeffrose.xio.application.Application;
@@ -29,10 +30,11 @@ public class DBRouter implements Closeable {
private final Map q;
private final Map> sessionCoordinator;
private final Map qCount;
+ private final Config config;
private Application application;
- public DBRouter(StorageProvider db) {
+ public DBRouter(StorageProvider db, Config config) {
this.db = db;
this.manager = new DBManager(db);
this.handler = new ChicagoDBHandler(manager);
@@ -41,6 +43,7 @@ public DBRouter(StorageProvider db) {
this.sessionCoordinator = PlatformDependent.newConcurrentHashMap();
this.qCount = PlatformDependent.newConcurrentHashMap();
this.chicagoPaxosHandler = new ChicagoPaxosHandler(offset, q, sessionCoordinator, qCount);
+ this.config = config;
}
private ChicagoServerPipeline buildDbPipeline() {
@@ -65,7 +68,7 @@ public ChannelHandler getApplicationHandler() {
public void run() {
manager.startAsync().awaitRunning();
- application = new ApplicationBootstrap("chicago.application")
+ application = new ApplicationBootstrap(config)
.addServer("admin", (bs) -> bs.addToPipeline(new XioSslHttp1_1Pipeline()))
.addServer("stats", (bs) -> bs.addToPipeline(new XioSslHttp1_1Pipeline()))
.addServer("db", (bs) -> bs.addToPipeline(buildDbPipeline()))
diff --git a/server/src/main/resources/logback.xml b/server/src/main/resources/logback.xml
index 600aadf..f78f2b3 100644
--- a/server/src/main/resources/logback.xml
+++ b/server/src/main/resources/logback.xml
@@ -12,10 +12,13 @@
-
+
${DEV_HOME}/archived/chi.%d{yyyy-MM-dd}.%i.log
+ 30
+ 3GB
+ 100MB
100MB
@@ -34,11 +37,11 @@
-
+
-
+
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
deleted file mode 100644
index 600aadf..0000000
--- a/src/main/resources/logback.xml
+++ /dev/null
@@ -1,44 +0,0 @@
-
-
-
-
-
-
- ${DEV_HOME}/chi.log
-
-
- %d{yyyy-MM-dd HH:mm:ss} - %msg%n
-
-
-
-
-
- ${DEV_HOME}/archived/chi.%d{yyyy-MM-dd}.%i.log
-
-
- 100MB
-
-
-
-
-
-
-
-
- %-4relative [%thread] %-5level %logger{35} - %msg %n
-
-
-
-
-
-
-
-
-
-
-
-