Deploy the nginx + do the namespaces separation

This commit is contained in:
Peter Jausovec 2017-01-13 16:49:16 -08:00
Родитель ba354e1556
Коммит 889f909777
6 изменённых файлов: 457 добавлений и 83 удалений

Просмотреть файл

@ -135,8 +135,8 @@ class ACSClient(object):
response = method_to_call(
url, data, headers=headers, **kwargs)
if response.status_code > 400:
raise Exception('Call to "%s" failed with: %s', url, response.text)
# if response.status_code > 400:
# raise Exception('Call to "%s" failed with: %s', url, response.text)
return response
def get_request(self, path):

Просмотреть файл

@ -9,6 +9,7 @@ import yaml
import acsclient
from kubernetes import Kubernetes
import serviceparser
from ingress_controller import IngressController
class DockerComposeParser(object):
@ -29,6 +30,7 @@ class DockerComposeParser(object):
self.acs_client = acsclient.ACSClient(self.cluster_info)
self.kubernetes = Kubernetes(self.acs_client)
self.ingress_controller = IngressController(self.kubernetes)
def __enter__(self):
"""
@ -84,6 +86,7 @@ class DockerComposeParser(object):
Parses the docker-compose file and returns the initial marathon.json file
"""
all_deployments = []
needs_ingress_controller = False
for service_name, service_info in self.compose_data['services'].items():
service_parser = serviceparser.Parser(
self.group_info, self.registry_info, service_name, service_info)
@ -91,12 +94,18 @@ class DockerComposeParser(object):
service_json = service_parser.get_service_json()
ingress_json = service_parser.get_ingress_json()
all_deployments.append({
'deployment_json': deployment_json,
'service_json': service_json,
'ingress_json': ingress_json})
# Check if need to deploy ingress controller or not
if not needs_ingress_controller:
needs_ingress_controller = service_parser.needs_ingress_controller
return all_deployments
all_deployments.append({
'service_name': service_name,
'deployment': {'json': deployment_json},
'service': {'json': service_json},
'ingress': {'json': ingress_json}
})
return needs_ingress_controller, all_deployments
def _cleanup(self):
"""
@ -107,14 +116,26 @@ class DockerComposeParser(object):
return
try:
group_id = self.group_info.get_id()
logging.info('Removing "%s".', group_id)
# self.marathon_helper.delete_group(group_id)
namespace = self.group_info.get_namespace()
logging.info('Removing all resources from namespace "%s".', namespace)
self._delete_all(namespace)
except Exception as remove_exception:
raise remove_exception
finally:
self._shutdown()
def _create_namespace(self, group_info):
"""
Creates a new namespace
"""
labels = {
"group_id": group_info.get_id(include_version=False),
"group_version": group_info.version,
"name": group_info.get_namespace()
}
logging.info('Creating namespace "%s"', group_info.get_namespace())
self.kubernetes.create_namespace(group_info.get_namespace(), labels)
def _predeployment_check(self):
"""
Checks if services can be deployed and
@ -122,39 +143,34 @@ class DockerComposeParser(object):
False if this is the first deployment
"""
group_id = self.group_info.get_id(include_version=False)
namespaces = self.kubernetes.get_namespaces(
'group_id={}'.format(group_id))
group_version = self.group_info.get_version()
namespaces = self.kubernetes.get_namespaces('group_id={}'.format(group_id))
is_update = False
existing_namespace = None
if len(namespaces) > 1:
raise Exception('Another deployment is already in progress')
if len(namespaces) == 1:
# Make sure that the version we are trying to deploy
# is different from the version that's already deployed
namespaces_with_version = self.kubernetes.get_namespaces(
'group_id={}&group_version={}'.format(group_id, group_version))
if len(namespaces_with_version) > 0:
# There is one namespace with the group_id already deployed
# we need to check the version to see if it's an update
deployed_version = namespaces[0][
'metadata']['labels']['group_version']
if deployed_version == group_version:
raise Exception('App with the same version already deployed')
else:
# This version is not deployed yet, so we are doing an update
is_update = True
print 'NAMESPACES WITH VERSION: ', namespaces_with_version
# TODO: Return a tuple here with existing group_id and existing group_version
existing_namespace = namespaces[0]['metadata']['name']
return (is_update, deployed_version, existing_namespace)
if len(namespaces) == 0:
# Create a new namespace
labels = {"group_id": group_id, "group_version": group_version}
logging.info('Creating namespace "%s"', self.group_info.name)
self.kubernetes.create_namespace(self.group_info.name, labels)
return is_update
return (is_update, None, None)
def _deploy_registry_secret(self):
"""
Deploys the registry secret
"""
namespace = self.group_info.name
namespace = self.group_info.get_namespace()
# TODO: Could registry be global; otherwise we are going to deploy
# the secret on each service deployment because of a different
@ -167,29 +183,78 @@ class DockerComposeParser(object):
else:
logging.info('Registry secret already exists')
def _delete_all(self, namespace):
"""
Deletes all resources from the specified namespace
"""
self.kubernetes.delete_ingresses(namespace)
self.kubernetes.delete_services(namespace)
self.kubernetes.delete_deployments(namespace)
self.kubernetes.delete_replicasets(namespace)
self.kubernetes.delete_namespace(namespace)
def deploy(self):
"""
Deploys the services defined in docker-compose.yml file
"""
is_update = self._predeployment_check()
new_namespace = self.group_info.get_namespace()
is_update, _, existing_namespace = self._predeployment_check()
if is_update:
raise Exception('NOT SUPPORTED YET!')
# Create a new namespace - it's either a first deployment or an upgrade
self._create_namespace(self.group_info)
self.cleanup_needed = True
self._deploy_registry_secret()
needs_ingress_controller, all_deployments = self._parse_compose()
all_deployments = self._parse_compose()
namespace = self.group_info.name
if needs_ingress_controller:
# Deploy Ingress controller if it's not running yet
self.ingress_controller.deploy(wait_for_external_ip=True)
logging.info('NGINX Ingress Loadbalancer deployed')
else:
logging.info('Skipping NGINX Ingress Loadbalancer deployment')
for deployment_item in all_deployments:
service_json = deployment_item['service_json']
if service_json:
self.kubernetes.create_service(service_json, namespace)
if is_update:
for deployment_item in all_deployments:
service_name = deployment_item['service_name']
existing_replicas = self.kubernetes.get_replicas(
existing_namespace, service_name)
logging.info('Updating replicas for "%s" to "%s"',
service_name, existing_replicas)
deployment_json = json.loads(
deployment_item['deployment']['json'])
deployment_json['spec']['replicas'] = existing_replicas
deployment_json = deployment_item['deployment_json']
self.kubernetes.create_deployment(deployment_json, namespace)
# Create the deployment
self.kubernetes.create_deployment(
json.dumps(deployment_json), new_namespace, wait_for_complete=True)
ingress_json = deployment_item['ingress_json']
if ingress_json:
self.kubernetes.create_ingress(ingress_json, namespace)
# Create the service
service_json = deployment_item['service']['json']
if service_json:
self.kubernetes.create_service(service_json, new_namespace)
# Create ingress
ingress_json = deployment_item['ingress']['json']
if ingress_json:
self.kubernetes.create_ingress(ingress_json, new_namespace)
logging.info('Removing previous deployment')
self._delete_all(existing_namespace)
else:
for deployment_item in all_deployments:
service_json = deployment_item['service']['json']
if service_json:
self.kubernetes.create_service(service_json, new_namespace)
deployment_json = deployment_item['deployment']['json']
self.kubernetes.create_deployment(
deployment_json, new_namespace)
ingress_json = deployment_item['ingress']['json']
if ingress_json:
self.kubernetes.create_ingress(ingress_json, new_namespace)
if needs_ingress_controller:
logging.info(
'ExternalIP of NGINX Ingress Loadbalancer: "%s"', self.ingress_controller.get_external_ip())

Просмотреть файл

@ -4,16 +4,16 @@
"metadata": {
"name": "nginx-ingress-controller",
"labels": {
"k8s-app": "nginx-ingress-lb"
}
"k8s-app": "nginx-ingress-controller"
},
"namespace": "kube-system"
},
"spec": {
"replicas": 1,
"template": {
"metadata": {
"labels": {
"k8s-app": "nginx-ingress-lb",
"name": "nginx-ingress-lb"
"k8s-app": "nginx-ingress-controller"
}
},
"spec": {
@ -21,7 +21,7 @@
"containers": [
{
"image": "gcr.io/google_containers/nginx-ingress-controller:0.8.3",
"name": "nginx-ingress-lb",
"name": "nginx-ingress-controller",
"imagePullPolicy": "Always",
"readinessProbe": {
"httpGet": {
@ -39,6 +39,16 @@
"initialDelaySeconds": 10,
"timeoutSeconds": 1
},
"ports": [
{
"containerPort": 80,
"hostPort": 80
},
{
"containerPort": 443,
"hostPort": 443
}
],
"env": [
{
"name": "POD_NAME",
@ -57,16 +67,6 @@
}
}
],
"ports": [
{
"containerPort": 80,
"hostPort": 80
},
{
"containerPort": 443,
"hostPort": 443
}
],
"args": [
"/nginx-ingress-controller",
"--default-backend-service=$(POD_NAMESPACE)/default-http-backend"

Просмотреть файл

@ -0,0 +1,144 @@
import os
import json
import logging
import time
class IngressController(object):
"""
This class takes care of deploying Nginx Ingress load balancer
and default backend for the load balancer
Deploying this will expose an IP on Azure load balancer.
"""
# Sadly, it can take 5 minutes or more for
# Azure to open up a port on LB
external_ip_max_wait_time = 10 * 60
DEFAULT_NAMESPACE = 'default'
DEFAULT_BACKEND_DEPLOYMENT_FILE = 'ingress/default-backend.json'
DEFAULT_BACKEND_SERVICE_FILE = 'ingress/default-backend-svc.json'
NGINX_INGRESS_DEPLOYMENT_FILE = 'ingress/nginx-ingress-lb.json'
NGINX_INGRESS_SERVICE_FILE = 'ingress/nginx-ingress-lb-svc.json'
DEFAULT_BACKEND_NAME = 'default-http-backend'
NGINX_INGRESS_LB_NAME = 'nginx-ingress-lb'
def __init__(self, kubernetes):
self.kubernetes = kubernetes
def deploy(self, wait_for_external_ip=False):
"""
Deploys the default backend and Nginx Ingress load balacer
if needed
"""
start_timestamp = time.time()
logging.info('Deploying default backend')
self._ensure_default_backend()
logging.info('Deploying Nginx Ingress Load balancer')
self._ensure_nginx_ingress_lb()
if wait_for_external_ip:
self._wait_for_external_ip(start_timestamp)
def get_external_ip(self):
"""
Gets the ExternalIP where the Nginx loadbalacer is exposed on
"""
service = self.kubernetes.get_service(
IngressController.NGINX_INGRESS_LB_NAME)
external_ip = None
try:
external_ip = service['status']['loadBalancer']['ingress'][0]['ip']
except KeyError:
logging.debug('Error getting [status][loadBalancer][ingress]')
return None
return external_ip
def _wait_for_external_ip(self, start_timestamp):
"""
Waits for the external IP to become active
"""
ip_obtained = False
timeout_exceeded = False
logging.info('Waiting for ExternalIP')
while not ip_obtained:
if self._wait_time_exceeded(self.external_ip_max_wait_time, start_timestamp):
timeout_exceeded = True
break
external_ip = self.get_external_ip()
if external_ip:
ip_obtained = True
break
time.sleep(1)
if timeout_exceeded:
raise Exception('Timeout exceeded waiting for ExternalIP')
if ip_obtained:
logging.info('ExternalIP obtained')
def _ensure_default_backend(self):
"""
Ensures default backed deployment and
service are deployed
"""
self._ensure_service(IngressController.DEFAULT_BACKEND_NAME,
IngressController.DEFAULT_NAMESPACE,
IngressController.DEFAULT_BACKEND_SERVICE_FILE)
self._ensure_deployment(IngressController.DEFAULT_BACKEND_NAME,
IngressController.DEFAULT_NAMESPACE,
IngressController.DEFAULT_BACKEND_DEPLOYMENT_FILE)
def _ensure_nginx_ingress_lb(self):
"""
Ensures NGINX ingress loadbalancer deployment and
service are deployed
"""
self._ensure_service(IngressController.NGINX_INGRESS_LB_NAME,
IngressController.DEFAULT_NAMESPACE,
IngressController.NGINX_INGRESS_SERVICE_FILE)
self._ensure_deployment(IngressController.NGINX_INGRESS_LB_NAME,
IngressController.DEFAULT_NAMESPACE,
IngressController.NGINX_INGRESS_DEPLOYMENT_FILE)
def _ensure_service(self, name, namespace, json_file):
"""
Ensures service exists and if not it deploys it
"""
if not self.kubernetes.service_exists(name, namespace):
logging.info('Deploying "%s" service', name)
service_json = self._load_json_from_file(json_file)
self.kubernetes.create_service(
json.dumps(service_json), namespace)
def _ensure_deployment(self, name, namespace, json_file):
"""
Ensures deployment exists and if not it deploys it
"""
if not self.kubernetes.deployment_exists(name, namespace):
logging.info('Creating deployment "%s"', name)
service_json = self._load_json_from_file(json_file)
self.kubernetes.create_deployment(
json.dumps(service_json), namespace, wait_for_complete=True)
def _load_json_from_file(self, file_path):
"""
Gets json contents from a file
"""
full_path = os.path.join(os.getcwd(), file_path)
with open(full_path) as json_file:
data = json.load(json_file)
return data
def _wait_time_exceeded(self, max_wait, timestamp):
"""
Checks if the wait time was exceeded.
"""
return time.time() - timestamp > max_wait

Просмотреть файл

@ -1,9 +1,12 @@
import json
import time
import logging
class Kubernetes(object):
"""
Class used for working with Kubernetes API
"""
deployment_max_wait_time = 5 * 60
def __init__(self, acs_client):
self.acs_client = acs_client
@ -58,12 +61,58 @@ class Kubernetes(object):
return False
return True
def create_deployment(self, deployment_json, namespace):
def create_deployment(self, deployment_json, namespace, wait_for_complete=False):
"""
Creates a deployment on Kubernetes
"""
start_timestamp = time.time()
url = 'namespaces/{}/deployments'.format(namespace)
return self.post_request(url, post_data=deployment_json, endpoint=self._beta_endpoint())
response = self.post_request(
url, post_data=deployment_json, endpoint=self._beta_endpoint()).json()
if wait_for_complete:
print 'RESPONSE', response
self._wait_for_deployment_complete(
start_timestamp, namespace, response['metadata']['name'])
return response
def deployment_exists(self, name, namespace):
"""
Checks if deployment exists in a namespace or not
"""
response = self.get_deployment(namespace, name)
if self._has_failed(response):
return False
if response['kind'] is 'Deployment':
return True
print 'RESPONSE: ', response['kind']
raise ValueError('Unknown response kind: "%s"', response)
def delete_deployment(self, name, namespace):
"""
Deletes a deployment
"""
url = 'namespaces/{}/deployments/{}'.format(namespace, name)
response = self.delete_request(url)
return response
def delete_deployments(self, namespace):
"""
Deletes all deployments from a namespace
"""
url = 'namespaces/{}/deployments'.format(namespace)
response = self.delete_request(url, endpoint=self._beta_endpoint()).json()
return response
def delete_replicasets(self, namespace):
"""
Deletes all ReplicaSets in a namespace
"""
logging.debug('Deleting replicasets')
url = 'namespaces/{}/replicasets'.format(namespace)
return self.delete_request(url, endpoint=self._beta_endpoint())
def create_ingress(self, ingress_json, namespace):
"""
@ -72,6 +121,14 @@ class Kubernetes(object):
url = 'namespaces/{}/ingresses'.format(namespace)
return self.post_request(url, post_data=ingress_json, endpoint=self._beta_endpoint())
def delete_ingresses(self, namespace):
"""
Deletes all ingresses from a namespace
"""
logging.debug('Deleting ingresses')
url = 'namespaces/{}/ingresses'.format(namespace)
return self.delete_request(url, endpoint=self._beta_endpoint())
def create_service(self, service_json, namespace):
"""
Creates a service on Kubernetes
@ -79,6 +136,45 @@ class Kubernetes(object):
url = 'namespaces/{}/services'.format(namespace)
return self.post_request(url, post_data=service_json)
def get_service(self, name, namespace):
"""
Gets the service
"""
url = 'namespaces/{}/services/{}'.format(namespace, name)
return self.get_request(url).json()
def delete_service(self, name, namespace):
"""
Deletes a service in specified namespace
"""
url = 'namespaces/{}/services/{}'.format(namespace, name)
return self.delete_request(url)
def service_exists(self, name, namespace):
"""
Checks if service exists in the namespace
"""
url = 'namespaces/{}/services/{}'.format(namespace, name)
response = self.get_request(url).json()
if self._has_failed(response):
return False
if response['kind'] == 'Service':
return True
def delete_services(self, namespace):
"""
Deletes all service in specified namespace
"""
url = 'namespaces/{}/services'.format(namespace)
response = self.get_request(url).json()
all_services = response['items']
for service in all_services:
service_name = service['metadata']['name']
logging.debug('Deleting service "%s"', service_name)
self.delete_service(service_name, namespace)
def namespace_exists(self, label_selector):
"""
Checks if a namespace defined by the label_selector exists or not
@ -97,6 +193,14 @@ class Kubernetes(object):
'namespaces?labelSelector={}'.format(label_selector)).json()
return response['items']
def delete_namespace(self, name):
"""
Deletes a namespace
"""
response = self.delete_request(
'namespaces?labelSelector=name={}'.format(name)).json()
return response
def create_namespace(self, name, labels):
"""
Creates a new namespace
@ -111,3 +215,66 @@ class Kubernetes(object):
}
return self.post_request('namespaces', post_data=json.dumps(namespace_json)).json()
def get_deployment(self, namespace, deployment_name):
"""
Gets a specific deployment in a namespace
"""
response = self.get_request(
'namespaces/{}/deployments/{}'.format(
namespace, deployment_name), self._beta_endpoint())
return response.json()
def get_replicas(self, namespace, deployment_name):
"""
Gets the number of replicas for a deployment
"""
deployment = self.get_deployment(namespace, deployment_name)
return deployment['spec']['replicas']
def _wait_for_deployment_complete(self, start_timestamp, namespace, deployment_name):
deployment = ''
deployment_completed = False
timeout_exceeded = False
logging.info('Waiting for deployment "%s.%s" to complete', namespace, deployment_name)
while not deployment_completed:
if self._wait_time_exceeded(self.deployment_max_wait_time, start_timestamp):
timeout_exceeded = True
break
deployment = self.get_deployment(namespace, deployment_name)
status = deployment['status']
logging.info('Checking if deployment is completed ...')
if not status or 'observedGeneration' not in status or 'updatedReplicas' not in status:
time.sleep(1)
continue
if (status['observedGeneration'] >= deployment['metadata']['generation']) and\
(status['updatedReplicas'] == deployment['spec']['replicas']):
deployment_completed = True
break
time.sleep(1)
if timeout_exceeded:
raise Exception(
'Timeout exceeded waiting for deployment to complete')
if deployment_completed:
logging.info('Deployment completed')
def _wait_time_exceeded(self, max_wait, timestamp):
"""
Checks if the wait time was exceeded.
"""
return time.time() - timestamp > max_wait
def _has_failed(self, response):
"""
Checks if the response failed (404) or not
"""
if response['kind'] == 'Status':
if response['code'] == 404:
return True
return False

Просмотреть файл

@ -17,6 +17,7 @@ class Parser(object):
self.service_json = self._get_empty_service_json()
self.ingress_rules = []
self.service_added = False
self.needs_ingress_controller = False
self.port_parser = PortParser(self.service_info)
@ -47,8 +48,6 @@ class Parser(object):
"""
Adds a container port
"""
# TODO: Do we always grab the first container? Or do we need
# to pass in the name of the container to find the right one
if not 'ports' in self.deployment_json['spec']['template']['spec']['containers'][0]:
self.deployment_json['spec']['template'][
'spec']['containers'][0]['ports'] = []
@ -68,42 +67,38 @@ class Parser(object):
"""
Gets the ingress JSON for the service
"""
# TODO: should we have 1 ingress resource per docker-compose or one
# per service?
if self.ingress_rules:
return json.dumps({
"apiVersion": "extensions/v1beta1",
"kind": "Ingress",
"metadata": {
"name": "{}-ingress".format(self.group_info.get_namespace())
},
"labels":{
"group_name": self.group_info.name,
"group_qualifier": self.group_info.qualifier,
"group_id": self.group_info.get_id()
"name": self.service_name
},
"labels": self._get_default_labels(),
"spec": {
"rules": self.ingress_rules
}
})
return None
def _get_default_labels(self):
"""
Gets the default labels that should be on every resource
"""
return {
"service_name": self.service_name,
"version": self.group_info.version,
"group_id": self.group_info.get_id()
}
# TODO: This should return an object with everything that needs to be deployed
# e.g. deployment.json, service.json, ???
def get_deployment_json(self):
"""
Gets the deployment JSON for the service in docker-compose
"""
self._add_label('group_name', self.group_info.name)
self._add_label('group_qualifier', self.group_info.qualifier)
self._add_label('group_version', self.group_info.version)
self._add_label('group_id', self.group_info.get_id())
self._add_label('service_name', self.service_name)
self._add_image_pull_secret(self.registry_info.host)
# self.['id'] = '{}/{}'.format(self.group_name, self.service_name)
for key in self.service_info:
# We look for the method named _parse_{key} (for example: _parse_ports)
# and execute it to parse that key.
@ -121,6 +116,7 @@ class Parser(object):
host_name = vhost
service_port = vhosts[vhost]
self._add_ingress_rule(host_name, service_port, self.service_name)
self.needs_ingress_controller = True
return json.dumps(self.deployment_json)
@ -268,6 +264,12 @@ class Parser(object):
# label without a value
self._add_label(label, '')
def _get_versioned_name(self):
"""
Gets the versioned name for the service
"""
return "{}-{}".format(self.service_name, self.group_info.version)
def _get_empty_deployment_json(self):
deployment_json = {
"apiVersion": "extensions/v1beta1",
@ -279,8 +281,7 @@ class Parser(object):
"replicas": 1,
"template": {
"metadata": {
"labels": {
}
"labels": self._get_default_labels()
},
"spec": {
"containers": [{
@ -305,11 +306,8 @@ class Parser(object):
"name": self.service_name
},
"spec": {
"selector": {
"group_id": self.group_info.get_id(),
"service_name": self.service_name
},
"ports": [],
"selector": self._get_default_labels(),
"ports": []
}
}