From bc8ad4f1d178ef6cf8aeca68a22652035759dcba Mon Sep 17 00:00:00 2001 From: YERMLV Date: Fri, 2 Nov 2018 23:34:24 +0200 Subject: [PATCH 1/5] Added writing to pg --- build.sbt | 10 +- src/main/resources/application.conf | 8 ++ src/main/resources/data_model.sql | 30 ++++ src/main/scala/mvp2/actors/Accountant.scala | 3 +- src/main/scala/mvp2/actors/Blockchainer.scala | 5 +- src/main/scala/mvp2/actors/PgWriter.scala | 15 ++ src/main/scala/mvp2/actors/Starter.scala | 10 +- src/main/scala/mvp2/utils/DbService.scala | 131 ++++++++++++++++++ src/main/scala/mvp2/utils/Settings.scala | 7 +- 9 files changed, 211 insertions(+), 8 deletions(-) create mode 100644 src/main/resources/data_model.sql create mode 100644 src/main/scala/mvp2/actors/PgWriter.scala create mode 100644 src/main/scala/mvp2/utils/DbService.scala diff --git a/build.sbt b/build.sbt index 760b5c9..63df36a 100644 --- a/build.sbt +++ b/build.sbt @@ -6,6 +6,14 @@ scalaVersion := "2.12.6" val akkaVersion = "2.5.13" val akkaHttpVersion = "10.0.9" val logbackVersion = "1.2.3" +val doobieVersion = "0.5.2" + +val databaseDependencies = Seq( + "org.tpolecat" %% "doobie-core" % doobieVersion, + "org.tpolecat" %% "doobie-postgres" % doobieVersion, + "org.tpolecat" %% "doobie-specs2" % doobieVersion, + "org.tpolecat" %% "doobie-hikari" % doobieVersion +) libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, @@ -29,7 +37,7 @@ libraryDependencies ++= Seq( "ch.qos.logback" % "logback-core" % logbackVersion, "com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test, "org.scalatest" %% "scalatest" % "3.0.5" % Test -) +) ++ databaseDependencies resolvers ++= Seq("Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/", "SonaType" at "https://oss.sonatype.org/content/groups/public", diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 3f03d54..b999daf 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -9,5 +9,13 @@ httpPort = 9151 timeout = 10 } + + postgres { + host = "" + pass = "" + user = "" + read = false + write = false + } } } \ No newline at end of file diff --git a/src/main/resources/data_model.sql b/src/main/resources/data_model.sql new file mode 100644 index 0000000..7fd6592 --- /dev/null +++ b/src/main/resources/data_model.sql @@ -0,0 +1,30 @@ +CREATE TABLE accounts( + public_key TEXT PRIMARY KEY, + nonce BIGINT NOT NULL +); + +CREATE TABLE accounts_data( + public_key TEXT REFERENCES accounts(public_key), + number_in_account INT NOT NULL, + data BYTEA, + PRIMARY KEY (public_key, number_in_account) +); + +CREATE table blocks( + is_microblock BOOLEAN NOT NULL, + height BIGINT NOT NULL, + timestamp BIGINT NOT NULL, + previous_key_block_hash BYTEA, + current_block_hash TEXT PRIMARY KEY, + data BYTEA, + prev_microblock BYTEA +); + +CREATE TABLE transactions( + public_key TEXT, + nonce BIGINT NOT NULL, + signature BYTEA, + data BYTEA, + block_hash TEXT REFERENCES blocks(current_block_hash), + PRIMARY KEY (public_key, nonce, signature) +); \ No newline at end of file diff --git a/src/main/scala/mvp2/actors/Accountant.scala b/src/main/scala/mvp2/actors/Accountant.scala index 534e15e..3cf72d4 100644 --- a/src/main/scala/mvp2/actors/Accountant.scala +++ b/src/main/scala/mvp2/actors/Accountant.scala @@ -5,7 +5,7 @@ import mvp2.utils.Sha256 import mvp2.data.{KeyBlock, MicroBlock, Transaction} import scala.collection.immutable.HashMap -class Accountant extends CommonActor { +class Accountant(saveToPostgres: Boolean) extends CommonActor { import Accountant.Account @@ -29,6 +29,7 @@ class Accountant extends CommonActor { } accountsInfo = accountsInfo + (singleParty._1 -> account) stateRoot = Sha256.toSha256(accountsInfo.toString) + if (saveToPostgres) context.actorSelection("/user/starter/pgWriter") ! account } } diff --git a/src/main/scala/mvp2/actors/Blockchainer.scala b/src/main/scala/mvp2/actors/Blockchainer.scala index 9833b6a..44f3c11 100644 --- a/src/main/scala/mvp2/actors/Blockchainer.scala +++ b/src/main/scala/mvp2/actors/Blockchainer.scala @@ -8,12 +8,12 @@ import mvp2.data._ import mvp2.messages.Get import scala.collection.immutable.TreeMap -class Blockchainer extends PersistentActor with StrictLogging with Blockchain { +class Blockchainer(saveToPostgres: Boolean) extends PersistentActor with StrictLogging with Blockchain { var appendix: Appendix = Appendix(TreeMap()) val accountant: ActorSelection = context.system.actorSelection("/user/starter/blockchainer/accountant") - context.actorOf(Props(classOf[Accountant]), "accountant") + context.actorOf(Props(classOf[Accountant], saveToPostgres), "accountant") override def receiveRecover: Receive = { case keyBlock: KeyBlock => update(keyBlock) @@ -48,6 +48,7 @@ class Blockchainer extends PersistentActor with StrictLogging with Blockchain { appendix = appendix.copy(appendix.chain + (microBlock.height -> microBlock)) accountant ! microBlock } + if (saveToPostgres) context.actorSelection("/user/starter/pgWriter") ! block } } diff --git a/src/main/scala/mvp2/actors/PgWriter.scala b/src/main/scala/mvp2/actors/PgWriter.scala new file mode 100644 index 0000000..8f7073c --- /dev/null +++ b/src/main/scala/mvp2/actors/PgWriter.scala @@ -0,0 +1,15 @@ +package mvp2.actors + +import mvp2.actors.Accountant.Account +import mvp2.data.Block +import mvp2.utils.DbService + +class PgWriter(dbService: DbService) extends CommonActor { + + override def specialBehavior: Receive = { + case block: Block => dbService.writeBlock(block) + case account: Account => dbService.writeAccount(account) + } + + override def postStop(): Unit = dbService.shutdown() +} diff --git a/src/main/scala/mvp2/actors/Starter.scala b/src/main/scala/mvp2/actors/Starter.scala index ec2cf67..5435548 100644 --- a/src/main/scala/mvp2/actors/Starter.scala +++ b/src/main/scala/mvp2/actors/Starter.scala @@ -1,7 +1,7 @@ package mvp2.actors import akka.actor.{Actor, Props} -import mvp2.utils.Settings +import mvp2.utils.{DbService, Settings} import com.typesafe.scalalogging.StrictLogging import scala.language.postfixOps import com.typesafe.config.ConfigFactory @@ -30,7 +30,13 @@ class Starter extends Actor with StrictLogging { ) context.actorOf(Props(classOf[Networker], settings).withDispatcher("net-dispatcher") .withMailbox("net-mailbox"), "networker") - context.actorOf(Props(classOf[Blockchainer]), "blockchainer") + context.actorOf(Props(classOf[Blockchainer], settings.postgres.exists(_.write)), "blockchainer") context.actorOf(Props[Zombie]) + settings.postgres.foreach { pgSettings => + if (pgSettings.read || pgSettings.write) { + val dbService = new DbService(pgSettings) + if (pgSettings.write) context.actorOf(Props(classOf[PgWriter], dbService), "pgWriter") + } + } } } \ No newline at end of file diff --git a/src/main/scala/mvp2/utils/DbService.scala b/src/main/scala/mvp2/utils/DbService.scala new file mode 100644 index 0000000..a69ab31 --- /dev/null +++ b/src/main/scala/mvp2/utils/DbService.scala @@ -0,0 +1,131 @@ +package mvp2.utils + +import cats.effect.IO +import cats.implicits._ +import doobie.free.connection.ConnectionIO +import doobie.implicits._ +import doobie.hikari.HikariTransactor +import doobie.postgres.implicits._ +import com.zaxxer.hikari.HikariDataSource +import doobie.hikari.implicits._ +import doobie.util.update.Update +import mvp2.actors.Accountant.Account +import mvp2.data.{Block, KeyBlock, MicroBlock, Transaction} +import mvp2.utils.DbService.BlockDbVersion +import mvp2.utils.EncodingUtils._ +import scala.concurrent.Future + +class DbService(settings: PostgresSettings) { + + import Queries._ + + def writeBlock(block: Block): Future[Int] = runAsync(writeBlockQuery(block)) + def writeAccount(account: Account): Future[Int] = runAsync(writeAccountQuery(account)) + + private val dataSource = new HikariDataSource + dataSource.setJdbcUrl(s"${settings.host}?loggerLevel=OFF") + dataSource.setUsername(settings.user) + dataSource.setPassword(settings.pass) + dataSource.setMaximumPoolSize(1) + + private val pgTransactor: HikariTransactor[IO] = HikariTransactor[IO](dataSource) + + def shutdown(): Future[Unit] = pgTransactor.shutdown.unsafeToFuture + + private def runAsync[A](io: ConnectionIO[A]): Future[A] = + (for { + res <- io.transact(pgTransactor) + } yield res) + .unsafeToFuture() +} + +private object DbService { + case class BlockDbVersion(isMicroBlock: Boolean, + height: Long, + timestamp: Long, + prevKeyBlockHash: Array[Byte], + currentBlockHash: String, + data: Array[Byte], + prevMicroBlock: Option[Array[Byte]]) + + object BlockDbVersion { + def apply(block: Block): BlockDbVersion = block match { + case micro: MicroBlock => + BlockDbVersion( + true, + micro.height, + micro.timestamp, + micro.previousKeyBlockHash.toArray, + encode2Base64(micro.currentBlockHash), + micro.data.toArray, + Some(micro.previousMicroBlock.toArray) + ) + case key: KeyBlock => + BlockDbVersion( + false, + key.height, + key.timestamp, + key.previousKeyBlockHash.toArray, + encode2Base64(key.currentBlockHash), + key.data.toArray, + None + ) + } + } +} + +private object Queries { + + def writeAccountQuery(account: Account): ConnectionIO[Int] = for { + accW <- writeAccountInfoQuery(account) + dataW <- writeAccountDataQuery(account) + } yield accW + dataW + + def writeAccountInfoQuery(account: Account): ConnectionIO[Int] = { + val query: String = + """ + |INSERT INTO public.accounts (public_key, nonce) VALUES(?, ?) ON CONFLICT (public_key) DO NOTHING + """.stripMargin + Update[(String, Long)](query).run((encode2Base64(account.publicKey), account.nonce)) + } + + def writeAccountDataQuery(account: Account): ConnectionIO[Int] = { + val query: String = + """ + |INSERT INTO public.accounts_data (public_key, number_in_account, data) VALUES(?, ?, ?) ON CONFLICT (public_key, number_in_account) DO NOTHING + """.stripMargin + Update[(String, Int, Array[Byte])](query) + .updateMany(account.data.zipWithIndex.map(entry => (encode2Base64(account.publicKey), entry._2, entry._1.toArray))) + } + + def writeBlockQuery(block: Block): ConnectionIO[Int] = for { + blockW <- writeBlockInfoQuery(block) + txsW <- block match { + case key: KeyBlock => writeTransactionsQuery(key.transactions, block) + case micro: MicroBlock => writeTransactionsQuery(micro.transactions, block) + } + } yield blockW + txsW + + + def writeBlockInfoQuery(block: Block): ConnectionIO[Int] = { + val blockDto: BlockDbVersion = BlockDbVersion(block) + val query: String = + """ + |INSERT INTO public.blocks (is_microblock, height, timestamp, previous_key_block_hash, current_block_hash, + | data, prev_microblock) VALUES(?, ?, ?, ?, ?, ?, ?) + """.stripMargin + Update[BlockDbVersion](query).run(blockDto) + } + + def writeTransactionsQuery(transactions: List[Transaction], block: Block): ConnectionIO[Int] = { + val query: String = + """ + |INSERT INTO transactions(public_key, nonce, signature, data, block_hash) VALUES(?, ?, ?, ?, ?) + """.stripMargin + Update[(String, Long, Array[Byte], Array[Byte], String)](query) + .updateMany(transactions.map { tx => + (encode2Base64(tx.publicKey), tx.nonce, tx.signature.toArray, tx.data.toArray, encode2Base64(block.currentBlockHash)) + }) + } + +} \ No newline at end of file diff --git a/src/main/scala/mvp2/utils/Settings.scala b/src/main/scala/mvp2/utils/Settings.scala index 12d4689..acbfc34 100644 --- a/src/main/scala/mvp2/utils/Settings.scala +++ b/src/main/scala/mvp2/utils/Settings.scala @@ -5,7 +5,8 @@ case class Settings(port: Int, heartbeat: Int, apiSettings: ApiSettings, influx: Option[InfluxSettings], - testingSettings: Option[TestingSettings] + testingSettings: Option[TestingSettings], + postgres: Option[PostgresSettings] ) case class Node(host: String, port: Int) @@ -14,4 +15,6 @@ case class ApiSettings(httpHost: String, httpPort: Int, timeout: Int) case class InfluxSettings(host: String, port: Int, login: String, password: String) -case class TestingSettings(pingPong: Boolean) \ No newline at end of file +case class TestingSettings(pingPong: Boolean) + +case class PostgresSettings(host: String, pass: String, user: String, read: Boolean, write: Boolean) \ No newline at end of file From 6e28bc54efeb55745f87742ec91c0ac65ca68cd5 Mon Sep 17 00:00:00 2001 From: YERMLV Date: Tue, 6 Nov 2018 17:20:21 +0200 Subject: [PATCH 2/5] Added reading from pg --- src/main/scala/mvp2/actors/Blockchainer.scala | 7 +- src/main/scala/mvp2/actors/PgReader.scala | 45 ++++++++++++ src/main/scala/mvp2/actors/Starter.scala | 2 + src/main/scala/mvp2/utils/DbService.scala | 70 ++++++++++++++++--- src/main/scala/mvp2/utils/Settings.scala | 2 +- 5 files changed, 115 insertions(+), 11 deletions(-) create mode 100644 src/main/scala/mvp2/actors/PgReader.scala diff --git a/src/main/scala/mvp2/actors/Blockchainer.scala b/src/main/scala/mvp2/actors/Blockchainer.scala index 03eb9ae..794bc21 100644 --- a/src/main/scala/mvp2/actors/Blockchainer.scala +++ b/src/main/scala/mvp2/actors/Blockchainer.scala @@ -5,6 +5,7 @@ import akka.persistence.{PersistentActor, RecoveryCompleted} import akka.util.ByteString import com.typesafe.scalalogging.StrictLogging import mvp2.data._ +import mvp2.utils.EncodingUtils._ import mvp2.utils.Settings import mvp2.messages.{CurrentBlockchainInfo, Get} import scala.collection.immutable.TreeMap @@ -40,6 +41,9 @@ class Blockchainer(settings: Settings) extends PersistentActor with Blockchain w } override def receiveCommand: Receive = { + case RecoveryCompleted if settings.postgres.exists(_.read) => publisher ! lastKeyBlock.getOrElse( + KeyBlock(0, System.currentTimeMillis(), ByteString.empty, List()) + ) case block: Block => saveModifier(block) case Get => chain case _ => logger.info("Got something strange at Blockchainer!") @@ -51,13 +55,14 @@ class Blockchainer(settings: Settings) extends PersistentActor with Blockchain w logger.info(s"New keyBlock with height ${keyBlock.height} is received on blockchainer.") appendix.chain.foreach(block => persist(block._2) { x => - logger.info(s"Successfully saved block with id: ${x.currentBlockHash} and height ${x.height}!") + logger.info(s"Successfully saved block with id: ${encode2Base64(x.currentBlockHash)} and height ${x.height}!") }) update(appendix.chain) appendix = appendix.copy(TreeMap(keyBlock.height -> keyBlock)) accountant ! keyBlock case microBlock: MicroBlock => logger.info(s"KeyBlock is valid with height ${microBlock.height}.") + logger.info(s"Successfully saved microBlock with id: ${encode2Base64(microBlock.currentBlockHash)}!") appendix = appendix.copy(appendix.chain + (microBlock.height -> microBlock)) accountant ! microBlock } diff --git a/src/main/scala/mvp2/actors/PgReader.scala b/src/main/scala/mvp2/actors/PgReader.scala new file mode 100644 index 0000000..c693cd7 --- /dev/null +++ b/src/main/scala/mvp2/actors/PgReader.scala @@ -0,0 +1,45 @@ +package mvp2.actors + +import akka.actor.ActorSelection +import akka.persistence.RecoveryCompleted +import mvp2.data.Block +import mvp2.utils.DbService +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.util.control.NonFatal + +class PgReader(dbService: DbService, batchSize: Int) extends CommonActor { + + val blockchainer: ActorSelection = context.actorSelection("/user/starter/blockchainer") + + dbService + .selectMaxHeightOpt() + .recoverWith { + case NonFatal(th) => + blockchainer ! RecoveryCompleted + logger.warn("Failed to connect to postgres") + context.stop(self) + Future.failed(th) + } + .foreach(self ! _.getOrElse(0)) + + override def specialBehavior: Receive = { + case height: Int => (0 to height).sliding(batchSize, batchSize).foldLeft(Future.successful(List[Block]())) { + case (prevBlocks, slide) => + val from: Int = slide.head + val to: Int = slide.last + logger.info(s"Trying to download blocks $from-$to from postgres") + prevBlocks.flatMap { retrieved => + if (retrieved.nonEmpty)logger.info(s"Downloaded blocks ${retrieved.head.height}-${retrieved.last.height} from postgres") + retrieved.foreach(blockchainer ! _) + if (to == height) blockchainer ! RecoveryCompleted + dbService.selectBlocksInHeightRange(from, to) + }.recoverWith { + case NonFatal(th) => + logger.warn("Failed to download blocks", th) + blockchainer ! RecoveryCompleted + Future.failed(th) + } + } + } +} diff --git a/src/main/scala/mvp2/actors/Starter.scala b/src/main/scala/mvp2/actors/Starter.scala index e106abe..5b7b658 100644 --- a/src/main/scala/mvp2/actors/Starter.scala +++ b/src/main/scala/mvp2/actors/Starter.scala @@ -35,6 +35,8 @@ class Starter extends CommonActor { if (pgSettings.read || pgSettings.write) { val dbService = new DbService(pgSettings) if (pgSettings.write) context.actorOf(Props(classOf[PgWriter], dbService), "pgWriter") + if (pgSettings.read) + context.actorOf(Props(classOf[PgReader], dbService, settings.postgres.flatMap(_.batchSize).getOrElse(5)), "pgReader") } } } diff --git a/src/main/scala/mvp2/utils/DbService.scala b/src/main/scala/mvp2/utils/DbService.scala index a69ab31..cabef2e 100644 --- a/src/main/scala/mvp2/utils/DbService.scala +++ b/src/main/scala/mvp2/utils/DbService.scala @@ -1,5 +1,6 @@ package mvp2.utils +import akka.util.ByteString import cats.effect.IO import cats.implicits._ import doobie.free.connection.ConnectionIO @@ -11,7 +12,7 @@ import doobie.hikari.implicits._ import doobie.util.update.Update import mvp2.actors.Accountant.Account import mvp2.data.{Block, KeyBlock, MicroBlock, Transaction} -import mvp2.utils.DbService.BlockDbVersion +import mvp2.utils.DbService.{BlockDbVersion, TransactionDbVersion} import mvp2.utils.EncodingUtils._ import scala.concurrent.Future @@ -21,6 +22,8 @@ class DbService(settings: PostgresSettings) { def writeBlock(block: Block): Future[Int] = runAsync(writeBlockQuery(block)) def writeAccount(account: Account): Future[Int] = runAsync(writeAccountQuery(account)) + def selectMaxHeightOpt(): Future[Option[Int]] = runAsync(selectMaxHeightOptQuery) + def selectBlocksInHeightRange(from: Int, to: Int): Future[List[Block]] = runAsync(selectBlocksByRangeQuery(from, to)) private val dataSource = new HikariDataSource dataSource.setJdbcUrl(s"${settings.host}?loggerLevel=OFF") @@ -72,19 +75,71 @@ private object DbService { ) } } + + case class TransactionDbVersion(publicKey: String, nonce: Long, signature: Array[Byte], data: Array[Byte], blockHash: String) { + def toTransaction: Transaction = Transaction(decodeFromBase64(publicKey), nonce, ByteString(signature), ByteString(data)) + } + object TransactionDbVersion { + def apply(txs: List[Transaction], block: Block): List[TransactionDbVersion] = txs.map { tx => + TransactionDbVersion( + encode2Base64(tx.publicKey), + tx.nonce, + tx.signature.toArray, + tx.data.toArray, + encode2Base64(block.currentBlockHash) + ) + } + } } private object Queries { + def selectBlocksByRangeQuery(from: Int, to: Int): ConnectionIO[List[Block]] = for { + infos <- selectBlocksInfosByRange(from, to) + txs <- selectTransactionsForBlocks(from, to) + } yield { + infos.map { dbBlock => + val relatedTxs: List[Transaction] = txs.filter(_.blockHash == dbBlock.currentBlockHash).map(_.toTransaction) + if (dbBlock.isMicroBlock) + MicroBlock( + dbBlock.height, + dbBlock.timestamp, + ByteString(dbBlock.prevKeyBlockHash), + ByteString(dbBlock.prevMicroBlock.getOrElse(Array.empty[Byte])), + ByteString(dbBlock.currentBlockHash), + relatedTxs, + ByteString(dbBlock.data) + ) + else + KeyBlock( + dbBlock.height, + dbBlock.timestamp, + ByteString(dbBlock.prevKeyBlockHash), + ByteString(dbBlock.currentBlockHash), + relatedTxs, + ByteString(dbBlock.data) + ) + } + } + + def selectBlocksInfosByRange(from: Int, to: Int): ConnectionIO[List[BlockDbVersion]] = + sql"SELECT * FROM public.blocks WHERE height >= $from AND height <= $to ORDER BY height ASC".query[BlockDbVersion].to[List] + + def selectTransactionsForBlocks(from: Int, to: Int): ConnectionIO[List[TransactionDbVersion]] = + sql"SELECT * FROM public.transactions WHERE block_hash IN (SELECT current_block_hash FROM public.blocks WHERE height >= $from AND height <= $to)" + .query[TransactionDbVersion].to[List] + + def selectMaxHeightOptQuery: ConnectionIO[Option[Int]] = sql"SELECT MAX(height) FROM public.blocks".query[Option[Int]].unique + def writeAccountQuery(account: Account): ConnectionIO[Int] = for { - accW <- writeAccountInfoQuery(account) + accW <- writeAccountInfoQuery(account) dataW <- writeAccountDataQuery(account) } yield accW + dataW def writeAccountInfoQuery(account: Account): ConnectionIO[Int] = { val query: String = """ - |INSERT INTO public.accounts (public_key, nonce) VALUES(?, ?) ON CONFLICT (public_key) DO NOTHING + |INSERT INTO public.accounts (public_key, nonce) VALUES(?, ?) ON CONFLICT (public_key) DO UPDATE SET nonce = EXCLUDED.nonce """.stripMargin Update[(String, Long)](query).run((encode2Base64(account.publicKey), account.nonce)) } @@ -100,7 +155,7 @@ private object Queries { def writeBlockQuery(block: Block): ConnectionIO[Int] = for { blockW <- writeBlockInfoQuery(block) - txsW <- block match { + txsW <- block match { case key: KeyBlock => writeTransactionsQuery(key.transactions, block) case micro: MicroBlock => writeTransactionsQuery(micro.transactions, block) } @@ -120,12 +175,9 @@ private object Queries { def writeTransactionsQuery(transactions: List[Transaction], block: Block): ConnectionIO[Int] = { val query: String = """ - |INSERT INTO transactions(public_key, nonce, signature, data, block_hash) VALUES(?, ?, ?, ?, ?) + |INSERT INTO public.transactions(public_key, nonce, signature, data, block_hash) VALUES(?, ?, ?, ?, ?) """.stripMargin - Update[(String, Long, Array[Byte], Array[Byte], String)](query) - .updateMany(transactions.map { tx => - (encode2Base64(tx.publicKey), tx.nonce, tx.signature.toArray, tx.data.toArray, encode2Base64(block.currentBlockHash)) - }) + Update[TransactionDbVersion](query).updateMany(TransactionDbVersion(transactions, block)) } } \ No newline at end of file diff --git a/src/main/scala/mvp2/utils/Settings.scala b/src/main/scala/mvp2/utils/Settings.scala index acbfc34..8de22a3 100644 --- a/src/main/scala/mvp2/utils/Settings.scala +++ b/src/main/scala/mvp2/utils/Settings.scala @@ -17,4 +17,4 @@ case class InfluxSettings(host: String, port: Int, login: String, password: Stri case class TestingSettings(pingPong: Boolean) -case class PostgresSettings(host: String, pass: String, user: String, read: Boolean, write: Boolean) \ No newline at end of file +case class PostgresSettings(host: String, pass: String, user: String, read: Boolean, write: Boolean, batchSize: Option[Int]) \ No newline at end of file From bf2ee468e1e170cf28f343a6cb75f892ec5c6fcd Mon Sep 17 00:00:00 2001 From: YERMLV Date: Tue, 6 Nov 2018 17:25:19 +0200 Subject: [PATCH 3/5] after merge fix --- src/main/scala/mvp2/actors/Blockchainer.scala | 2 +- src/main/scala/mvp2/actors/Starter.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/mvp2/actors/Blockchainer.scala b/src/main/scala/mvp2/actors/Blockchainer.scala index cc00e50..6b04fbe 100644 --- a/src/main/scala/mvp2/actors/Blockchainer.scala +++ b/src/main/scala/mvp2/actors/Blockchainer.scala @@ -16,7 +16,7 @@ import scala.concurrent.duration._ class Blockchainer(settings: Settings) extends PersistentActor with Blockchain with StrictLogging { var appendix: Appendix = Appendix(TreeMap()) - val accountant: ActorRef = context.actorOf(Props(classOf[Accountant]), "accountant") + val accountant: ActorRef = context.actorOf(Props(classOf[Accountant], settings.postgres.exists(_.write)), "accountant") val networker: ActorRef = context.actorOf(Props(classOf[Networker], settings).withDispatcher("net-dispatcher") .withMailbox("net-mailbox"), "networker") val publisher: ActorRef = context.actorOf(Props[Publisher], "publisher") diff --git a/src/main/scala/mvp2/actors/Starter.scala b/src/main/scala/mvp2/actors/Starter.scala index 257bc29..abd3ef4 100644 --- a/src/main/scala/mvp2/actors/Starter.scala +++ b/src/main/scala/mvp2/actors/Starter.scala @@ -31,7 +31,6 @@ class Starter extends CommonActor { context.actorOf(Props(classOf[ConsoleActor], settings), "cliActor") context.actorOf(Props(classOf[Informator], settings), "informator") context.actorOf(Props(classOf[Zombie]), "zombie") - context.actorOf(Props(classOf[ConsoleActor], settings), "cliActor") settings.postgres.foreach { pgSettings => if (pgSettings.read || pgSettings.write) { val dbService = new DbService(pgSettings) From b95d294a58d68f40e373804de0cb0af6bc613b87 Mon Sep 17 00:00:00 2001 From: YERMLV Date: Tue, 20 Nov 2018 14:51:40 +0200 Subject: [PATCH 4/5] after merge fix --- src/main/resources/data_model.sql | 1 + src/main/scala/mvp2/actors/Blockchainer.scala | 7 +++---- src/main/scala/mvp2/utils/DbService.scala | 19 ++++++++++--------- src/main/scala/mvp2/utils/Settings.scala | 2 +- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/main/resources/data_model.sql b/src/main/resources/data_model.sql index 7fd6592..8f44b08 100644 --- a/src/main/resources/data_model.sql +++ b/src/main/resources/data_model.sql @@ -22,6 +22,7 @@ CREATE table blocks( CREATE TABLE transactions( public_key TEXT, + timestamp BIGINT NOT NULL, nonce BIGINT NOT NULL, signature BYTEA, data BYTEA, diff --git a/src/main/scala/mvp2/actors/Blockchainer.scala b/src/main/scala/mvp2/actors/Blockchainer.scala index 6e6e62a..06727ca 100644 --- a/src/main/scala/mvp2/actors/Blockchainer.scala +++ b/src/main/scala/mvp2/actors/Blockchainer.scala @@ -2,12 +2,11 @@ package mvp2.actors import akka.actor.{ActorRef, ActorSelection, Props} import akka.persistence.{PersistentActor, RecoveryCompleted} +import akka.util.ByteString import com.typesafe.scalalogging.StrictLogging import mvp2.actors.Planner.Period import mvp2.data.InnerMessages.{CurrentBlockchainInfo, Get, TimeDelta} import mvp2.data._ -import mvp2.messages.CurrentBlockchainInfo -import mvp2.messages.Get import mvp2.utils.EncodingUtils._ import mvp2.utils.Settings @@ -28,7 +27,7 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin } override def receiveCommand: Receive = { - case RecoveryCompleted if settings.postgres.exists(_.read) => publisher ! lastKeyBlock.getOrElse( + case RecoveryCompleted if settings.postgres.exists(_.read) => publisher ! blockchain.chain.lastOption.getOrElse( KeyBlock(0, System.currentTimeMillis(), ByteString.empty, List()) ) case keyBlock: KeyBlock => @@ -42,7 +41,7 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin s"Blockchain consists of ${blockchain.chain.size} blocks.") planner ! keyBlock publisher ! keyBlock - if (settings.postgres.exists(_.write)) context.actorSelection("/user/starter/pgWriter") ! block + if (settings.postgres.exists(_.write)) context.actorSelection("/user/starter/pgWriter") ! keyBlock case TimeDelta(delta: Long) => currentDelta = delta case Get => sender ! blockchain case period: Period => diff --git a/src/main/scala/mvp2/utils/DbService.scala b/src/main/scala/mvp2/utils/DbService.scala index cabef2e..0cb3327 100644 --- a/src/main/scala/mvp2/utils/DbService.scala +++ b/src/main/scala/mvp2/utils/DbService.scala @@ -59,7 +59,7 @@ private object DbService { micro.height, micro.timestamp, micro.previousKeyBlockHash.toArray, - encode2Base64(micro.currentBlockHash), + encode2Base16(micro.currentBlockHash), micro.data.toArray, Some(micro.previousMicroBlock.toArray) ) @@ -69,24 +69,25 @@ private object DbService { key.height, key.timestamp, key.previousKeyBlockHash.toArray, - encode2Base64(key.currentBlockHash), + encode2Base16(key.currentBlockHash), key.data.toArray, None ) } } - case class TransactionDbVersion(publicKey: String, nonce: Long, signature: Array[Byte], data: Array[Byte], blockHash: String) { - def toTransaction: Transaction = Transaction(decodeFromBase64(publicKey), nonce, ByteString(signature), ByteString(data)) + case class TransactionDbVersion(publicKey: String, ts: Long, nonce: Long, signature: Array[Byte], data: Array[Byte], blockHash: String) { + def toTransaction: Transaction = Transaction(decodeFromBase16(publicKey), ts, nonce, ByteString(signature), ByteString(data)) } object TransactionDbVersion { def apply(txs: List[Transaction], block: Block): List[TransactionDbVersion] = txs.map { tx => TransactionDbVersion( - encode2Base64(tx.publicKey), + encode2Base16(tx.publicKey), + tx.timestamp, tx.nonce, tx.signature.toArray, tx.data.toArray, - encode2Base64(block.currentBlockHash) + encode2Base16(block.currentBlockHash) ) } } @@ -141,7 +142,7 @@ private object Queries { """ |INSERT INTO public.accounts (public_key, nonce) VALUES(?, ?) ON CONFLICT (public_key) DO UPDATE SET nonce = EXCLUDED.nonce """.stripMargin - Update[(String, Long)](query).run((encode2Base64(account.publicKey), account.nonce)) + Update[(String, Long)](query).run((encode2Base16(account.publicKey), account.nonce)) } def writeAccountDataQuery(account: Account): ConnectionIO[Int] = { @@ -150,7 +151,7 @@ private object Queries { |INSERT INTO public.accounts_data (public_key, number_in_account, data) VALUES(?, ?, ?) ON CONFLICT (public_key, number_in_account) DO NOTHING """.stripMargin Update[(String, Int, Array[Byte])](query) - .updateMany(account.data.zipWithIndex.map(entry => (encode2Base64(account.publicKey), entry._2, entry._1.toArray))) + .updateMany(account.data.zipWithIndex.map(entry => (encode2Base16(account.publicKey), entry._2, entry._1.toArray))) } def writeBlockQuery(block: Block): ConnectionIO[Int] = for { @@ -175,7 +176,7 @@ private object Queries { def writeTransactionsQuery(transactions: List[Transaction], block: Block): ConnectionIO[Int] = { val query: String = """ - |INSERT INTO public.transactions(public_key, nonce, signature, data, block_hash) VALUES(?, ?, ?, ?, ?) + |INSERT INTO public.transactions(public_key, timestamp, nonce, signature, data, block_hash) VALUES(?, ?, ?, ?, ?, ?) """.stripMargin Update[TransactionDbVersion](query).updateMany(TransactionDbVersion(transactions, block)) } diff --git a/src/main/scala/mvp2/utils/Settings.scala b/src/main/scala/mvp2/utils/Settings.scala index 978a06c..41c65d5 100644 --- a/src/main/scala/mvp2/utils/Settings.scala +++ b/src/main/scala/mvp2/utils/Settings.scala @@ -11,7 +11,7 @@ case class Settings(port: Int, ntp: NetworkTimeProviderSettings, influx: InfluxSettings, testingSettings: TestingSettings, - mempoolSetting: MempoolSetting + mempoolSetting: MempoolSetting, postgres: Option[PostgresSettings] ) From fc64cc12db811ab20d1f9c1630f3f588052ff474 Mon Sep 17 00:00:00 2001 From: YERMLV Date: Tue, 20 Nov 2018 15:11:55 +0200 Subject: [PATCH 5/5] Test fix --- src/test/scala/mvp2/actors/StateSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/mvp2/actors/StateSpec.scala b/src/test/scala/mvp2/actors/StateSpec.scala index fba5609..051e964 100644 --- a/src/test/scala/mvp2/actors/StateSpec.scala +++ b/src/test/scala/mvp2/actors/StateSpec.scala @@ -9,7 +9,7 @@ class StateSpec extends TestKit(ActorSystem("StateTestSystem")) with PropSpecLik property("state applying blocks") { val numBlocks = 10 - val stateActor = system.actorOf(Props[Accountant], "Accountant") + val stateActor = system.actorOf(Props(classOf[Accountant], false), "Accountant") val sampleBlockChain = DummyTestBlockGenerator.generateChain(numBlocks) sampleBlockChain.foreach(b => stateActor ! b) Thread.sleep(1000)