зеркало из https://github.com/mozilla/mig.git
Merge pull request #407 from ameihm0912/remove-worker
remove remaining worker related code
This commit is contained in:
Коммит
d39d7435f8
5
Makefile
5
Makefile
|
@ -161,9 +161,6 @@ mig-cmd: create-bindir
|
|||
mig-agent-search: create-bindir
|
||||
$(GO) build $(GOOPTS) -o $(BINDIR)/mig-agent-search $(GOLDFLAGS) mig.ninja/mig/client/mig-agent-search
|
||||
|
||||
worker-agent-verif: create-bindir
|
||||
$(GO) build $(GOOPTS) -o $(BINDIR)/mig-worker-agent-verif $(GOLDFLAGS) mig.ninja/mig/workers/mig-worker-agent-verif
|
||||
|
||||
runner-compliance: create-bindir
|
||||
$(GO) build $(GOOPTS) -o $(BINDIR)/runner-compliance $(GOLDFLAGS) mig.ninja/mig/runner-plugins/runner-compliance
|
||||
|
||||
|
@ -319,7 +316,6 @@ test: test-modules
|
|||
$(GO) test mig.ninja/mig/mig-loader/...
|
||||
$(GO) test mig.ninja/mig/client/...
|
||||
$(GO) test mig.ninja/mig/database/...
|
||||
$(GO) test mig.ninja/mig/workers/...
|
||||
$(GO) test mig.ninja/mig
|
||||
|
||||
test-modules:
|
||||
|
@ -350,7 +346,6 @@ vet:
|
|||
$(GO) vet mig.ninja/mig/client/...
|
||||
$(GO) vet mig.ninja/mig/modules/...
|
||||
$(GO) vet mig.ninja/mig/database/...
|
||||
$(GO) vet mig.ninja/mig/workers/...
|
||||
$(GO) vet mig.ninja/mig
|
||||
|
||||
clean: clean-agent
|
||||
|
|
|
@ -10,7 +10,6 @@ const (
|
|||
// rabbitmq exchanges and common queues
|
||||
Mq_Ex_ToAgents = "toagents"
|
||||
Mq_Ex_ToSchedulers = "toschedulers"
|
||||
Mq_Ex_ToWorkers = "toworkers"
|
||||
Mq_Q_Heartbeat = "mig.agt.heartbeats"
|
||||
Mq_Q_Results = "mig.agt.results"
|
||||
|
||||
|
|
|
@ -104,11 +104,6 @@ func getHeartbeats(msg amqp.Delivery, ctx Context) (err error) {
|
|||
if !ok {
|
||||
desc := fmt.Sprintf("getHeartbeats(): Agent '%s' is not authorized", agt.QueueLoc)
|
||||
ctx.Channels.Log <- mig.Log{Desc: desc}.Warning()
|
||||
// send an event to notify workers of the failed agent auth
|
||||
err = sendEvent(mig.Ev_Q_Agt_Auth_Fail, msg.Body, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// agent authorization failed so we drop this heartbeat and return
|
||||
return
|
||||
}
|
||||
|
@ -126,11 +121,6 @@ func getHeartbeats(msg amqp.Delivery, ctx Context) (err error) {
|
|||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB insertion failed with error '%v' for agent '%s'", err, agt.Name)}.Err()
|
||||
}
|
||||
// notify the agt.new event queue
|
||||
err = sendEvent(mig.Ev_Q_Agt_New, msg.Body, ctx)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Failed to send migevent to %s: %v", err, mig.Ev_Q_Agt_New)}.Err()
|
||||
}
|
||||
} else {
|
||||
// the agent exists in database. reuse the existing ID, and keep the status if it was
|
||||
// previously set to destroyed, otherwise set status to online
|
||||
|
|
|
@ -299,11 +299,6 @@ func initRelay(orig_ctx Context) (ctx Context, err error) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// declare the "migevent" exchange used for communication between the platform components
|
||||
err = ctx.MQ.Chan.ExchangeDeclare(mig.Mq_Ex_ToWorkers, "topic", true, false, false, false, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{Desc: "AMQP connection opened"}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
//
|
||||
// Contributor: Julien Vehent jvehent@mozilla.com [:ulfr]
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
"mig.ninja/mig"
|
||||
)
|
||||
|
||||
// sendEvent publishes a message to the miginternal rabbitmq exchange
|
||||
func sendEvent(key string, body []byte, ctx Context) error {
|
||||
msg := amqp.Publishing{
|
||||
DeliveryMode: amqp.Persistent,
|
||||
Timestamp: time.Now(),
|
||||
ContentType: "text/plain",
|
||||
Expiration: "6000000", // events expire after 100 minutes if not consumed
|
||||
Body: body,
|
||||
}
|
||||
err := ctx.MQ.Chan.Publish(mig.Mq_Ex_ToWorkers, key, false, false, msg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("event publication failed. err='%v', key='%s', body='%s'", err, key, msg)
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -206,11 +206,6 @@ func startRoutines(ctx Context) {
|
|||
}.Err()
|
||||
continue
|
||||
}
|
||||
// publish an event in the command results queue
|
||||
err = sendEvent(mig.Ev_Q_Cmd_Res, delivery.Body, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "agents results listener routine started"}
|
||||
|
@ -278,22 +273,6 @@ func startRoutines(ctx Context) {
|
|||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "killDupAgents() routine started"}
|
||||
|
||||
// launch the routine that heartbeats the relays and terminates if connection is lost
|
||||
go func() {
|
||||
hostname, _ := os.Hostname()
|
||||
hbmsg := fmt.Sprintf("host='%s' pid='%d'", hostname, os.Getpid())
|
||||
for {
|
||||
ctx.OpID = mig.GenID()
|
||||
err = sendEvent(mig.Ev_Q_Sched_Hb, []byte(hbmsg+time.Now().UTC().String()), ctx)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("relay heartbeating failed with error '%v'", err)
|
||||
ctx.Channels.Terminate <- err
|
||||
}
|
||||
time.Sleep(60 * time.Second)
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "relay heartbeating routine started"}
|
||||
|
||||
// block here until a terminate message is received
|
||||
exitReason := <-ctx.Channels.Terminate
|
||||
time.Sleep(time.Second)
|
||||
|
|
|
@ -1,62 +0,0 @@
|
|||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
//
|
||||
// Contributor: Julien Vehent jvehent@mozilla.com [:ulfr]
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"gopkg.in/gcfg.v1"
|
||||
"mig.ninja/mig"
|
||||
"mig.ninja/mig/workers"
|
||||
)
|
||||
|
||||
const workerName = "agent_verif"
|
||||
|
||||
type Config struct {
|
||||
Mq workers.MqConf
|
||||
Logging mig.Logging
|
||||
}
|
||||
|
||||
func main() {
|
||||
var (
|
||||
err error
|
||||
conf Config
|
||||
)
|
||||
flag.Usage = func() {
|
||||
fmt.Fprintf(os.Stderr, "%s - a worker verifying agents that fail to authenticate\n", os.Args[0])
|
||||
flag.PrintDefaults()
|
||||
}
|
||||
var configPath = flag.String("c", "/etc/mig/agent-verif-worker.cfg", "Load configuration from file")
|
||||
var showversion = flag.Bool("V", false, "Show build version and exit")
|
||||
flag.Parse()
|
||||
if *showversion {
|
||||
fmt.Println(mig.Version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
err = gcfg.ReadFileInto(&conf, *configPath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
logctx, err := mig.InitLogger(conf.Logging, workerName)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// set a binding to route events from mig.Ev_Q_Agt_Auth_Fail into the queue named after the worker
|
||||
// and return a channel that consumes the queue
|
||||
workerQueue := "migevent.worker." + workerName
|
||||
consumerChan, err := workers.InitMqWithConsumer(conf.Mq, workerQueue, mig.Ev_Q_Agt_Auth_Fail)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Println("started worker", workerName, "consuming queue", workerQueue, "from key", mig.Ev_Q_Agt_Auth_Fail)
|
||||
for event := range consumerChan {
|
||||
mig.ProcessLog(logctx, mig.Log{Desc: fmt.Sprintf("unverified agent '%s'", event.Body)})
|
||||
}
|
||||
return
|
||||
}
|
|
@ -1,124 +0,0 @@
|
|||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
//
|
||||
// Contributor: Julien Vehent jvehent@mozilla.com [:ulfr]
|
||||
package workers /* import "mig.ninja/mig/workers" */
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"github.com/streadway/amqp"
|
||||
"io/ioutil"
|
||||
"mig.ninja/mig"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
type MqConf struct {
|
||||
Host, User, Pass, Vhost string
|
||||
Port int
|
||||
UseTLS bool
|
||||
TLScert, TLSkey, CAcert string
|
||||
Timeout string
|
||||
}
|
||||
|
||||
func InitMQ(conf MqConf) (amqpChan *amqp.Channel, err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("worker.initMQ() -> %v", e)
|
||||
}
|
||||
}()
|
||||
// create an AMQP configuration with a 10min heartbeat and timeout
|
||||
// dialing address use format "<scheme>://<user>:<pass>@<host>:<port><vhost>"
|
||||
var scheme, user, pass, host, port, vhost string
|
||||
if conf.UseTLS {
|
||||
scheme = "amqps"
|
||||
} else {
|
||||
scheme = "amqp"
|
||||
}
|
||||
if conf.User == "" {
|
||||
panic("MQ User is missing")
|
||||
}
|
||||
user = conf.User
|
||||
if conf.Pass == "" {
|
||||
panic("MQ Pass is missing")
|
||||
}
|
||||
pass = conf.Pass
|
||||
if conf.Host == "" {
|
||||
panic("MQ Host is missing")
|
||||
}
|
||||
host = conf.Host
|
||||
if conf.Port < 1 {
|
||||
panic("MQ Port is missing")
|
||||
}
|
||||
port = fmt.Sprintf("%d", conf.Port)
|
||||
vhost = conf.Vhost
|
||||
dialaddr := scheme + "://" + user + ":" + pass + "@" + host + ":" + port + "/" + vhost
|
||||
|
||||
timeout, _ := time.ParseDuration(conf.Timeout)
|
||||
var dialConfig amqp.Config
|
||||
dialConfig.Heartbeat = timeout
|
||||
dialConfig.Dial = func(network, addr string) (net.Conn, error) {
|
||||
return net.DialTimeout(network, addr, timeout)
|
||||
}
|
||||
if conf.UseTLS {
|
||||
// import the client certificates
|
||||
cert, err := tls.LoadX509KeyPair(conf.TLScert, conf.TLSkey)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// import the ca cert
|
||||
data, err := ioutil.ReadFile(conf.CAcert)
|
||||
ca := x509.NewCertPool()
|
||||
if ok := ca.AppendCertsFromPEM(data); !ok {
|
||||
panic("failed to import CA Certificate")
|
||||
}
|
||||
TLSconfig := tls.Config{Certificates: []tls.Certificate{cert},
|
||||
RootCAs: ca,
|
||||
InsecureSkipVerify: false,
|
||||
Rand: rand.Reader}
|
||||
dialConfig.TLSClientConfig = &TLSconfig
|
||||
}
|
||||
// Setup the AMQP broker connection
|
||||
amqpConn, err := amqp.DialConfig(dialaddr, dialConfig)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
amqpChan, err = amqpConn.Channel()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func InitMqWithConsumer(conf MqConf, name, key string) (consumer <-chan amqp.Delivery, err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("worker.InitMqWithConsumer() -> %v", e)
|
||||
}
|
||||
}()
|
||||
amqpChan, err := InitMQ(conf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = amqpChan.QueueDeclare(name, true, false, false, false, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = amqpChan.QueueBind(name, key, mig.Mq_Ex_ToWorkers, false, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = amqpChan.Qos(0, 0, false)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
consumer, err = amqpChan.Consume(name, "", true, false, false, false, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
Загрузка…
Ссылка в новой задаче