Skip to content

Commit

Permalink
Merge pull request #1013 from jmarin/institutions-cassandra
Browse files Browse the repository at this point in the history
Institutions cassandra
  • Loading branch information
schbetsy authored Jun 16, 2017
2 parents 29021da + 1172811 commit 0ef4f2a
Show file tree
Hide file tree
Showing 19 changed files with 442 additions and 9 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,21 @@ run <full local path to sample file>
A sample file is located in the following folder: `hmda-platform/persistence/src/main/resources/demoInstitutions.csv`


* In order to support the read side, a local PostgreSQL server is needed. Assuming it runs on the default port, on the same machine as the API, the following environment variable needs to be set:
* In order to support the read side, a local PostgreSQL and Cassandra server are needed. Assuming it runs on the default port, on the same machine as the API, the following environment variable needs to be set:

```shell
export JDBC_URL='jdbc:postgresql://localhost/hmda?user=postgres&password=postgres'
```

where `hmda` is the name of the `PostgreSQL` database, owned by the default user with default password (`postgres`)

For Cassandra, the following environment variables need to be set (assuming Cassandra is running on a docker container as described above):

```shell
export CASSANDRA_CLUSTER_HOSTS=192.168.99.100
export CASSANDRA_CLUSTER_PORT=9042
```

**Note: if you are running the backend only through sbt, the database needs to be created manually in advance, see instructions [here](https://www.postgresql.org/docs/9.1/static/manage-ag-createdb.html)**

* The `HMDA Platform` is a distributed system that is meant to be run as a clustered application in production.
Expand Down
6 changes: 6 additions & 0 deletions api/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ akka {
cluster {
failure-detector.threshold = 12 //Increase value for AWS environments
metrics.enabled = off

retry-unsuccessful-join-after = 20s

//DON'T USE IN PRODUCTION!!!!!
auto-down-unreachable-after = 10s

}

extensions = ["de.heikoseeberger.constructr.ConstructrExtension"]
Expand Down
7 changes: 4 additions & 3 deletions api/src/main/scala/hmda/api/HmdaPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import hmda.persistence.HmdaSupervisor._
import hmda.query.HmdaQuerySupervisor._
import hmda.persistence.demo.DemoData
Expand All @@ -24,8 +23,7 @@ import hmda.query.DbConfiguration._
import hmda.query.projections.filing.HmdaFilingDBProjection._
import hmda.validation.ValidationStats
import hmda.api.HmdaConfig._

import scala.concurrent.ExecutionContext
import hmda.query.projections.institutions.InstitutionCassandraProjection

object HmdaPlatform {

Expand Down Expand Up @@ -78,6 +76,9 @@ object HmdaPlatform {
val institutionViewF = (querySupervisor ? FindActorByName(InstitutionView.name))
.mapTo[ActorRef]

val cassandraProjection = new InstitutionCassandraProjection
cassandraProjection.startUp()

// Start validation stats actor
system.actorOf(ValidationStats.props(), "validation-stats")

Expand Down
6 changes: 6 additions & 0 deletions api/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ db {
keepAliveConnection = true
}
}

cassandra {
host = "127.0.0.1"
port = 9142
keyspace = "hmda_query"
}
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ lazy val query = (project in file("query"))
oldStrategy(x)
},
parallelExecution in Test := false,
libraryDependencies ++= configDeps ++ akkaPersistenceDeps ++ slickDeps
fork in Test := false,
libraryDependencies ++= configDeps ++ akkaPersistenceDeps ++ slickDeps ++ Seq(cassandraDriver, cassandraUnit, alpakkaCassandra)
)
.dependsOn(modelJVM % "compile->compile;test->test")
.dependsOn(persistenceModel % "compile->compile;test->test")
Expand All @@ -186,8 +187,8 @@ lazy val api = (project in file("api"))
)
)
.dependsOn(persistenceModel % "compile->compile;test->test")
.dependsOn(query % "compile->compile")
.dependsOn(persistence % "compile->compile")
.dependsOn(query % "compile->compile;test->test")
.dependsOn(persistence % "compile->compile;test->test")


lazy val platformTest = (project in file("platform-test"))
Expand Down
16 changes: 15 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ services:
volumes:
- ./target/scala-2.12/hmda.jar:/opt/hmda.jar
depends_on:
- cassandra
- zookeeper
- query_db
environment:
JDBC_URL: jdbc:postgresql://192.168.99.100:54321/hmda?user=postgres&password=postgres
ZOOKEEPER_HOST: zookeeper
ZOOKEEPER_PORT: 2181
CASSANDRA_CLUSTER_HOSTS: cassandra
CASSANDRA_CLUSTER_PORT: 9042
HMDA_IS_DEMO: 'true'
restart: always

ui:
build:
Expand Down Expand Up @@ -117,4 +122,13 @@ services:
zookeeper:
image: jplock/zookeeper
ports:
- '2181:2181'
- '2181:2181'
depends_on:
- cassandra

cassandra:
image: cassandra
ports:
- '9042:9042'
- '7000:7000'
- '7199:7199'
3 changes: 3 additions & 0 deletions persistence-model/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ cassandra-journal {
keyspace-autocreate-retries = 5
connect-retries = 10
connect-retry-delay = 5s
reconnect-max-delay = 60s
}

cassandra-snapshot-store {
Expand All @@ -86,11 +87,13 @@ cassandra-snapshot-store {
keyspace-autocreate-retries = 5
connect-retries = 10
connect-retry-delay = 5s
reconnect-max-delay = 60s
}

cassandra-query-journal {
refresh-interval = 5s
refresh-interval = ${?QUERY_JOURNAL_REFRESH_INTERVAL}
read-retries = 10
}

hmda {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ object HmdaQuery {
.map(_.event.asInstanceOf[Event])
}

def liveEvents(persistenceId: String)(implicit system: ActorSystem, materializer: ActorMaterializer): Source[Event, NotUsed] = {
readJournal(system).eventsByPersistenceId(persistenceId, 0L, Long.MaxValue)
.map(_.event.asInstanceOf[Event])
}

def eventsWithSequenceNumber(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long)(implicit system: ActorSystem, materializer: ActorMaterializer): Source[EventWithSeqNr, NotUsed] = {
readJournal(system)
.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
Expand Down
4 changes: 3 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,7 @@ object Dependencies {
val scalaCsv = "com.github.tototoshi" %% "scala-csv" % Version.scalaCsv
val constructr = "de.heikoseeberger" %% "constructr" % Version.constructrVersion
val constructrZookeeper = "com.lightbend.constructr" %% "constructr-coordination-zookeeper" % Version.constructrZookeeperVersion

val cassandraUnit = "org.cassandraunit" % "cassandra-unit" % Version.cassandraUnit
val alpakkaCassandra = "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % Version.alpakka
val cassandraDriver = "com.datastax.cassandra" % "cassandra-driver-core" % Version.cassandraDriver
}
3 changes: 3 additions & 0 deletions project/Version.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ object Version {
val scalaCsv = "1.3.4"
val constructrVersion = "0.17.0"
val constructrZookeeperVersion = "0.3.3"
val cassandraUnit = "3.1.3.2"
val alpakka = "0.9"
val cassandraDriver = "3.2.0"
}
9 changes: 9 additions & 0 deletions query/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,12 @@ db {
}
}

cassandra {
host = "127.0.0.1"
host = ${CASSANDRA_CLUSTER_HOSTS}
port = 9042
port = ${CASSANDRA_CLUSTER_PORT}
keyspace = "hmda_query"
retries = 60
retry-interval = 1000
}
12 changes: 12 additions & 0 deletions query/src/main/scala/hmda/query/CassandraConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package hmda.query

import com.typesafe.config.ConfigFactory

object CassandraConfig {
val config = ConfigFactory.load()
val cassandraHost = config.getString("cassandra.host")
val cassandraPort = config.getInt("cassandra.port")
val cassandraKeyspace = config.getString("cassandra.keyspace")
val numberOfRetries = config.getInt("cassandra.retries")
val retryInterval = config.getLong("cassandra.retry-interval")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package hmda.query.projections.institutions

import akka.NotUsed
import akka.actor.{ ActorSystem, Scheduler }
import akka.stream.ActorMaterializer
import akka.stream.alpakka.cassandra.scaladsl.CassandraSink
import akka.stream.scaladsl.Source
import hmda.persistence.messages.events.institutions.InstitutionEvents.{ InstitutionCreated, InstitutionModified }
import hmda.persistence.processing.HmdaQuery._
import hmda.query.model.institutions.InstitutionQuery
import hmda.query.repository.institutions.InstitutionCassandraRepository
import hmda.query.repository.institutions.InstitutionConverter._

import scala.concurrent.ExecutionContext

class InstitutionCassandraProjection extends InstitutionCassandraRepository {

def startUp(): Unit = {

createKeyspace()
createTable()

val source: Source[InstitutionQuery, NotUsed] = liveEvents("institutions").map {
case InstitutionCreated(i) => i
case InstitutionModified(i) => i
}

val sink = CassandraSink[InstitutionQuery](parallelism = 2, preparedStatement, statementBinder)

source.runWith(sink)

}

override implicit def materializer: ActorMaterializer = ActorMaterializer()

override implicit def system: ActorSystem = ActorSystem()

override implicit val ec: ExecutionContext = system.dispatcher

override implicit val scheduler: Scheduler = system.scheduler
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package hmda.query.repository

import akka.actor.Scheduler
import akka.{ Done, NotUsed }
import akka.stream.scaladsl.Source
import com.datastax.driver.core.policies.ExponentialReconnectionPolicy
import com.datastax.driver.core.{ Cluster, ResultSet, Row, Session }
import hmda.query.CassandraConfig._
import scala.annotation.tailrec
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Success, Try }
import org.slf4j.LoggerFactory

trait CassandraRepository[A] {

implicit val ec: ExecutionContext
implicit val scheduler: Scheduler

val log = LoggerFactory.getLogger("CassandraRepository")

val keyspace = "hmda_query"

@tailrec
private def retry[T](n: Int)(fn: => T): Try[T] = {
log.info("*********ATTEMPTING CONNECTION TO CASSANDRA QUERY CLUSTER********")
Try { fn } match {
case x: Success[T] => x
case _ if n > 1 =>
Thread.sleep(retryInterval)
retry(n - 1)(fn)
case fn => fn
}
}

implicit val session: Session = {
retry(numberOfRetries) {
Cluster
.builder
.addContactPoint(cassandraHost)
.withPort(cassandraPort)
.withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 200000L))
.build
.connect()
}.getOrElse(null)
}

def createKeyspace(): ResultSet = {
val query =
s"""
|CREATE KEYSPACE IF NOT EXISTS $keyspace WITH REPLICATION = {
| 'class': 'SimpleStrategy',
| 'replication_factor': '1'
|}
""".stripMargin

session.execute(query)
}
def dropKeyspace(): ResultSet = {
val query =
s"""
|DROP KEYSPACE $keyspace
""".stripMargin
session.execute(query)
}
def createTable(): ResultSet
def dropTable(): ResultSet
def insertData(source: Source[A, NotUsed]): Future[Done]
def readData(fetchSize: Int): Future[Seq[Row]]

}
Loading

0 comments on commit 0ef4f2a

Please sign in to comment.