summaryrefslogtreecommitdiff
path: root/scorsh.go
diff options
context:
space:
mode:
Diffstat (limited to 'scorsh.go')
-rw-r--r--scorsh.go62
1 files changed, 43 insertions, 19 deletions
diff --git a/scorsh.go b/scorsh.go
index aca3121..d36b646 100644
--- a/scorsh.go
+++ b/scorsh.go
@@ -62,32 +62,52 @@ func Master(master *SCORSHmaster) {
// master main loop:
var matching_workers []*SCORSHworker
- var push_msg SCORSHmsg
matching_workers = make([]*SCORSHworker, len(master.Workers))
log.Println("[master] Master started ")
+ debug.log("[master] StatusChan: %s\n", master.StatusChan)
for {
+ debug.log("[master] Receive loop...\n")
select {
- // - receive stuff from the spooler
- case push_msg = <-master.Spooler:
-
+ case push_msg := <-master.Spooler:
+ // here we manage the stuff we receive from the spooler
debug.log("[master] received message: %s\n", push_msg)
-
// - lookup the repos map for matching workers
matching_workers = FindMatchingWorkers(master, &push_msg)
- debug.log("[master] matching workers: %s\n", matching_workers)
-
- // add the message to PendingMsg
- //...
- // - dispatch the message to all the matching workers
- for _, w := range matching_workers {
- // increase the counter associated to the message
- w.MsgChan <- push_msg
+ debug.log("[master] matching workers: \n%s\n", matching_workers)
+
+ // add the message to WorkingMsg, if it's not a duplicate!
+ if _, ok := master.WorkingMsg[push_msg.Id]; ok {
+ log.Printf("[master] detected duplicate message %s \n", push_msg.Id)
+ } else {
+ master.WorkingMsg[push_msg.Id] = 0
+ // - dispatch the message to all the matching workers
+ for _, w := range matching_workers {
+ debug.log("[master] sending msg to worker: %s\n", w.Name)
+ // send the message to the worker
+ w.MsgChan <- push_msg
+ // increase the counter associated to the message
+ master.WorkingMsg[push_msg.Id] += 1
+ debug.log("[master] now WorkingMsg[%s] is: %d\n", push_msg.Id, master.WorkingMsg[push_msg.Id])
+ }
+ }
+ case done_msg := <-master.StatusChan:
+ // Here we manage a status message from a worker
+ debug.log("[master] received message from StatusChan: %s\n", done_msg)
+ if _, ok := master.WorkingMsg[done_msg.Id]; ok && master.WorkingMsg[done_msg.Id] > 0 {
+ master.WorkingMsg[done_msg.Id] -= 1
+ if master.WorkingMsg[done_msg.Id] == 0 {
+ delete(master.WorkingMsg, done_msg.Id)
+ master.Spooler <- done_msg
+ }
+ } else {
+ log.Printf("[master] received completion event for non-existing message name: %s\n", done_msg.Id)
}
}
}
+ debug.log("[master] Exiting the for loop, for some mysterious reason...\n")
}
func InitMaster() *SCORSHmaster {
@@ -96,9 +116,12 @@ func InitMaster() *SCORSHmaster {
master.Repos = make(map[string][]*SCORSHworker)
master.WorkingMsg = make(map[string]int)
- // This is the mutex-channel on which we receive acks from workers
- master.StatusChan = make(chan SCORSHmsg, 1)
- master.Spooler = make(chan SCORSHmsg, 1)
+ // This is the channel on which we receive acks from workers
+ master.StatusChan = make(chan SCORSHmsg)
+ // This is the channel on which we exchange messages with the spooler
+ master.Spooler = make(chan SCORSHmsg)
+
+ debug.log("[InitMaster] StatusChan: %s\n", master.StatusChan)
err_workers := StartWorkers(master)
if err_workers != nil {
@@ -111,17 +134,18 @@ func InitMaster() *SCORSHmaster {
log.Fatal("Error starting spooler: ", err_spooler)
}
return master
-
}
func main() {
+ var done chan int
+
flag.Parse()
master := InitMaster()
go Master(master)
- <-master.StatusChan
-
+ // wait indefinitely -- we should implement signal handling...
+ <-done
}