diff --git a/build.sbt b/build.sbt index 6ffb5c1..e3adf34 100644 --- a/build.sbt +++ b/build.sbt @@ -6,6 +6,14 @@ scalaVersion := "2.12.6" val akkaVersion = "2.5.13" val akkaHttpVersion = "10.0.9" val logbackVersion = "1.2.3" +val doobieVersion = "0.5.2" + +val databaseDependencies = Seq( + "org.tpolecat" %% "doobie-core" % doobieVersion, + "org.tpolecat" %% "doobie-postgres" % doobieVersion, + "org.tpolecat" %% "doobie-specs2" % doobieVersion, + "org.tpolecat" %% "doobie-hikari" % doobieVersion +) libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, @@ -30,7 +38,7 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test, "org.scalatest" %% "scalatest" % "3.0.5" % Test, "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test -) +) ++ databaseDependencies resolvers ++= Seq("Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/", "SonaType" at "https://oss.sonatype.org/content/groups/public", diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 922be25..444958d 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -31,5 +31,13 @@ transactionsValidTime = 600000 mempoolCleaningTime = 30000 } + + postgres { + host = "" + pass = "" + user = "" + read = false + write = false + } } } \ No newline at end of file diff --git a/src/main/resources/data_model.sql b/src/main/resources/data_model.sql new file mode 100644 index 0000000..8f44b08 --- /dev/null +++ b/src/main/resources/data_model.sql @@ -0,0 +1,31 @@ +CREATE TABLE accounts( + public_key TEXT PRIMARY KEY, + nonce BIGINT NOT NULL +); + +CREATE TABLE accounts_data( + public_key TEXT REFERENCES accounts(public_key), + number_in_account INT NOT NULL, + data BYTEA, + PRIMARY KEY (public_key, number_in_account) +); + +CREATE table blocks( + is_microblock BOOLEAN NOT NULL, + height BIGINT NOT NULL, + timestamp BIGINT NOT NULL, + previous_key_block_hash BYTEA, + current_block_hash TEXT PRIMARY KEY, + data BYTEA, + prev_microblock BYTEA +); + +CREATE TABLE transactions( + public_key TEXT, + timestamp BIGINT NOT NULL, + nonce BIGINT NOT NULL, + signature BYTEA, + data BYTEA, + block_hash TEXT REFERENCES blocks(current_block_hash), + PRIMARY KEY (public_key, nonce, signature) +); \ No newline at end of file diff --git a/src/main/scala/mvp2/actors/Accountant.scala b/src/main/scala/mvp2/actors/Accountant.scala index 306018a..0ac3e84 100644 --- a/src/main/scala/mvp2/actors/Accountant.scala +++ b/src/main/scala/mvp2/actors/Accountant.scala @@ -5,7 +5,7 @@ import mvp2.utils.Sha256 import mvp2.data.{KeyBlock, Transaction} import scala.collection.immutable.HashMap -class Accountant extends CommonActor { +class Accountant(saveToPostgres: Boolean) extends CommonActor { import Accountant.Account @@ -27,6 +27,7 @@ class Accountant extends CommonActor { } accountsInfo = accountsInfo + (singleParty._1 -> account) stateRoot = Sha256.toSha256(accountsInfo.toString) + if (saveToPostgres) context.actorSelection("/user/starter/pgWriter") ! account } } diff --git a/src/main/scala/mvp2/actors/Blockchainer.scala b/src/main/scala/mvp2/actors/Blockchainer.scala index 48393e5..06727ca 100644 --- a/src/main/scala/mvp2/actors/Blockchainer.scala +++ b/src/main/scala/mvp2/actors/Blockchainer.scala @@ -2,10 +2,12 @@ package mvp2.actors import akka.actor.{ActorRef, ActorSelection, Props} import akka.persistence.{PersistentActor, RecoveryCompleted} +import akka.util.ByteString import com.typesafe.scalalogging.StrictLogging import mvp2.actors.Planner.Period import mvp2.data.InnerMessages.{CurrentBlockchainInfo, Get, TimeDelta} import mvp2.data._ +import mvp2.utils.EncodingUtils._ import mvp2.utils.Settings class Blockchainer(settings: Settings) extends PersistentActor with StrictLogging { @@ -13,7 +15,7 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin var blockchain: Blockchain = Blockchain() var currentDelta: Long = 0 var nextTurn: Period = Period(KeyBlock(), settings) - val accountant: ActorRef = context.actorOf(Props(classOf[Accountant]), "accountant") + val accountant: ActorRef = context.actorOf(Props(classOf[Accountant], settings.postgres.exists(_.write)), "accountant") val networker: ActorRef = context.actorOf(Props(classOf[Networker], settings).withDispatcher("net-dispatcher") .withMailbox("net-mailbox"), "networker") val publisher: ActorRef = context.actorOf(Props(classOf[Publisher], settings), "publisher") @@ -25,6 +27,9 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin } override def receiveCommand: Receive = { + case RecoveryCompleted if settings.postgres.exists(_.read) => publisher ! blockchain.chain.lastOption.getOrElse( + KeyBlock(0, System.currentTimeMillis(), ByteString.empty, List()) + ) case keyBlock: KeyBlock => blockchain = Blockchain(keyBlock :: blockchain.chain) informator ! CurrentBlockchainInfo( @@ -36,6 +41,7 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin s"Blockchain consists of ${blockchain.chain.size} blocks.") planner ! keyBlock publisher ! keyBlock + if (settings.postgres.exists(_.write)) context.actorSelection("/user/starter/pgWriter") ! keyBlock case TimeDelta(delta: Long) => currentDelta = delta case Get => sender ! blockchain case period: Period => diff --git a/src/main/scala/mvp2/actors/PgReader.scala b/src/main/scala/mvp2/actors/PgReader.scala new file mode 100644 index 0000000..c693cd7 --- /dev/null +++ b/src/main/scala/mvp2/actors/PgReader.scala @@ -0,0 +1,45 @@ +package mvp2.actors + +import akka.actor.ActorSelection +import akka.persistence.RecoveryCompleted +import mvp2.data.Block +import mvp2.utils.DbService +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.util.control.NonFatal + +class PgReader(dbService: DbService, batchSize: Int) extends CommonActor { + + val blockchainer: ActorSelection = context.actorSelection("/user/starter/blockchainer") + + dbService + .selectMaxHeightOpt() + .recoverWith { + case NonFatal(th) => + blockchainer ! RecoveryCompleted + logger.warn("Failed to connect to postgres") + context.stop(self) + Future.failed(th) + } + .foreach(self ! _.getOrElse(0)) + + override def specialBehavior: Receive = { + case height: Int => (0 to height).sliding(batchSize, batchSize).foldLeft(Future.successful(List[Block]())) { + case (prevBlocks, slide) => + val from: Int = slide.head + val to: Int = slide.last + logger.info(s"Trying to download blocks $from-$to from postgres") + prevBlocks.flatMap { retrieved => + if (retrieved.nonEmpty)logger.info(s"Downloaded blocks ${retrieved.head.height}-${retrieved.last.height} from postgres") + retrieved.foreach(blockchainer ! _) + if (to == height) blockchainer ! RecoveryCompleted + dbService.selectBlocksInHeightRange(from, to) + }.recoverWith { + case NonFatal(th) => + logger.warn("Failed to download blocks", th) + blockchainer ! RecoveryCompleted + Future.failed(th) + } + } + } +} diff --git a/src/main/scala/mvp2/actors/PgWriter.scala b/src/main/scala/mvp2/actors/PgWriter.scala new file mode 100644 index 0000000..8f7073c --- /dev/null +++ b/src/main/scala/mvp2/actors/PgWriter.scala @@ -0,0 +1,15 @@ +package mvp2.actors + +import mvp2.actors.Accountant.Account +import mvp2.data.Block +import mvp2.utils.DbService + +class PgWriter(dbService: DbService) extends CommonActor { + + override def specialBehavior: Receive = { + case block: Block => dbService.writeBlock(block) + case account: Account => dbService.writeAccount(account) + } + + override def postStop(): Unit = dbService.shutdown() +} diff --git a/src/main/scala/mvp2/actors/Starter.scala b/src/main/scala/mvp2/actors/Starter.scala index 87d9db8..003e926 100644 --- a/src/main/scala/mvp2/actors/Starter.scala +++ b/src/main/scala/mvp2/actors/Starter.scala @@ -1,11 +1,11 @@ package mvp2.actors -import akka.actor.Props -import mvp2.utils.Settings import scala.language.postfixOps import com.typesafe.config.ConfigFactory import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ +import akka.actor.Props +import mvp2.utils.{DbService, Settings} class Starter extends CommonActor { @@ -28,5 +28,13 @@ class Starter extends CommonActor { context.actorOf(Props(classOf[Zombie]), "zombie") context.actorOf(Props(classOf[Informator], settings), "informator") context.actorOf(Props(classOf[TimeProvider], settings), "timeProvider") + settings.postgres.foreach { pgSettings => + if (pgSettings.read || pgSettings.write) { + val dbService = new DbService(pgSettings) + if (pgSettings.write) context.actorOf(Props(classOf[PgWriter], dbService), "pgWriter") + if (pgSettings.read) + context.actorOf(Props(classOf[PgReader], dbService, settings.postgres.flatMap(_.batchSize).getOrElse(5)), "pgReader") + } + } } } \ No newline at end of file diff --git a/src/main/scala/mvp2/utils/DbService.scala b/src/main/scala/mvp2/utils/DbService.scala new file mode 100644 index 0000000..0cb3327 --- /dev/null +++ b/src/main/scala/mvp2/utils/DbService.scala @@ -0,0 +1,184 @@ +package mvp2.utils + +import akka.util.ByteString +import cats.effect.IO +import cats.implicits._ +import doobie.free.connection.ConnectionIO +import doobie.implicits._ +import doobie.hikari.HikariTransactor +import doobie.postgres.implicits._ +import com.zaxxer.hikari.HikariDataSource +import doobie.hikari.implicits._ +import doobie.util.update.Update +import mvp2.actors.Accountant.Account +import mvp2.data.{Block, KeyBlock, MicroBlock, Transaction} +import mvp2.utils.DbService.{BlockDbVersion, TransactionDbVersion} +import mvp2.utils.EncodingUtils._ +import scala.concurrent.Future + +class DbService(settings: PostgresSettings) { + + import Queries._ + + def writeBlock(block: Block): Future[Int] = runAsync(writeBlockQuery(block)) + def writeAccount(account: Account): Future[Int] = runAsync(writeAccountQuery(account)) + def selectMaxHeightOpt(): Future[Option[Int]] = runAsync(selectMaxHeightOptQuery) + def selectBlocksInHeightRange(from: Int, to: Int): Future[List[Block]] = runAsync(selectBlocksByRangeQuery(from, to)) + + private val dataSource = new HikariDataSource + dataSource.setJdbcUrl(s"${settings.host}?loggerLevel=OFF") + dataSource.setUsername(settings.user) + dataSource.setPassword(settings.pass) + dataSource.setMaximumPoolSize(1) + + private val pgTransactor: HikariTransactor[IO] = HikariTransactor[IO](dataSource) + + def shutdown(): Future[Unit] = pgTransactor.shutdown.unsafeToFuture + + private def runAsync[A](io: ConnectionIO[A]): Future[A] = + (for { + res <- io.transact(pgTransactor) + } yield res) + .unsafeToFuture() +} + +private object DbService { + case class BlockDbVersion(isMicroBlock: Boolean, + height: Long, + timestamp: Long, + prevKeyBlockHash: Array[Byte], + currentBlockHash: String, + data: Array[Byte], + prevMicroBlock: Option[Array[Byte]]) + + object BlockDbVersion { + def apply(block: Block): BlockDbVersion = block match { + case micro: MicroBlock => + BlockDbVersion( + true, + micro.height, + micro.timestamp, + micro.previousKeyBlockHash.toArray, + encode2Base16(micro.currentBlockHash), + micro.data.toArray, + Some(micro.previousMicroBlock.toArray) + ) + case key: KeyBlock => + BlockDbVersion( + false, + key.height, + key.timestamp, + key.previousKeyBlockHash.toArray, + encode2Base16(key.currentBlockHash), + key.data.toArray, + None + ) + } + } + + case class TransactionDbVersion(publicKey: String, ts: Long, nonce: Long, signature: Array[Byte], data: Array[Byte], blockHash: String) { + def toTransaction: Transaction = Transaction(decodeFromBase16(publicKey), ts, nonce, ByteString(signature), ByteString(data)) + } + object TransactionDbVersion { + def apply(txs: List[Transaction], block: Block): List[TransactionDbVersion] = txs.map { tx => + TransactionDbVersion( + encode2Base16(tx.publicKey), + tx.timestamp, + tx.nonce, + tx.signature.toArray, + tx.data.toArray, + encode2Base16(block.currentBlockHash) + ) + } + } +} + +private object Queries { + + def selectBlocksByRangeQuery(from: Int, to: Int): ConnectionIO[List[Block]] = for { + infos <- selectBlocksInfosByRange(from, to) + txs <- selectTransactionsForBlocks(from, to) + } yield { + infos.map { dbBlock => + val relatedTxs: List[Transaction] = txs.filter(_.blockHash == dbBlock.currentBlockHash).map(_.toTransaction) + if (dbBlock.isMicroBlock) + MicroBlock( + dbBlock.height, + dbBlock.timestamp, + ByteString(dbBlock.prevKeyBlockHash), + ByteString(dbBlock.prevMicroBlock.getOrElse(Array.empty[Byte])), + ByteString(dbBlock.currentBlockHash), + relatedTxs, + ByteString(dbBlock.data) + ) + else + KeyBlock( + dbBlock.height, + dbBlock.timestamp, + ByteString(dbBlock.prevKeyBlockHash), + ByteString(dbBlock.currentBlockHash), + relatedTxs, + ByteString(dbBlock.data) + ) + } + } + + def selectBlocksInfosByRange(from: Int, to: Int): ConnectionIO[List[BlockDbVersion]] = + sql"SELECT * FROM public.blocks WHERE height >= $from AND height <= $to ORDER BY height ASC".query[BlockDbVersion].to[List] + + def selectTransactionsForBlocks(from: Int, to: Int): ConnectionIO[List[TransactionDbVersion]] = + sql"SELECT * FROM public.transactions WHERE block_hash IN (SELECT current_block_hash FROM public.blocks WHERE height >= $from AND height <= $to)" + .query[TransactionDbVersion].to[List] + + def selectMaxHeightOptQuery: ConnectionIO[Option[Int]] = sql"SELECT MAX(height) FROM public.blocks".query[Option[Int]].unique + + def writeAccountQuery(account: Account): ConnectionIO[Int] = for { + accW <- writeAccountInfoQuery(account) + dataW <- writeAccountDataQuery(account) + } yield accW + dataW + + def writeAccountInfoQuery(account: Account): ConnectionIO[Int] = { + val query: String = + """ + |INSERT INTO public.accounts (public_key, nonce) VALUES(?, ?) ON CONFLICT (public_key) DO UPDATE SET nonce = EXCLUDED.nonce + """.stripMargin + Update[(String, Long)](query).run((encode2Base16(account.publicKey), account.nonce)) + } + + def writeAccountDataQuery(account: Account): ConnectionIO[Int] = { + val query: String = + """ + |INSERT INTO public.accounts_data (public_key, number_in_account, data) VALUES(?, ?, ?) ON CONFLICT (public_key, number_in_account) DO NOTHING + """.stripMargin + Update[(String, Int, Array[Byte])](query) + .updateMany(account.data.zipWithIndex.map(entry => (encode2Base16(account.publicKey), entry._2, entry._1.toArray))) + } + + def writeBlockQuery(block: Block): ConnectionIO[Int] = for { + blockW <- writeBlockInfoQuery(block) + txsW <- block match { + case key: KeyBlock => writeTransactionsQuery(key.transactions, block) + case micro: MicroBlock => writeTransactionsQuery(micro.transactions, block) + } + } yield blockW + txsW + + + def writeBlockInfoQuery(block: Block): ConnectionIO[Int] = { + val blockDto: BlockDbVersion = BlockDbVersion(block) + val query: String = + """ + |INSERT INTO public.blocks (is_microblock, height, timestamp, previous_key_block_hash, current_block_hash, + | data, prev_microblock) VALUES(?, ?, ?, ?, ?, ?, ?) + """.stripMargin + Update[BlockDbVersion](query).run(blockDto) + } + + def writeTransactionsQuery(transactions: List[Transaction], block: Block): ConnectionIO[Int] = { + val query: String = + """ + |INSERT INTO public.transactions(public_key, timestamp, nonce, signature, data, block_hash) VALUES(?, ?, ?, ?, ?, ?) + """.stripMargin + Update[TransactionDbVersion](query).updateMany(TransactionDbVersion(transactions, block)) + } + +} \ No newline at end of file diff --git a/src/main/scala/mvp2/utils/Settings.scala b/src/main/scala/mvp2/utils/Settings.scala index 9b62fe8..41c65d5 100644 --- a/src/main/scala/mvp2/utils/Settings.scala +++ b/src/main/scala/mvp2/utils/Settings.scala @@ -11,7 +11,8 @@ case class Settings(port: Int, ntp: NetworkTimeProviderSettings, influx: InfluxSettings, testingSettings: TestingSettings, - mempoolSetting: MempoolSetting + mempoolSetting: MempoolSetting, + postgres: Option[PostgresSettings] ) case class Node(host: String, port: Int) @@ -24,4 +25,6 @@ case class NetworkTimeProviderSettings(server: String, updateEvery: Int, timeout case class MempoolSetting(transactionsValidTime: Long, mempoolCleaningTime: Long) -case class TestingSettings(messagesTime: Boolean, iteratorsSyncTime: Int) \ No newline at end of file +case class TestingSettings(messagesTime: Boolean, iteratorsSyncTime: Int) + +case class PostgresSettings(host: String, pass: String, user: String, read: Boolean, write: Boolean, batchSize: Option[Int]) \ No newline at end of file diff --git a/src/test/scala/mvp2/actors/StateSpec.scala b/src/test/scala/mvp2/actors/StateSpec.scala index fba5609..051e964 100644 --- a/src/test/scala/mvp2/actors/StateSpec.scala +++ b/src/test/scala/mvp2/actors/StateSpec.scala @@ -9,7 +9,7 @@ class StateSpec extends TestKit(ActorSystem("StateTestSystem")) with PropSpecLik property("state applying blocks") { val numBlocks = 10 - val stateActor = system.actorOf(Props[Accountant], "Accountant") + val stateActor = system.actorOf(Props(classOf[Accountant], false), "Accountant") val sampleBlockChain = DummyTestBlockGenerator.generateChain(numBlocks) sampleBlockChain.foreach(b => stateActor ! b) Thread.sleep(1000)