summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKatolaZ <katolaz@freaknet.org>2017-07-13 07:55:44 +0100
committerKatolaZ <katolaz@freaknet.org>2017-07-13 07:55:44 +0100
commit00c61083d7139f19b8d99dfc7ac6d7e22c4f9a78 (patch)
tree942623175a650be1c98694e0f2e05640df49d9df
parentb2b083e0597d3277f5dc075f4b17c89de51a77d0 (diff)
master and worker initialisation (draft)
-rw-r--r--config.go23
-rw-r--r--scorsh.cfg12
-rw-r--r--scorsh.go31
-rw-r--r--spooler.go2
-rw-r--r--types.go31
-rw-r--r--worker_config.cfg28
-rw-r--r--workers.go128
7 files changed, 206 insertions, 49 deletions
diff --git a/config.go b/config.go
index 15e234b..e64a35c 100644
--- a/config.go
+++ b/config.go
@@ -20,9 +20,11 @@ func ReadGlobalConfig(fname string) *SCORSHmaster {
log.Fatal("Error while reading file: ", err)
}
+
var cfg *SCORSHmaster
- cfg = new(SCORSHmaster)
+ cfg = new(SCORSHmaster)
+
// Unmarshal the YAML configuration file into a SCORSHcfg structure
err = yaml.Unmarshal(data, cfg)
if err != nil {
@@ -30,27 +32,29 @@ func ReadGlobalConfig(fname string) *SCORSHmaster {
}
fmt.Printf("%s", cfg)
-
+
// If the user has not set a spooldir, crash loudly
if cfg.Spooldir == "" {
log.Fatal("No spooldir defined in ", fname, ". Exiting\n")
}
// Check if the user has set a custom logprefix
- if cfg.LogPrefix != "" {
- log.SetPrefix(cfg.LogPrefix)
- }
// Check if the user wants to redirect the logs to a file
if cfg.Logfile != "" {
- f, err := os.Open(cfg.Logfile)
+ log.Printf("Opening log file: %s\n", cfg.Logfile)
+ f, err := os.OpenFile(cfg.Logfile, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
log.SetOutput(io.Writer(f))
} else {
- log.Printf("Error opening logfile: \n", err)
+ log.Fatal("Error opening logfile: ", cfg.Logfile, err)
}
}
+ if cfg.LogPrefix != "" {
+ log.SetPrefix(cfg.LogPrefix)
+ }
+
// If we got so far, then there is some sort of config in cfg
log.Printf("Successfully read config from %s\n", fname)
@@ -58,7 +62,10 @@ func ReadGlobalConfig(fname string) *SCORSHmaster {
}
-func (cfg *SCORSHmaster_cfg) String() string {
+
+
+
+func (cfg *SCORSHmaster) String() string {
var buff bytes.Buffer
diff --git a/scorsh.cfg b/scorsh.cfg
index a1320a3..01e8def 100644
--- a/scorsh.cfg
+++ b/scorsh.cfg
@@ -1,12 +1,6 @@
-#
-# This is a typical scorsh configuration. We declare here the list of
-# workers, with the corresponding repo/branches regular expressions
-# and the associated folder
-#
-
---
-s_spooldir: "/var/spool/scorsh"
-s_logfile: "/var/log/scorsh/scorsh.log"
+s_spooldir: "./spool"
+s_logfile: "./scorsh.log"
s_logprefix: "[scorsh]"
s_workers:
@@ -50,4 +44,4 @@ s_workers:
],
}
]
-... \ No newline at end of file
+...
diff --git a/scorsh.go b/scorsh.go
index 1d345b4..dceb6d7 100644
--- a/scorsh.go
+++ b/scorsh.go
@@ -61,27 +61,48 @@ func Master(master *SCORSHmaster) {
case push_msg = <- master.Spooler:
// - lookup the repos map for matching workers
matching_workers = FindMatchingWorkers(master, &push_msg)
+ // add the message to PendingMsg
+ //...
// - dispatch the message to all the matching workers
for _, w := range matching_workers {
- w.Chan <- push_msg
+ // increase the counter associated to the message
+ w.MsgChan <- push_msg
}
}
}
}
-func main() {
-
- flag.Parse()
+func InitMaster() *SCORSHmaster {
master := ReadGlobalConfig(*conf_file)
-
+
+ master.Repos = make(map[string][]*SCORSHworker)
+ master.WorkingMsg = make(map[string]int)
+ // This is the mutex-channel on which we receive acks from workers
+ master.StatusChan = make(chan SCORSHmsg, 1)
+
err_workers := StartWorkers(master)
if err_workers != nil {
log.Fatal("Error starting workers: ", err_workers)
+ } else {
+ log.Println("Workers started correctly")
}
err_spooler := StartSpooler(master)
if err_spooler != nil {
log.Fatal("Error starting spooler: ", err_spooler)
+ } else {
+ log.Println("Spooler started correctly")
}
+ return master
+
+}
+
+
+func main() {
+
+ flag.Parse()
+
+ master := InitMaster()
+
go Master(master)
}
diff --git a/spooler.go b/spooler.go
index 8d7bdc9..e11b980 100644
--- a/spooler.go
+++ b/spooler.go
@@ -16,6 +16,8 @@ func parse_request(fname string) (SCORSHmsg, error) {
log.Printf("Unable to open file: %s\n", fname)
return ret, SCORSHerr(SCORSH_ERR_NO_FILE)
}
+
+ // FIXME: Fill in the ret structure
return ret, nil
diff --git a/types.go b/types.go
index b14a103..3ccb6cc 100644
--- a/types.go
+++ b/types.go
@@ -15,22 +15,22 @@ const (
// the SCORSHmsg type represents messages received from the spool and
// sent to workers
type SCORSHmsg struct {
+ name string
repo string
branch string
old_rev string
new_rev string
}
-
type SCORSHcmd struct {
- URL string
- hash string
+ URL string `yaml:"c_url"`
+ Hash string `yaml:"c_hash"`
}
type SCORSHtag struct {
- TagName string
- Keyrings []string
- Commands []SCORSHcmd
+ Name string `yaml:"t_name"`
+ Keyrings []string `yaml:"t_keyrings"`
+ Commands []SCORSHcmd `yaml:"t_commands"`
}
// Configuration of a worker
@@ -45,16 +45,17 @@ type SCORSHworker_cfg struct {
// State of a worker
type SCORSHworker_state struct {
- Tags map[string]SCORSHtag
- Keys map[string]openpgp.KeyRing
- Chan chan SCORSHmsg
+ Tags []SCORSHtag `yaml:"w_tags"`
+ Keys map[string]openpgp.KeyRing
+ MsgChan chan SCORSHmsg
+ StatusChan chan SCORSHmsg
}
// The type SCORSHworker represents the configuration and state of a
// worker
type SCORSHworker struct {
- SCORSHworker_cfg
- SCORSHworker_state
+ SCORSHworker_cfg `yaml:",inline"`
+ SCORSHworker_state `yaml:",inline"`
}
// Configuration of the master
@@ -67,13 +68,15 @@ type SCORSHmaster_cfg struct {
// State of the master
type SCORSHmaster_state struct {
- Spooler chan SCORSHmsg
- Repos map[string][]*SCORSHworker
+ Spooler chan SCORSHmsg
+ StatusChan chan SCORSHmsg
+ Repos map[string][]*SCORSHworker
+ WorkingMsg map[string]int
}
// The type SCORSHmaster represents the configuration and state of the
// master
type SCORSHmaster struct {
- SCORSHmaster_cfg
+ SCORSHmaster_cfg `yaml:",inline"`
SCORSHmaster_state
}
diff --git a/worker_config.cfg b/worker_config.cfg
index 5173b6f..a156ac8 100644
--- a/worker_config.cfg
+++ b/worker_config.cfg
@@ -9,11 +9,11 @@
---
w_tags:
- [
{
t_name: "BUILD",
- t_keyrings: ["build_keyring.asc", "general_keyring.asc"],
- t_commands: [
+ {
+ t_keyrings: ["build_keyring.asc", "general_keyring.asc"],
+ t_commands: [
{
c_url: "file:///home/user/bin/script.sh $1 $2",
c_hash: "12da324fb76s924acbce"
@@ -21,17 +21,19 @@ w_tags:
{
c_url: "http://my.server.net/call.pl?branch=$1"
}
- ]
- },
+ ]
+ }
{
t_name: "PUBLISH",
- t_keyrings: ["web_developers.asc"],
- t_commands: [
- {
- c_url: "file:///usr/local/bin/publish.py $repo $branch",
- c_hash: "3234567898765432345678"
- }
- ]
+ {
+ t_keyrings: ["web_developers.asc"],
+ t_commands: [
+ {
+ c_url: "file:///usr/local/bin/publish.py $repo $branch",
+ c_hash: "3234567898765432345678"
+ }
+ ]
+ }
}
- ]
+ }
... \ No newline at end of file
diff --git a/workers.go b/workers.go
new file mode 100644
index 0000000..d5462c1
--- /dev/null
+++ b/workers.go
@@ -0,0 +1,128 @@
+package main
+
+import (
+ "fmt"
+ "github.com/go-yaml/yaml"
+ "golang.org/x/crypto/openpgp"
+ "io/ioutil"
+ "log"
+ "os"
+ "regexp"
+ "strings"
+)
+
+func (worker *SCORSHworker) Matches(repo, branch string) bool {
+
+ for _, r := range worker.Repos {
+ parts := strings.SplitN(r, ":", 2)
+ repo_pattern := parts[0]
+ branch_pattern := parts[1]
+ repo_match, _ := regexp.MatchString(repo_pattern, repo)
+ branch_match, _ := regexp.MatchString(branch_pattern, branch)
+ if repo_match && branch_match {
+ return true
+ }
+ }
+ return false
+}
+
+func (w *SCORSHworker) LoadKeyrings() error {
+
+ w.Keys = make(map[string]openpgp.KeyRing, len(w.Keyrings))
+
+ // Open the keyring files
+ for _, keyring := range w.Keyrings {
+ f, err_file := os.Open(keyring)
+
+ if err_file != nil {
+ log.Printf("[worker] cannot open keyring:", err_file)
+ f.Close()
+ return fmt.Errorf("Unable to open keyring: ", err_file)
+ }
+
+ // load the keyring
+ kr, err_key := openpgp.ReadArmoredKeyRing(f)
+
+ if err_key != nil {
+ log.Printf("[worker] cannot load keyring: ", err_key)
+ f.Close()
+ return fmt.Errorf("Unable to load keyring: ", err_key)
+ }
+ w.Keys[keyring] = kr
+ f.Close()
+ }
+ return nil
+}
+
+// Still to be implemented
+func (w *SCORSHworker) LoadTags() error {
+
+ w_tags, err := ioutil.ReadFile(w.Tagfile)
+ if err != nil{
+ log.Printf("[worker:%s] Cannot read worker config: ", w.Name, err)
+ return err
+ }
+
+ err = yaml.Unmarshal(w_tags, w.Tags)
+
+ if err != nil {
+ log.Printf("[worker:%s] Error while reading tags: ", w.Name, err)
+ return err
+ }
+
+
+ return nil
+}
+
+// FIXME--- STILL UNDER HEAVY WORK...
+func SCORSHWorker(w *SCORSHworker) {
+
+
+ // This is the main worker loop
+ for {
+ select {
+ case msg := <-w.MsgChan:
+ // process message
+ err := walk_commits(msg, w)
+ if err != nil {
+ log.Printf("[worker: %s] error in walk_commits: %s", err)
+ }
+ }
+ }
+}
+
+// StartWorkers starts all the workers specified in a given
+// configuration and fills in the SCORSHmaster struct
+func StartWorkers(master *SCORSHmaster) error {
+
+ num_workers := len(master.Workers)
+
+ // We should now start each worker
+
+ for w:=1; w<num_workers; w++ {
+
+ worker := & (master.Workers[w])
+ // Set the Status and Msg channels
+ worker.StatusChan = master.StatusChan
+ worker.MsgChan = make(chan SCORSHmsg)
+ // Load worker keyrings
+ err := worker.LoadKeyrings()
+ if err != nil {
+ log.Printf("[worker: %s] Unable to load keyrings (Exiting): %s\n", worker.Name, err)
+ close(worker.MsgChan)
+ return err
+ }
+ // Load worker tags from worker.Tagfile
+ err = worker.LoadTags()
+ if err != nil {
+ log.Printf("[worker: %s] Unable to load tags (Exiting): %s\n", worker.Name, err)
+ close(worker.MsgChan)
+ return err
+ }
+ // Add the repos definitions to the map master.Repos
+ for _, repo_name := range worker.Repos {
+ master.Repos[repo_name] = append(master.Repos[repo_name], worker)
+ }
+ }
+ return nil
+}