forked from 0x4b53/amqp-rpc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bindings.go
79 lines (72 loc) · 2.5 KB
/
bindings.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package amqprpc
import (
"github.com/streadway/amqp"
)
// Exchanges are enteties where messages are published. This defines the available
// entities based on https://www.rabbitmq.com/tutorials/amqp-concepts.html.
const (
ExchangeDirect = "direct"
ExchangeFanout = "fanout"
ExchangeTopic = "topic"
ExchangeHeaders = "headers"
)
// HandlerBinding holds information about how an exchange and a queue should be
// declared and bound. If the ExchangeName is not defined (an empty string), the
// queue will not be bound to the exchange but assumed to use the default match.
type HandlerBinding struct {
QueueName string
ExchangeName string
ExchangeType string
RoutingKey string
BindHeaders amqp.Table
Handler HandlerFunc
}
// DirectBinding returns a HandlerBinding to use for direct exchanges where each
// routing key will be mapped to one handler.
func DirectBinding(routingKey string, handler HandlerFunc) HandlerBinding {
return HandlerBinding{
QueueName: routingKey,
ExchangeName: "amq.direct",
ExchangeType: ExchangeDirect,
RoutingKey: routingKey,
BindHeaders: amqp.Table{},
Handler: handler,
}
}
// FanoutBinding returns a HandlerBinding to use for fanout exchanges. These
// exchanges does not use the routing key. We do not use the default exchange
// (amq.fanout) since this would broadcast all messages everywhere.
func FanoutBinding(exchangeName string, handler HandlerFunc) HandlerBinding {
return HandlerBinding{
ExchangeName: exchangeName,
ExchangeType: ExchangeFanout,
RoutingKey: "",
BindHeaders: amqp.Table{},
Handler: handler,
}
}
// TopicBinding returns a HandlerBinding to use for topic exchanges. The default
// exchange (amq.topic) will be used. The topic is matched on the routing key.
func TopicBinding(queueName, routingKey string, handler HandlerFunc) HandlerBinding {
return HandlerBinding{
QueueName: queueName,
ExchangeName: "amq.topic",
ExchangeType: ExchangeTopic,
RoutingKey: routingKey,
BindHeaders: amqp.Table{},
Handler: handler,
}
}
// HeadersBinding returns a HandlerBinding to use for header exchanges that will
// match on specific headers. The heades are specified as an amqp.Table. The
// default exchange amq.match will be used.
func HeadersBinding(queueName string, headers amqp.Table, handler HandlerFunc) HandlerBinding {
return HandlerBinding{
QueueName: queueName,
ExchangeName: "amq.match",
ExchangeType: ExchangeHeaders,
RoutingKey: "",
BindHeaders: headers,
Handler: handler,
}
}