-
Notifications
You must be signed in to change notification settings - Fork 645
/
ReferenceSpec.scala
142 lines (112 loc) · 3.91 KB
/
ReferenceSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.scaladsl
import akka.actor.ActorSystem
import akka.stream.alpakka.reference._
import akka.stream.alpakka.reference.scaladsl.Reference
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.testkit.TestKit
import akka.util.ByteString
import akka.{Done, NotUsed}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import scala.collection.immutable
import scala.concurrent.Future
/**
* Append "Spec" to every Scala test suite.
*/
class ReferenceSpec extends AnyWordSpec with BeforeAndAfterAll with ScalaFutures with Matchers with LogCapturing {
implicit val system: ActorSystem = ActorSystem("ReferenceSpec")
final val ClientId = "test-client-id"
"reference connector" should {
/**
* Type annotations not generally needed on local variables.
* However it allows to check if the types are really what we want.
*/
"compile settings" in {
val providedAuth: Authentication.Provided =
Authentication.Provided().withVerifier(c => true)
val noAuth: Authentication.None =
Authentication.None
val settings: SourceSettings = SourceSettings(ClientId)
settings.withAuthentication(providedAuth)
settings.withAuthentication(noAuth)
}
"compile source" in {
// #source
val settings: SourceSettings = SourceSettings(ClientId)
val source: Source[ReferenceReadResult, Future[Done]] =
Reference.source(settings)
// #source
source
}
"compile flow" in {
// #flow
val flow: Flow[ReferenceWriteMessage, ReferenceWriteResult, NotUsed] =
Reference.flow()
// #flow
flow
}
"run source" in {
val source = Reference.source(SourceSettings(ClientId))
val msg = source.runWith(Sink.head).futureValue
msg.data should contain theSameElementsAs Seq(ByteString("one"))
}
"run flow" in {
val flow = Reference.flow()
val source = Source(
immutable.Seq(
ReferenceWriteMessage()
.withData(immutable.Seq(ByteString("one")))
.withMetrics(Map("rps" -> 20L, "rpm" -> 30L)),
ReferenceWriteMessage().withData(
immutable.Seq(
ByteString("two"),
ByteString("three"),
ByteString("four")
)
),
ReferenceWriteMessage().withData(
immutable.Seq(
ByteString("five"),
ByteString("six"),
ByteString("seven")
)
)
)
)
val result = source.via(flow).runWith(Sink.seq).futureValue
result.flatMap(_.message.data) should contain theSameElementsAs Seq(
"one",
"two",
"three",
"four",
"five",
"six",
"seven"
).map(ByteString.apply)
result.head.metrics.get("total") should contain(50L)
}
"resolve resource from application config" in {
val result = Source
.single(ReferenceWriteMessage().withData(immutable.Seq(ByteString("one"))))
.via(Reference.flowWithResource())
.runWith(Sink.seq)
result.futureValue.flatMap(_.message.data).map(_.utf8String) shouldBe Seq("one default msg")
}
"use resource from attributes" in {
val resource = Resource(ResourceSettings("attributes msg"))
val result = Source
.single(ReferenceWriteMessage().withData(immutable.Seq(ByteString("one"))))
.via(Reference.flowWithResource().withAttributes(ReferenceAttributes.resource(resource)))
.runWith(Sink.seq)
result.futureValue.flatMap(_.message.data).map(_.utf8String) shouldBe Seq("one attributes msg")
}
}
override def afterAll() =
TestKit.shutdownActorSystem(system)
}