forked from myzhan/boomer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_czmq.go
78 lines (68 loc) · 1.78 KB
/
client_czmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// +build goczmq
package boomer
import (
"fmt"
"log"
"github.com/zeromq/goczmq"
)
type czmqSocketClient struct {
pushConn *goczmq.Sock
pullConn *goczmq.Sock
}
func newClient() client {
log.Println("Boomer is built with goczmq support.")
var client client
var message string
if rpc == "zeromq" {
client = newZmqClient(masterHost, masterPort)
message = fmt.Sprintf("Boomer is connected to master(%s:%d|%d) press Ctrl+c to quit.", masterHost, masterPort, masterPort+1)
} else if rpc == "socket" {
client = newSocketClient(masterHost, masterPort)
message = fmt.Sprintf("Boomer is connected to master(%s:%d) press Ctrl+c to quit.", masterHost, masterPort)
} else {
log.Fatal("Unknown rpc type:", rpc)
}
log.Println(message)
return client
}
func newZmqClient(masterHost string, masterPort int) *czmqSocketClient {
tcpAddr := fmt.Sprintf("tcp://%s:%d", masterHost, masterPort)
pushConn, err := goczmq.NewPush(tcpAddr)
if err != nil {
log.Fatalf("Failed to create zeromq pusher, %s", err)
}
tcpAddr = fmt.Sprintf(">tcp://%s:%d", masterHost, masterPort+1)
pullConn, err := goczmq.NewPull(tcpAddr)
if err != nil {
log.Fatalf("Failed to create zeromq puller, %s", err)
}
log.Println("ZMQ sockets connected")
newClient := &czmqSocketClient{
pushConn: pushConn,
pullConn: pullConn,
}
go newClient.recv()
go newClient.send()
return newClient
}
func (c *czmqSocketClient) recv() {
for {
msg, _, _ := c.pullConn.RecvFrame()
msgFromMaster := newMessageFromBytes(msg)
fromMaster <- msgFromMaster
}
}
func (c *czmqSocketClient) send() {
for {
select {
case msg := <-toMaster:
c.sendMessage(msg)
if msg.Type == "quit" {
disconnectedFromMaster <- true
}
}
}
}
func (c *czmqSocketClient) sendMessage(msg *message) {
c.pushConn.SendFrame(msg.serialize(), 0)
}