summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKatolaZ <katolaz@freaknet.org>2017-07-18 16:24:49 +0100
committerKatolaZ <katolaz@freaknet.org>2017-07-18 16:24:49 +0100
commit7df9111d9f24f12417cc4938c7e859d97b45680e (patch)
treea99c29afcfaf865976ca3c4e21bccceacd40a455
parent74f8a74ecf8284fd4182cabab797f17bb18e7032 (diff)
The pipeline "spooling -> routing -> file deletion" works
-rw-r--r--commits.go30
-rw-r--r--scorsh.go62
-rw-r--r--spooler.go29
-rw-r--r--types.go95
-rw-r--r--workers.go13
5 files changed, 145 insertions, 84 deletions
diff --git a/commits.go b/commits.go
index 46672cf..b81bf7a 100644
--- a/commits.go
+++ b/commits.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/KatolaZ/git2go"
"golang.org/x/crypto/openpgp"
+ "log"
"os"
"strings"
// "log"
@@ -23,15 +24,15 @@ func CommitToString(commit *git.Commit) string {
}
// FIXME: RETURN THE ENTITY PROVIDED BY THE CHECK, OR nil
-func check_signature(commit *git.Commit, keys []*openpgp.KeyRing) (signature, signed string, err error) {
+func check_signature(commit *git.Commit, keys *map[string]openpgp.KeyRing) (signature, signed string, err error) {
signature, signed, err = commit.ExtractSignature()
if err == nil {
- for _, keyring := range keys {
+ for _, keyring := range *keys {
_, err_sig :=
- openpgp.CheckArmoredDetachedSignature(*keyring, strings.NewReader(signed),
+ openpgp.CheckArmoredDetachedSignature(keyring, strings.NewReader(signed),
strings.NewReader(signature))
if err_sig == nil {
@@ -45,6 +46,14 @@ func check_signature(commit *git.Commit, keys []*openpgp.KeyRing) (signature, si
return "", "", err
}
+func find_scorsh_message(commit *git.Commit) (string, error) {
+
+ msg := commit.RawMessage()
+ debug.log("[find_scorsg_msg] found message:\n %s\n", msg)
+
+ return msg, nil
+}
+
// traverse all the commits between two references, looking for scorsh
// commands
// fixme: we don't have just one keyring here....
@@ -91,12 +100,17 @@ func walk_commits(msg SCORSHmsg, w *SCORSHworker) error {
// check if it can be verified by any of the keyrings associated
// with the scorsh-tag
- //signature, signed, err := check_signature(commit, &keyring)
+ // check if the commit contains a scorsh command
+
+ _, err = find_scorsh_message(commit)
+
+ //signature, signed, err := check_signature(commit, &w.Keys)
//_, _, err := check_signature(commit, w.keys)
- //if err != nil {
- // log.Printf("%s\n", SCORSHerr(SCORSH_ERR_SIGNATURE))
- //
- //}
+ if err != nil {
+ log.Printf("[worker: %s] %s\n", w.Name, SCORSHerr(SCORSH_ERR_SIGNATURE))
+ } else {
+
+ }
cur_commit = commit.Parent(0)
} else {
fmt.Printf("Commit %x not found!\n", cur_commit.Id())
diff --git a/scorsh.go b/scorsh.go
index aca3121..d36b646 100644
--- a/scorsh.go
+++ b/scorsh.go
@@ -62,32 +62,52 @@ func Master(master *SCORSHmaster) {
// master main loop:
var matching_workers []*SCORSHworker
- var push_msg SCORSHmsg
matching_workers = make([]*SCORSHworker, len(master.Workers))
log.Println("[master] Master started ")
+ debug.log("[master] StatusChan: %s\n", master.StatusChan)
for {
+ debug.log("[master] Receive loop...\n")
select {
- // - receive stuff from the spooler
- case push_msg = <-master.Spooler:
-
+ case push_msg := <-master.Spooler:
+ // here we manage the stuff we receive from the spooler
debug.log("[master] received message: %s\n", push_msg)
-
// - lookup the repos map for matching workers
matching_workers = FindMatchingWorkers(master, &push_msg)
- debug.log("[master] matching workers: %s\n", matching_workers)
-
- // add the message to PendingMsg
- //...
- // - dispatch the message to all the matching workers
- for _, w := range matching_workers {
- // increase the counter associated to the message
- w.MsgChan <- push_msg
+ debug.log("[master] matching workers: \n%s\n", matching_workers)
+
+ // add the message to WorkingMsg, if it's not a duplicate!
+ if _, ok := master.WorkingMsg[push_msg.Id]; ok {
+ log.Printf("[master] detected duplicate message %s \n", push_msg.Id)
+ } else {
+ master.WorkingMsg[push_msg.Id] = 0
+ // - dispatch the message to all the matching workers
+ for _, w := range matching_workers {
+ debug.log("[master] sending msg to worker: %s\n", w.Name)
+ // send the message to the worker
+ w.MsgChan <- push_msg
+ // increase the counter associated to the message
+ master.WorkingMsg[push_msg.Id] += 1
+ debug.log("[master] now WorkingMsg[%s] is: %d\n", push_msg.Id, master.WorkingMsg[push_msg.Id])
+ }
+ }
+ case done_msg := <-master.StatusChan:
+ // Here we manage a status message from a worker
+ debug.log("[master] received message from StatusChan: %s\n", done_msg)
+ if _, ok := master.WorkingMsg[done_msg.Id]; ok && master.WorkingMsg[done_msg.Id] > 0 {
+ master.WorkingMsg[done_msg.Id] -= 1
+ if master.WorkingMsg[done_msg.Id] == 0 {
+ delete(master.WorkingMsg, done_msg.Id)
+ master.Spooler <- done_msg
+ }
+ } else {
+ log.Printf("[master] received completion event for non-existing message name: %s\n", done_msg.Id)
}
}
}
+ debug.log("[master] Exiting the for loop, for some mysterious reason...\n")
}
func InitMaster() *SCORSHmaster {
@@ -96,9 +116,12 @@ func InitMaster() *SCORSHmaster {
master.Repos = make(map[string][]*SCORSHworker)
master.WorkingMsg = make(map[string]int)
- // This is the mutex-channel on which we receive acks from workers
- master.StatusChan = make(chan SCORSHmsg, 1)
- master.Spooler = make(chan SCORSHmsg, 1)
+ // This is the channel on which we receive acks from workers
+ master.StatusChan = make(chan SCORSHmsg)
+ // This is the channel on which we exchange messages with the spooler
+ master.Spooler = make(chan SCORSHmsg)
+
+ debug.log("[InitMaster] StatusChan: %s\n", master.StatusChan)
err_workers := StartWorkers(master)
if err_workers != nil {
@@ -111,17 +134,18 @@ func InitMaster() *SCORSHmaster {
log.Fatal("Error starting spooler: ", err_spooler)
}
return master
-
}
func main() {
+ var done chan int
+
flag.Parse()
master := InitMaster()
go Master(master)
- <-master.StatusChan
-
+ // wait indefinitely -- we should implement signal handling...
+ <-done
}
diff --git a/spooler.go b/spooler.go
index 8febf31..d2b31c2 100644
--- a/spooler.go
+++ b/spooler.go
@@ -6,6 +6,7 @@ import (
"github.com/go-yaml/yaml"
"io/ioutil"
"log"
+ "os"
// "time"
)
@@ -32,28 +33,40 @@ func parse_request(fname string, msg *SCORSHmsg) error {
return nil
}
-func spooler(watcher *fsnotify.Watcher, worker chan SCORSHmsg) {
+func spooler(watcher *fsnotify.Watcher, master chan SCORSHmsg) {
log.Println("Spooler started correctly")
- var msg *SCORSHmsg
- msg = new(SCORSHmsg)
-
for {
select {
case event := <-watcher.Events:
+ // Here we manage genuine events from fsnotify. We catch the
+ // "Write" event, which should happen only when the file is
+ // created
if event.Op == fsnotify.Write {
- //time.Sleep(1000 * time.Millisecond)
+ var msg SCORSHmsg
debug.log("[spooler] new file %s detected\n", event.Name)
- err := parse_request(event.Name, msg)
+ err := parse_request(event.Name, &msg)
if err != nil {
log.Printf("Invalid packet received. [%s]\n", err)
}
debug.log("[spooler] read message: %s\n", msg)
- worker <- *msg
+ msg.Path = event.Name
+ master <- msg
}
case err := <-watcher.Errors:
- log.Println("error:", err)
+ // here we manage event errors
+ log.Println("[spooler] error: ", err)
+ case msg := <-master:
+ // Here we receive messages from the master about files to be
+ // removed
+ log.Printf("[spooler] received deletion request for: %s\n", msg.Path)
+ err := os.Remove(msg.Path)
+ if err != nil {
+ log.Printf("[spooler] error removing file: %s\n", err)
+ } else {
+ log.Printf("[spooler] file %s successfully removed\n", msg.Path)
+ }
}
}
}
diff --git a/types.go b/types.go
index 50b278f..8ff9bc6 100644
--- a/types.go
+++ b/types.go
@@ -2,6 +2,7 @@ package main
import (
"bytes"
+ "fmt"
"golang.org/x/crypto/openpgp"
)
@@ -16,11 +17,12 @@ const (
// the SCORSHmsg type represents messages received from the spool and
// sent to workers
type SCORSHmsg struct {
- Name string `yaml:"m_id"`
+ Id string `yaml:"m_id"`
Repo string `yaml:"m_repo"`
Branch string `yaml:"m_branch"`
Old_rev string `yaml:"m_oldrev"`
New_rev string `yaml:"m_newrev"`
+ Path string
}
type SCORSHcmd struct {
@@ -28,7 +30,7 @@ type SCORSHcmd struct {
Hash string `yaml:"c_hash"`
}
-type SCORSHtag struct {
+type SCORSHtag_cfg struct {
Name string `yaml:"t_name"`
Keyrings []string `yaml:"t_keyrings"`
Commands []SCORSHcmd `yaml:"t_commands"`
@@ -36,13 +38,13 @@ type SCORSHtag struct {
// Configuration of a worker
type SCORSHworker_cfg struct {
- Name string `yaml:"w_name"`
- Repos []string `yaml:"w_repos"`
- Folder string `yaml:"w_folder"`
- Logfile string `yaml:"w_logfile"`
- Tagfile string `yaml:"w_tagfile"`
- Keyrings []string `yaml:"w_keyrings"`
- Tags []SCORSHtag `yaml:"w_tags"`
+ Name string `yaml:"w_name"`
+ Repos []string `yaml:"w_repos"`
+ Folder string `yaml:"w_folder"`
+ Logfile string `yaml:"w_logfile"`
+ Tagfile string `yaml:"w_tagfile"`
+ Keyrings []string `yaml:"w_keyrings"`
+ Tags []SCORSHtag_cfg `yaml:"w_tags"`
}
// State of a worker
@@ -82,39 +84,30 @@ type SCORSHmaster struct {
SCORSHmaster_state
}
+// client commands
+
+type SCORSHtag struct {
+ Tag string `yaml:"s_tag"`
+ Args []string `yaml:"s_args"`
+}
+
+type SCORSHclient_msg struct {
+ Tags []SCORSHtag `yaml:"scorsh"`
+}
+
+////////////////////////
+
func (cfg *SCORSHmaster) String() string {
var buff bytes.Buffer
- buff.WriteString("spooldir: ")
- buff.WriteString(cfg.Spooldir)
- buff.WriteString("\nlogfile: ")
- buff.WriteString(cfg.Logfile)
- buff.WriteString("\nlogprefix: ")
- buff.WriteString(cfg.LogPrefix)
- buff.WriteString("\nWorkers: \n")
+ fmt.Fprintf(&buff, "spooldir: %s\n", cfg.Spooldir)
+ fmt.Fprintf(&buff, "logfile: %s\n", cfg.Logfile)
+ fmt.Fprintf(&buff, "logprefix: %s\n", cfg.LogPrefix)
+ fmt.Fprintf(&buff, "Workers: \n")
for _, w := range cfg.Workers {
- buff.WriteString("---\n name: ")
- buff.WriteString(w.Name)
- buff.WriteString("\n repos: ")
- for _, r := range w.Repos {
- buff.WriteString("\n ")
- buff.WriteString(r)
- }
- buff.WriteString("\n folder: ")
- buff.WriteString(w.Folder)
- buff.WriteString("\n logfile: ")
- buff.WriteString(w.Logfile)
- buff.WriteString("\n tagfile: ")
- buff.WriteString(w.Tagfile)
- buff.WriteString("\n keyrings: ")
- for _, k := range w.Keyrings {
- buff.WriteString("\n ")
- buff.WriteString(k)
- }
- buff.WriteString("\n...\n")
-
+ fmt.Fprintf(&buff, "%s", &w)
}
return buff.String()
@@ -123,16 +116,26 @@ func (cfg *SCORSHmaster) String() string {
func (msg *SCORSHmsg) String() string {
var buff bytes.Buffer
- buff.WriteString("\nName: ")
- buff.WriteString(msg.Name)
- buff.WriteString("\nRepo: ")
- buff.WriteString(msg.Repo)
- buff.WriteString("\nBranch: ")
- buff.WriteString(msg.Branch)
- buff.WriteString("\nOld_rev: ")
- buff.WriteString(msg.Old_rev)
- buff.WriteString("\nNew_rev: ")
- buff.WriteString(msg.New_rev)
+ fmt.Fprintf(&buff, "Id: %s\n", msg.Id)
+ fmt.Fprintf(&buff, "Repo: %s\n", msg.Repo)
+ fmt.Fprintf(&buff, "Branch: %s\n", msg.Branch)
+ fmt.Fprintf(&buff, "Old_Rev: %s\n", msg.Old_rev)
+ fmt.Fprintf(&buff, "New_rev: %s\n", msg.New_rev)
+ fmt.Fprintf(&buff, "Path: %s\n", msg.Path)
+
return buff.String()
}
+
+func (w *SCORSHworker) String() string {
+
+ var buff bytes.Buffer
+ fmt.Fprintf(&buff, "Name: %s\n", w.Name)
+ fmt.Fprintf(&buff, "Repos: %s\n", w.Repos)
+ fmt.Fprintf(&buff, "Folder: %s\n", w.Folder)
+ fmt.Fprintf(&buff, "Logfile: %s\n", w.Logfile)
+ fmt.Fprintf(&buff, "Tagfile: %s\n", w.Tagfile)
+ fmt.Fprintf(&buff, "Keyrings: %s\n", w.Keyrings)
+
+ return buff.String()
+}
diff --git a/workers.go b/workers.go
index 33c6166..81281df 100644
--- a/workers.go
+++ b/workers.go
@@ -9,6 +9,7 @@ import (
"os"
"regexp"
"strings"
+ "time"
)
func (worker *SCORSHworker) Matches(repo, branch string) bool {
@@ -80,20 +81,26 @@ func Worker(w *SCORSHworker) {
var msg SCORSHmsg
log.Printf("[worker: %s] Started\n", w.Name)
+ debug.log("[worker: %s] MsgChan: %s\n", w.Name, w.MsgChan)
+ // notify that we have been started!
w.StatusChan <- msg
// This is the main worker loop
for {
select {
case msg = <-w.MsgChan:
- debug.log("[worker: %s] received message %s\n", w.Name, msg.Name)
+ debug.log("[worker: %s] received message %s\n", w.Name, msg.Id)
// process message
// err := walk_commits(msg, w)
// if err != nil {
// log.Printf("[worker: %s] error in walk_commits: %s", err)
// }
- log.Printf("[worker: %s] Received message: ", w.Name, msg)
+ debug.log("[worker: %s] Received message: %s", w.Name, msg)
+ debug.log("[worker: %s] StatusChan: %s\n", w.Name, w.StatusChan)
+ time.Sleep(1000 * time.Millisecond)
+ w.StatusChan <- msg
+ debug.log("[worker: %s] Sent message back: %s", w.Name, msg)
}
}
}
@@ -113,7 +120,7 @@ func StartWorkers(master *SCORSHmaster) error {
worker := &(master.Workers[w])
// Set the Status and Msg channels
worker.StatusChan = master.StatusChan
- worker.MsgChan = make(chan SCORSHmsg)
+ worker.MsgChan = make(chan SCORSHmsg, 10)
// Load worker keyrings
err := worker.LoadKeyrings()
if err != nil {