package main import ( "flag" "fmt" "log" ) // manage debugging messages const debug debugging = true type debugging bool func (d debugging) log(format string, args ...interface{}) { if d { log.Printf(format, args...) } } /////////// var confFile = flag.String("c", "./scorsh.cfg", "Configuration file for SCORSH") // SCORSHerr converts numeric error values in the corresponding // description string func SCORSHerr(err int) error { var errStr string switch err { case errNoFile: errStr = "Invalid file name" case errKeyring: errStr = "Invalid keyring" case errNoRepo: errStr = "Invalid repository" case errNoCommit: errStr = "Invalid commit ID" case errSignature: errStr = "Invalid signature" default: errStr = "Generic Error" } return fmt.Errorf("%s", errStr) } func findMatchingWorkers(master *master, msg *spoolMsg) []*worker { var ret []*worker for idx, w := range master.Workers { if w.Matches(msg.Repo, msg.Branch) { debug.log("--- Worker: %s matches %s:%s\n", w.Name, msg.Repo, msg.Branch) ret = append(ret, &(master.Workers[idx])) } } return ret } func runMaster(master *master) { // master main loop: log.Println("[master] Master started ") debug.log("[master] StatusChan: %s\n", master.StatusChan) for { debug.log("[master] Receive loop...\n") select { case pushMsg := <-master.Spooler: // here we manage the stuff we receive from the spooler debug.log("[master] received message: %s\n", pushMsg) // - lookup the repos map for matching workers matchingWorkers := findMatchingWorkers(master, &pushMsg) debug.log("[master] matching workers: \n%s\n", matchingWorkers) // add the message to WorkingMsg, if it's not a duplicate! if _, ok := master.WorkingMsg[pushMsg.ID]; ok { log.Printf("[master] detected duplicate message %s \n", pushMsg.ID) } else { master.WorkingMsg[pushMsg.ID] = 0 // - dispatch the message to all the matching workers for _, w := range matchingWorkers { debug.log("[master] sending msg to worker: %s\n", w.Name) // send the message to the worker w.MsgChan <- pushMsg // increase the counter associated to the message master.WorkingMsg[pushMsg.ID]++ debug.log("[master] now WorkingMsg[%s] is: %d\n", pushMsg.ID, master.WorkingMsg[pushMsg.ID]) } } case doneMsg := <-master.StatusChan: // Here we manage a status message from a worker debug.log("[master] received message from StatusChan: %s\n", doneMsg) if _, ok := master.WorkingMsg[doneMsg.ID]; ok && master.WorkingMsg[doneMsg.ID] > 0 { master.WorkingMsg[doneMsg.ID]-- if master.WorkingMsg[doneMsg.ID] == 0 { delete(master.WorkingMsg, doneMsg.ID) master.Spooler <- doneMsg } } else { log.Printf("[master] received completion event for non-existing message name: %s\n", doneMsg.ID) } } } debug.log("[master] Exiting the for loop, for some mysterious reason...\n") } func initMaster() *master { master := readGlobalConfig(*confFile) master.Repos = make(map[string][]*worker) master.WorkingMsg = make(map[string]int) // This is the channel on which we receive acks from workers master.StatusChan = make(chan spoolMsg) // This is the channel on which we exchange messages with the spooler master.Spooler = make(chan spoolMsg) debug.log("[InitMaster] StatusChan: %s\n", master.StatusChan) errWorkers := startWorkers(master) if errWorkers != nil { log.Fatal("Error starting workers: ", errWorkers) } else { log.Println("Workers started correctly") } errSpooler := startSpooler(master) if errSpooler != nil { log.Fatal("Error starting spooler: ", errSpooler) } return master } func main() { var done chan int flag.Parse() master := initMaster() go runMaster(master) // wait indefinitely -- we should implement signal handling... <-done }