Merge pull request #4 from mozilla/agentupgrade

Agent upgrade protocol
This commit is contained in:
Julien Vehent 2014-04-17 14:11:15 -04:00
Родитель 586342378e 20ad452bdf
Коммит 5bef4af1c5
22 изменённых файлов: 1128 добавлений и 75 удалений

Просмотреть файл

@ -28,6 +28,7 @@ GOCFLAGS :=
MKDIR := mkdir
INSTALL := install
all: mig-agent mig-scheduler mig-action-generator mig-action-verifier
mig-agent:
@ -81,15 +82,28 @@ rpm-agent: mig-agent
# Bonus FPM options
# --rpm-digest sha512 --rpm-sign
rm -fr tmp
$(INSTALL) -D -m 0755 $(BINDIR)/mig-agent tmp/sbin/mig-agent
$(INSTALL) -D -m 0755 $(BINDIR)/mig-agent tmp/sbin/mig-agent-$(BUILDREV)
$(MKDIR) -p tmp/var/cache/mig
# Agent auto install startup scripts, so we just need to execute it once as priviligied user
echo -en "#!/bin/sh\n/sbin/mig-agent" > tmp/agent_install.sh
# Agent auto install startup scripts, so we just need to execute it once as priviliged user
echo -en "#!/bin/sh\nrm /sbin/mig-agent\nln -s /sbin/mig-agent-$(BUILDREV) /sbin/mig-agent\n/sbin/mig-agent" > tmp/agent_install.sh
chmod 0755 tmp/agent_install.sh
fpm -C tmp -n mig-agent --license GPL --vendor mozilla --description "Mozilla InvestiGator Agent" \
--url https://github.com/mozilla/mig --after-install tmp/agent_install.sh \
-s dir -t rpm .
deb-agent: mig-agent
# Bonus FPM options
# --rpm-digest sha512 --rpm-sign
rm -fr tmp
$(INSTALL) -D -m 0755 $(BINDIR)/mig-agent tmp/sbin/mig-agent-$(BUILDREV)
$(MKDIR) -p tmp/var/cache/mig
# Agent auto install startup scripts, so we just need to execute it once as priviliged user
echo -en "#!/bin/sh\nrm /sbin/mig-agent\nln -s /sbin/mig-agent-$(BUILDREV) /sbin/mig-agent\n/sbin/mig-agent" > tmp/agent_install.sh
chmod 0755 tmp/agent_install.sh
fpm -C tmp -n mig-agent --license GPL --vendor mozilla --description "Mozilla InvestiGator Agent" \
--url https://github.com/mozilla/mig --after-install tmp/agent_install.sh \
-s dir -t deb .
rpm-scheduler: mig-scheduler
rm -rf tmp
$(INSTALL) -D -m 0755 $(BINDIR)/mig-scheduler tmp/sbin/mig-scheduler

Просмотреть файл

@ -76,7 +76,7 @@ var AGENTACL = [...]string{
}
}`,
`{
"pidkill": {
"agentdestroy": {
"minimumweight": 1,
"investigators": {
"MIG Scheduler": {

Просмотреть файл

@ -2,6 +2,7 @@
[agent]
timeout = "20m"
heartbeatfreq = "5m"
whitelist = "/var/cache/mig/agents_whitelist.txt"
[collector]
@ -47,3 +48,6 @@
; host = "localhost"
; port = 514
; protocol = "udp"
[pgp]
keyid = "1E644752FB76B77245B1694E556CDD7B07E9D5D6"

Просмотреть файл

@ -298,6 +298,105 @@ Agent registration process
Agent upgrade process
---------------------
MIG supports upgrading agents in the wild. The upgrade protocol is designed with
security in mind. The flow diagram below presents a high-level view:
::
Investigator Scheduler Agent NewAgent FileServer
+-----------+ +-------+ +---+ +------+ +--------+
| | | | |
| 1.initiate | | | |
|------------------>| | | |
| | 2.send command | | |
| |------------------>| 3.verify | |
| | |--------+ | |
| | | | | |
| | | | | |
| | |<-------+ | |
| | | | |
| | | 4.download | |
| | |-------------------------------------->|
| | | | |
| | | 5.checksum | |
| | |--------+ | |
| | | | | |
| | | | | |
| | |<-------+ | |
| | | | |
| | | 6.exec | |
| | |------------------>| |
| | 7.return own PID | | |
| |<------------------| | |
| | | | |
| |------+ 8.mark | | |
| | | agent as | | |
| | | upgraded | | |
| |<-----+ | | |
| | | | |
| | 9.register | | |
| |<--------------------------------------| |
| | | | |
| |------+10.find dup | | |
| | |agents in | | |
| | |registrations | |
| |<-----+ | | |
| | | | |
| | 11.send command to kill PID old agt| |
| |-------------------------------------->| |
| | | | |
| | 12.acknowledge | | |
| |<--------------------------------------| |
All upgrade operations are initiated by an investigator (1). The upgrade is
triggered by an action to the upgrade module with the following parameters:
.. code:: json
"Operations": [
{
"Module": "upgrade",
"Parameters": {
"linux/amd64": {
"to_version": "16eb58b-201404021544",
"location": "http://localhost/mig/bin/linux/amd64/mig-agent",
"checksum": "31fccc576635a29e0a27bbf7416d4f32a0ebaee892475e14708641c0a3620b03"
}
}
}
],
* Each OS family and architecture have their own parameters (ex: "linux/amd64",
"darwin/amd64", "windows/386", ...). Then, in each OS/Arch group, we have:
* to_version is the version an agent should upgrade to
* location points to a HTTPS address that contains the agent binary
* checksum is a SHA256 hash of the agent binary to be verified after download
The parameters above are signed using a standard PGP action signature.
The upgrade action is forwarded to agents (2) like any other action. The action
signature is verified by the agent (3), and the upgrade module is called. The
module downloads the new binary (4), verifies the version and checksum (5) and
installs itself on the system.
Assuming everything checks in, the old agent executes the binary of the new
agent (6). At that point, two agents are running on the same machine, and the
rest of the protocol is designed to shut down the old agent, and clean up.
After executing the new agent, the old agent returns a successful result to the
scheduler, and includes its own PID in the results.
The new agent starts by registering with the scheduler (7). This tells the
scheduler that two agents are running on the same node, and one of them must
terminate. The scheduler sends a kill action to both agents with the PID of the
old agent (8). The kill action may be executed twice, but that doesn't matter.
When the scheduler receives the kill results (9), it sends a new action to check
for `mig-agent` processes (10). Only one should be found in the results (11),
and if that is the case, the scheduler tells the agent to remove the binary of
the old agent (12). When the agent returns (13), the upgrade protocol is done.
If the PID of the old agent lingers on the system, an error is logged for the
investigator to decide what to do next. The scheduler does not attempt to clean
up the situation.
Agent command execution flow
----------------------------

Просмотреть файл

@ -157,6 +157,67 @@ notification routine, and delete old files after a grace period.
; and invalid actions are kept
deleteafter = "72h"
PGP
~~~
The scheduler uses a PGP key to sign agent destruction actions during the agent
upgrade protocol. Therefore, when deployed a scheduler, a key must be generated
with the command `gpg --gen-key`.
The fingerprint of the key must then be added in two places:
1. In the scheduler configuration file `mig-scheduler.cfg`.
First, obtain the fingerprint using the `gpg` command line.
.. code:: bash
$ gpg --fingerprint --with-colons 'MIG scheduler stage1 (NOT PRODUCTION)' |grep '^fpr'|cut -f 10 -d ':'
1E644752FB76B77245B1694E556CDD7B07E9D5D6
Then add the fingerprint in the scheduler configuration file.
::
[pgp]
keyid = "1E644752FB76B77245B1694E556CDD7B07E9D5D6"
2. In the ACL of the agent configuration file `conf/mig-agent-conf.go`:
::
var AGENTACL = [...]string{
`{
"agentdestroy": {
"minimumweight": 1,
"investigators": {
"MIG Scheduler": {
"fingerprint": "1E644752FB76B77245B1694E556CDD7B07E9D5D6",
"weight": 1
}
}
}
}`,
}
And add the public PGP key of the scheduler as well:
::
// PGP public keys that are authorized to sign actions
var PUBLICPGPKEYS = [...]string{
`
-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1. Name: MIG Scheduler
mQENBFF/69EBCADe79sqUKJHXTMW3tahbXPdQAnpFWXChjI9tOGbgxmse1eEGjPZ
QPFOPgu3O3iij6UOVh+LOkqccjJ8gZVLYMJzUQC+2RJ3jvXhti8xZ1hs2iEr65Rj
zUklHVZguf2Zv2X9Er8rnlW5xzplsVXNWnVvMDXyzx0ufC00dDbCwahLQnv6Vqq8
BdUCSrvo/r7oAims8SyWE+ZObC+rw7u01Sut0ctnYrvklaM10+zkwGNOTszrduUy
.....
`
}
RabbitMQ Configuration
----------------------

Просмотреть файл

@ -500,4 +500,19 @@ heartbeat.
database123.example.com linux.database123.example.com.55tjdn0fsrdaf Wed Feb 12 2014 15:49:43 GMT+0000 (UTC)
firewall55.example.net linux.firewall55.example.net.55ub9eh81igbi Wed Feb 12 2014 15:48:29 GMT+0000 (UTC)
List active agents by version
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We list the agents in the **registrations** collections and print the details,
sorted by version number.
.. code:: javascript
> var agents = db.registrations.find({ heartbeatts: {$gt: new Date(ISODate().getTime() - 1000 * 60 * 10)}}).sort({ version: 1});
> agents.forEach( function(agent) { print(agent.version, "\t", agent.name); } )
8e73e69-201404171134 agt1
8e73e69-201404171239 agt58
...
See MongoDB reference documentation for a full explanation of the query language.

Просмотреть файл

@ -0,0 +1,19 @@
{
"Name": "Kill and remove an agent at the end of the Upgrade protocol",
"Description": {
"Author": "Julien Vehent",
"Email": "jvehent@mozilla.com",
"Revision": 201404151300
},
"Target": "linux",
"Operations": [
{
"Module": "agentdestroy",
"Parameters": {
"pid": 28803,
"version": "16eb58b-201404021544"
}
}
],
"SyntaxVersion": 1
}

Просмотреть файл

@ -89,24 +89,40 @@
}
}
},
{
"module": "connected",
"parameters": {
"check for connected IPs": [
"98.143.145.80",
"96.46.4.237"
]
}
},
{
"module": "upgrade",
"parameters": {
"to_version": "b9536d2-201403031435",
"location": "https://download.mig.example.net/mig-agent-b9536d2-201403031435",
"checksum": "c59d4eaeac728671c635ff645014e2afa935bebffdb5fbd207ffdeab"
}
},
{
"module": "process",
"parameters": {
"look for running process that belongs to rootkit (not implemented)": [
"look for running process that belongs to rootkits": [
"/usr/libexec/rootkitd",
"/opt/rootkit/stealth_dangerous"
]
}
},
{
"module": "connected_ip",
"module": "agentdestroy",
"parameters": {
"check for connected IPs (not implemented)": [
"98.143.145.80",
"96.46.4.237"
]
"pid": 12345,
"version": "b9536d2-201403031435"
}
}
],
"syntaxversion": 1
}

Просмотреть файл

@ -0,0 +1,22 @@
{
"Name": "Upgrade a specific agent",
"Description": {
"Author": "Julien Vehent",
"Email": "jvehent@mozilla.com",
"Revision": 201404021600
},
"Target": "linux",
"Operations": [
{
"Module": "upgrade",
"Parameters": {
"linux/amd64": {
"to_version": "16eb58b-201404021544",
"location": "http://localhost/mig/bin/linux/amd64/mig-agent",
"checksum": "31fccc576635a29e0a27bbf7416d4f32a0ebaee892475e14708641c0a3620b03"
}
}
}
],
"SyntaxVersion": 1
}

Просмотреть файл

@ -44,15 +44,15 @@ type ACL []Permission
type Permission map[string]struct {
MinimumWeight int
Investigators map[string]struct{
Investigators map[string]struct {
Fingerprint string
Weight int
Weight int
}
}
// verifyPermission controls that the PGP keys, identified by their fingerprints, that
// signed an operation are sufficient to allow this operation to run
func verifyPermission(operation operation, permName string, perm Permission, fingerprints []string) (err error) {
func verifyPermission(operation Operation, permName string, perm Permission, fingerprints []string) (err error) {
if perm[permName].MinimumWeight < 1 {
return fmt.Errorf("Invalid permission '%s'. Must require at least 1 signature, has %d",
permName, perm[permName].MinimumWeight)

Просмотреть файл

@ -78,18 +78,18 @@ type Action struct {
ID uint64 `json:"id"`
Name string `json:"name"`
Target string `json:"target"`
Description description `json:"description"`
Threat threat `json:"threat"`
Description Description `json:"description"`
Threat Threat `json:"threat"`
ValidFrom time.Time `json:"validfrom"`
ExpireAfter time.Time `json:"expireafter"`
Operations []operation `json:"operations"`
Operations []Operation `json:"operations"`
PGPSignatures []string `json:"pgpsignatures"`
SyntaxVersion int `json:"syntaxversion"`
}
// a description is a simple object that contains detail about the
// action's author, and it's revision.
type description struct {
type Description struct {
Author string `json:"author"`
Email string `json:"email"`
URL string `json:"url"`
@ -98,7 +98,7 @@ type description struct {
// a threat provides the investigator with an idea of how dangerous
// a the compromission might be, if the indicators return positive
type threat struct {
type Threat struct {
Level string `json:"level"`
Family string `json:"family"`
}
@ -106,7 +106,7 @@ type threat struct {
// an operation is an object that map to an agent module.
// the parameters of the operation are passed to the module as argument,
// and thus their format depend on the module itself.
type operation struct {
type Operation struct {
Module string `json:"module"`
Parameters interface{} `json:"parameters"`
}

Просмотреть файл

@ -45,8 +45,10 @@ import (
"fmt"
"github.com/streadway/amqp"
"mig"
"mig/modules/agentdestroy"
"mig/modules/connected"
"mig/modules/filechecker"
"mig/modules/upgrade"
"os"
"os/exec"
"strings"
@ -76,8 +78,14 @@ func main() {
var mode = flag.String("m", "agent", "Module to run (eg. agent, filechecker).")
var file = flag.String("i", "/path/to/file", "Load action from file")
var foreground = flag.Bool("f", false, "Agent will run in background by default. Except if this flag is set, or if LOGGING.Mode is stdout. All other modules run in foreground by default.")
var showversion = flag.Bool("V", false, "Print Agent version and exit.")
flag.Parse()
if *showversion {
fmt.Println(version)
os.Exit(0)
}
// run the agent, and exit when done
if *mode == "agent" && *file == "/path/to/file" {
err := runAgent(*foreground)
@ -124,6 +132,12 @@ func runModuleDirectly(mode string, args []byte) (err error) {
case "filechecker":
fmt.Println(filechecker.Run(args))
os.Exit(0)
case "agentdestroy":
fmt.Println(agentdestroy.Run(args))
os.Exit(0)
case "upgrade":
fmt.Println(upgrade.Run(args))
os.Exit(0)
default:
fmt.Println("Module", mode, "is not implemented")
}
@ -267,7 +281,7 @@ func parseCommands(ctx Context, msg []byte) (err error) {
// pass the module operation object to the proper channel
switch operation.Module {
case "connected", "filechecker":
case "connected", "filechecker", "upgrade", "agentdestroy":
// send the operation to the module
ctx.Channels.RunAgentCommand <- currentOp
opsCounter++
@ -435,6 +449,7 @@ func keepAliveAgent(ctx Context) (err error) {
Name: ctx.Agent.Hostname,
OS: ctx.Agent.OS,
Version: version,
PID: os.Getpid(),
QueueLoc: ctx.Agent.QueueLoc,
StartTime: time.Now(),
}

Просмотреть файл

@ -331,7 +331,7 @@ func initMQ(orig_ctx Context) (ctx Context, err error) {
var dialConfig amqp.Config
dialConfig.Heartbeat = 2 * ctx.Sleeper
dialConfig.Dial = func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 2 * ctx.Sleeper)
return net.DialTimeout(network, addr, 2*ctx.Sleeper)
}
if ctx.MQ.UseTLS {

Просмотреть файл

@ -1,43 +0,0 @@
/* Mozilla InvestiGator Fetcher
Version: MPL 1.1/GPL 2.0/LGPL 2.1
The contents of this file are subject to the Mozilla Public License Version
1.1 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.mozilla.org/MPL/
Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the
License.
The Initial Developer of the Original Code is
Mozilla Corporation
Portions created by the Initial Developer are Copyright (C) 2013
the Initial Developer. All Rights Reserved.
Contributor(s):
Julien Vehent jvehent@mozilla.com [:ulfr]
Alternatively, the contents of this file may be used under the terms of
either the GNU General Public License Version 2 or later (the "GPL"), or
the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
in which case the provisions of the GPL or the LGPL are applicable instead
of those above. If you wish to allow use of your version of this file only
under the terms of either the GPL or the LGPL, and not to allow others to
use your version of this file under the terms of the MPL, indicate your
decision by deleting the provisions above and replace them with the notice
and other provisions required by the GPL or the LGPL. If you do not delete
the provisions above, a recipient may use your version of this file under
the terms of any one of the MPL, the GPL or the LGPL.
*/
package main
import(
"io"
"net/http"
"code.google.com/p/go.net/websocket"
)

Просмотреть файл

@ -0,0 +1,133 @@
/* Kill an agent and remove its binary
Version: MPL 1.1/GPL 2.0/LGPL 2.1
The contents of this file are subject to the Mozilla Public License Version
1.1 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.mozilla.org/MPL/
Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the
License.
The Initial Developer of the Original Code is
Mozilla Corporation
Portions created by the Initial Developer are Copyright (C) 2014
the Initial Developer. All Rights Reserved.
Contributor(s):
Julien Vehent jvehent@mozilla.com [:ulfr]
Alternatively, the contents of this file may be used under the terms of
either the GNU General Public License Version 2 or later (the "GPL"), or
the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
in which case the provisions of the GPL or the LGPL are applicable instead
of those above. If you wish to allow use of your version of this file only
under the terms of either the GPL or the LGPL, and not to allow others to
use your version of this file under the terms of the MPL, indicate your
decision by deleting the provisions above and replace them with the notice
and other provisions required by the GPL or the LGPL. If you do not delete
the provisions above, a recipient may use your version of this file under
the terms of any one of the MPL, the GPL or the LGPL.
*/
package agentdestroy
import (
"encoding/json"
"fmt"
"os"
"regexp"
"runtime"
)
type Parameters struct {
PID int `json:"pid"`
Version string `json:"version"`
}
func NewParameters() (p Parameters) {
return
}
type Results struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
func (p Parameters) Validate() (err error) {
if p.PID < 2 || p.PID > 65535 {
return fmt.Errorf("PID '%s' is not in the range [2:65535]", p.PID)
}
versionre := regexp.MustCompile(`^[a-z0-9]{7}-[0-9]{12}$`)
if !versionre.MatchString(p.Version) {
return fmt.Errorf("Parameter 'bin_version' with value '%s' is invalid. Expecting version.", p.Version)
}
return
}
func Run(Args []byte) string {
params := NewParameters()
var results Results
err := json.Unmarshal(Args, &params)
if err != nil {
panic(err)
}
err = params.Validate()
if err != nil {
panic(err)
}
// Refuse to suicide
if params.PID == os.Getppid() {
results.Success = false
results.Error = fmt.Sprintf("PID '%d' is mine. Refusing to suicide.", params.PID)
return buildResults(results)
}
// First kill the agent PID
results.Success = true
process, err := os.FindProcess(params.PID)
if err != nil {
results.Error = fmt.Sprintf("PID '%d' not found. Returned error '%v'", params.PID, err)
results.Success = false
return buildResults(results)
} else {
err = process.Kill()
if err != nil {
results.Error = fmt.Sprintf("PID '%d' not killed. Returned error '%v'", params.PID, err)
results.Success = false
return buildResults(results)
}
}
// Then remove the agent binary
var binary string
switch runtime.GOOS {
case "linux", "darwin", "freebsd", "openbsd", "netbsd":
binary = fmt.Sprintf("/sbin/mig-agent-%s", params.Version)
case "windows":
binary = fmt.Sprintf("C:/Windows/mig-agent-%s.exe", params.Version)
default:
results.Error = fmt.Sprintf("'%s' isn't a supported OS", runtime.GOOS)
results.Success = false
return buildResults(results)
}
err = os.Remove(binary)
if err != nil {
panic(err)
}
return buildResults(results)
}
func buildResults(results Results) (jsonResults string) {
jsonOutput, err := json.Marshal(results)
if err != nil {
panic(err)
}
return string(jsonOutput[:])
}

Просмотреть файл

@ -0,0 +1,317 @@
/* Upgrade a MIG agent
Version: MPL 1.1/GPL 2.0/LGPL 2.1
The contents of this file are subject to the Mozilla Public License Version
1.1 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.mozilla.org/MPL/
Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the
License.
The Initial Developer of the Original Code is
Mozilla Corporation
Portions created by the Initial Developer are Copyright (C) 2014
the Initial Developer. All Rights Reserved.
Contributor(s):
Julien Vehent jvehent@mozilla.com [:ulfr]
Alternatively, the contents of this file may be used under the terms of
either the GNU General Public License Version 2 or later (the "GPL"), or
the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
in which case the provisions of the GPL or the LGPL are applicable instead
of those above. If you wish to allow use of your version of this file only
under the terms of either the GPL or the LGPL, and not to allow others to
use your version of this file under the terms of the MPL, indicate your
decision by deleting the provisions above and replace them with the notice
and other provisions required by the GPL or the LGPL. If you do not delete
the provisions above, a recipient may use your version of this file under
the terms of any one of the MPL, the GPL or the LGPL.
*/
package upgrade
import (
"crypto/sha256"
"encoding/json"
"fmt"
"hash"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"regexp"
"runtime"
"time"
)
type Parameters struct {
Elements map[string]map[string]string `json:"elements"`
}
func NewParameters() (p Parameters) {
return
}
type Results struct {
Success bool `json:"success"`
OldPID int `json:"oldpid"`
Error string `json:"error,omitempty"`
Statistics Statistics `json:"statistics,omitempty"`
}
func (p Parameters) Validate() (err error) {
versionre := regexp.MustCompile(`^[a-z0-9]{7}-[0-9]{12}$`)
locre := regexp.MustCompile(`^https?://`)
checksumre := regexp.MustCompile(`^[a-zA-Z0-9]{64}$`)
for k, el := range p.Elements {
if !versionre.MatchString(el["to_version"]) {
return fmt.Errorf("In %s, parameter 'to_version' with value '%s' is invalid. Expecting version.", k, el["to_version"])
}
if !locre.MatchString(el["location"]) {
return fmt.Errorf("In %s, parameter 'location' with value '%s' is invalid. Expecting URL.", k, el["location"])
}
if !checksumre.MatchString(el["checksum"]) {
return fmt.Errorf("In %s, parameter 'checksum' with value '%s' is invalid. Expecting SHA256 checksum.", k, el["checksum"])
}
}
return
}
var stats Statistics
type Statistics struct {
DownloadTime string `json:"downloadtime"`
DownloadSize int64 `json:"downloadsize"`
}
func Run(Args []byte) string {
p := NewParameters()
err := json.Unmarshal(Args, &p.Elements)
if err != nil {
panic(err)
}
err = p.Validate()
if err != nil {
return buildResults(p, fmt.Sprintf("%v", err))
}
// Extract the parameters that apply to this OS and Arch
key := fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH)
el, ok := p.Elements[key]
if !ok {
return buildResults(p, fmt.Sprintf("No parameter found for %s", key))
}
// Verify that the version we're told to upgrade to isn't the current one
cversion, err := getCurrentVersion()
if err != nil {
return buildResults(p, fmt.Sprintf("%v", err))
}
fmt.Println(cversion)
if cversion == el["to_version"] {
return buildResults(p, fmt.Sprintf("Agent is already running version '%s'", cversion))
}
// Download new agent binary from provided location
binfd, err := downloadBinary(el["location"])
if err != nil {
return buildResults(p, fmt.Sprintf("%v", err))
}
// Verify checksum of the binary
err = verifyChecksum(binfd, el["checksum"])
if err != nil {
return buildResults(p, fmt.Sprintf("%v", err))
}
// grab the path before closing the file descriptor
binPath := binfd.Name()
err = binfd.Close()
if err != nil {
return buildResults(p, fmt.Sprintf("%v", err))
}
// Dry run of the binary to verify that the version is correct
// but also that it can run without error
err = verifyVersion(binPath, el["to_version"])
if err != nil {
return buildResults(p, fmt.Sprintf("%v", err))
}
// Move the binary of the new agent from tmp, to the correct destination
agentBinPath, err := moveBinary(binPath, el["to_version"])
if err != nil {
return buildResults(p, fmt.Sprintf("%v", err))
}
// Launch the new agent and exit the module
_, err = exec.Command(agentBinPath).Output()
if err != nil {
return buildResults(p, fmt.Sprintf("%v", err))
}
return buildResults(p, "")
}
// Run the agent binary to obtain the current version
func getCurrentVersion() (cversion string, err error) {
cdir, err := os.Getwd()
if err != nil {
panic(err)
}
bin := cdir + "/" + os.Args[0]
out, err := exec.Command(bin, "-V").Output()
cversion = string(out[:len(out)-1])
return
}
// downloadBinary retrieves the data from a location and saves it to a temp file
func downloadBinary(loc string) (tmpfd *os.File, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("downloadBinary() -> %v", e)
}
}()
tmpfd, err = ioutil.TempFile("", "")
if err != nil {
panic(err)
}
start := time.Now()
resp, err := http.Get(loc)
if err != nil {
panic(err)
}
stats.DownloadSize, err = io.Copy(tmpfd, resp.Body)
stats.DownloadTime = time.Since(start).String()
resp.Body.Close()
return
}
// verifyChecksum computes the hash of a file and compares it
// to a checksum. If comparison fails, it returns an error.
func verifyChecksum(fd *os.File, checksum string) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("verifyChecksum() -> %v", e)
}
}()
var h hash.Hash
h = sha256.New()
buf := make([]byte, 4096)
var offset int64 = 0
for {
block, err := fd.ReadAt(buf, offset)
if err != nil && err != io.EOF {
panic(err)
}
if block == 0 {
break
}
h.Write(buf[:block])
offset += int64(block)
}
hexhash := fmt.Sprintf("%x", h.Sum(nil))
if hexhash != checksum {
return fmt.Errorf("Checksum validation failed. Got '%s', Expected '%s'.",
hexhash, checksum)
}
return
}
// verifyVersion runs a binary and compares the returned version
func verifyVersion(binPath, expectedVersion string) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("verifyVersion() -> %v", e)
}
}()
os.Chmod(binPath, 0750)
out, err := exec.Command(binPath, "-V").Output()
if err != nil {
panic(err)
}
binVersion := string(out[:len(out)-1])
if binVersion != expectedVersion {
return fmt.Errorf("Version mismatch. Got '%s', Expected '%s'.",
binVersion, expectedVersion)
}
return
}
func moveBinary(binPath, version string) (linkloc string, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("moveBinary() -> %v", e)
}
}()
var target string
switch runtime.GOOS {
case "linux", "darwin", "freebsd", "openbsd", "netbsd":
target = fmt.Sprintf("/sbin/mig-agent-%s", version)
linkloc = "/sbin/mig-agent"
case "windows":
target = fmt.Sprintf("C:/Windows/mig-agent-%s.exe", version)
linkloc = "C:/Windows/mig-agent"
default:
err = fmt.Errorf("'%s' isn't a supported OS", runtime.GOOS)
return
}
// copy the file (rename may not work if we're crossing partitions)
srcfd, err := os.Open(binPath)
if err != nil {
panic(err)
}
dstfd, err := os.Create(target)
if err != nil {
panic(err)
}
_, err = io.Copy(dstfd, srcfd)
if err != nil {
panic(err)
}
srcfd.Close()
dstfd.Close()
err = os.Remove(binPath)
if err != nil {
panic(err)
}
err = os.Chmod(target, 0750)
if err != nil {
panic(err)
}
// don't fail on removal of existing link, it may not exist
os.Remove(linkloc)
// create a symlink
err = os.Symlink(target, linkloc)
if err != nil {
panic(err)
}
return
}
// buildResults transforms the ConnectedIPs map into a Results
// map that is serialized in JSON and returned as a string
func buildResults(params Parameters, errors string) string {
var results Results
results.OldPID = os.Getppid()
if errors != "" {
results.Error = errors
} else {
results.Success = true
}
results.Statistics = stats
jsonOutput, err := json.Marshal(results)
if err != nil {
panic(err)
}
return string(jsonOutput[:])
}

Просмотреть файл

@ -58,13 +58,14 @@ type Context struct {
OpID uint64 // ID of the current operation, used for tracking
Agent struct {
// configuration
TimeOut, Whitelist string
TimeOut, HeartbeatFreq, Whitelist string
}
Channels struct {
// internal
Terminate chan error
Log chan mig.Log
NewAction, ActionDone, CommandReady, UpdateCommand, CommandReturned, CommandDone chan string
DetectDupAgents chan string
}
Collector struct {
Freq, DeleteAfter string
@ -103,6 +104,9 @@ type Context struct {
conn *amqp.Connection
Chan *amqp.Channel
}
PGP struct {
KeyID string
}
Stats struct {
}
Logging mig.Logging
@ -345,6 +349,7 @@ func initChannels(orig_ctx Context) (ctx Context, err error) {
ctx.Channels.UpdateCommand = make(chan string, 599)
ctx.Channels.CommandReturned = make(chan string, 271)
ctx.Channels.CommandDone = make(chan string, 641)
ctx.Channels.DetectDupAgents = make(chan string, 29)
ctx.Channels.Log = make(chan mig.Log, 601)
ctx.Channels.Log <- mig.Log{Desc: "leaving initChannels()"}.Debug()
return

Просмотреть файл

@ -44,6 +44,11 @@ import (
"time"
)
// agentRegId is used to retrieve the internal mongodb ID of an agen'ts registration
type agentRegId struct {
Id bson.ObjectId `bson:"_id,omitempty"`
}
// pickUpAliveAgents lists agents that have recent keepalive in the
// database, and start listening queues for them
func pickUpAliveAgents(ctx Context) (err error) {
@ -172,17 +177,42 @@ func getKeepAlives(msg amqp.Delivery, ctx Context) (err error) {
// try to find an existing entry to update, or create a new one
// and save registration in database
_, err = ctx.DB.Col.Reg.Upsert(
// search string
bson.M{"name": ka.Name, "os": ka.OS, "queueloc": ka.QueueLoc},
// update string
bson.M{"name": ka.Name, "os": ka.OS, "queueloc": ka.QueueLoc,
"heartbeatts": ka.HeartBeatTS, "starttime": ka.StartTime})
var ids []agentRegId
iter := ctx.DB.Col.Reg.Find(bson.M{"name": ka.Name, "os": ka.OS, "queueloc": ka.QueueLoc, "pid": ka.PID, "version": ka.Version}).Iter()
err = iter.All(&ids)
if err != nil {
panic(err)
}
switch {
case len(ids) == 0:
// no registration for this agent in database, create one
err = ctx.DB.Col.Reg.Insert(bson.M{"name": ka.Name, "os": ka.OS, "queueloc": ka.QueueLoc,
"pid": ka.PID, "version": ka.Version, "heartbeatts": ka.HeartBeatTS, "starttime": ka.StartTime})
if err != nil {
panic(err)
}
case len(ids) == 1:
// update existing registration for this agent
mgoId := ids[0].Id
err := ctx.DB.Col.Reg.Update(bson.M{"_id": mgoId}, bson.M{"$set": bson.M{"heartbeatts": ka.HeartBeatTS}})
if err != nil {
panic(err)
}
case len(ids) > 1:
// more than one registration is a problem !
desc := fmt.Sprintf("%d agents match this registration in database. That's a problem!", len(ids))
panic(desc)
}
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("getKeepAlives() KeepAlive for Agent '%s' updated in DB", ka.Name)}.Debug()
// If multiple agents are listening on the same queue, alert the cleanup routine
agtCnt, _, err := findDupAgents(ka.QueueLoc, ctx)
if err != nil {
panic(err)
}
if agtCnt > 1 {
ctx.Channels.DetectDupAgents <- ka.QueueLoc
}
return
}
@ -241,3 +271,30 @@ func startAgentListener(reg mig.KeepAlive, ctx Context) (err error) {
return
}
// findDupAgents counts agents that are listening on a given queue and
// have sent a heartbeat in recent times, to detect systems that are running
// two or more agents
func findDupAgents(queueLoc string, ctx Context) (count int, agents []mig.KeepAlive, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("findDupAgents() -> %v", e)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving findDupAgents()"}.Debug()
}()
// retrieve agents that have sent in heartbeat in twice their heartbeat time
period, err := time.ParseDuration(ctx.Agent.HeartbeatFreq)
if err != nil {
panic(err)
}
since := time.Now().Add(-period * 2)
iter := ctx.DB.Col.Reg.Find(
bson.M{"heartbeatts": bson.M{"$gte": since}, "queueloc": queueLoc},
).Iter()
agents = []mig.KeepAlive{}
err = iter.All(&agents)
if err != nil {
panic(err)
}
return len(agents), agents, err
}

Просмотреть файл

@ -0,0 +1,99 @@
/* Mozilla InvestiGator Scheduler
Version: MPL 1.1/GPL 2.0/LGPL 2.1
The contents of this file are subject to the Mozilla Public License Version
1.1 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.mozilla.org/MPL/
Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the
License.
The Initial Developer of the Original Code is
Mozilla Corporation
Portions created by the Initial Developer are Copyright (C) 2013
the Initial Developer. All Rights Reserved.
Contributor(s):
Julien Vehent jvehent@mozilla.com [:ulfr]
Alternatively, the contents of this file may be used under the terms of
either the GNU General Public License Version 2 or later (the "GPL"), or
the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
in which case the provisions of the GPL or the LGPL are applicable instead
of those above. If you wish to allow use of your version of this file only
under the terms of either the GPL or the LGPL, and not to allow others to
use your version of this file under the terms of the MPL, indicate your
decision by deleting the provisions above and replace them with the notice
and other provisions required by the GPL or the LGPL. If you do not delete
the provisions above, a recipient may use your version of this file under
the terms of any one of the MPL, the GPL or the LGPL.
*/
package main
import (
"fmt"
"mig"
"time"
)
// inspectMultiAgents takes a number of actions when several agents are found
// to be listening on the same queue. It will trigger an agentdestroy action
// for agents that are flagged as upgraded, and log alerts for agents that
// are not, such that an investigator can look at them.
func inspectMultiAgents(queueLoc string, ctx Context) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("inspectMultiAgents() -> %v", e)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving inspectMultiAgents()"}.Debug()
}()
agentsCount, agents, err := findDupAgents(queueLoc, ctx)
if agentsCount < 2 {
return
}
destroyedAgents := 0
leftAloneAgents := 0
for _, agent := range agents {
switch agent.Status {
case "upgraded":
// upgraded agents must die
err = destroyAgent(agent, ctx)
if err != nil {
panic(err)
}
destroyedAgents++
case "destroyed":
// if the agent has already been marked as destroyed, check if
// that was done longer than 2 heartbeats ago. If it did, the
// destruction failed, and we need to reissue a destruction order
hbFreq, err := time.ParseDuration(ctx.Agent.HeartbeatFreq)
if err != nil {
panic(err)
}
twoHeartbeats := time.Now().Add(-hbFreq * 2)
if agent.DestructionTime.Before(twoHeartbeats) {
err = destroyAgent(agent, ctx)
if err != nil {
panic(err)
}
destroyedAgents++
} else {
leftAloneAgents++
}
}
}
remainingAgents := agentsCount - destroyedAgents - leftAloneAgents
if remainingAgents > 1 {
// there's still some agents left, raise errors for these
desc := fmt.Sprintf("Found '%d' agents running on '%s'. Require manual inspection.", remainingAgents, queueLoc)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}.Warning()
}
return
}

Просмотреть файл

@ -53,7 +53,7 @@ import (
var version string
// the list of active agents is shared globally
// TODO: wrap this around mutexes for safety
// TODO: make this a database thing to work with scheduler clusters
var activeAgentsList []string
// main initializes the mongodb connection, the directory watchers and the
@ -196,6 +196,18 @@ func main() {
}()
ctx.Channels.Log <- mig.Log{Desc: "spoolInspection() routine started"}
// launch the routine that handles multi agents on same queue
go func() {
for queueLoc := range ctx.Channels.DetectDupAgents {
ctx.OpID = mig.GenID()
err = inspectMultiAgents(queueLoc, ctx)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()
}
}
}()
ctx.Channels.Log <- mig.Log{Desc: "inspectMultiAgents() routine started"}
// won't exit until this chan received something
exitReason := <-ctx.Channels.Terminate
fmt.Fprintf(os.Stderr, "Scheduler is shutting down. Reason: %s", exitReason)
@ -391,13 +403,25 @@ func prepareCommands(action mig.Action, ctx Context) (cmdIDs []uint64, err error
panic("0 targets found in database")
}
// loop over the list of targets and create a command for each
// loop over the list of targets and create one command for each agent queue
var targetedAgents []string
for _, target := range targets {
skip := false
for _, q := range targetedAgents {
if q == target.QueueLoc {
// if already done this agent, skip it
skip = true
}
}
if skip {
continue
}
cmdid, err := createCommand(ctx, action, target)
if err != nil {
panic(err)
}
cmdIDs = append(cmdIDs, cmdid)
targetedAgents = append(targetedAgents, target.QueueLoc)
}
return
}
@ -653,6 +677,15 @@ func updateAction(cmdPath string, ctx Context) (err error) {
panic(err)
}
// in case the action is related to upgrading agents, do stuff
if cmd.Status == "succeeded" {
// this can fail for many reason, do not panic on err return
err = markUpgradedAgents(cmd, ctx)
if err != nil {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: ea.Action.ID, CommandID: cmd.ID, Desc: fmt.Sprintf("%v", err)}.Err()
}
}
// remove the command from the spool
os.Remove(cmdPath)

Просмотреть файл

@ -0,0 +1,185 @@
/* Mozilla InvestiGator Scheduler
Version: MPL 1.1/GPL 2.0/LGPL 2.1
The contents of this file are subject to the Mozilla Public License Version
1.1 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.mozilla.org/MPL/
Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the
License.
The Initial Developer of the Original Code is
Mozilla Corporation
Portions created by the Initial Developer are Copyright (C) 2013
the Initial Developer. All Rights Reserved.
Contributor(s):
Julien Vehent jvehent@mozilla.com [:ulfr]
Alternatively, the contents of this file may be used under the terms of
either the GNU General Public License Version 2 or later (the "GPL"), or
the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
in which case the provisions of the GPL or the LGPL are applicable instead
of those above. If you wish to allow use of your version of this file only
under the terms of either the GPL or the LGPL, and not to allow others to
use your version of this file under the terms of the MPL, indicate your
decision by deleting the provisions above and replace them with the notice
and other provisions required by the GPL or the LGPL. If you do not delete
the provisions above, a recipient may use your version of this file under
the terms of any one of the MPL, the GPL or the LGPL.
*/
package main
import (
"encoding/json"
"fmt"
"labix.org/v2/mgo/bson"
"mig"
"mig/pgp/sign"
"reflect"
"time"
)
// Check the action that was processed, and if it's related to upgrading agents
// extract the command results, grab the PID of the agents that was upgraded,
// and mark the agent registration in the database as 'upgraded'
func markUpgradedAgents(cmd mig.Command, ctx Context) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("markUpgradedAgents() -> %v", e)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID, Desc: "leaving markUpgradedAgents()"}.Debug()
}()
for _, operation := range cmd.Action.Operations {
if operation.Module == "upgrade" {
for _, result := range cmd.Results {
reflection := reflect.ValueOf(result)
resultMap := reflection.Interface().(map[string]interface{})
_, ok := resultMap["success"]
if !ok {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID,
Desc: "Invalid operation results format. Missing 'success' key."}.Err()
panic(err)
}
success := reflect.ValueOf(resultMap["success"])
if !success.Bool() {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID,
Desc: "Upgrade operation failed. Agent not marked."}.Err()
panic(err)
}
_, ok = resultMap["oldpid"]
if !ok {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID,
Desc: "Invalid operation results format. Missing 'oldpid' key."}.Err()
panic(err)
}
oldpid := reflect.ValueOf(resultMap["oldpid"])
if oldpid.Float() < 2 {
desc := fmt.Sprintf("Successfully found upgraded action on agent '%s', but with PID '%s'. That's not right...",
cmd.AgentName, oldpid)
ctx.Channels.Log <- mig.Log{Desc: desc}.Err()
panic(desc)
}
// update the agent's registration to mark it as upgraded
var ids []agentRegId
iter := ctx.DB.Col.Reg.Find(bson.M{"queueloc": cmd.AgentQueueLoc, "pid": oldpid.Float()}).Iter()
err = iter.All(&ids)
if err != nil {
panic(err)
}
if len(ids) == 0 {
panic("No agent found in database")
}
mgoId := ids[0].Id
err := ctx.DB.Col.Reg.Update(bson.M{"_id": mgoId}, bson.M{"$set": bson.M{"status": "upgraded"}})
if err != nil {
panic(err)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID,
Desc: fmt.Sprintf("Agent '%s' marked as upgraded", cmd.AgentName)}.Info()
}
}
}
return
}
// destroyAgent issues an `agentdestroy` action targetted to a specific agent
// and updates the status of the agent in the database
func destroyAgent(agent mig.KeepAlive, ctx Context) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("destroyAgent() -> %v", e)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving destroyAgent()"}.Debug()
}()
// generate an `agentdestroy` action for this agent
killAction := mig.Action{
ID: mig.GenID(),
Name: fmt.Sprintf("Destroy agent %s", agent.Name),
Target: agent.QueueLoc,
ValidFrom: time.Now().Add(-60 * time.Second).UTC(),
ExpireAfter: time.Now().Add(30 * time.Minute).UTC(),
SyntaxVersion: 1,
}
var opparams struct {
PID int `json:"pid"`
Version string `json:"version"`
}
opparams.PID = agent.PID
opparams.Version = agent.Version
killOperation := mig.Operation{
Module: "agentdestroy",
Parameters: opparams,
}
killAction.Operations = append(killAction.Operations, killOperation)
// sign the action with the scheduler PGP key
str, err := killAction.String()
if err != nil {
panic(err)
}
pgpsig, err := sign.Sign(str, ctx.PGP.KeyID)
if err != nil {
panic(err)
}
killAction.PGPSignatures = append(killAction.PGPSignatures, pgpsig)
var jsonAction []byte
jsonAction, err = json.Marshal(killAction)
if err != nil {
panic(err)
}
// write the action to the spool for scheduling
dest := fmt.Sprintf("%s/%s", ctx.Directories.Action.New, mig.GenID())
err = safeWrite(ctx, dest, jsonAction)
if err != nil {
panic(err)
}
// mark the agent as `destroyed` in the database
var ids []agentRegId
iter := ctx.DB.Col.Reg.Find(bson.M{"queueloc": agent.QueueLoc, "pid": agent.PID}).Iter()
err = iter.All(&ids)
if err != nil {
panic(err)
}
if len(ids) == 0 {
panic("No agent found in database")
}
mgoId := ids[0].Id
err = ctx.DB.Col.Reg.Update(bson.M{"_id": mgoId},
bson.M{"$set": bson.M{"status": "destroyed", "destructiontime": time.Now().UTC()}})
if err != nil {
panic(err)
}
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Requested destruction of agent '%s' with PID '%d'", agent.Name, agent.PID)}.Info()
return
}

Просмотреть файл

@ -40,8 +40,10 @@ import (
)
type KeepAlive struct {
Name, QueueLoc, OS, Version string
StartTime, HeartBeatTS time.Time
Name, QueueLoc, OS, Version string
PID int
StartTime, DestructionTime, HeartBeatTS time.Time
Status string
}
type Binding struct {