Merge pull request #77 from mozilla/exchangerework

MQ Exchanges rework
This commit is contained in:
Julien Vehent 2015-08-22 10:36:44 -04:00
Родитель ad750ae3b4 e062d41b33
Коммит 668ca47998
18 изменённых файлов: 222 добавлений и 183 удалений

Просмотреть файл

@ -0,0 +1,17 @@
[mq]
host = "localhost"
port = 5672
user = "worker"
pass = "secretpassphrase"
vhost = "mig"
usetls = false
cacert = "/etc/mig/certs/ca.crt"
tlscert = "/etc/mig/certs/worker.crt"
tlskey = "/etc/mig/certs/worker.key"
timeout = "10s"
[logging]
mode = "stdout" ; stdout | file | syslog
level = "debug"
;host = "localhost"
;port = 514
;protocol = "udp"

Просмотреть файл

@ -59,7 +59,7 @@
port = 5672
user = "guest"
pass = "guest"
vhost = "/"
vhost = "mig"
; TLS options
; usetls = true

Просмотреть файл

@ -528,20 +528,20 @@ And add the public PGP key of the scheduler as well:
RabbitMQ Configuration
----------------------
All communications between scheduler and agents rely on RabbitMQ's AMQP
All communications between schedulers and agents rely on RabbitMQ's AMQP
protocol. While MIG does not rely on the security of RabbitMQ to pass orders to
agents, an attacker that gains control to the message broker would be able to
listen to all message, or shut down MIG entirely. To prevent this, RabbitMQ must
provide a reasonable amount of protection, at two levels:
listen to all commands passed to the agents (not results). To prevent this,
RabbitMQ must provide a reasonable amount of protection, at two levels:
* All communications on the public internet are authenticated using client and
server certificates. Since all agents share a single client certificate, this
provides minimal security, and should only be used to make it harder for
attackers to establish an AMQP connection with rabbitmq.
* A given agent can listen and write to its own queue, and no other. We
accomplish this by adding a random number to the queue ID, which is generated
by an agent, and hard to guess by another agent.
* Agents can only listen on their own queue and write to the `toschedulers`
exchange. This is accomplished by randomizing the agent queue name and adding
tight Access Control rules to RabbitMQ.
Note that, even if a random agent manages to connect to the relay, the scheduler
will accept its registration only if it is present in the scheduler's whitelist.
@ -553,27 +553,13 @@ Install the RabbitMQ server from your distribution's packaging system. If your
distribution does not provide a RabbitMQ package, install `erlang` from yum or
apt, and then install RabbitMQ using the packages from rabbitmq.com
RabbitMQ Permissions
~~~~~~~~~~~~~~~~~~~~
0. If you want more than 1024 clients, you may have to increase the max number
of file descriptors that rabbitmq is allowed to hold. On linux, increase
`nofile` in `/etc/security/limits.conf` as follow:
.. code:: bash
rabbitmq - nofile 102400
Then, make sure than `pam_limits.so` is included in `/etc/pam.d/common-session`:
.. code:: bash
session required pam_limits.so
RabbitMQ Configuration
~~~~~~~~~~~~~~~~~~~~~~
1. On the rabbitmq server, create users:
* **admin**, with the tag 'administrator'
* **scheduler** , **agent**, **api** and **worker** with no tag
* **scheduler** , **agent** and **worker** with no tag
All users should have strong passwords. The scheduler password goes into the
configuration file `conf/mig-scheduler.cfg`, in `[mq] password`. The agent
@ -589,8 +575,6 @@ string. The admin password is, of course, for yourself.
sudo rabbitmqctl add_user agent SomeRandomPassword
sudo rabbitmqctl add_user api SomeRandomPassword
sudo rabbitmqctl add_user worker SomeRandomPassword
You can list the users with the following command:
@ -615,63 +599,55 @@ On fresh installation, rabbitmq comes with a `guest` user that as password
3. Create permissions for the scheduler user. The scheduler is allowed to:
- CONFIGURE:
- create the exchanges `mig` and `migevent`
- create and delete any queues under `migevent.*` and `mig.agt.*`
- declare the exchanges `toagents`, `toschedulers` and `toworkers`
- declare and delete queues under `mig.agt.*`
- WRITE:
- publish into the exchanges `mig` and `migevent`
- bind to the queues `mig.agt.heartbeats` and `mig.agt.results`
- bind to any queue under `migevent.*`
- publish into the exchanges `toagents` and `toworkers`
- consume from queues `mig.agt.heartbeats` and `mig.agt.results`
- READ:
- bind to the `mig` and `migevent` exchanges
- read from the queues `mig.agt.heartbeats`, `mig.agt.results`
- read from any queue under `migevent.*`
- declare the exchanges `toagents`, `toschedulers` and `toworkers`
- consume from queues `mig.agt.heartbeats` and `mig.agt.results` bound
to the `toschedulers` exchange
.. code:: bash
sudo rabbitmqctl set_permissions -p mig scheduler \
'^mig(|(event|\.agt)(|\..*))$' \
'^mig(|event(|\..*)|\.(agt\.(heartbeats|results)))$' \
'^mig(|event(|\..*)|\.(agt\.(heartbeats|results)))$'
'^(toagents|toschedulers|toworkers|mig\.agt\..*)$' \
'^(toagents|toworkers|mig\.agt\.(heartbeats|results))$' \
'^(toagents|toschedulers|toworkers|mig\.agt\.(heartbeats|results))$'
4. Create permissions for the agent use. The agent is allowed to:
- CONFIGURE:
- create any queue under `mig.agt.(linux|windows|darwin).*`
- create any queue under `mig.agt.*`
- WRITE:
- publish to the `mig` exchange
- bind to any queue under `mig.agt.(linux|windows|darwin).*`
- publish to the `toschedulers` exchange
- consume from queues under `mig.agt.*`
- READ:
- bind to the `mig` exchange
- read from any queue under `mig.agt.(linux|windows|darwin).*`
- consume from queues under `mig.agt.*` bound to the `toagents`
exchange
.. code:: bash
sudo rabbitmqctl set_permissions -p mig agent \
'^mig\.agt\.(linux|windows|darwin)\..*$' \
'^mig(|\.agt\.(linux|windows|darwin)\..*)$' \
'^mig(|\.agt\.(linux|windows|darwin)\..*)$'
'^mig\.agt\..*$' \
'^(toschedulers|mig\.agt\..*)$' \
'^(toagents|mig\.agt\..*)$'
5. Create permissions for the event workers and api users. The workers and api are allowed to:
5. Create permissions for the event workers. The workers are allowed to:
- CONFIGURE:
- create any queue under `migevent.*`
- declare queues under `migevent.*`
- WRITE:
- write into the exchange `migevent`
- bind to any queue under `migevent.*`
- consume from queues under `migevent.*`
- READ:
- bind to the `migevent` exchange
- read from any queue under `migevent.*`
- consume from queues under `migevent.*` bound to the `toworkers`
exchange
.. code:: bash
sudo rabbitmqctl set_permissions -p mig worker \
'^migevent\..*$' \
'^migevent(|\..*)$' \
'^migevent(|\..*)$'
sudo rabbitmqctl set_permissions -p mig api \
'^migevent\..*$' \
'^migevent(|\..*)$' \
'^migevent(|\..*)$'
'^(toworkers|migevent\..*)$'
6. Start the scheduler, it shouldn't return any ACCESS error. You can also list
the permissions with the command:
@ -680,12 +656,9 @@ On fresh installation, rabbitmq comes with a `guest` user that as password
$ sudo rabbitmqctl list_permissions -p mig | column -t
Listing permissions in vhost "mig" ...
USER------+CONFIGURE---------------------------------+WRITE-------------------------------------------------+READ-------------------------------------------------+
admin .* .* .*
scheduler ^mig(|(event|\\.agt)(|\\..*))$ ^mig(|event(|\\..*)|\\.(agt\\.(heartbeats|results)))$ ^mig(|event(|\\..*)|\\.(agt\\.(heartbeats|results)))$
agent ^mig\\.agt\\.(linux|windows|darwin)\\..*$ ^mig(|\\.agt\\.(linux|windows|darwin)\\..*)$ ^mig(|\\.agt\\.(linux|windows|darwin)\\..*)$
api ^migevent\\..*$ ^migevent(|\\..*)$ ^migevent(|\\..*)$
worker ^migevent\\..*$ ^migevent(|\\..*)$ ^migevent(|\\..*)$
agent ^mig\\.agt\\..*$ ^(toschedulers|mig\\.agt\\..*)$ ^(toagents|mig\\.agt\\..*)$
scheduler ^(toagents|toschedulers|toworkers|mig\\.agt\\..*)$ ^(toagents|toworkers|mig\\.agt\\.(heartbeats|results))$ ^(toagents|toschedulers|toworkers|mig\\.agt\\.(heartbeats|results))$
worker ^migevent\\..*$ ^migevent(|\\..*)$ ^(toworkers|migevent\\..*)$
RabbitMQ TLS configuration
~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -819,6 +792,24 @@ the scheduler. The agents will restart themselves.
(remove the `echo` in the command above, it's there as a safety for copy/paste
people).
Supporting more than 1024 connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If you want more than 1024 clients, you may have to increase the max number of
file descriptors that rabbitmq is allowed to hold. On linux, increase `nofile`
in `/etc/security/limits.conf` as follow:
.. code:: bash
rabbitmq - nofile 102400
Then, make sure than `pam_limits.so` is included in `/etc/pam.d/common-session`:
.. code:: bash
session required pam_limits.so
Serving AMQPS on port 443
~~~~~~~~~~~~~~~~~~~~~~~~~

Просмотреть файл

@ -43,11 +43,12 @@
<li><a href="#rabbitmq-configuration">5   RabbitMQ Configuration</a>
<ul class="auto-toc">
<li><a href="#installation">5.1   Installation</a></li>
<li><a href="#rabbitmq-permissions">5.2   RabbitMQ Permissions</a></li>
<li><a href="#id3">5.2   RabbitMQ Configuration</a></li>
<li><a href="#rabbitmq-tls-configuration">5.3   RabbitMQ TLS configuration</a></li>
<li><a href="#queues-mirroring">5.4   Queues mirroring</a></li>
<li><a href="#cluster-management">5.5   Cluster management</a></li>
<li><a href="#serving-amqps-on-port-443">5.6   Serving AMQPS on port 443</a></li>
<li><a href="#supporting-more-than-1024-connections">5.6   Supporting more than 1024 connections</a></li>
<li><a href="#serving-amqps-on-port-443">5.7   Serving AMQPS on port 443</a></li>
</ul>
</li>
<li><a href="#api-configuration">6   API configuration</a>
@ -357,29 +358,23 @@ BdUCSrvo/r7oAims8SyWE+ZObC+rw7u01Sut0ctnYrvklaM10+zkwGNOTszrduUy
</section>
<section id="rabbitmq-configuration">
<h2>5   RabbitMQ Configuration</h2>
<p>All communications between scheduler and agents rely on RabbitMQ's AMQP protocol. While MIG does not rely on the security of RabbitMQ to pass orders to agents, an attacker that gains control to the message broker would be able to listen to all message, or shut down MIG entirely. To prevent this, RabbitMQ must provide a reasonable amount of protection, at two levels:</p>
<p>All communications between schedulers and agents rely on RabbitMQ's AMQP protocol. While MIG does not rely on the security of RabbitMQ to pass orders to agents, an attacker that gains control to the message broker would be able to listen to all commands passed to the agents (not results). To prevent this, RabbitMQ must provide a reasonable amount of protection, at two levels:</p>
<ul>
<li>All communications on the public internet are authenticated using client and server certificates. Since all agents share a single client certificate, this provides minimal security, and should only be used to make it harder for attackers to establish an AMQP connection with rabbitmq.</li>
<li>A given agent can listen and write to its own queue, and no other. We accomplish this by adding a random number to the queue ID, which is generated by an agent, and hard to guess by another agent.</li>
<li>Agents can only listen on their own queue and write to the <cite>toschedulers</cite> exchange. This is accomplished by randomizing the agent queue name and adding tight Access Control rules to RabbitMQ.</li>
</ul>
<p>Note that, even if a random agent manages to connect to the relay, the scheduler will accept its registration only if it is present in the scheduler's whitelist.</p>
<section id="installation">
<h3>5.1   Installation</h3>
<p>Install the RabbitMQ server from your distribution's packaging system. If your distribution does not provide a RabbitMQ package, install <cite>erlang</cite> from yum or apt, and then install RabbitMQ using the packages from rabbitmq.com</p>
</section>
<section id="rabbitmq-permissions">
<h3>5.2   RabbitMQ Permissions</h3>
<ol type="1">
<li>If you want more than 1024 clients, you may have to increase the max number of file descriptors that rabbitmq is allowed to hold. On linux, increase <cite>nofile</cite> in <cite>/etc/security/limits.conf</cite> as follow:</li>
</ol>
<pre class="code bash">rabbitmq - nofile 102400</pre>
<p>Then, make sure than <cite>pam_limits.so</cite> is included in <cite>/etc/pam.d/common-session</cite>:</p>
<pre class="code bash">session required pam_limits.so</pre>
<section id="id3">
<h3>5.2   RabbitMQ Configuration</h3>
<ol type="1">
<li>On the rabbitmq server, create users:
<ul>
<li><strong>admin</strong>, with the tag 'administrator'</li>
<li><strong>scheduler</strong> , <strong>agent</strong>, <strong>api</strong> and <strong>worker</strong> with no tag</li>
<li><strong>scheduler</strong> , <strong>agent</strong> and <strong>worker</strong> with no tag</li>
</ul>
</li>
</ol>
@ -391,8 +386,6 @@ sudo rabbitmqctl add_user scheduler SomeRandomPassword
sudo rabbitmqctl add_user agent SomeRandomPassword
sudo rabbitmqctl add_user api SomeRandomPassword
sudo rabbitmqctl add_user worker SomeRandomPassword</pre>
<p>You can list the users with the following command:</p>
<pre class="code bash">sudo rabbitmqctl list_users</pre>
@ -414,8 +407,8 @@ sudo rabbitmqctl list_vhosts</pre>
<dt>CONFIGURE:</dt>
<dd>
<ul>
<li>create the exchanges <cite>mig</cite> and <cite>migevent</cite></li>
<li>create and delete any queues under <cite>migevent.*</cite> and <cite>mig.agt.*</cite></li>
<li>declare the exchanges <cite>toagents</cite>, <cite>toschedulers</cite> and <cite>toworkers</cite></li>
<li>declare and delete queues under <cite>mig.agt.*</cite></li>
</ul>
</dd>
</dl>
@ -425,9 +418,8 @@ sudo rabbitmqctl list_vhosts</pre>
<dt>WRITE:</dt>
<dd>
<ul>
<li>publish into the exchanges <cite>mig</cite> and <cite>migevent</cite></li>
<li>bind to the queues <cite>mig.agt.heartbeats</cite> and <cite>mig.agt.results</cite></li>
<li>bind to any queue under <cite>migevent.*</cite></li>
<li>publish into the exchanges <cite>toagents</cite> and <cite>toworkers</cite></li>
<li>consume from queues <cite>mig.agt.heartbeats</cite> and <cite>mig.agt.results</cite></li>
</ul>
</dd>
</dl>
@ -437,9 +429,8 @@ sudo rabbitmqctl list_vhosts</pre>
<dt>READ:</dt>
<dd>
<ul>
<li>bind to the <cite>mig</cite> and <cite>migevent</cite> exchanges</li>
<li>read from the queues <cite>mig.agt.heartbeats</cite>, <cite>mig.agt.results</cite></li>
<li>read from any queue under <cite>migevent.*</cite></li>
<li>declare the exchanges <cite>toagents</cite>, <cite>toschedulers</cite> and <cite>toworkers</cite></li>
<li>consume from queues <cite>mig.agt.heartbeats</cite> and <cite>mig.agt.results</cite> bound to the <cite>toschedulers</cite> exchange</li>
</ul>
</dd>
</dl>
@ -450,9 +441,9 @@ sudo rabbitmqctl list_vhosts</pre>
</li>
</ol>
<pre class="code bash">sudo rabbitmqctl set_permissions -p mig scheduler <span class="se">\
</span> <span class="s1">'^mig(|(event|\.agt)(|\..*))$'</span> <span class="se">\
</span> <span class="s1">'^mig(|event(|\..*)|\.(agt\.(heartbeats|results)))$'</span> <span class="se">\
</span> <span class="s1">'^mig(|event(|\..*)|\.(agt\.(heartbeats|results)))$'</span></pre>
</span> <span class="s1">'^(toagents|toschedulers|toworkers|mig\.agt\..*)$'</span> <span class="se">\
</span> <span class="s1">'^(toagents|toworkers|mig\.agt\.(heartbeats|results))$'</span> <span class="se">\
</span> <span class="s1">'^(toagents|toschedulers|toworkers|mig\.agt\.(heartbeats|results))$'</span></pre>
<ol start="4" type="1">
<li>
<dl>
@ -464,7 +455,7 @@ sudo rabbitmqctl list_vhosts</pre>
<dt>CONFIGURE:</dt>
<dd>
<ul>
<li>create any queue under <cite>mig.agt.(linux|windows|darwin).*</cite></li>
<li>create any queue under <cite>mig.agt.*</cite></li>
</ul>
</dd>
</dl>
@ -474,8 +465,8 @@ sudo rabbitmqctl list_vhosts</pre>
<dt>WRITE:</dt>
<dd>
<ul>
<li>publish to the <cite>mig</cite> exchange</li>
<li>bind to any queue under <cite>mig.agt.(linux|windows|darwin).*</cite></li>
<li>publish to the <cite>toschedulers</cite> exchange</li>
<li>consume from queues under <cite>mig.agt.*</cite></li>
</ul>
</dd>
</dl>
@ -485,8 +476,7 @@ sudo rabbitmqctl list_vhosts</pre>
<dt>READ:</dt>
<dd>
<ul>
<li>bind to the <cite>mig</cite> exchange</li>
<li>read from any queue under <cite>mig.agt.(linux|windows|darwin).*</cite></li>
<li>consume from queues under <cite>mig.agt.*</cite> bound to the <cite>toagents</cite> exchange</li>
</ul>
</dd>
</dl>
@ -497,13 +487,13 @@ sudo rabbitmqctl list_vhosts</pre>
</li>
</ol>
<pre class="code bash">sudo rabbitmqctl set_permissions -p mig agent <span class="se">\
</span> <span class="s1">'^mig\.agt\.(linux|windows|darwin)\..*$'</span> <span class="se">\
</span> <span class="s1">'^mig(|\.agt\.(linux|windows|darwin)\..*)$'</span> <span class="se">\
</span> <span class="s1">'^mig(|\.agt\.(linux|windows|darwin)\..*)$'</span></pre>
</span> <span class="s1">'^mig\.agt\..*$'</span> <span class="se">\
</span> <span class="s1">'^(toschedulers|mig\.agt\..*)$'</span> <span class="se">\
</span> <span class="s1">'^(toagents|mig\.agt\..*)$'</span></pre>
<ol start="5" type="1">
<li>
<dl>
<dt>Create permissions for the event workers and api users. The workers and api are allowed to:</dt>
<dt>Create permissions for the event workers. The workers are allowed to:</dt>
<dd>
<ul>
<li>
@ -511,7 +501,7 @@ sudo rabbitmqctl list_vhosts</pre>
<dt>CONFIGURE:</dt>
<dd>
<ul>
<li>create any queue under <cite>migevent.*</cite></li>
<li>declare queues under <cite>migevent.*</cite></li>
</ul>
</dd>
</dl>
@ -521,8 +511,7 @@ sudo rabbitmqctl list_vhosts</pre>
<dt>WRITE:</dt>
<dd>
<ul>
<li>write into the exchange <cite>migevent</cite></li>
<li>bind to any queue under <cite>migevent.*</cite></li>
<li>consume from queues under <cite>migevent.*</cite></li>
</ul>
</dd>
</dl>
@ -532,8 +521,7 @@ sudo rabbitmqctl list_vhosts</pre>
<dt>READ:</dt>
<dd>
<ul>
<li>bind to the <cite>migevent</cite> exchange</li>
<li>read from any queue under <cite>migevent.*</cite></li>
<li>consume from queues under <cite>migevent.*</cite> bound to the <cite>toworkers</cite> exchange</li>
</ul>
</dd>
</dl>
@ -546,23 +534,15 @@ sudo rabbitmqctl list_vhosts</pre>
<pre class="code bash">sudo rabbitmqctl set_permissions -p mig worker <span class="se">\
</span><span class="s1">'^migevent\..*$'</span> <span class="se">\
</span><span class="s1">'^migevent(|\..*)$'</span> <span class="se">\
</span><span class="s1">'^migevent(|\..*)$'</span>
sudo rabbitmqctl set_permissions -p mig api <span class="se">\
</span><span class="s1">'^migevent\..*$'</span> <span class="se">\
</span><span class="s1">'^migevent(|\..*)$'</span> <span class="se">\
</span><span class="s1">'^migevent(|\..*)$'</span></pre>
</span><span class="s1">'^(toworkers|migevent\..*)$'</span></pre>
<ol start="6" type="1">
<li>Start the scheduler, it shouldn't return any ACCESS error. You can also list the permissions with the command:</li>
</ol>
<pre class="code bash"><span class="nv">$ </span>sudo rabbitmqctl list_permissions -p mig | column -t
Listing permissions in vhost <span class="s2">"mig"</span> ...
USER------+CONFIGURE---------------------------------+WRITE-------------------------------------------------+READ-------------------------------------------------+
admin .* .* .*
scheduler ^mig<span class="o">(</span>|<span class="o">(</span>event|<span class="se">\\</span>.agt<span class="o">)(</span>|<span class="se">\\</span>..*<span class="o">))</span><span class="nv">$ </span> ^mig<span class="o">(</span>|event<span class="o">(</span>|<span class="se">\\</span>..*<span class="o">)</span>|<span class="se">\\</span>.<span class="o">(</span>agt<span class="se">\\</span>.<span class="o">(</span>heartbeats|results<span class="o">)))</span><span class="nv">$ </span> ^mig<span class="o">(</span>|event<span class="o">(</span>|<span class="se">\\</span>..*<span class="o">)</span>|<span class="se">\\</span>.<span class="o">(</span>agt<span class="se">\\</span>.<span class="o">(</span>heartbeats|results<span class="o">)))</span><span class="err">$</span>
agent ^mig<span class="se">\\</span>.agt<span class="se">\\</span>.<span class="o">(</span>linux|windows|darwin<span class="o">)</span><span class="se">\\</span>..*<span class="nv">$ </span> ^mig<span class="o">(</span>|<span class="se">\\</span>.agt<span class="se">\\</span>.<span class="o">(</span>linux|windows|darwin<span class="o">)</span><span class="se">\\</span>..*<span class="o">)</span><span class="nv">$ </span> ^mig<span class="o">(</span>|<span class="se">\\</span>.agt<span class="se">\\</span>.<span class="o">(</span>linux|windows|darwin<span class="o">)</span><span class="se">\\</span>..*<span class="o">)</span><span class="err">$</span>
api ^migevent<span class="se">\\</span>..*<span class="nv">$ </span> ^migevent<span class="o">(</span>|<span class="se">\\</span>..*<span class="o">)</span><span class="nv">$ </span> ^migevent<span class="o">(</span>|<span class="se">\\</span>..*<span class="o">)</span><span class="err">$</span>
worker ^migevent<span class="se">\\</span>..*<span class="nv">$ </span> ^migevent<span class="o">(</span>|<span class="se">\\</span>..*<span class="o">)</span><span class="nv">$ </span> ^migevent<span class="o">(</span>|<span class="se">\\</span>..*<span class="o">)</span><span class="err">$</span></pre>
agent ^mig<span class="se">\\</span>.agt<span class="se">\\</span>..*<span class="nv">$ </span> ^<span class="o">(</span>toschedulers|mig<span class="se">\\</span>.agt<span class="se">\\</span>..*<span class="o">)</span><span class="nv">$ </span> ^<span class="o">(</span>toagents|mig<span class="se">\\</span>.agt<span class="se">\\</span>..*<span class="o">)</span><span class="err">$</span>
scheduler ^<span class="o">(</span>toagents|toschedulers|toworkers|mig<span class="se">\\</span>.agt<span class="se">\\</span>..*<span class="o">)</span><span class="nv">$ </span> ^<span class="o">(</span>toagents|toworkers|mig<span class="se">\\</span>.agt<span class="se">\\</span>.<span class="o">(</span>heartbeats|results<span class="o">))</span><span class="nv">$ </span> ^<span class="o">(</span>toagents|toschedulers|toworkers|mig<span class="se">\\</span>.agt<span class="se">\\</span>.<span class="o">(</span>heartbeats|results<span class="o">))</span><span class="err">$</span>
worker ^migevent<span class="se">\\</span>..*<span class="nv">$ </span> ^migevent<span class="o">(</span>|<span class="se">\\</span>..*<span class="o">)</span><span class="nv">$ </span> ^<span class="o">(</span>toworkers|migevent<span class="se">\\</span>..*<span class="o">)</span><span class="err">$</span></pre>
</section>
<section id="rabbitmq-tls-configuration">
<h3>5.3   RabbitMQ TLS configuration</h3>
@ -641,8 +621,15 @@ worker ^migevent<span class="se">\\</span>..*<span class="nv">$ </span>
<span class="k">done</span></pre>
<p>(remove the <cite>echo</cite> in the command above, it's there as a safety for copy/paste people).</p>
</section>
<section id="supporting-more-than-1024-connections">
<h3>5.6   Supporting more than 1024 connections</h3>
<p>If you want more than 1024 clients, you may have to increase the max number of file descriptors that rabbitmq is allowed to hold. On linux, increase <cite>nofile</cite> in <cite>/etc/security/limits.conf</cite> as follow:</p>
<pre class="code bash">rabbitmq - nofile 102400</pre>
<p>Then, make sure than <cite>pam_limits.so</cite> is included in <cite>/etc/pam.d/common-session</cite>:</p>
<pre class="code bash">session required pam_limits.so</pre>
</section>
<section id="serving-amqps-on-port-443">
<h3>5.6   Serving AMQPS on port 443</h3>
<h3>5.7   Serving AMQPS on port 443</h3>
<p>To prevent yours agents from getting blocked by firewalls, it may be a good idea to use port 443 for connections between agents and rabbitmq. However, rabbitmq is not designed to run on a privileged port. The solution, then, is to use iptables to redirect the port on the rabbitmq server.</p>
<pre class="code bash">iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 443 -j REDIRECT --to-port 5671 -m comment --comment <span class="s2">"Serve RabbitMQ on HTTPS port"</span></pre>
</section>

Просмотреть файл

@ -654,7 +654,7 @@ func sendResults(ctx Context, result mig.Command) (err error) {
panic(err)
}
err = publish(ctx, "mig", "mig.agt.results", body)
err = publish(ctx, mig.Mq_Ex_ToSchedulers, mig.Mq_Q_Results, body)
if err != nil {
panic(err)
}
@ -688,8 +688,8 @@ func heartbeat(ctx Context) (err error) {
}
desc := fmt.Sprintf("heartbeat '%s'", body)
ctx.Channels.Log <- mig.Log{Desc: desc}.Debug()
publish(ctx, "mig", "mig.agt.heartbeats", body)
// write the heartbeat to disk
publish(ctx, mig.Mq_Ex_ToSchedulers, mig.Mq_Q_Heartbeat, body)
// update the local heartbeat file
err = ioutil.WriteFile(ctx.Agent.RunDir+"mig-agent.ok", []byte(time.Now().String()), 644)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: "Failed to write mig-agent.ok to disk"}.Err()

Просмотреть файл

@ -17,12 +17,14 @@ import (
"github.com/kardianos/osext"
"github.com/streadway/amqp"
"io/ioutil"
mrand "math/rand"
"mig"
"net"
"net/http"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"time"
)
@ -277,7 +279,12 @@ func createIDFile(ctx Context) (id []byte, err error) {
}
}()
// generate an ID
sid := mig.GenB32ID()
r := mrand.New(mrand.NewSource(time.Now().UnixNano()))
sid := strconv.FormatUint(uint64(r.Int63()), 36)
sid += strconv.FormatUint(uint64(r.Int63()), 36)
sid += strconv.FormatUint(uint64(r.Int63()), 36)
sid += strconv.FormatUint(uint64(r.Int63()), 36)
// check that the storage DIR exist, and that it's a dir
tdir, err := os.Open(ctx.Agent.RunDir)
defer tdir.Close()
@ -482,10 +489,10 @@ func initMQ(orig_ctx Context, try_proxy bool, proxy string) (ctx Context, err er
}
err = ctx.MQ.Chan.QueueBind(ctx.MQ.Bind.Queue, // Queue name
ctx.MQ.Bind.Key, // Routing key name
"mig", // Exchange name
false, // is noWait
nil) // AMQP args
ctx.MQ.Bind.Key, // Routing key name
mig.Mq_Ex_ToAgents, // Exchange name
false, // is noWait
nil) // AMQP args
if err != nil {
panic(err)
}

20
src/mig/constants.go Normal file
Просмотреть файл

@ -0,0 +1,20 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Julien Vehent jvehent@mozilla.com [:ulfr]
package mig
const (
// rabbitmq exchanges and common queues
Mq_Ex_ToAgents = "toagents"
Mq_Ex_ToSchedulers = "toschedulers"
Mq_Ex_ToWorkers = "toworkers"
Mq_Q_Heartbeat = "mig.agt.heartbeats"
Mq_Q_Results = "mig.agt.results"
// event queues
Ev_Q_Agt_Auth_Fail = "agent.authentication.failure"
Ev_Q_Agt_New = "agent.new"
Ev_Q_Cmd_Res = "command.results"
)

Просмотреть файл

@ -1,12 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Julien Vehent jvehent@mozilla.com [:ulfr]
package event
const (
Q_Agt_Auth_Fail = "agent.authentication.failure"
Q_Agt_New = "agent.new"
Q_Cmd_Res = "command.results"
)

Просмотреть файл

@ -9,7 +9,6 @@ import (
"encoding/json"
"fmt"
"mig"
"mig/event"
"time"
"github.com/streadway/amqp"
@ -24,12 +23,12 @@ func startHeartbeatsListener(ctx Context) (heartbeatChan <-chan amqp.Delivery, e
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving startHeartbeatsListener()"}.Debug()
}()
_, err = ctx.MQ.Chan.QueueDeclare("mig.agt.heartbeats", true, false, false, false, nil)
_, err = ctx.MQ.Chan.QueueDeclare(mig.Mq_Q_Heartbeat, true, false, false, false, nil)
if err != nil {
panic(err)
}
err = ctx.MQ.Chan.QueueBind("mig.agt.heartbeats", "mig.agt.heartbeats", "mig", false, nil)
err = ctx.MQ.Chan.QueueBind(mig.Mq_Q_Heartbeat, mig.Mq_Q_Heartbeat, mig.Mq_Ex_ToSchedulers, false, nil)
if err != nil {
panic(err)
}
@ -39,7 +38,7 @@ func startHeartbeatsListener(ctx Context) (heartbeatChan <-chan amqp.Delivery, e
panic(err)
}
heartbeatChan, err = ctx.MQ.Chan.Consume("mig.agt.heartbeats", "", true, false, false, false, nil)
heartbeatChan, err = ctx.MQ.Chan.Consume(mig.Mq_Q_Heartbeat, "", true, false, false, false, nil)
if err != nil {
panic(err)
}
@ -102,7 +101,7 @@ func getHeartbeats(msg amqp.Delivery, ctx Context) (err error) {
desc := fmt.Sprintf("getHeartbeats(): Agent '%s' is not authorized", agt.QueueLoc)
ctx.Channels.Log <- mig.Log{Desc: desc}.Warning()
// send an event to notify workers of the failed agent auth
err = sendEvent(event.Q_Agt_Auth_Fail, msg.Body, ctx)
err = sendEvent(mig.Ev_Q_Agt_Auth_Fail, msg.Body, ctx)
if err != nil {
panic(err)
}
@ -123,9 +122,9 @@ func getHeartbeats(msg amqp.Delivery, ctx Context) (err error) {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB insertion failed with error '%v' for agent '%s'", err, agt.Name)}.Err()
}
// notify the agt.new event queue
err = sendEvent(event.Q_Agt_New, msg.Body, ctx)
err = sendEvent(mig.Ev_Q_Agt_New, msg.Body, ctx)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Failed to send migevent to %s: %v", err, event.Q_Agt_New)}.Err()
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Failed to send migevent to %s: %v", err, mig.Ev_Q_Agt_New)}.Err()
}
} else {
// the agent exists in database. reuse the existing ID, and keep the status if it was
@ -162,12 +161,12 @@ func startResultsListener(ctx Context) (resultsChan <-chan amqp.Delivery, err er
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving startResultsListener()"}.Debug()
}()
_, err = ctx.MQ.Chan.QueueDeclare("mig.agt.results", true, false, false, false, nil)
_, err = ctx.MQ.Chan.QueueDeclare(mig.Mq_Q_Results, true, false, false, false, nil)
if err != nil {
panic(err)
}
err = ctx.MQ.Chan.QueueBind("mig.agt.results", "mig.agt.results", "mig", false, nil)
err = ctx.MQ.Chan.QueueBind(mig.Mq_Q_Results, mig.Mq_Q_Results, mig.Mq_Ex_ToSchedulers, false, nil)
if err != nil {
panic(err)
}
@ -177,7 +176,7 @@ func startResultsListener(ctx Context) (resultsChan <-chan amqp.Delivery, err er
panic(err)
}
resultsChan, err = ctx.MQ.Chan.Consume("mig.agt.results", "", true, false, false, false, nil)
resultsChan, err = ctx.MQ.Chan.Consume(mig.Mq_Q_Results, "", true, false, false, false, nil)
if err != nil {
panic(err)
}

Просмотреть файл

@ -287,13 +287,18 @@ func initRelay(orig_ctx Context) (ctx Context, err error) {
if err != nil {
panic(err)
}
// declare the "mig" exchange used for communication with the agents
err = ctx.MQ.Chan.ExchangeDeclare("mig", "direct", true, false, false, false, nil)
// declare the "toagents" exchange used for communication from schedulers to agents
err = ctx.MQ.Chan.ExchangeDeclare(mig.Mq_Ex_ToAgents, "direct", true, false, false, false, nil)
if err != nil {
panic(err)
}
// declare the "toschedulers" exchange used for communication from agents to schedulers
err = ctx.MQ.Chan.ExchangeDeclare(mig.Mq_Ex_ToSchedulers, "direct", true, false, false, false, nil)
if err != nil {
panic(err)
}
// declare the "migevent" exchange used for communication between the platform components
err = ctx.MQ.Chan.ExchangeDeclare("migevent", "topic", true, false, false, false, nil)
err = ctx.MQ.Chan.ExchangeDeclare(mig.Mq_Ex_ToWorkers, "topic", true, false, false, false, nil)
if err != nil {
panic(err)
}

Просмотреть файл

@ -21,7 +21,7 @@ func sendEvent(key string, body []byte, ctx Context) error {
Expiration: "6000000", // events expire after 100 minutes if not consumed
Body: body,
}
err := ctx.MQ.Chan.Publish("migevent", key, false, false, msg)
err := ctx.MQ.Chan.Publish(mig.Mq_Ex_ToWorkers, key, false, false, msg)
if err != nil {
err = fmt.Errorf("event publication failed. err='%v', key='%s', body='%s'", err, key, msg)
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()

Просмотреть файл

@ -9,7 +9,6 @@ import (
"errors"
"fmt"
"mig"
"mig/event"
"os"
"time"
)
@ -186,7 +185,7 @@ func startRoutines(ctx Context) {
continue
}
// publish an event in the command results queue
err = sendEvent(event.Q_Cmd_Res, delivery.Body, ctx)
err = sendEvent(mig.Ev_Q_Cmd_Res, delivery.Body, ctx)
if err != nil {
panic(err)
}

Просмотреть файл

@ -21,8 +21,6 @@ import (
// build version
var version string
// main initializes the mongodb connection, the directory watchers and the
// AMQP ctx.MQ.Chan. It also launches the goroutines.
func main() {
cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus * 2)
@ -220,7 +218,7 @@ func sendCommands(cmds []mig.Command, ctx Context) (err error) {
}
agtQueue := fmt.Sprintf("mig.agt.%s", cmd.Agent.QueueLoc)
go func() {
err = ctx.MQ.Chan.Publish("mig", agtQueue, true, false, msg)
err = ctx.MQ.Chan.Publish(mig.Mq_Ex_ToAgents, agtQueue, true, false, msg)
if err != nil {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID, Desc: "publishing failed to queue" + agtQueue}.Err()
} else {

Просмотреть файл

@ -12,7 +12,6 @@ import (
"fmt"
"github.com/jvehent/gozdef"
"mig"
"mig/event"
"mig/workers"
"os"
"os/exec"
@ -56,7 +55,7 @@ func main() {
// bind to the MIG even queue
workerQueue := "migevent.worker." + workerName
consumerChan, err := workers.InitMqWithConsumer(conf.Mq, workerQueue, event.Q_Agt_New)
consumerChan, err := workers.InitMqWithConsumer(conf.Mq, workerQueue, mig.Ev_Q_Agt_New)
if err != nil {
panic(err)
}
@ -67,7 +66,7 @@ func main() {
panic(err)
}
mig.ProcessLog(logctx, mig.Log{Desc: "worker started, consuming queue " + workerQueue + " from key " + event.Q_Agt_New})
mig.ProcessLog(logctx, mig.Log{Desc: "worker started, consuming queue " + workerQueue + " from key " + mig.Ev_Q_Agt_New})
for event := range consumerChan {
var agt mig.Agent
err = json.Unmarshal(event.Body, &agt)

Просмотреть файл

@ -9,7 +9,7 @@ import (
"code.google.com/p/gcfg"
"flag"
"fmt"
"mig/event"
"mig"
"mig/workers"
"os"
)
@ -17,7 +17,8 @@ import (
const workerName = "agent_verif"
type Config struct {
Mq workers.MqConf
Mq workers.MqConf
Logging mig.Logging
}
func main() {
@ -29,22 +30,26 @@ func main() {
fmt.Fprintf(os.Stderr, "%s - a worker verifying agents that fail to authenticate\n", os.Args[0])
flag.PrintDefaults()
}
var configPath = flag.String("c", "/etc/mig/agent_verif_worker.cfg", "Load configuration from file")
var configPath = flag.String("c", "/etc/mig/agent-verif-worker.cfg", "Load configuration from file")
flag.Parse()
err = gcfg.ReadFileInto(&conf, *configPath)
if err != nil {
panic(err)
}
// set a binding to route events from event.Q_Agt_Auth_Fail into the queue named after the worker
// and return a channel that consumes the queue
workerQueue := "migevent.worker." + workerName
consumerChan, err := workers.InitMqWithConsumer(conf.Mq, workerQueue, event.Q_Agt_Auth_Fail)
logctx, err := mig.InitLogger(conf.Logging, workerName)
if err != nil {
panic(err)
}
fmt.Println("started worker", workerName, "consuming queue", workerQueue, "from key", event.Q_Agt_Auth_Fail)
// set a binding to route events from mig.Ev_Q_Agt_Auth_Fail into the queue named after the worker
// and return a channel that consumes the queue
workerQueue := "migevent.worker." + workerName
consumerChan, err := workers.InitMqWithConsumer(conf.Mq, workerQueue, mig.Ev_Q_Agt_Auth_Fail)
if err != nil {
panic(err)
}
fmt.Println("started worker", workerName, "consuming queue", workerQueue, "from key", mig.Ev_Q_Agt_Auth_Fail)
for event := range consumerChan {
fmt.Printf("%s\n", event.Body)
mig.ProcessLog(logctx, mig.Log{Desc: fmt.Sprintf("unverified agent '%s'", event.Body)})
}
return
}

Просмотреть файл

@ -12,7 +12,6 @@ import (
"fmt"
"github.com/jvehent/gozdef"
"mig"
"mig/event"
"mig/modules"
"mig/modules/file"
"mig/workers"
@ -60,7 +59,7 @@ func main() {
// bind to the MIG even queue
workerQueue := "migevent.worker." + workerName
consumerChan, err := workers.InitMqWithConsumer(conf.Mq, workerQueue, event.Q_Cmd_Res)
consumerChan, err := workers.InitMqWithConsumer(conf.Mq, workerQueue, mig.Ev_Q_Cmd_Res)
if err != nil {
panic(err)
}
@ -71,7 +70,7 @@ func main() {
panic(err)
}
mig.ProcessLog(logctx, mig.Log{Desc: "worker started, consuming queue " + workerQueue + " from key " + event.Q_Cmd_Res})
mig.ProcessLog(logctx, mig.Log{Desc: "worker started, consuming queue " + workerQueue + " from key " + mig.Ev_Q_Cmd_Res})
tFamRe := regexp.MustCompile("(?i)^compliance$")
for event := range consumerChan {
var cmd mig.Command

Просмотреть файл

@ -12,6 +12,7 @@ import (
"fmt"
"github.com/streadway/amqp"
"io/ioutil"
"mig"
"net"
"time"
)
@ -107,7 +108,7 @@ func InitMqWithConsumer(conf MqConf, name, key string) (consumer <-chan amqp.Del
if err != nil {
panic(err)
}
err = amqpChan.QueueBind(name, key, "migevent", false, nil)
err = amqpChan.QueueBind(name, key, mig.Mq_Ex_ToWorkers, false, nil)
if err != nil {
panic(err)
}

Просмотреть файл

@ -98,6 +98,12 @@ sudo cp bin/linux/amd64/mig-api /usr/local/bin/ || fail
sudo chown mig /usr/local/bin/mig-api || fail
sudo chmod 550 /usr/local/bin/mig-api || fail
echo -e "\n---- Building MIG Worker\n"
make worker-agent-verif || fail
sudo cp bin/linux/amd64/mig_agent_verif_worker /usr/local/bin/ || fail
sudo chown mig /usr/local/bin/mig_agent_verif_worker || fail
sudo chmod 550 /usr/local/bin/mig_agent_verif_worker || fail
echo -e "\n---- Building MIG Clients\n"
make mig-console || fail
sudo cp bin/linux/amd64/mig-console /usr/local/bin/ || fail
@ -139,21 +145,31 @@ mqpass=$(cat /dev/urandom | tr -dc _A-Z-a-z-0-9 | head -c${1:-32})
sudo rabbitmqctl delete_user admin
sudo rabbitmqctl add_user admin $mqpass || fail
sudo rabbitmqctl set_user_tags admin administrator || fail
sudo rabbitmqctl delete_user scheduler
sudo rabbitmqctl add_user scheduler $mqpass || fail
sudo rabbitmqctl delete_user agent
sudo rabbitmqctl add_user agent $mqpass || fail
sudo rabbitmqctl delete_vhost mig
sudo rabbitmqctl add_vhost mig || fail
sudo rabbitmqctl list_vhosts || fail
sudo rabbitmqctl delete_user scheduler
sudo rabbitmqctl add_user scheduler $mqpass || fail
sudo rabbitmqctl set_permissions -p mig scheduler \
'^mig(|\.(heartbeat|sched\..*))' \
'^mig.*' \
'^mig(|\.(heartbeat|sched\..*))' || fail
'^(toagents|toschedulers|toworkers|mig\.agt\..*)$' \
'^(toagents|toworkers|mig\.agt\.(heartbeats|results))$' \
'^(toagents|toschedulers|toworkers|mig\.agt\.(heartbeats|results))$' || fail
sudo rabbitmqctl delete_user agent
sudo rabbitmqctl add_user agent $mqpass || fail
sudo rabbitmqctl set_permissions -p mig agent \
"^mig\.agt\.*" \
"^mig*" \
"^mig(|\.agt\..*)" || fail
'^mig\.agt\..*$' \
'^(toschedulers|mig\.agt\..*)$' \
'^(toagents|mig\.agt\..*)$' || fail
sudo rabbitmqctl delete_user worker
sudo rabbitmqctl add_user worker $mqpass || fail
sudo rabbitmqctl set_permissions -p mig worker \
'^migevent\..*$' \
'^migevent(|\..*)$' \
'^(toworkers|migevent\..*)$'
echo -e "\n---- Creating Scheduler configuration\n"
cp conf/scheduler.cfg.inc /tmp/scheduler.cfg
@ -162,7 +178,6 @@ sed -i "s/freq = \"87s\"/freq = \"3s\"/" /tmp/scheduler.cfg || fail
sed -i "s/password = \"123456\"/password = \"$dbpass\"/" /tmp/scheduler.cfg || fail
sed -i "s/user = \"guest\"/user = \"scheduler\"/" /tmp/scheduler.cfg || fail
sed -i "s/pass = \"guest\"/pass = \"$mqpass\"/" /tmp/scheduler.cfg || fail
sed -i "s|vhost = \"/\"|vhost = \"mig\"|" /tmp/scheduler.cfg || fail
sudo mv /tmp/scheduler.cfg /etc/mig/scheduler.cfg || fail
sudo chown mig /etc/mig/scheduler.cfg || fail
sudo chmod 750 /etc/mig/scheduler.cfg || fail
@ -176,10 +191,19 @@ sudo chown mig /etc/mig/api.cfg || fail
sudo chmod 750 /etc/mig/api.cfg || fail
echo OK
echo -e "\n---- Creating Worker configuration\n"
cp conf/agent-verif-worker.cfg.inc /tmp/agent-verif-worker.cfg
sed -i "s/pass = \"secretpassphrase\"/pass = \"$mqpass\"/" /tmp/agent-verif-worker.cfg || fail
sudo mv /tmp/agent-verif-worker.cfg /etc/mig/agent-verif-worker.cfg || fail
sudo chown mig /etc/mig/agent-verif-worker.cfg || fail
sudo chmod 750 /etc/mig/agent-verif-worker.cfg || fail
echo OK
echo -e "\n---- Starting Scheduler and API in TMUX under mig user\n"
sudo su mig -c "/usr/bin/tmux new-session -s 'mig' -d"
sudo su mig -c "/usr/bin/tmux new-window -t 'mig' -n '0' '/usr/local/bin/mig-scheduler'"
sudo su mig -c "/usr/bin/tmux new-window -t 'mig' -n '0' '/usr/local/bin/mig-api'"
sudo su mig -c "/usr/bin/tmux new-window -t 'mig' -n '0' '/usr/local/bin/mig_agent_verif_worker'"
echo OK
# Unset proxy related environment variables from this point on, since we want to ensure we are