From 7df9111d9f24f12417cc4938c7e859d97b45680e Mon Sep 17 00:00:00 2001 From: KatolaZ Date: Tue, 18 Jul 2017 16:24:49 +0100 Subject: The pipeline "spooling -> routing -> file deletion" works --- commits.go | 30 ++++++++++++++------ scorsh.go | 62 +++++++++++++++++++++++++++------------- spooler.go | 29 +++++++++++++------ types.go | 95 ++++++++++++++++++++++++++++++++------------------------------ workers.go | 13 +++++++-- 5 files changed, 145 insertions(+), 84 deletions(-) diff --git a/commits.go b/commits.go index 46672cf..b81bf7a 100644 --- a/commits.go +++ b/commits.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/KatolaZ/git2go" "golang.org/x/crypto/openpgp" + "log" "os" "strings" // "log" @@ -23,15 +24,15 @@ func CommitToString(commit *git.Commit) string { } // FIXME: RETURN THE ENTITY PROVIDED BY THE CHECK, OR nil -func check_signature(commit *git.Commit, keys []*openpgp.KeyRing) (signature, signed string, err error) { +func check_signature(commit *git.Commit, keys *map[string]openpgp.KeyRing) (signature, signed string, err error) { signature, signed, err = commit.ExtractSignature() if err == nil { - for _, keyring := range keys { + for _, keyring := range *keys { _, err_sig := - openpgp.CheckArmoredDetachedSignature(*keyring, strings.NewReader(signed), + openpgp.CheckArmoredDetachedSignature(keyring, strings.NewReader(signed), strings.NewReader(signature)) if err_sig == nil { @@ -45,6 +46,14 @@ func check_signature(commit *git.Commit, keys []*openpgp.KeyRing) (signature, si return "", "", err } +func find_scorsh_message(commit *git.Commit) (string, error) { + + msg := commit.RawMessage() + debug.log("[find_scorsg_msg] found message:\n %s\n", msg) + + return msg, nil +} + // traverse all the commits between two references, looking for scorsh // commands // fixme: we don't have just one keyring here.... @@ -91,12 +100,17 @@ func walk_commits(msg SCORSHmsg, w *SCORSHworker) error { // check if it can be verified by any of the keyrings associated // with the scorsh-tag - //signature, signed, err := check_signature(commit, &keyring) + // check if the commit contains a scorsh command + + _, err = find_scorsh_message(commit) + + //signature, signed, err := check_signature(commit, &w.Keys) //_, _, err := check_signature(commit, w.keys) - //if err != nil { - // log.Printf("%s\n", SCORSHerr(SCORSH_ERR_SIGNATURE)) - // - //} + if err != nil { + log.Printf("[worker: %s] %s\n", w.Name, SCORSHerr(SCORSH_ERR_SIGNATURE)) + } else { + + } cur_commit = commit.Parent(0) } else { fmt.Printf("Commit %x not found!\n", cur_commit.Id()) diff --git a/scorsh.go b/scorsh.go index aca3121..d36b646 100644 --- a/scorsh.go +++ b/scorsh.go @@ -62,32 +62,52 @@ func Master(master *SCORSHmaster) { // master main loop: var matching_workers []*SCORSHworker - var push_msg SCORSHmsg matching_workers = make([]*SCORSHworker, len(master.Workers)) log.Println("[master] Master started ") + debug.log("[master] StatusChan: %s\n", master.StatusChan) for { + debug.log("[master] Receive loop...\n") select { - // - receive stuff from the spooler - case push_msg = <-master.Spooler: - + case push_msg := <-master.Spooler: + // here we manage the stuff we receive from the spooler debug.log("[master] received message: %s\n", push_msg) - // - lookup the repos map for matching workers matching_workers = FindMatchingWorkers(master, &push_msg) - debug.log("[master] matching workers: %s\n", matching_workers) - - // add the message to PendingMsg - //... - // - dispatch the message to all the matching workers - for _, w := range matching_workers { - // increase the counter associated to the message - w.MsgChan <- push_msg + debug.log("[master] matching workers: \n%s\n", matching_workers) + + // add the message to WorkingMsg, if it's not a duplicate! + if _, ok := master.WorkingMsg[push_msg.Id]; ok { + log.Printf("[master] detected duplicate message %s \n", push_msg.Id) + } else { + master.WorkingMsg[push_msg.Id] = 0 + // - dispatch the message to all the matching workers + for _, w := range matching_workers { + debug.log("[master] sending msg to worker: %s\n", w.Name) + // send the message to the worker + w.MsgChan <- push_msg + // increase the counter associated to the message + master.WorkingMsg[push_msg.Id] += 1 + debug.log("[master] now WorkingMsg[%s] is: %d\n", push_msg.Id, master.WorkingMsg[push_msg.Id]) + } + } + case done_msg := <-master.StatusChan: + // Here we manage a status message from a worker + debug.log("[master] received message from StatusChan: %s\n", done_msg) + if _, ok := master.WorkingMsg[done_msg.Id]; ok && master.WorkingMsg[done_msg.Id] > 0 { + master.WorkingMsg[done_msg.Id] -= 1 + if master.WorkingMsg[done_msg.Id] == 0 { + delete(master.WorkingMsg, done_msg.Id) + master.Spooler <- done_msg + } + } else { + log.Printf("[master] received completion event for non-existing message name: %s\n", done_msg.Id) } } } + debug.log("[master] Exiting the for loop, for some mysterious reason...\n") } func InitMaster() *SCORSHmaster { @@ -96,9 +116,12 @@ func InitMaster() *SCORSHmaster { master.Repos = make(map[string][]*SCORSHworker) master.WorkingMsg = make(map[string]int) - // This is the mutex-channel on which we receive acks from workers - master.StatusChan = make(chan SCORSHmsg, 1) - master.Spooler = make(chan SCORSHmsg, 1) + // This is the channel on which we receive acks from workers + master.StatusChan = make(chan SCORSHmsg) + // This is the channel on which we exchange messages with the spooler + master.Spooler = make(chan SCORSHmsg) + + debug.log("[InitMaster] StatusChan: %s\n", master.StatusChan) err_workers := StartWorkers(master) if err_workers != nil { @@ -111,17 +134,18 @@ func InitMaster() *SCORSHmaster { log.Fatal("Error starting spooler: ", err_spooler) } return master - } func main() { + var done chan int + flag.Parse() master := InitMaster() go Master(master) - <-master.StatusChan - + // wait indefinitely -- we should implement signal handling... + <-done } diff --git a/spooler.go b/spooler.go index 8febf31..d2b31c2 100644 --- a/spooler.go +++ b/spooler.go @@ -6,6 +6,7 @@ import ( "github.com/go-yaml/yaml" "io/ioutil" "log" + "os" // "time" ) @@ -32,28 +33,40 @@ func parse_request(fname string, msg *SCORSHmsg) error { return nil } -func spooler(watcher *fsnotify.Watcher, worker chan SCORSHmsg) { +func spooler(watcher *fsnotify.Watcher, master chan SCORSHmsg) { log.Println("Spooler started correctly") - var msg *SCORSHmsg - msg = new(SCORSHmsg) - for { select { case event := <-watcher.Events: + // Here we manage genuine events from fsnotify. We catch the + // "Write" event, which should happen only when the file is + // created if event.Op == fsnotify.Write { - //time.Sleep(1000 * time.Millisecond) + var msg SCORSHmsg debug.log("[spooler] new file %s detected\n", event.Name) - err := parse_request(event.Name, msg) + err := parse_request(event.Name, &msg) if err != nil { log.Printf("Invalid packet received. [%s]\n", err) } debug.log("[spooler] read message: %s\n", msg) - worker <- *msg + msg.Path = event.Name + master <- msg } case err := <-watcher.Errors: - log.Println("error:", err) + // here we manage event errors + log.Println("[spooler] error: ", err) + case msg := <-master: + // Here we receive messages from the master about files to be + // removed + log.Printf("[spooler] received deletion request for: %s\n", msg.Path) + err := os.Remove(msg.Path) + if err != nil { + log.Printf("[spooler] error removing file: %s\n", err) + } else { + log.Printf("[spooler] file %s successfully removed\n", msg.Path) + } } } } diff --git a/types.go b/types.go index 50b278f..8ff9bc6 100644 --- a/types.go +++ b/types.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "fmt" "golang.org/x/crypto/openpgp" ) @@ -16,11 +17,12 @@ const ( // the SCORSHmsg type represents messages received from the spool and // sent to workers type SCORSHmsg struct { - Name string `yaml:"m_id"` + Id string `yaml:"m_id"` Repo string `yaml:"m_repo"` Branch string `yaml:"m_branch"` Old_rev string `yaml:"m_oldrev"` New_rev string `yaml:"m_newrev"` + Path string } type SCORSHcmd struct { @@ -28,7 +30,7 @@ type SCORSHcmd struct { Hash string `yaml:"c_hash"` } -type SCORSHtag struct { +type SCORSHtag_cfg struct { Name string `yaml:"t_name"` Keyrings []string `yaml:"t_keyrings"` Commands []SCORSHcmd `yaml:"t_commands"` @@ -36,13 +38,13 @@ type SCORSHtag struct { // Configuration of a worker type SCORSHworker_cfg struct { - Name string `yaml:"w_name"` - Repos []string `yaml:"w_repos"` - Folder string `yaml:"w_folder"` - Logfile string `yaml:"w_logfile"` - Tagfile string `yaml:"w_tagfile"` - Keyrings []string `yaml:"w_keyrings"` - Tags []SCORSHtag `yaml:"w_tags"` + Name string `yaml:"w_name"` + Repos []string `yaml:"w_repos"` + Folder string `yaml:"w_folder"` + Logfile string `yaml:"w_logfile"` + Tagfile string `yaml:"w_tagfile"` + Keyrings []string `yaml:"w_keyrings"` + Tags []SCORSHtag_cfg `yaml:"w_tags"` } // State of a worker @@ -82,39 +84,30 @@ type SCORSHmaster struct { SCORSHmaster_state } +// client commands + +type SCORSHtag struct { + Tag string `yaml:"s_tag"` + Args []string `yaml:"s_args"` +} + +type SCORSHclient_msg struct { + Tags []SCORSHtag `yaml:"scorsh"` +} + +//////////////////////// + func (cfg *SCORSHmaster) String() string { var buff bytes.Buffer - buff.WriteString("spooldir: ") - buff.WriteString(cfg.Spooldir) - buff.WriteString("\nlogfile: ") - buff.WriteString(cfg.Logfile) - buff.WriteString("\nlogprefix: ") - buff.WriteString(cfg.LogPrefix) - buff.WriteString("\nWorkers: \n") + fmt.Fprintf(&buff, "spooldir: %s\n", cfg.Spooldir) + fmt.Fprintf(&buff, "logfile: %s\n", cfg.Logfile) + fmt.Fprintf(&buff, "logprefix: %s\n", cfg.LogPrefix) + fmt.Fprintf(&buff, "Workers: \n") for _, w := range cfg.Workers { - buff.WriteString("---\n name: ") - buff.WriteString(w.Name) - buff.WriteString("\n repos: ") - for _, r := range w.Repos { - buff.WriteString("\n ") - buff.WriteString(r) - } - buff.WriteString("\n folder: ") - buff.WriteString(w.Folder) - buff.WriteString("\n logfile: ") - buff.WriteString(w.Logfile) - buff.WriteString("\n tagfile: ") - buff.WriteString(w.Tagfile) - buff.WriteString("\n keyrings: ") - for _, k := range w.Keyrings { - buff.WriteString("\n ") - buff.WriteString(k) - } - buff.WriteString("\n...\n") - + fmt.Fprintf(&buff, "%s", &w) } return buff.String() @@ -123,16 +116,26 @@ func (cfg *SCORSHmaster) String() string { func (msg *SCORSHmsg) String() string { var buff bytes.Buffer - buff.WriteString("\nName: ") - buff.WriteString(msg.Name) - buff.WriteString("\nRepo: ") - buff.WriteString(msg.Repo) - buff.WriteString("\nBranch: ") - buff.WriteString(msg.Branch) - buff.WriteString("\nOld_rev: ") - buff.WriteString(msg.Old_rev) - buff.WriteString("\nNew_rev: ") - buff.WriteString(msg.New_rev) + fmt.Fprintf(&buff, "Id: %s\n", msg.Id) + fmt.Fprintf(&buff, "Repo: %s\n", msg.Repo) + fmt.Fprintf(&buff, "Branch: %s\n", msg.Branch) + fmt.Fprintf(&buff, "Old_Rev: %s\n", msg.Old_rev) + fmt.Fprintf(&buff, "New_rev: %s\n", msg.New_rev) + fmt.Fprintf(&buff, "Path: %s\n", msg.Path) + return buff.String() } + +func (w *SCORSHworker) String() string { + + var buff bytes.Buffer + fmt.Fprintf(&buff, "Name: %s\n", w.Name) + fmt.Fprintf(&buff, "Repos: %s\n", w.Repos) + fmt.Fprintf(&buff, "Folder: %s\n", w.Folder) + fmt.Fprintf(&buff, "Logfile: %s\n", w.Logfile) + fmt.Fprintf(&buff, "Tagfile: %s\n", w.Tagfile) + fmt.Fprintf(&buff, "Keyrings: %s\n", w.Keyrings) + + return buff.String() +} diff --git a/workers.go b/workers.go index 33c6166..81281df 100644 --- a/workers.go +++ b/workers.go @@ -9,6 +9,7 @@ import ( "os" "regexp" "strings" + "time" ) func (worker *SCORSHworker) Matches(repo, branch string) bool { @@ -80,20 +81,26 @@ func Worker(w *SCORSHworker) { var msg SCORSHmsg log.Printf("[worker: %s] Started\n", w.Name) + debug.log("[worker: %s] MsgChan: %s\n", w.Name, w.MsgChan) + // notify that we have been started! w.StatusChan <- msg // This is the main worker loop for { select { case msg = <-w.MsgChan: - debug.log("[worker: %s] received message %s\n", w.Name, msg.Name) + debug.log("[worker: %s] received message %s\n", w.Name, msg.Id) // process message // err := walk_commits(msg, w) // if err != nil { // log.Printf("[worker: %s] error in walk_commits: %s", err) // } - log.Printf("[worker: %s] Received message: ", w.Name, msg) + debug.log("[worker: %s] Received message: %s", w.Name, msg) + debug.log("[worker: %s] StatusChan: %s\n", w.Name, w.StatusChan) + time.Sleep(1000 * time.Millisecond) + w.StatusChan <- msg + debug.log("[worker: %s] Sent message back: %s", w.Name, msg) } } } @@ -113,7 +120,7 @@ func StartWorkers(master *SCORSHmaster) error { worker := &(master.Workers[w]) // Set the Status and Msg channels worker.StatusChan = master.StatusChan - worker.MsgChan = make(chan SCORSHmsg) + worker.MsgChan = make(chan SCORSHmsg, 10) // Load worker keyrings err := worker.LoadKeyrings() if err != nil { -- cgit v1.2.3