Merge changes I919927bd,I1f233298

* changes:
  [hm] [cc] [dea] cleanup after crashed droplet if it is flapping.
  [cc] [bulk] [fix] ensure valid json is returned by bulk api
This commit is contained in:
Bob Nugmanov 2012-04-19 23:54:34 +00:00 коммит произвёл Gerrit Code Review
Родитель 031019654f e22a72545f
Коммит ef10242fcd
6 изменённых файлов: 55 добавлений и 24 удалений

Просмотреть файл

@ -1,14 +1,12 @@
class BulkController < ApplicationController
skip_before_filter :fetch_user_from_token
before_filter :authenticate_bulk_api
#the password is randomly generated at startup and is
#discoverable through NATS.request('cloudcontroller.bulk.credentials')
# the password is randomly generated at startup and is
# discoverable through NATS.request('cloudcontroller.bulk.credentials')
DEFAULT_BATCH_SIZE = 200
def users
render_results_and_token_for_model(User)
end
@ -18,7 +16,6 @@ class BulkController < ApplicationController
end
private
def authenticate_bulk_api
authenticate_or_request_with_http_basic do |user, pass|
if user==AppConfig[:bulk_api][:auth][:user] &&
@ -29,21 +26,31 @@ class BulkController < ApplicationController
false
end
end
end
def render_results_and_token_for_model(model)
results = retrieve_results(model)
update_token(results)
render :json => { :results => hash_by_id(results), :bulk_token => bulk_token }
render :json => { :results => hash_by_id(model, results), :bulk_token => bulk_token }
end
def retrieve_results(model)
model.where(where_clause).limit(batch_size).to_a
CloudController.logger.debug("Params: #{params}")
CloudController.logger.debug("Retrieving bulk results for bulk_token: #{bulk_token}")
CloudController.logger.debug("WHERE-clause: #{where_clause}")
model.where(where_clause).order('id').limit(batch_size).to_a
end
def hash_by_id arr
arr.inject({}) { |hash, elem| hash[elem.id] = elem; hash }
def hash_by_id(model, arr)
arr.inject({}) { |hash, elem| hash[elem.id] = hashify(model,elem); hash }
end
def hashify(model, record)
model.column_names.inject(Hash.new) { |hash, col|
hash[col] = record.send(col)
hash
}
end
def update_token(results)
@ -55,18 +62,21 @@ class BulkController < ApplicationController
end
def sanitize_atom(atom)
unless atom =~ /^\w+$/
atom = atom.to_s
unless atom =~ /^[a-zA-Z0-9_]+$/
CloudController.logger.error("invalid atom #{atom} in bulk_token #{bulk_token}")
raise CloudError.new(CloudError::BAD_REQUEST, "bad atom #{atom} in bulk_api token")
end
atom
end
def batch_size
@batch_size||=params['batch_size'] ? json_param('batch_size').to_i : DEFAULT_BATCH_SIZE
end
def bulk_token
@bulk_token||=params['bulk_token'] ? params['bulk_token'] : {}
@bulk_token = Yajl::Parser.parse(@bulk_token) if @bulk_token.kind_of? String
@bulk_token
end
end

Просмотреть файл

@ -174,6 +174,8 @@ class AppManager
end
CloudController.logger.debug("[HealthManager] Starting #{indices.length} missing instances for app: #{app.id}")
# FIXME - Check capacity
message[:flapping] = true if payload[:flapping]
indices.each { |i| start_instance(message, i) }
when /STOP/i
# If HM detects older versions, let's clean up here versus suppressing

Просмотреть файл

@ -114,6 +114,19 @@ describe "bulk_api" do
results.should_not be_nil
end
it "returns results that are valid json" do
get_apps
results.each { |res|
res.should be_a_kind_of Array
res.size.should == 2
res.last.should be_a_kind_of Hash
res.last["id"].should_not be_nil
res.last["framework"].should == 'sinatra'
}
end
it "respects the batch_size parameter" do
[3,5].each { |size|
get_apps :batch_size=>size

Просмотреть файл

@ -547,6 +547,7 @@ module DEA
framework = message_json['framework']
debug = message_json['debug']
console = message_json['console']
flapping = message_json['flapping']
# Limits processing
mem = DEFAULT_APP_MEM
@ -601,6 +602,7 @@ module DEA
:start => Time.now,
:state_timestamp => Time.now.to_i,
:log_id => "(name=%s app_id=%s instance=%s index=%s)" % [name, droplet_id, instance_id, instance_index],
:flapping => flapping ? true : false
}
instances = @droplets[droplet_id] || {}
@ -1295,15 +1297,16 @@ module DEA
# Drop usage and resource tracking regardless of state
remove_instance_resources(instance)
@usage.delete(instance[:pid]) if instance[:pid]
# clean up the in memory instance and directory only if the instance didn't crash
if instance[:state] != :CRASHED
# clean up the in memory instance and directory only if
# the instance didn't crash or when it was marked as flapping
if instance[:state] != :CRASHED || instance[:flapping]
if droplet = @droplets[instance[:droplet_id]]
droplet.delete(instance[:instance_id])
@droplets.delete(instance[:droplet_id]) if droplet.empty?
schedule_snapshot
end
unless @disable_dir_cleanup
@logger.debug("#{instance[:name]}: Cleaning up dir #{instance[:dir]}")
@logger.debug("#{instance[:name]}: Cleaning up dir #{instance[:dir]}#{instance[:flapping]?' (flapping)':''}")
EM.system("rm -rf #{instance[:dir]}")
end
# Rechown crashed application directory using uid and gid of DEA

Просмотреть файл

@ -318,7 +318,7 @@ class HealthManager
end
if index_entry[:state] == FLAPPING && !restart_pending?(app_id, index) && now - index_entry[:last_action] > @restart_timeout
delay_or_giveup_restart_of_flapping_instance(app_id, index, index_entry)
delay_or_giveup_restart_of_flapping_instance(app_id, index, index_entry, true)
end
if index_entry[:state] == DOWN && now - index_entry[:last_action] > @restart_timeout
@ -527,12 +527,13 @@ class HealthManager
droplet_entry # return the droplet that we changed. This allows the spec tests to ensure the behaviour is correct.
end
def delay_or_giveup_restart_of_flapping_instance(droplet_id, index, index_entry)
def delay_or_giveup_restart_of_flapping_instance(droplet_id, index, index_entry, giveup_quietly = false)
index_entry[:last_action] = now #regardless of whether real action is omitted or delayed, a decision timestamp is needed
if @giveup_crash_number > 0 && index_entry[:crashes] > @giveup_crash_number
@logger.info("giving up on flapping instance (app_id=#{droplet_id}, index=#{index}). Number of crashes: #{index_entry[:crashes]}.")
@logger.info("given up on flapping instance (app_id=#{droplet_id}, index=#{index}). " +
"Number of crashes: #{index_entry[:crashes]}.") unless giveup_quietly
else
@pending_restart[droplet_id] ||= {}
@pending_restart[droplet_id][index] = true
@ -542,7 +543,7 @@ class HealthManager
@logger.info("delayed-restarting flapping instance (app_id=#{droplet_id}, index=#{index}). Delay: #{restart_delay}. Number of crashes: #{index_entry[:crashes]}.")
EM.add_timer(restart_delay) do
index_entry[:last_action] = now
start_instances(droplet_id, [index], false)
start_instances(droplet_id, [index], false, true)
end
end
end
@ -744,7 +745,7 @@ class HealthManager
entry_updated
end
def start_instances(droplet_id, indices, high_priority = false)
def start_instances(droplet_id, indices, high_priority = false, flapping = false)
droplet_entry = @droplets[droplet_id]
if droplet_entry.nil?
@ -760,6 +761,8 @@ class HealthManager
:indices => indices
}
start_message[:flapping] = true if flapping
if queue_requests?
queue_request(start_message, high_priority)
else

Просмотреть файл

@ -134,14 +134,14 @@ describe HealthManager do
}
end
def make_restart_message
{
def make_restart_message(options = {})
m = {
'droplet' => @app.id,
'op' => 'START',
'last_updated' => @app.last_updated.to_i,
'version' => "#{@app.staged_package_hash}-#{@app.run_count}",
'indices' => [0]
}
}.merge(options)
end
def get_live_index(droplet_entry,index)
@ -239,7 +239,7 @@ describe HealthManager do
def ensure_flapping_delayed_restart(delay)
in_em_with_fiber do |f|
should_publish_to_nats "cloudcontrollers.hm.requests", make_restart_message
should_publish_to_nats "cloudcontrollers.hm.requests", make_restart_message('flapping' => true)
@hm.process_heartbeat_message(make_heartbeat_message([0], "RUNNING").to_json)
droplet_entry = @hm.process_exited_message(make_crashed_message.to_json)