diff --git a/convoy/batch.py b/convoy/batch.py index 22b43e6..5cca802 100644 --- a/convoy/batch.py +++ b/convoy/batch.py @@ -50,6 +50,7 @@ from . import keyvault from . import settings from . import storage from . import util +from .version import __version__ # create logger logger = logging.getLogger(__name__) @@ -645,6 +646,17 @@ def resize_pool(batch_client, config, wait=False): """ pool = settings.pool_settings(config) _pool = batch_client.pool.get(pool.id) + # check pool metadata version + if util.is_none_or_empty(_pool.metadata): + logger.warning('pool version metadata not present') + else: + for md in _pool.metadata: + if (md.name == settings.get_metadata_version_name() and + md.value != __version__): + logger.warning( + 'pool version metadata mismatch: pool={} cli={}'.format( + md.value, __version__)) + break logger.info( ('Resizing pool {} to {} compute nodes [current_dedicated_nodes={} ' 'current_low_priority_nodes={}]').format( @@ -2209,6 +2221,12 @@ def add_jobs( uses_task_dependencies=uses_task_dependencies, job_preparation_task=jptask, job_release_task=jrtask, + metadata=[ + batchmodels.MetadataItem( + name=settings.get_metadata_version_name(), + value=__version__, + ), + ], ) lastjob = job.id logger.info('Adding job {} to pool {}'.format(job.id, pool.id)) @@ -2235,6 +2253,20 @@ def add_jobs( # job release requirement if multi_instance and auto_complete: raise + else: + # retrieve job and check for version consistency + _job = batch_client.job.get(job.id) + if util.is_none_or_empty(_job.metadata): + logger.warning('job version metadata not present') + else: + for md in _job.metadata: + if (md.name == settings.get_metadata_version_name() + and md.value != __version__): + logger.warning( + ('job version metadata mismatch: ' + 'job={} cli={}').format( + md.value, __version__)) + break else: raise del multi_instance diff --git a/convoy/fleet.py b/convoy/fleet.py index e132e78..867e9a2 100644 --- a/convoy/fleet.py +++ b/convoy/fleet.py @@ -969,6 +969,12 @@ def _add_pool( ], resource_files=[], ), + metadata=[ + batchmodels.MetadataItem( + name=settings.get_metadata_version_name(), + value=__version__, + ), + ], ) if util.is_not_empty(block_for_gr): pool.start_task.environment_settings.append( @@ -1308,6 +1314,17 @@ def _update_docker_images( if not force_ssh and pool.current_low_priority_nodes > 0: logger.debug('forcing update via SSH due to low priority nodes') force_ssh = True + # check pool metadata version + if util.is_none_or_empty(pool.metadata): + logger.warning('pool version metadata not present') + else: + for md in pool.metadata: + if (md.name == settings.get_metadata_version_name() and + md.value != __version__): + logger.warning( + 'pool version metadata mismatch: pool={} cli={}'.format( + md.value, __version__)) + break # create coordination command line # 1. log in again in case of cred expiry # 2. pull images with respect to registry diff --git a/convoy/settings.py b/convoy/settings.py index 60cc9b3..8fdd9a5 100644 --- a/convoy/settings.py +++ b/convoy/settings.py @@ -40,6 +40,7 @@ except ImportError: from . import util # global defines +_METADATA_VERSION_NAME = 'batch_shipyard_version' _GLUSTER_DEFAULT_VOLNAME = 'gv0' _GLUSTER_ON_COMPUTE_VOLUME = '.gluster/{}'.format(_GLUSTER_DEFAULT_VOLNAME) _TENSORBOARD_DOCKER_IMAGE = ( @@ -302,6 +303,15 @@ def _kv_read(conf, key, default=None): return ret +def get_metadata_version_name(): + # type: (None) -> str + """Get metadata version name + :rtype: str + :return: metadata version name + """ + return _METADATA_VERSION_NAME + + def get_tensorboard_docker_image(): # type: (None) -> Tuple[str, str] """Get tensorboard docker image