Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ scalaVersion := "2.12.6"
val akkaVersion = "2.5.13"
val akkaHttpVersion = "10.0.9"
val logbackVersion = "1.2.3"
val doobieVersion = "0.5.2"

val databaseDependencies = Seq(
"org.tpolecat" %% "doobie-core" % doobieVersion,
"org.tpolecat" %% "doobie-postgres" % doobieVersion,
"org.tpolecat" %% "doobie-specs2" % doobieVersion,
"org.tpolecat" %% "doobie-hikari" % doobieVersion
)

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
Expand All @@ -30,7 +38,7 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test,
"org.scalatest" %% "scalatest" % "3.0.5" % Test,
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test
)
) ++ databaseDependencies

resolvers ++= Seq("Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/",
"SonaType" at "https://oss.sonatype.org/content/groups/public",
Expand Down
8 changes: 8 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,13 @@
transactionsValidTime = 600000
mempoolCleaningTime = 30000
}

postgres {
host = ""
pass = ""
user = ""
read = false
write = false
}
}
}
31 changes: 31 additions & 0 deletions src/main/resources/data_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
CREATE TABLE accounts(
public_key TEXT PRIMARY KEY,
nonce BIGINT NOT NULL
);

CREATE TABLE accounts_data(
public_key TEXT REFERENCES accounts(public_key),
number_in_account INT NOT NULL,
data BYTEA,
PRIMARY KEY (public_key, number_in_account)
);

CREATE table blocks(
is_microblock BOOLEAN NOT NULL,
height BIGINT NOT NULL,
timestamp BIGINT NOT NULL,
previous_key_block_hash BYTEA,
current_block_hash TEXT PRIMARY KEY,
data BYTEA,
prev_microblock BYTEA
);

CREATE TABLE transactions(
public_key TEXT,
timestamp BIGINT NOT NULL,
nonce BIGINT NOT NULL,
signature BYTEA,
data BYTEA,
block_hash TEXT REFERENCES blocks(current_block_hash),
PRIMARY KEY (public_key, nonce, signature)
);
3 changes: 2 additions & 1 deletion src/main/scala/mvp2/actors/Accountant.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import mvp2.utils.Sha256
import mvp2.data.{KeyBlock, Transaction}
import scala.collection.immutable.HashMap

class Accountant extends CommonActor {
class Accountant(saveToPostgres: Boolean) extends CommonActor {

import Accountant.Account

Expand All @@ -27,6 +27,7 @@ class Accountant extends CommonActor {
}
accountsInfo = accountsInfo + (singleParty._1 -> account)
stateRoot = Sha256.toSha256(accountsInfo.toString)
if (saveToPostgres) context.actorSelection("/user/starter/pgWriter") ! account
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/main/scala/mvp2/actors/Blockchainer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package mvp2.actors

import akka.actor.{ActorRef, ActorSelection, Props}
import akka.persistence.{PersistentActor, RecoveryCompleted}
import akka.util.ByteString
import com.typesafe.scalalogging.StrictLogging
import mvp2.actors.Planner.Period
import mvp2.data.InnerMessages.{CurrentBlockchainInfo, Get, TimeDelta}
import mvp2.data._
import mvp2.utils.EncodingUtils._
import mvp2.utils.Settings

class Blockchainer(settings: Settings) extends PersistentActor with StrictLogging {

var blockchain: Blockchain = Blockchain()
var currentDelta: Long = 0
var nextTurn: Period = Period(KeyBlock(), settings)
val accountant: ActorRef = context.actorOf(Props(classOf[Accountant]), "accountant")
val accountant: ActorRef = context.actorOf(Props(classOf[Accountant], settings.postgres.exists(_.write)), "accountant")
val networker: ActorRef = context.actorOf(Props(classOf[Networker], settings).withDispatcher("net-dispatcher")
.withMailbox("net-mailbox"), "networker")
val publisher: ActorRef = context.actorOf(Props(classOf[Publisher], settings), "publisher")
Expand All @@ -25,6 +27,9 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin
}

override def receiveCommand: Receive = {
case RecoveryCompleted if settings.postgres.exists(_.read) => publisher ! blockchain.chain.lastOption.getOrElse(
KeyBlock(0, System.currentTimeMillis(), ByteString.empty, List())
)
case keyBlock: KeyBlock =>
blockchain = Blockchain(keyBlock :: blockchain.chain)
informator ! CurrentBlockchainInfo(
Expand All @@ -36,6 +41,7 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin
s"Blockchain consists of ${blockchain.chain.size} blocks.")
planner ! keyBlock
publisher ! keyBlock
if (settings.postgres.exists(_.write)) context.actorSelection("/user/starter/pgWriter") ! keyBlock
case TimeDelta(delta: Long) => currentDelta = delta
case Get => sender ! blockchain
case period: Period =>
Expand Down
45 changes: 45 additions & 0 deletions src/main/scala/mvp2/actors/PgReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package mvp2.actors

import akka.actor.ActorSelection
import akka.persistence.RecoveryCompleted
import mvp2.data.Block
import mvp2.utils.DbService
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.control.NonFatal

class PgReader(dbService: DbService, batchSize: Int) extends CommonActor {

val blockchainer: ActorSelection = context.actorSelection("/user/starter/blockchainer")

dbService
.selectMaxHeightOpt()
.recoverWith {
case NonFatal(th) =>
blockchainer ! RecoveryCompleted
logger.warn("Failed to connect to postgres")
context.stop(self)
Future.failed(th)
}
.foreach(self ! _.getOrElse(0))

override def specialBehavior: Receive = {
case height: Int => (0 to height).sliding(batchSize, batchSize).foldLeft(Future.successful(List[Block]())) {
case (prevBlocks, slide) =>
val from: Int = slide.head
val to: Int = slide.last
logger.info(s"Trying to download blocks $from-$to from postgres")
prevBlocks.flatMap { retrieved =>
if (retrieved.nonEmpty)logger.info(s"Downloaded blocks ${retrieved.head.height}-${retrieved.last.height} from postgres")
retrieved.foreach(blockchainer ! _)
if (to == height) blockchainer ! RecoveryCompleted
dbService.selectBlocksInHeightRange(from, to)
}.recoverWith {
case NonFatal(th) =>
logger.warn("Failed to download blocks", th)
blockchainer ! RecoveryCompleted
Future.failed(th)
}
}
}
}
15 changes: 15 additions & 0 deletions src/main/scala/mvp2/actors/PgWriter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package mvp2.actors

import mvp2.actors.Accountant.Account
import mvp2.data.Block
import mvp2.utils.DbService

class PgWriter(dbService: DbService) extends CommonActor {

override def specialBehavior: Receive = {
case block: Block => dbService.writeBlock(block)
case account: Account => dbService.writeAccount(account)
}

override def postStop(): Unit = dbService.shutdown()
}
12 changes: 10 additions & 2 deletions src/main/scala/mvp2/actors/Starter.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package mvp2.actors

import akka.actor.Props
import mvp2.utils.Settings
import scala.language.postfixOps
import com.typesafe.config.ConfigFactory
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import akka.actor.Props
import mvp2.utils.{DbService, Settings}

class Starter extends CommonActor {

Expand All @@ -28,5 +28,13 @@ class Starter extends CommonActor {
context.actorOf(Props(classOf[Zombie]), "zombie")
context.actorOf(Props(classOf[Informator], settings), "informator")
context.actorOf(Props(classOf[TimeProvider], settings), "timeProvider")
settings.postgres.foreach { pgSettings =>
if (pgSettings.read || pgSettings.write) {
val dbService = new DbService(pgSettings)
if (pgSettings.write) context.actorOf(Props(classOf[PgWriter], dbService), "pgWriter")
if (pgSettings.read)
context.actorOf(Props(classOf[PgReader], dbService, settings.postgres.flatMap(_.batchSize).getOrElse(5)), "pgReader")
}
}
}
}
Loading