From 7df9111d9f24f12417cc4938c7e859d97b45680e Mon Sep 17 00:00:00 2001 From: KatolaZ Date: Tue, 18 Jul 2017 16:24:49 +0100 Subject: The pipeline "spooling -> routing -> file deletion" works --- scorsh.go | 62 +++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 19 deletions(-) (limited to 'scorsh.go') diff --git a/scorsh.go b/scorsh.go index aca3121..d36b646 100644 --- a/scorsh.go +++ b/scorsh.go @@ -62,32 +62,52 @@ func Master(master *SCORSHmaster) { // master main loop: var matching_workers []*SCORSHworker - var push_msg SCORSHmsg matching_workers = make([]*SCORSHworker, len(master.Workers)) log.Println("[master] Master started ") + debug.log("[master] StatusChan: %s\n", master.StatusChan) for { + debug.log("[master] Receive loop...\n") select { - // - receive stuff from the spooler - case push_msg = <-master.Spooler: - + case push_msg := <-master.Spooler: + // here we manage the stuff we receive from the spooler debug.log("[master] received message: %s\n", push_msg) - // - lookup the repos map for matching workers matching_workers = FindMatchingWorkers(master, &push_msg) - debug.log("[master] matching workers: %s\n", matching_workers) - - // add the message to PendingMsg - //... - // - dispatch the message to all the matching workers - for _, w := range matching_workers { - // increase the counter associated to the message - w.MsgChan <- push_msg + debug.log("[master] matching workers: \n%s\n", matching_workers) + + // add the message to WorkingMsg, if it's not a duplicate! + if _, ok := master.WorkingMsg[push_msg.Id]; ok { + log.Printf("[master] detected duplicate message %s \n", push_msg.Id) + } else { + master.WorkingMsg[push_msg.Id] = 0 + // - dispatch the message to all the matching workers + for _, w := range matching_workers { + debug.log("[master] sending msg to worker: %s\n", w.Name) + // send the message to the worker + w.MsgChan <- push_msg + // increase the counter associated to the message + master.WorkingMsg[push_msg.Id] += 1 + debug.log("[master] now WorkingMsg[%s] is: %d\n", push_msg.Id, master.WorkingMsg[push_msg.Id]) + } + } + case done_msg := <-master.StatusChan: + // Here we manage a status message from a worker + debug.log("[master] received message from StatusChan: %s\n", done_msg) + if _, ok := master.WorkingMsg[done_msg.Id]; ok && master.WorkingMsg[done_msg.Id] > 0 { + master.WorkingMsg[done_msg.Id] -= 1 + if master.WorkingMsg[done_msg.Id] == 0 { + delete(master.WorkingMsg, done_msg.Id) + master.Spooler <- done_msg + } + } else { + log.Printf("[master] received completion event for non-existing message name: %s\n", done_msg.Id) } } } + debug.log("[master] Exiting the for loop, for some mysterious reason...\n") } func InitMaster() *SCORSHmaster { @@ -96,9 +116,12 @@ func InitMaster() *SCORSHmaster { master.Repos = make(map[string][]*SCORSHworker) master.WorkingMsg = make(map[string]int) - // This is the mutex-channel on which we receive acks from workers - master.StatusChan = make(chan SCORSHmsg, 1) - master.Spooler = make(chan SCORSHmsg, 1) + // This is the channel on which we receive acks from workers + master.StatusChan = make(chan SCORSHmsg) + // This is the channel on which we exchange messages with the spooler + master.Spooler = make(chan SCORSHmsg) + + debug.log("[InitMaster] StatusChan: %s\n", master.StatusChan) err_workers := StartWorkers(master) if err_workers != nil { @@ -111,17 +134,18 @@ func InitMaster() *SCORSHmaster { log.Fatal("Error starting spooler: ", err_spooler) } return master - } func main() { + var done chan int + flag.Parse() master := InitMaster() go Master(master) - <-master.StatusChan - + // wait indefinitely -- we should implement signal handling... + <-done } -- cgit v1.2.3