Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ initialize := {
assert(current == required, s"Unsupported build JDK: java.specification.version $current != $required")
}

// PROJECT DEFINITIONS
// *****************************************************************************
// PROJECTS
// *****************************************************************************
lazy val root = (project in file("."))
.enablePlugins(BuildInfoPlugin, SbtMultiJvm, JavaServerAppPackaging)
.configs(MultiJvm)
Expand All @@ -53,11 +55,13 @@ lazy val root = (project in file("."))
buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion, git.gitHeadCommit, git.gitCurrentBranch),
buildInfoOptions += BuildInfoOption.ToJson
)
.settings(versionWithGit)
.settings(git.useGitDescribe := true)
.settings(
versionWithGit,
git.useGitDescribe := true
)
.settings(configAnnotationSettings)
.aggregate(core, httpApi, storageInMem, storageRocksDB)
.dependsOn(core, httpApi, storageInMem, storageRocksDB)
.aggregate(core, httpApi, storageInMem, storageRocksDB, splitBrainResolver)
.dependsOn(core, httpApi, storageInMem, storageRocksDB, splitBrainResolver)

lazy val core = (project in file("justin-core"))
.disablePlugins(RevolverPlugin)
Expand Down Expand Up @@ -132,11 +136,28 @@ lazy val storageRocksDB = (project in file("justin-storage-rocksdb"))
)
.dependsOn(storageApi)

// ALIASES
lazy val splitBrainResolver = (project in file("justin-split-brain-resolver"))
.enablePlugins(SbtMultiJvm)
.configs(MultiJvm)
.disablePlugins(RevolverPlugin)
.settings(
resolvers += Resolver.bintrayRepo("tanukkii007", "maven")
)
.settings(
name := "justin-split-brain-resolver",
scalaVersion := Version.scala,
libraryDependencies ++= Dependencies.splitBrainResolver
)

// *****************************************************************************
// Aliases
// *****************************************************************************
addCommandAlias("compileAll", ";compile;test:compile;multi-jvm:compile")
addCommandAlias("testAll", ";test:test;multi-jvm:test")

// SETTINGS
// *****************************************************************************
// Settings
// *****************************************************************************
lazy val commonSettings = Def.settings(
compileSettings
)
Expand Down
11 changes: 11 additions & 0 deletions justin-split-brain-resolver/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
akka.cluster.downing-provider-class = "tanukki.akka.cluster.autodown.MajorityLeaderAutoDowning"

custom-downing {
stable-after = 20s

majority-leader-auto-downing {
majority-member-role = ""
down-if-in-minority = true
shutdown-actor-system-on-resolution = true
}
}
28 changes: 15 additions & 13 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ object Version {
val configAnnotation = "0.3.7"
val macroParadise = "2.1.1"
val rocksDB = "5.5.1"
val akkaClusterDowning = "0.0.9"
}

object Library {
Expand Down Expand Up @@ -52,26 +53,27 @@ object Library {
// storage
val rocksdb = "org.rocksdb" % "rocksdbjni" % Version.rocksDB
val kryo = "com.esotericsoftware" % "kryo" % Version.kryo % "provided"

// split-brain resolver
val akkaClusterDowning = "com.github.TanUkkii007" %% "akka-cluster-custom-downing" % Version.akkaClusterDowning
}

object Dependencies {
import Library._

private val genericTest = Seq(scalactic, scalatest % "test")

private val genericTest = Seq(scalactic, scalatest % "test")
private val akkaCommon = Seq(akkaActor, akkaSfl4j, akkaTestkit, akkaKryo, akkaStream)
private val akkaHttpCommon = Seq(akkaHttp, akkaHttpSprayJson, akkaHttpTestkit)
private val akkaClusterCommon = Seq(akkaRemote, akkaMultiNodeTestkit % "multi-jvm", akkaCluster, akkaClusterMetrics, akkaClusterTools, kamonSigar, akkaClusterManager)

val core = akkaCommon ++ akkaClusterCommon ++ genericTest ++ Seq(scalacheck % "test", logback, scalaLogging) ++ Seq(akkaHttpSprayJson)
val ring = genericTest
val vectorClocks = genericTest
val httpApi = akkaCommon ++ akkaHttpCommon ++ genericTest

val storageApi = genericTest
val storageInMem = genericTest
val storageLogDBExperimental = genericTest
val storageRocksDB = Seq(rocksdb, rocksdb % "test", kryo) ++ genericTest

val root = core ++ httpApi ++ storageApi
// projects
val core: Seq[ModuleID] = akkaCommon ++ akkaClusterCommon ++ genericTest ++ Seq(scalacheck % "test", logback, scalaLogging) ++ Seq(akkaHttpSprayJson)
val ring: Seq[ModuleID] = genericTest
val vectorClocks: Seq[ModuleID] = genericTest
val httpApi: Seq[ModuleID] = akkaCommon ++ akkaHttpCommon ++ genericTest
val storageApi: Seq[ModuleID] = genericTest
val storageInMem: Seq[ModuleID] = genericTest
val storageRocksDB: Seq[ModuleID] = Seq(rocksdb, rocksdb % "test", kryo) ++ genericTest
val splitBrainResolver: Seq[ModuleID] = Seq(akkaClusterDowning) ++ genericTest
val root: Seq[ModuleID] = core ++ httpApi ++ storageApi
}
48 changes: 47 additions & 1 deletion src/multi-jvm/scala/justin/db/MultiNodeClusterSpec.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package justin.db

import java.util.concurrent.ConcurrentHashMap

import akka.actor.Address
import akka.cluster.Cluster
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeSpec
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.Suite

import scala.concurrent.duration.{FiniteDuration, _}

object MultiNodeClusterSpec {

val commonBaseConfig: Config = ConfigFactory.parseString(
Expand All @@ -20,10 +26,50 @@ object MultiNodeClusterSpec {

trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeSpec ⇒

private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]

def initialParticipants: Int = roles.size

/**
* Get the cluster node to use.
*/
def cluster: Cluster = Cluster(system)

def initialParticipants: Int = roles.size
/**
* Lookup the Address for the role.
*
* Implicit conversion from RoleName to Address.
*
* It is cached, which has the implication that stopping
* and then restarting a role (jvm) with another address is not
* supported.
*/
implicit def address(role: RoleName): Address = {
cachedAddresses.get(role) match {
case null ⇒
val address = node(role).address
cachedAddresses.put(role, address)
address
case address ⇒ address
}
}

/**
* Wait until the expected number of members has status Up has been reached.
* Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring.
*/
def awaitMembersUp(numberOfMembers: Int, canNotBePartOfMemberRing: Set[Address] = Set(), timeout: FiniteDuration = 10.seconds): Unit = {
awaitAssert(
a = {
cluster.state.members.size shouldBe numberOfMembers

println("cluster state: " + cluster.state)

cluster.state.members.map(_.address) intersect canNotBePartOfMemberRing shouldBe empty
},
max = timeout,
interval = 2.seconds
)
()
}
}
87 changes: 87 additions & 0 deletions src/multi-jvm/scala/justin/db/SplitBrainResolverSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package justin.db

import akka.remote.testconductor.RoleName
import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import akka.testkit.TestDuration

final class SplitBrainResolverConfig extends MultiNodeConfig {
val first: RoleName = role("first")
val second: RoleName = role("second")
val third: RoleName = role("third")
val fourth: RoleName = role("fourth")
val fifth: RoleName = role("fifth")

private[this] val allRoles = List(first, second, third, fourth, fifth)
private[this] val clusterName = "SplitBrainResolverSpec"

private[this] def commonNodeConfig(id: Int) = ConfigFactory.parseString(
s"""
|justin.system = $clusterName
|justin.kubernetes-hostname = s"justindb-$id"
|justin.http.port = ${9000 + id}
|akka.cluster.role.storagenode.min-nr-of-members = ${allRoles.size}
|akka.cluster.http.management.port = ${19999 + id}
|akka.cluster.seed-nodes.0 = "akka.trttl.gremlin.tcp://$clusterName@localhost:25551"
|akka.remote.netty.tcp.port = ${25551 + id}
|akka.remote.netty.tcp.hostname = "localhost"
|akka.remote.netty.tcp.applied-adapters = [trttl, gremlin]
|akka.remote.artery.advanced.test-mode = on
""".stripMargin
)

commonConfig(MultiNodeClusterSpec.commonBaseConfig.withFallback(JustinDBConfig.init.config))

allRoles.zipWithIndex.foreach { case (roleName, id) =>
nodeConfig(roleName)(commonNodeConfig(id))
}
}

final class SplitBrainResolverSpecMultiJvmNode1 extends SplitBrainResolverSpec
final class SplitBrainResolverSpecMultiJvmNode2 extends SplitBrainResolverSpec
final class SplitBrainResolverSpecMultiJvmNode3 extends SplitBrainResolverSpec
final class SplitBrainResolverSpecMultiJvmNode4 extends SplitBrainResolverSpec
final class SplitBrainResolverSpecMultiJvmNode5 extends SplitBrainResolverSpec

abstract class SplitBrainResolverSpec(config: SplitBrainResolverConfig)
extends MultiNodeSpec(config)
with MultiNodeClusterSpec {
import config._

def this() = this(new SplitBrainResolverConfig())

"The majority leader in a 5 node cluster" must {

"be able to DOWN a 'last' node that is UNREACHABLE" in within(150.seconds) {
val config = new JustinDBConfig(system.settings.config)
val justinDB = JustinDB.init(config)(system)

enterBarrier("justindb-cluster-up")

val fifthAddress = address(fifth)

runOn(first) {
// kill 'fifth' node
testConductor.exit(fifth, 0).await
enterBarrier("down-fifth-node")

// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---

awaitMembersUp(numberOfMembers = 4, canNotBePartOfMemberRing = Set(fifthAddress), 30.seconds.dilated)
}

runOn(fifth) {
enterBarrier("down-fifth-node")
}

runOn(second, third, fourth) {
enterBarrier("down-fifth-node")

awaitMembersUp(numberOfMembers = 4, canNotBePartOfMemberRing = Set(fifthAddress), 30.seconds.dilated)
}

enterBarrier("await-completion-1")
}
}
}