-
Notifications
You must be signed in to change notification settings - Fork 0
/
workers.go
66 lines (54 loc) · 1.5 KB
/
workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main
import (
"fmt"
"log"
"os"
"strconv"
"github.com/KyungWonPark/C2/internal/calc"
)
type ringBuffer []struct {
isEmpty bool
data [][600]float32
}
func load(fileList []string, buffer ringBuffer, bufferCh chan<- int, workerConfig *calc.Config) {
bufferIndex := 0
dataDir := os.Getenv("DATA") + "/fMRI-Smoothed/"
for i := 0; i < len(fileList); i++ {
if !buffer[bufferIndex].isEmpty {
i = i - 1
bufferIndex = (bufferIndex + 1) % workerConfig.NumBufferSize
continue
}
path := dataDir + fileList[i]
fmt.Printf("PUSH: %s into ring buffer: %s\n", fileList[i], strconv.Itoa(bufferIndex))
doSampling(path, buffer[bufferIndex].data, workerConfig)
buffer[bufferIndex].isEmpty = false
bufferCh <- bufferIndex
bufferIndex = (bufferIndex + 1) % workerConfig.NumBufferSize
}
close(bufferCh)
return
}
func compute(buffer ringBuffer, bufferCh <-chan int, matBuffer [][13362]float32, workerConfig *calc.Config) {
stats := make([]calc.LinStatEle, 13362)
for {
bufferIndex, ok := <-bufferCh
if ok {
if buffer[bufferIndex].isEmpty {
log.Fatal("Ring Buffer: Something is wrong!")
}
timeSeries := buffer[bufferIndex].data
// z-score
calc.DoZScoring(timeSeries, workerConfig)
// sigmoid
calc.DoSigmoid(timeSeries, stats, workerConfig)
// pearson & accumulation
calc.DoPearson(timeSeries, stats, matBuffer)
fmt.Printf("POP: popping from ring buffer: %s\n", strconv.Itoa(bufferIndex))
buffer[bufferIndex].isEmpty = true
} else {
break
}
}
return
}