зеркало из https://github.com/mozilla/mig.git
[major] postgres migration, take 2
This commit is contained in:
Родитель
a75b8b6200
Коммит
ca8df4f485
|
@ -54,7 +54,7 @@ var LOGGINGCONF = mig.Logging{
|
|||
// location of the rabbitmq server
|
||||
var AMQPBROKER string = "amqp://guest:guest@localhost:5672/"
|
||||
|
||||
// firequency at which the agent sends heartbeat messages
|
||||
// frequency at which the agent sends heartbeat messages
|
||||
var HEARTBEATFREQ time.Duration = 300 * time.Second
|
||||
|
||||
// timeout after which a module run is killed
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
host = "http://localhost:12345"
|
||||
baseroute = "/api/v1"
|
||||
|
||||
[openpgp]
|
||||
[pgp]
|
||||
pubring = "/var/cache/mig/.gnupg/pubring.gpg"
|
||||
|
||||
[directories]
|
||||
|
|
|
@ -55,3 +55,4 @@
|
|||
|
||||
[pgp]
|
||||
keyid = "1E644752FB76B77245B1694E556CDD7B07E9D5D6"
|
||||
pubring = "/path/to/pubring.pgp"
|
||||
|
|
Двоичные данные
doc/.files/ER-diagram.png
Двоичные данные
doc/.files/ER-diagram.png
Двоичный файл не отображается.
До Ширина: | Высота: | Размер: 49 KiB После Ширина: | Высота: | Размер: 48 KiB |
|
@ -0,0 +1,172 @@
|
|||
#! /usr/bin/env bash
|
||||
[ ! -x $(which sudo) ] && echo "sudo isn't available, that won't work" && exit 1
|
||||
|
||||
|
||||
for user in "migadmin" "migapi" "migscheduler"; do
|
||||
pass=$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c${1:-32})
|
||||
sudo su postgres -c "psql -c 'CREATE ROLE $user;'" 1>/dev/null
|
||||
[ $? -ne 0 ] && echo "ERROR: user creation failed." && exit 123
|
||||
sudo su postgres -c "psql -c \"ALTER ROLE $user WITH NOSUPERUSER INHERIT NOCREATEROLE NOCREATEDB LOGIN PASSWORD '$pass';\"" 1>/dev/null
|
||||
[ $? -ne 0 ] && echo "ERROR: user creation failed." && exit 123
|
||||
echo "Created user $user with password '$pass'"
|
||||
done
|
||||
sudo su postgres -c "psql -c 'CREATE DATABASE mig OWNER migadmin;'" 1>/dev/null
|
||||
[ $? -ne 0 ] && echo "ERROR: database creation failed." && exit 123
|
||||
|
||||
createdbtemp=$(mktemp)
|
||||
cat > $createdbtemp << EOF
|
||||
CREATE TABLE actions (
|
||||
id numeric NOT NULL,
|
||||
name character varying(2048) NOT NULL,
|
||||
target character varying(2048) NOT NULL,
|
||||
description json,
|
||||
threat json,
|
||||
operations json,
|
||||
validfrom timestamp with time zone NOT NULL,
|
||||
expireafter timestamp with time zone NOT NULL,
|
||||
starttime timestamp with time zone,
|
||||
finishtime timestamp with time zone,
|
||||
lastupdatetime timestamp with time zone,
|
||||
status character varying(256),
|
||||
sentctr integer,
|
||||
returnedctr integer,
|
||||
donectr integer,
|
||||
cancelledctr integer,
|
||||
failedctr integer,
|
||||
timeoutctr integer,
|
||||
pgpsignatures json,
|
||||
syntaxversion integer
|
||||
);
|
||||
ALTER TABLE public.actions OWNER TO migadmin;
|
||||
ALTER TABLE ONLY actions
|
||||
ADD CONSTRAINT actions_pkey PRIMARY KEY (id);
|
||||
|
||||
CREATE TABLE agents (
|
||||
id numeric NOT NULL,
|
||||
name character varying(2048) NOT NULL,
|
||||
queueloc character varying(2048) NOT NULL,
|
||||
os character varying(2048) NOT NULL,
|
||||
version character varying(2048) NOT NULL,
|
||||
pid integer NOT NULL,
|
||||
starttime timestamp with time zone NOT NULL,
|
||||
destructiontime timestamp with time zone,
|
||||
heartbeattime timestamp with time zone NOT NULL,
|
||||
status character varying(255)
|
||||
);
|
||||
ALTER TABLE public.agents OWNER TO migadmin;
|
||||
ALTER TABLE ONLY agents
|
||||
ADD CONSTRAINT agents_pkey PRIMARY KEY (id);
|
||||
|
||||
CREATE TABLE agtmodreq (
|
||||
moduleid numeric NOT NULL,
|
||||
agentid numeric NOT NULL,
|
||||
minimumweight integer NOT NULL
|
||||
);
|
||||
ALTER TABLE public.agtmodreq OWNER TO migadmin;
|
||||
CREATE UNIQUE INDEX agtmodreq_moduleid_agentid_idx ON agtmodreq USING btree (moduleid, agentid);
|
||||
CREATE INDEX agtmodreq_agentid_idx ON agtmodreq USING btree (agentid);
|
||||
CREATE INDEX agtmodreq_moduleid_idx ON agtmodreq USING btree (moduleid);
|
||||
|
||||
CREATE TABLE commands (
|
||||
id numeric NOT NULL,
|
||||
actionid numeric NOT NULL,
|
||||
agentid numeric NOT NULL,
|
||||
status character varying(255) NOT NULL,
|
||||
results json,
|
||||
starttime timestamp with time zone NOT NULL,
|
||||
finishtime timestamp with time zone
|
||||
);
|
||||
ALTER TABLE public.commands OWNER TO migadmin;
|
||||
ALTER TABLE ONLY commands
|
||||
ADD CONSTRAINT commands_pkey PRIMARY KEY (id);
|
||||
|
||||
CREATE TABLE invagtmodperm (
|
||||
investigatorid bigint NOT NULL,
|
||||
agentid numeric NOT NULL,
|
||||
moduleid numeric NOT NULL,
|
||||
weight integer NOT NULL
|
||||
);
|
||||
ALTER TABLE public.invagtmodperm OWNER TO migadmin;
|
||||
CREATE UNIQUE INDEX invagtmodperm_investigatorid_agentid_moduleid_idx ON invagtmodperm USING btree (investigatorid, agentid, moduleid);
|
||||
CREATE INDEX invagtmodperm_agentid_idx ON invagtmodperm USING btree (agentid);
|
||||
CREATE INDEX invagtmodperm_investigatorid_idx ON invagtmodperm USING btree (investigatorid);
|
||||
CREATE INDEX invagtmodperm_moduleid_idx ON invagtmodperm USING btree (moduleid);
|
||||
|
||||
CREATE TABLE investigators (
|
||||
id bigserial NOT NULL,
|
||||
name character varying(1024) NOT NULL,
|
||||
pgpfingerprint character varying(128),
|
||||
publickey bytea
|
||||
);
|
||||
ALTER TABLE public.investigators OWNER TO migadmin;
|
||||
ALTER TABLE ONLY investigators
|
||||
ADD CONSTRAINT investigators_pkey PRIMARY KEY (id);
|
||||
CREATE UNIQUE INDEX investigators_pgpfingerprint_idx ON investigators USING btree (pgpfingerprint);
|
||||
|
||||
CREATE TABLE modules (
|
||||
id numeric NOT NULL,
|
||||
name character varying(256) NOT NULL
|
||||
);
|
||||
ALTER TABLE public.modules OWNER TO migadmin;
|
||||
ALTER TABLE ONLY modules
|
||||
ADD CONSTRAINT modules_pkey PRIMARY KEY (id);
|
||||
|
||||
CREATE TABLE signatures (
|
||||
actionid numeric NOT NULL,
|
||||
investigatorid bigint NOT NULL,
|
||||
pgpsignature character varying(4096) NOT NULL
|
||||
);
|
||||
ALTER TABLE public.signatures OWNER TO migadmin;
|
||||
CREATE UNIQUE INDEX signatures_actionid_investigatorid_idx ON signatures USING btree (actionid, investigatorid);
|
||||
CREATE INDEX signatures_actionid_idx ON signatures USING btree (actionid);
|
||||
CREATE INDEX signatures_investigatorid_idx ON signatures USING btree (investigatorid);
|
||||
|
||||
ALTER TABLE ONLY agtmodreq
|
||||
ADD CONSTRAINT agtmodreq_moduleid_fkey FOREIGN KEY (moduleid) REFERENCES modules(id);
|
||||
|
||||
ALTER TABLE ONLY commands
|
||||
ADD CONSTRAINT commands_actionid_fkey FOREIGN KEY (actionid) REFERENCES actions(id);
|
||||
|
||||
ALTER TABLE ONLY commands
|
||||
ADD CONSTRAINT commands_agentid_fkey FOREIGN KEY (agentid) REFERENCES agents(id);
|
||||
|
||||
ALTER TABLE ONLY invagtmodperm
|
||||
ADD CONSTRAINT invagtmodperm_agentid_fkey FOREIGN KEY (agentid) REFERENCES agents(id);
|
||||
|
||||
ALTER TABLE ONLY invagtmodperm
|
||||
ADD CONSTRAINT invagtmodperm_investigatorid_fkey FOREIGN KEY (investigatorid) REFERENCES investigators(id);
|
||||
|
||||
ALTER TABLE ONLY invagtmodperm
|
||||
ADD CONSTRAINT invagtmodperm_moduleid_fkey FOREIGN KEY (moduleid) REFERENCES modules(id);
|
||||
|
||||
ALTER TABLE ONLY signatures
|
||||
ADD CONSTRAINT signatures_actionid_fkey FOREIGN KEY (actionid) REFERENCES actions(id);
|
||||
|
||||
ALTER TABLE ONLY signatures
|
||||
ADD CONSTRAINT signatures_investigatorid_fkey FOREIGN KEY (investigatorid) REFERENCES investigators(id);
|
||||
EOF
|
||||
|
||||
chmod 777 $createdbtemp
|
||||
sudo su postgres -c "psql -d mig -f $createdbtemp" 1>/dev/null
|
||||
[ $? -ne 0 ] && echo "ERROR: tables creation failed." && exit 123
|
||||
rm "$createdbtemp"
|
||||
|
||||
granttmp=$(mktemp)
|
||||
cat > $granttmp << EOF
|
||||
GRANT ALL PRIVILEGES ON DATABASE mig TO migadmin;
|
||||
|
||||
\c mig
|
||||
|
||||
GRANT SELECT ON ALL TABLES IN SCHEMA public TO migscheduler;
|
||||
GRANT INSERT, UPDATE ON actions, commands, agents, signatures TO migscheduler;
|
||||
|
||||
GRANT SELECT ON ALL TABLES IN SCHEMA public TO migapi;
|
||||
GRANT INSERT ON actions, signatures TO migapi;
|
||||
EOF
|
||||
|
||||
chmod 777 $granttmp
|
||||
sudo su postgres -c "psql -f $granttmp" 1>/dev/null
|
||||
[ $? -ne 0 ] && echo "ERROR: grants failed." && exit 123
|
||||
rm "$granttmp"
|
||||
|
||||
echo "MIG Database created successfully."
|
28
doc/data.rst
28
doc/data.rst
|
@ -33,7 +33,12 @@ the scheduler, each action and command are stored individually in a text file in
|
|||
Postgresql database
|
||||
-------------------
|
||||
|
||||
Database structure
|
||||
Entity-Relationship Diagram
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. image:: .files/ER-diagram.png
|
||||
|
||||
Structure & Tables
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The `actions` table contains the detail of each action ran by the MIG platform.
|
||||
|
@ -164,4 +169,23 @@ on an agent. This model allows for very fine grained permissions management.
|
|||
CREATE INDEX ON invagtmodperm (agentid);
|
||||
CREATE INDEX ON invagtmodperm (moduleid);
|
||||
|
||||
.. image:: .files/ER-diagram.png
|
||||
Database creation script
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. include:: .files/createdb.sh
|
||||
:code: bash
|
||||
|
||||
Adding Investigators
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
In the future, this will probably be automated via the API. But for now, and
|
||||
until we have a strong authentication mechanism for API calls, it must be done
|
||||
manually in the database.
|
||||
|
||||
Adapt the query below to add a new investigator.
|
||||
|
||||
.. code:: sql
|
||||
|
||||
INSERT INTO investigators (name, pgpfingerprint)
|
||||
VALUES ('Bob Kelso', 'E608......');
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ type Action struct {
|
|||
LastUpdateTime time.Time `json:"lastupdatetime,omitempty"`
|
||||
CommandIDs []uint64 `json:"commandids,omitempty"`
|
||||
Counters counters `json:"counters,omitempty"`
|
||||
SyntaxVersion int `json:"syntaxversion,omitempty"`
|
||||
SyntaxVersion uint16 `json:"syntaxversion,omitempty"`
|
||||
}
|
||||
|
||||
// Some counters used to track the completion of an action
|
||||
|
|
|
@ -309,14 +309,14 @@ func createAction(respWriter http.ResponseWriter, request *http.Request) {
|
|||
|
||||
// Init action fields
|
||||
action.ID = mig.GenID()
|
||||
date0 := time.Date(11, time.January, 11, 11, 11, 11, 11, time.UTC)
|
||||
date0 := time.Date(9998, time.January, 11, 11, 11, 11, 11, time.UTC)
|
||||
action.StartTime = date0
|
||||
action.FinishTime = date0
|
||||
action.LastUpdateTime = date0
|
||||
action.Status = "init"
|
||||
|
||||
// load keyring and validate action
|
||||
keyring, err := os.Open(ctx.OpenPGP.PubRing)
|
||||
keyring, err := os.Open(ctx.PGP.PubRing)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -344,7 +344,7 @@ func createAction(respWriter http.ResponseWriter, request *http.Request) {
|
|||
}
|
||||
for _, sig := range action.PGPSignatures {
|
||||
// TODO: opening the keyring in a loop is really ugly. rewind!
|
||||
k, err := os.Open(ctx.OpenPGP.PubRing)
|
||||
k, err := os.Open(ctx.PGP.PubRing)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -65,8 +65,8 @@ type Context struct {
|
|||
Ready, InFlight, Returned, Done string
|
||||
}
|
||||
}
|
||||
DB migdb.DB
|
||||
OpenPGP struct {
|
||||
DB migdb.DB
|
||||
PGP struct {
|
||||
PubRing string
|
||||
}
|
||||
Postgres struct {
|
||||
|
|
|
@ -150,16 +150,17 @@ func (db *DB) UpdateAction(a mig.Action) (err error) {
|
|||
|
||||
// InsertOrUpdateAction looks for an existing action in DB and update it,
|
||||
// or insert a new one if none is found
|
||||
func (db *DB) InsertOrUpdateAction(a mig.Action) (err error) {
|
||||
func (db *DB) InsertOrUpdateAction(a mig.Action) (inserted bool, err error) {
|
||||
var id uint64
|
||||
err = db.c.QueryRow(`SELECT id FROM actions WHERE id=$1`, a.ID).Scan(&id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error while retrieving action: '%v'", err)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return inserted, fmt.Errorf("Error while retrieving action: '%v'", err)
|
||||
}
|
||||
if err == sql.ErrNoRows {
|
||||
return db.InsertAction(a)
|
||||
inserted = true
|
||||
return inserted, db.InsertAction(a)
|
||||
} else {
|
||||
return db.UpdateAction(a)
|
||||
return inserted, db.UpdateAction(a)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -192,12 +193,13 @@ func (db *DB) InsertSignature(aid, iid uint64, sig string) (err error) {
|
|||
// FindInvestigatorByFingerprint searches the database for an investigator that
|
||||
// has a given fingerprint
|
||||
func (db *DB) InvestigatorByFingerprint(fp string) (iid uint64, err error) {
|
||||
err = db.c.QueryRow("SELECT id FROM investigators WHERE pgpfingerprint=$1", fp).Scan(&iid)
|
||||
err = db.c.QueryRow("SELECT id FROM investigators WHERE LOWER(pgpfingerprint)=LOWER($1)", fp).Scan(&iid)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
err = fmt.Errorf("Error while finding investigator: '%v'", err)
|
||||
return
|
||||
}
|
||||
if err == sql.ErrNoRows {
|
||||
err = fmt.Errorf("InvestigatorByFingerprint: no investigator found for fingerprint '%s'", fp)
|
||||
return
|
||||
}
|
||||
return
|
||||
|
@ -367,11 +369,8 @@ func (db *DB) UpdateAgentHeartbeat(agt mig.Agent) (err error) {
|
|||
// if it exists, or insert it if it doesn't
|
||||
func (db *DB) InsertOrUpdateAgent(agt mig.Agent) (err error) {
|
||||
agent, err := db.AgentByQueueAndPID(agt.QueueLoc, agt.PID)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return fmt.Errorf("Error while searching for agent: '%v'", err)
|
||||
}
|
||||
if err == sql.ErrNoRows {
|
||||
agt.DestructionTime = time.Date(11, time.January, 11, 11, 11, 11, 11, time.UTC)
|
||||
if err != nil {
|
||||
agt.DestructionTime = time.Date(9998, time.January, 11, 11, 11, 11, 11, time.UTC)
|
||||
agt.Status = "heartbeating"
|
||||
// create a new agent
|
||||
return db.InsertAgent(agt)
|
||||
|
@ -439,12 +438,22 @@ func (db *DB) ActiveAgentsByTarget(target string, pointInTime time.Time) (agents
|
|||
return
|
||||
}
|
||||
|
||||
// MarkAgentUpgraded updated the status of an agent in the database
|
||||
func (db *DB) MarkAgentUpgraded(agent mig.Agent) (err error) {
|
||||
_, err = db.c.Exec(`UPDATE agents SET status='upgraded' WHERE id=$1`,
|
||||
agent.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to mark agent as upgraded in database: '%v'", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MarkAgentDestroyed updated the status and destructiontime of an agent in the database
|
||||
func (db *DB) MarkAgentDestroyed(agent mig.Agent) (err error) {
|
||||
agent.Status = "destroyed"
|
||||
agent.DestructionTime = time.Now()
|
||||
_, err = db.c.Exec(`UPDATE agents
|
||||
SET heartbeattime=$1, status=$2 WHERE id=$3`,
|
||||
SET destructiontime=$1, status=$2 WHERE id=$3`,
|
||||
agent.DestructionTime, agent.Status, agent.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to mark agent as destroyed in database: '%v'", err)
|
||||
|
|
|
@ -96,7 +96,7 @@ type Context struct {
|
|||
Chan *amqp.Channel
|
||||
}
|
||||
PGP struct {
|
||||
KeyID string
|
||||
KeyID, PubRing string
|
||||
}
|
||||
Postgres struct {
|
||||
Host, User, Password, DBName, SSLMode string
|
||||
|
|
|
@ -108,7 +108,7 @@ func flyAction(ctx Context, a mig.Action, origin string) (err error) {
|
|||
panic(err)
|
||||
}
|
||||
a.Status = "inflight"
|
||||
err = ctx.DB.InsertOrUpdateAction(a)
|
||||
err = ctx.DB.UpdateAction(a)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -1,99 +0,0 @@
|
|||
/* Mozilla InvestiGator Scheduler
|
||||
|
||||
Version: MPL 1.1/GPL 2.0/LGPL 2.1
|
||||
|
||||
The contents of this file are subject to the Mozilla Public License Version
|
||||
1.1 (the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.mozilla.org/MPL/
|
||||
|
||||
Software distributed under the License is distributed on an "AS IS" basis,
|
||||
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
|
||||
for the specific language governing rights and limitations under the
|
||||
License.
|
||||
|
||||
The Initial Developer of the Original Code is
|
||||
Mozilla Corporation
|
||||
Portions created by the Initial Developer are Copyright (C) 2013
|
||||
the Initial Developer. All Rights Reserved.
|
||||
|
||||
Contributor(s):
|
||||
Julien Vehent jvehent@mozilla.com [:ulfr]
|
||||
|
||||
Alternatively, the contents of this file may be used under the terms of
|
||||
either the GNU General Public License Version 2 or later (the "GPL"), or
|
||||
the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
|
||||
in which case the provisions of the GPL or the LGPL are applicable instead
|
||||
of those above. If you wish to allow use of your version of this file only
|
||||
under the terms of either the GPL or the LGPL, and not to allow others to
|
||||
use your version of this file under the terms of the MPL, indicate your
|
||||
decision by deleting the provisions above and replace them with the notice
|
||||
and other provisions required by the GPL or the LGPL. If you do not delete
|
||||
the provisions above, a recipient may use your version of this file under
|
||||
the terms of any one of the MPL, the GPL or the LGPL.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"mig"
|
||||
"time"
|
||||
)
|
||||
|
||||
// inspectMultiAgents takes a number of actions when several agents are found
|
||||
// to be listening on the same queue. It will trigger an agentdestroy action
|
||||
// for agents that are flagged as upgraded, and log alerts for agents that
|
||||
// are not, such that an investigator can look at them.
|
||||
func inspectMultiAgents(queueLoc string, ctx Context) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("inspectMultiAgents() -> %v", e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving inspectMultiAgents()"}.Debug()
|
||||
}()
|
||||
agentsCount, agents, err := findDupAgents(queueLoc, ctx)
|
||||
if agentsCount < 2 {
|
||||
return
|
||||
}
|
||||
|
||||
destroyedAgents := 0
|
||||
leftAloneAgents := 0
|
||||
for _, agent := range agents {
|
||||
switch agent.Status {
|
||||
case "upgraded":
|
||||
// upgraded agents must die
|
||||
err = destroyAgent(agent, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
destroyedAgents++
|
||||
case "destroyed":
|
||||
// if the agent has already been marked as destroyed, check if
|
||||
// that was done longer than 2 heartbeats ago. If it did, the
|
||||
// destruction failed, and we need to reissue a destruction order
|
||||
hbFreq, err := time.ParseDuration(ctx.Agent.HeartbeatFreq)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
twoHeartbeats := time.Now().Add(-hbFreq * 2)
|
||||
if agent.DestructionTime.Before(twoHeartbeats) {
|
||||
err = destroyAgent(agent, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
destroyedAgents++
|
||||
} else {
|
||||
leftAloneAgents++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
remainingAgents := agentsCount - destroyedAgents - leftAloneAgents
|
||||
if remainingAgents > 1 {
|
||||
// there's still some agents left, raise errors for these
|
||||
desc := fmt.Sprintf("Found '%d' agents running on '%s'. Require manual inspection.", remainingAgents, queueLoc)
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}.Warning()
|
||||
}
|
||||
return
|
||||
}
|
|
@ -41,6 +41,7 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"mig"
|
||||
"mig/pgp"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
|
@ -114,7 +115,7 @@ func main() {
|
|||
// move action to INVALID folder and log
|
||||
dest := fmt.Sprintf("%s/%d.json", ctx.Directories.Action.Invalid, time.Now().UTC().UnixNano())
|
||||
os.Rename(actionPath, dest)
|
||||
reason := fmt.Sprintf("%v. %s moved to %s", err, actionPath, dest)
|
||||
reason := fmt.Sprintf("%v. '%s' moved to '%s'", err, actionPath, dest)
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: reason}.Warning()
|
||||
}
|
||||
}
|
||||
|
@ -344,10 +345,40 @@ func processNewAction(actionPath string, ctx Context) (err error) {
|
|||
return
|
||||
}
|
||||
action.Status = "preparing"
|
||||
err = ctx.DB.InsertOrUpdateAction(action)
|
||||
inserted, err := ctx.DB.InsertOrUpdateAction(action)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if inserted {
|
||||
// action was inserted, and not updated, so we need to insert
|
||||
// the signatures as well
|
||||
astr, err := action.String()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for _, sig := range action.PGPSignatures {
|
||||
// TODO: opening the keyring in a loop is really ugly. rewind!
|
||||
k, err := os.Open(ctx.PGP.PubRing)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer k.Close()
|
||||
fp, err := pgp.GetFingerprintFromSignature(astr, sig, k)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
iid, err := ctx.DB.InvestigatorByFingerprint(fp)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = ctx.DB.InsertSignature(action.ID, iid, sig)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: action.ID, Desc: "Action written to database"}.Debug()
|
||||
|
||||
// expand the action in one command per agent
|
||||
action.CommandIDs, err = prepareCommands(action, ctx)
|
||||
if err != nil {
|
||||
|
|
|
@ -79,20 +79,18 @@ func markUpgradedAgents(cmd mig.Command, ctx Context) (err error) {
|
|||
panic(err)
|
||||
}
|
||||
oldpid := reflect.ValueOf(resultMap["oldpid"])
|
||||
if oldpid.Int() < 2 {
|
||||
if oldpid.Float() < 2 || oldpid.Float() > 65535 {
|
||||
desc := fmt.Sprintf("Successfully found upgraded action on agent '%s', but with PID '%s'. That's not right...",
|
||||
cmd.Agent.Name, oldpid)
|
||||
ctx.Channels.Log <- mig.Log{Desc: desc}.Err()
|
||||
panic(desc)
|
||||
}
|
||||
|
||||
// update the agent's registration to mark it as upgraded
|
||||
agent, err := ctx.DB.AgentByQueueAndPID(cmd.Agent.QueueLoc, int(oldpid.Int()))
|
||||
agent, err := ctx.DB.AgentByQueueAndPID(cmd.Agent.QueueLoc, int(oldpid.Float()))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
agent.Status = "upgraded"
|
||||
err = ctx.DB.InsertOrUpdateAgent(agent)
|
||||
err = ctx.DB.MarkAgentUpgraded(agent)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -104,6 +102,66 @@ func markUpgradedAgents(cmd mig.Command, ctx Context) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// inspectMultiAgents takes a number of actions when several agents are found
|
||||
// to be listening on the same queue. It will trigger an agentdestroy action
|
||||
// for agents that are flagged as upgraded, and log alerts for agents that
|
||||
// are not, such that an investigator can look at them.
|
||||
func inspectMultiAgents(queueLoc string, ctx Context) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("inspectMultiAgents() -> %v", e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving inspectMultiAgents()"}.Debug()
|
||||
}()
|
||||
agentsCount, agents, err := findDupAgents(queueLoc, ctx)
|
||||
if agentsCount < 2 {
|
||||
return
|
||||
}
|
||||
destroyedAgents := 0
|
||||
leftAloneAgents := 0
|
||||
for _, agent := range agents {
|
||||
switch agent.Status {
|
||||
case "upgraded":
|
||||
// upgraded agents must die
|
||||
err = destroyAgent(agent, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
destroyedAgents++
|
||||
desc := fmt.Sprintf("Agent '%s' with PID '%d' has been upgraded and will be destroyed.", agent.Name, agent.PID)
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}.Debug()
|
||||
case "destroyed":
|
||||
// if the agent has already been marked as destroyed, check if
|
||||
// that was done longer than 3 heartbeats ago. If it did, the
|
||||
// destruction failed, and we need to reissue a destruction order
|
||||
hbFreq, err := time.ParseDuration(ctx.Agent.HeartbeatFreq)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pointInTime := time.Now().Add(-hbFreq * 3)
|
||||
if agent.DestructionTime.Before(pointInTime) {
|
||||
err = destroyAgent(agent, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
destroyedAgents++
|
||||
desc := fmt.Sprintf("Re-issuing destruction action for agent '%s' with PID '%d'.", agent.Name, agent.PID)
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}.Debug()
|
||||
} else {
|
||||
leftAloneAgents++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
remainingAgents := agentsCount - destroyedAgents - leftAloneAgents
|
||||
if remainingAgents > 1 {
|
||||
// there's still some agents left, raise errors for these
|
||||
desc := fmt.Sprintf("Found '%d' agents running on '%s'. Require manual inspection.", remainingAgents, queueLoc)
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}.Warning()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// destroyAgent issues an `agentdestroy` action targetted to a specific agent
|
||||
// and updates the status of the agent in the database
|
||||
func destroyAgent(agent mig.Agent, ctx Context) (err error) {
|
||||
|
@ -151,7 +209,7 @@ func destroyAgent(agent mig.Agent, ctx Context) (err error) {
|
|||
}
|
||||
|
||||
// write the action to the spool for scheduling
|
||||
dest := fmt.Sprintf("%s/%s", ctx.Directories.Action.New, mig.GenID())
|
||||
dest := fmt.Sprintf("%s/%d.json", ctx.Directories.Action.New, killAction.ID)
|
||||
err = safeWrite(ctx, dest, jsonAction)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
Загрузка…
Ссылка в новой задаче