diff --git a/README.md b/README.md index ccfc48c361..2f9ae9435c 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,7 @@ run A sample file is located in the following folder: `hmda-platform/persistence/src/main/resources/demoInstitutions.csv` -* In order to support the read side, a local PostgreSQL server is needed. Assuming it runs on the default port, on the same machine as the API, the following environment variable needs to be set: +* In order to support the read side, a local PostgreSQL and Cassandra server are needed. Assuming it runs on the default port, on the same machine as the API, the following environment variable needs to be set: ```shell export JDBC_URL='jdbc:postgresql://localhost/hmda?user=postgres&password=postgres' @@ -111,6 +111,13 @@ export JDBC_URL='jdbc:postgresql://localhost/hmda?user=postgres&password=postgre where `hmda` is the name of the `PostgreSQL` database, owned by the default user with default password (`postgres`) +For Cassandra, the following environment variables need to be set (assuming Cassandra is running on a docker container as described above): + +```shell +export CASSANDRA_CLUSTER_HOSTS=192.168.99.100 +export CASSANDRA_CLUSTER_PORT=9042 +``` + **Note: if you are running the backend only through sbt, the database needs to be created manually in advance, see instructions [here](https://www.postgresql.org/docs/9.1/static/manage-ag-createdb.html)** * The `HMDA Platform` is a distributed system that is meant to be run as a clustered application in production. diff --git a/api/src/main/resources/application.conf b/api/src/main/resources/application.conf index e56ca4523c..209f72ecc3 100644 --- a/api/src/main/resources/application.conf +++ b/api/src/main/resources/application.conf @@ -20,6 +20,12 @@ akka { cluster { failure-detector.threshold = 12 //Increase value for AWS environments metrics.enabled = off + + retry-unsuccessful-join-after = 20s + + //DON'T USE IN PRODUCTION!!!!! + auto-down-unreachable-after = 10s + } extensions = ["de.heikoseeberger.constructr.ConstructrExtension"] diff --git a/api/src/main/scala/hmda/api/HmdaPlatform.scala b/api/src/main/scala/hmda/api/HmdaPlatform.scala index 88240a743a..45db3f953d 100644 --- a/api/src/main/scala/hmda/api/HmdaPlatform.scala +++ b/api/src/main/scala/hmda/api/HmdaPlatform.scala @@ -7,7 +7,6 @@ import akka.pattern.ask import akka.util.Timeout import scala.concurrent.duration._ -import com.typesafe.config.ConfigFactory import hmda.persistence.HmdaSupervisor._ import hmda.query.HmdaQuerySupervisor._ import hmda.persistence.demo.DemoData @@ -24,8 +23,7 @@ import hmda.query.DbConfiguration._ import hmda.query.projections.filing.HmdaFilingDBProjection._ import hmda.validation.ValidationStats import hmda.api.HmdaConfig._ - -import scala.concurrent.ExecutionContext +import hmda.query.projections.institutions.InstitutionCassandraProjection object HmdaPlatform { @@ -78,6 +76,9 @@ object HmdaPlatform { val institutionViewF = (querySupervisor ? FindActorByName(InstitutionView.name)) .mapTo[ActorRef] + val cassandraProjection = new InstitutionCassandraProjection + cassandraProjection.startUp() + // Start validation stats actor system.actorOf(ValidationStats.props(), "validation-stats") diff --git a/api/src/test/resources/application.conf b/api/src/test/resources/application.conf index 70c0c0a4f0..f6f319beb0 100644 --- a/api/src/test/resources/application.conf +++ b/api/src/test/resources/application.conf @@ -39,3 +39,9 @@ db { keepAliveConnection = true } } + +cassandra { + host = "127.0.0.1" + port = 9142 + keyspace = "hmda_query" +} diff --git a/build.sbt b/build.sbt index 7840402536..f5baada0e4 100644 --- a/build.sbt +++ b/build.sbt @@ -162,7 +162,8 @@ lazy val query = (project in file("query")) oldStrategy(x) }, parallelExecution in Test := false, - libraryDependencies ++= configDeps ++ akkaPersistenceDeps ++ slickDeps + fork in Test := false, + libraryDependencies ++= configDeps ++ akkaPersistenceDeps ++ slickDeps ++ Seq(cassandraDriver, cassandraUnit, alpakkaCassandra) ) .dependsOn(modelJVM % "compile->compile;test->test") .dependsOn(persistenceModel % "compile->compile;test->test") @@ -186,8 +187,8 @@ lazy val api = (project in file("api")) ) ) .dependsOn(persistenceModel % "compile->compile;test->test") - .dependsOn(query % "compile->compile") - .dependsOn(persistence % "compile->compile") + .dependsOn(query % "compile->compile;test->test") + .dependsOn(persistence % "compile->compile;test->test") lazy val platformTest = (project in file("platform-test")) diff --git a/docker-compose.yml b/docker-compose.yml index 797c1641af..338e877d68 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,12 +10,17 @@ services: volumes: - ./target/scala-2.12/hmda.jar:/opt/hmda.jar depends_on: + - cassandra - zookeeper - query_db environment: JDBC_URL: jdbc:postgresql://192.168.99.100:54321/hmda?user=postgres&password=postgres ZOOKEEPER_HOST: zookeeper ZOOKEEPER_PORT: 2181 + CASSANDRA_CLUSTER_HOSTS: cassandra + CASSANDRA_CLUSTER_PORT: 9042 + HMDA_IS_DEMO: 'true' + restart: always ui: build: @@ -117,4 +122,13 @@ services: zookeeper: image: jplock/zookeeper ports: - - '2181:2181' \ No newline at end of file + - '2181:2181' + depends_on: + - cassandra + + cassandra: + image: cassandra + ports: + - '9042:9042' + - '7000:7000' + - '7199:7199' \ No newline at end of file diff --git a/persistence-model/src/main/resources/application.conf b/persistence-model/src/main/resources/application.conf index 2e63edcca1..c5e1dfe535 100644 --- a/persistence-model/src/main/resources/application.conf +++ b/persistence-model/src/main/resources/application.conf @@ -75,6 +75,7 @@ cassandra-journal { keyspace-autocreate-retries = 5 connect-retries = 10 connect-retry-delay = 5s + reconnect-max-delay = 60s } cassandra-snapshot-store { @@ -86,11 +87,13 @@ cassandra-snapshot-store { keyspace-autocreate-retries = 5 connect-retries = 10 connect-retry-delay = 5s + reconnect-max-delay = 60s } cassandra-query-journal { refresh-interval = 5s refresh-interval = ${?QUERY_JOURNAL_REFRESH_INTERVAL} + read-retries = 10 } hmda { diff --git a/persistence-model/src/main/scala/hmda/persistence/processing/HmdaQuery.scala b/persistence-model/src/main/scala/hmda/persistence/processing/HmdaQuery.scala index b1b2bd2b02..789e47470d 100644 --- a/persistence-model/src/main/scala/hmda/persistence/processing/HmdaQuery.scala +++ b/persistence-model/src/main/scala/hmda/persistence/processing/HmdaQuery.scala @@ -26,6 +26,11 @@ object HmdaQuery { .map(_.event.asInstanceOf[Event]) } + def liveEvents(persistenceId: String)(implicit system: ActorSystem, materializer: ActorMaterializer): Source[Event, NotUsed] = { + readJournal(system).eventsByPersistenceId(persistenceId, 0L, Long.MaxValue) + .map(_.event.asInstanceOf[Event]) + } + def eventsWithSequenceNumber(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long)(implicit system: ActorSystem, materializer: ActorMaterializer): Source[EventWithSeqNr, NotUsed] = { readJournal(system) .eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0e72bc3d26..d504494668 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -35,5 +35,7 @@ object Dependencies { val scalaCsv = "com.github.tototoshi" %% "scala-csv" % Version.scalaCsv val constructr = "de.heikoseeberger" %% "constructr" % Version.constructrVersion val constructrZookeeper = "com.lightbend.constructr" %% "constructr-coordination-zookeeper" % Version.constructrZookeeperVersion - + val cassandraUnit = "org.cassandraunit" % "cassandra-unit" % Version.cassandraUnit + val alpakkaCassandra = "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % Version.alpakka + val cassandraDriver = "com.datastax.cassandra" % "cassandra-driver-core" % Version.cassandraDriver } diff --git a/project/Version.scala b/project/Version.scala index 235f303d8c..5ab6632e71 100644 --- a/project/Version.scala +++ b/project/Version.scala @@ -18,4 +18,7 @@ object Version { val scalaCsv = "1.3.4" val constructrVersion = "0.17.0" val constructrZookeeperVersion = "0.3.3" + val cassandraUnit = "3.1.3.2" + val alpakka = "0.9" + val cassandraDriver = "3.2.0" } diff --git a/query/src/main/resources/application.conf b/query/src/main/resources/application.conf index 258da48cd8..af98831292 100644 --- a/query/src/main/resources/application.conf +++ b/query/src/main/resources/application.conf @@ -31,3 +31,12 @@ db { } } +cassandra { + host = "127.0.0.1" + host = ${CASSANDRA_CLUSTER_HOSTS} + port = 9042 + port = ${CASSANDRA_CLUSTER_PORT} + keyspace = "hmda_query" + retries = 60 + retry-interval = 1000 +} diff --git a/query/src/main/scala/hmda/query/CassandraConfig.scala b/query/src/main/scala/hmda/query/CassandraConfig.scala new file mode 100644 index 0000000000..e53181e6aa --- /dev/null +++ b/query/src/main/scala/hmda/query/CassandraConfig.scala @@ -0,0 +1,12 @@ +package hmda.query + +import com.typesafe.config.ConfigFactory + +object CassandraConfig { + val config = ConfigFactory.load() + val cassandraHost = config.getString("cassandra.host") + val cassandraPort = config.getInt("cassandra.port") + val cassandraKeyspace = config.getString("cassandra.keyspace") + val numberOfRetries = config.getInt("cassandra.retries") + val retryInterval = config.getLong("cassandra.retry-interval") +} diff --git a/query/src/main/scala/hmda/query/projections/institutions/InstitutionCassandraProjection.scala b/query/src/main/scala/hmda/query/projections/institutions/InstitutionCassandraProjection.scala new file mode 100644 index 0000000000..c40407f696 --- /dev/null +++ b/query/src/main/scala/hmda/query/projections/institutions/InstitutionCassandraProjection.scala @@ -0,0 +1,41 @@ +package hmda.query.projections.institutions + +import akka.NotUsed +import akka.actor.{ ActorSystem, Scheduler } +import akka.stream.ActorMaterializer +import akka.stream.alpakka.cassandra.scaladsl.CassandraSink +import akka.stream.scaladsl.Source +import hmda.persistence.messages.events.institutions.InstitutionEvents.{ InstitutionCreated, InstitutionModified } +import hmda.persistence.processing.HmdaQuery._ +import hmda.query.model.institutions.InstitutionQuery +import hmda.query.repository.institutions.InstitutionCassandraRepository +import hmda.query.repository.institutions.InstitutionConverter._ + +import scala.concurrent.ExecutionContext + +class InstitutionCassandraProjection extends InstitutionCassandraRepository { + + def startUp(): Unit = { + + createKeyspace() + createTable() + + val source: Source[InstitutionQuery, NotUsed] = liveEvents("institutions").map { + case InstitutionCreated(i) => i + case InstitutionModified(i) => i + } + + val sink = CassandraSink[InstitutionQuery](parallelism = 2, preparedStatement, statementBinder) + + source.runWith(sink) + + } + + override implicit def materializer: ActorMaterializer = ActorMaterializer() + + override implicit def system: ActorSystem = ActorSystem() + + override implicit val ec: ExecutionContext = system.dispatcher + + override implicit val scheduler: Scheduler = system.scheduler +} diff --git a/query/src/main/scala/hmda/query/repository/CassandraRepository.scala b/query/src/main/scala/hmda/query/repository/CassandraRepository.scala new file mode 100644 index 0000000000..79d371137f --- /dev/null +++ b/query/src/main/scala/hmda/query/repository/CassandraRepository.scala @@ -0,0 +1,70 @@ +package hmda.query.repository + +import akka.actor.Scheduler +import akka.{ Done, NotUsed } +import akka.stream.scaladsl.Source +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy +import com.datastax.driver.core.{ Cluster, ResultSet, Row, Session } +import hmda.query.CassandraConfig._ +import scala.annotation.tailrec +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.{ Success, Try } +import org.slf4j.LoggerFactory + +trait CassandraRepository[A] { + + implicit val ec: ExecutionContext + implicit val scheduler: Scheduler + + val log = LoggerFactory.getLogger("CassandraRepository") + + val keyspace = "hmda_query" + + @tailrec + private def retry[T](n: Int)(fn: => T): Try[T] = { + log.info("*********ATTEMPTING CONNECTION TO CASSANDRA QUERY CLUSTER********") + Try { fn } match { + case x: Success[T] => x + case _ if n > 1 => + Thread.sleep(retryInterval) + retry(n - 1)(fn) + case fn => fn + } + } + + implicit val session: Session = { + retry(numberOfRetries) { + Cluster + .builder + .addContactPoint(cassandraHost) + .withPort(cassandraPort) + .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 200000L)) + .build + .connect() + }.getOrElse(null) + } + + def createKeyspace(): ResultSet = { + val query = + s""" + |CREATE KEYSPACE IF NOT EXISTS $keyspace WITH REPLICATION = { + | 'class': 'SimpleStrategy', + | 'replication_factor': '1' + |} + """.stripMargin + + session.execute(query) + } + def dropKeyspace(): ResultSet = { + val query = + s""" + |DROP KEYSPACE $keyspace + """.stripMargin + session.execute(query) + } + def createTable(): ResultSet + def dropTable(): ResultSet + def insertData(source: Source[A, NotUsed]): Future[Done] + def readData(fetchSize: Int): Future[Seq[Row]] + +} diff --git a/query/src/main/scala/hmda/query/repository/institutions/InstitutionCassandraRepository.scala b/query/src/main/scala/hmda/query/repository/institutions/InstitutionCassandraRepository.scala new file mode 100644 index 0000000000..a94a9e9a91 --- /dev/null +++ b/query/src/main/scala/hmda/query/repository/institutions/InstitutionCassandraRepository.scala @@ -0,0 +1,141 @@ +package hmda.query.repository.institutions + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.alpakka.cassandra.scaladsl.CassandraSource +import akka.{ Done, NotUsed } +import akka.stream.alpakka.cassandra.scaladsl.CassandraSink +import akka.stream.scaladsl.{ Sink, Source } +import com.datastax.driver.core._ +import hmda.query.model.institutions.InstitutionQuery +import hmda.query.repository.CassandraRepository + +import scala.concurrent.{ ExecutionContext, Future } + +trait InstitutionCassandraRepository extends CassandraRepository[InstitutionQuery] { + + implicit def system: ActorSystem + implicit def materializer: ActorMaterializer + implicit val ec: ExecutionContext + + def preparedStatement(implicit session: Session): PreparedStatement = { + session.prepare(s"INSERT INTO $keyspace.institutions" + + "(id," + + "agency," + + "period," + + "activity_year," + + "respondent_id," + + "type," + + "cra," + + "email_1," + + "email_2," + + "email_3," + + "respondent_name," + + "respondent_state," + + "respondent_city," + + "respondent_fips," + + "hmda_filer," + + "parent_respondent_id," + + "parent_id_rssd," + + "parent_name," + + "parent_city," + + "parent_state," + + "assets," + + "other_lender_codes," + + "top_holder_id_rssd," + + "top_holder_name," + + "top_holder_city," + + "top_holder_state," + + "top_holder_country) " + + " VALUES " + + "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + } + + val statementBinder = (institution: InstitutionQuery, statement: PreparedStatement) => + statement.bind( + institution.id, + new Integer(institution.agency), + new Integer(institution.filingPeriod), + new Integer(institution.activityYear), + institution.respondentId, + institution.institutionType, + new java.lang.Boolean(institution.cra), + institution.emailDomain1, + institution.emailDomain2, + institution.emailDomain3, + institution.respondentName, + institution.respondentState, + institution.respondentCity, + institution.respondentFipsStateNumber, + new java.lang.Boolean(institution.hmdaFilerFlag), + institution.parentRespondentId, + new Integer(institution.parentIdRssd), + institution.parentName, + institution.parentCity, + institution.parentState, + new Integer(institution.assets), + new Integer(institution.otherLenderCode), + new Integer(institution.topHolderIdRssd), + institution.topHolderName, + institution.topHolderCity, + institution.topHolderState, + institution.topHolderCountry + ) + + override def createTable(): ResultSet = { + val query = + s""" + |CREATE TABLE IF NOT EXISTS $keyspace.institutions( + | id varchar PRIMARY KEY, + | agency int, + | period int, + | activity_year int, + | respondent_id varchar, + | type varchar, + | cra boolean, + | email_1 varchar, + | email_2 varchar, + | email_3 varchar, + | respondent_name varchar, + | respondent_state varchar, + | respondent_city varchar, + | respondent_fips varchar, + | hmda_filer boolean, + | parent_respondent_id varchar, + | parent_id_rssd int, + | parent_name varchar, + | parent_city varchar, + | parent_state varchar, + | assets int, + | other_lender_codes int, + | top_holder_id_rssd int, + | top_holder_name varchar, + | top_holder_city varchar, + | top_holder_state varchar, + | top_holder_country varchar + |); + """.stripMargin + + session.execute(query) + + } + + override def dropTable(): ResultSet = { + val query = s""" + |DROP TABLE IF EXISTS $keyspace.institutions; + """.stripMargin + + session.execute(query) + } + + override def insertData(source: Source[InstitutionQuery, NotUsed]): Future[Done] = { + val sink = CassandraSink[InstitutionQuery](parallelism = 2, preparedStatement, statementBinder) + source.runWith(sink) + } + + override def readData(fetchSize: Int): Future[Seq[Row]] = { + val statement = new SimpleStatement(s"SELECT * FROM $keyspace.institutions").setFetchSize(fetchSize) + CassandraSource(statement).runWith(Sink.seq).mapTo[Seq[Row]] + } + +} diff --git a/query/src/test/resources/application.conf b/query/src/test/resources/application.conf index 6d282e390a..09cd4f9983 100644 --- a/query/src/test/resources/application.conf +++ b/query/src/test/resources/application.conf @@ -25,3 +25,13 @@ hmda { isDemo = true } +akka.persistence.journal.plugin = "inmemory-journal" +akka.persistence.query.journal.id = "inmemory-read-journal" +akka.persistence.snapshot-store.plugin = "inmemory-snapshot-store" + +cassandra { + host = "127.0.0.1" + port = 9142 + keyspace = "hmda_query" +} + diff --git a/query/src/test/resources/simple.cql b/query/src/test/resources/simple.cql new file mode 100644 index 0000000000..3692341a1b --- /dev/null +++ b/query/src/test/resources/simple.cql @@ -0,0 +1,8 @@ +CREATE TABLE myTable( + id varchar, + value varchar, + PRIMARY KEY(id)); + +INSERT INTO myTable(id, value) values('myKey01','myValue01'); + +INSERT INTO myTable(id, value) values('myKey02','myValue02'); \ No newline at end of file diff --git a/query/src/test/scala/hmda/query/cassandra/CassandraSpec.scala b/query/src/test/scala/hmda/query/cassandra/CassandraSpec.scala new file mode 100644 index 0000000000..8fa1a507c9 --- /dev/null +++ b/query/src/test/scala/hmda/query/cassandra/CassandraSpec.scala @@ -0,0 +1,39 @@ +package hmda.query.cassandra + +import com.datastax.driver.core.{ Cluster, Session } +import org.scalatest.{ AsyncWordSpec, BeforeAndAfterAll, MustMatchers, WordSpec } +import org.cassandraunit.CQLDataLoader +import org.cassandraunit.dataset.cql.ClassPathCQLDataSet +import org.cassandraunit.utils.EmbeddedCassandraServerHelper + +import scala.concurrent.Future + +class CassandraSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { + + var cluster: Cluster = _ + var session: Session = _ + + override def beforeAll(): Unit = { + EmbeddedCassandraServerHelper.startEmbeddedCassandra(60000L) + cluster = EmbeddedCassandraServerHelper.getCluster() + session = cluster.connect() + loadData() + } + + override def afterAll(): Unit = { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra() + } + + def loadData(): Unit = { + val dataLoader = new CQLDataLoader(session) + dataLoader.load(new ClassPathCQLDataSet("simple.cql", "hmda_query")) + } + + "Cassandra" must { + "Select from table" in { + val resultSet = session.execute("select * from myTable where id = 'myKey01'") + resultSet.iterator().next().getString("value") mustBe "myValue01" + } + } + +} diff --git a/query/src/test/scala/hmda/query/repository/institutions/InstitutionCassandraRepositorySpec.scala b/query/src/test/scala/hmda/query/repository/institutions/InstitutionCassandraRepositorySpec.scala new file mode 100644 index 0000000000..93e18bb5c2 --- /dev/null +++ b/query/src/test/scala/hmda/query/repository/institutions/InstitutionCassandraRepositorySpec.scala @@ -0,0 +1,55 @@ +package hmda.query.repository.institutions + +import akka.actor.{ ActorSystem, Scheduler } +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Source +import com.datastax.driver.core.{ Cluster, Session } +import hmda.model.institution.{ Agency, InstitutionGenerators } +import org.cassandraunit.utils.EmbeddedCassandraServerHelper +import org.scalatest.{ BeforeAndAfterAll, MustMatchers, WordSpec } +import hmda.query.repository.institutions.InstitutionConverter._ + +import scala.concurrent.ExecutionContext + +class InstitutionCassandraRepositorySpec extends WordSpec with MustMatchers with BeforeAndAfterAll with InstitutionCassandraRepository { + + EmbeddedCassandraServerHelper.startEmbeddedCassandra(20000L) + val cluster: Cluster = EmbeddedCassandraServerHelper.getCluster + override val session: Session = cluster.connect() + + override def beforeAll(): Unit = { + createKeyspace() + } + + override def afterAll(): Unit = { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra() + } + + "Institutions in Cassandra" must { + "Drop the table if it exists, create it again and populate it with some data that can be read back" in { + dropTable() + createTable() + + val institutions = List( + toInstitutionQuery(InstitutionGenerators.sampleInstitution.copy(agency = Agency.CFPB)), + toInstitutionQuery(InstitutionGenerators.sampleInstitution.copy(agency = Agency.CFPB)), + toInstitutionQuery(InstitutionGenerators.sampleInstitution.copy(agency = Agency.CFPB)) + ) + + val source = Source.fromIterator(() => institutions.toIterator) + insertData(source) + val read = readData(20) + read.map { r => + r.map(x => x.getInt("agency") mustBe Agency.CFPB.value) + r.seq.size mustBe 3 + } + } + } + + override implicit def materializer: ActorMaterializer = ActorMaterializer() + + override implicit val ec: ExecutionContext = system.dispatcher + override implicit val scheduler: Scheduler = system.scheduler + + override implicit def system: ActorSystem = ActorSystem() +}