use a single index in elasticsearch for both document types.

WARNING: Must rebuild search index!

Starting with this change, the name of the search index used by the
service has changed.  The new search index must be created before
deploying this version of the application.  There are two ways of doing
this:

* Offline (recommended)
Follow [these instructions](https://github.com/edx/cs_comments_service/wiki/Search-Indexes#offline-procedure) to perform an offline rebuild.

* Online
1. Deploy this code to a host which is not serving HTTP requests, and
run `rake search:rebuild`
2. When the rebuild finishes, deploy the updated code on live servers.
3. run `rake search:catchup[MINUTES]` where minutes is the amount of
time it took to run rebuild in step 1.
4. run `rake search:prune`
This commit is contained in:
jsa 2014-05-22 09:44:30 -04:00
Родитель d5523cc70e
Коммит ead2f5e399
7 изменённых файлов: 146 добавлений и 104 удалений

160
Rakefile
Просмотреть файл

@ -161,7 +161,6 @@ namespace :db do
"votes" => {"count" => 0, "down" => [], "down_count" => 0, "point" => 0, "up" => [], "up_count" => []}}
coll.insert(doc)
end
binding.pry
Tire.index('comment_threads').delete
CommentThread.create_elasticsearch_index
Tire.index('comment_threads') { import CommentThread.all }
@ -234,22 +233,33 @@ end
namespace :search do
def get_es_index
# we are using the same index for two types, which is against the
# grain of Tire's design. This is why this method works for both
# comment_threads and comments.
CommentThread.tire.index
end
def get_number_of_primary_shards(index_name)
res = Tire::Configuration.client.get "#{Tire::Configuration.url}/#{index_name}/_status"
status = JSON.parse res.body
status["indices"].first[1]["shards"].size
end
def create_index_for_class(klass)
def create_es_index
# create the new index with a unique name
new_index = Tire.index klass.tire.index.name << '_' << Time.now.strftime('%Y%m%d%H%M%S')
new_index = Tire.index "#{Content::ES_INDEX_NAME}_#{Time.now.strftime('%Y%m%d%H%M%S')}"
new_index.create
LOG.info "configuring new index: #{new_index.name}"
# apply the proper mapping and settings to the new index
new_index.create :mappings => klass.tire.mapping_to_hash, :settings => klass.tire.settings
[CommentThread, Comment].each do |klass|
LOG.info "applying index mappings for #{klass.name}"
klass.put_search_index_mapping new_index
end
new_index
end
def import_from_cursor(cursor, index, opts)
Mongoid.identity_map_enabled = true
tot = cursor.count
cnt = 0
t = Time.now
@ -260,6 +270,7 @@ namespace :search do
LOG.info "#{index.name}: imported #{cnt} of #{tot} (#{pct_complete}% complete after #{elapsed_secs} seconds)"
end
cnt += documents.length
Mongoid::IdentityMap.clear
sleep opts[:sleep_time]
documents
end
@ -292,15 +303,12 @@ namespace :search do
true
end
def do_reindex (classname, opts, in_place=false)
def do_reindex (opts, in_place=false)
# get a reference to the model class (and make sure it's a model class with tire hooks)
klass = CommentService.const_get(classname)
raise RuntimeError unless klass.instance_of? Class
raise RuntimeError unless klass.respond_to? "tire"
start_time = Time.now
# create the new index with a unique name
new_index = create_index_for_class(klass)
new_index = create_es_index
# unless the user is forcing a rebuild, or the index does not yet exist, we
# can do a Tire api reindex which is much faster than reimporting documents
# from mongo.
@ -309,7 +317,7 @@ namespace :search do
# for the model class when the app loaded if one did not already exist. However,
# it won't create an alias, which is what our app uses. So if the index exists
# but not the alias, we know that it's auto-created.
old_index = klass.tire.index
old_index = get_es_index
alias_name = old_index.name
alias_ = Tire::Alias.find alias_name
if alias_.nil? then
@ -317,61 +325,54 @@ namespace :search do
# the alias doesn't exist, so we know the index was auto-created.
# We will delete it and replace it with an alias.
raise RuntimeError, 'Cannot reindex in-place, no valid source index' if in_place
LOG.warn "deleting auto-created index to make room for the alias"
old_index.delete
# NOTE on the small chance that another process re-auto-creates the index
# we just deleted before we have a chance to create the alias, this next
# call will fail.
move_alias_to(alias_name, new_index)
move_alias_to(Content::ES_INDEX_NAME, new_index)
end
op = in_place ? "reindex" : "(re)build index for"
LOG.info "preparing to #{op} CommentService::#{classname}"
op = in_place ? "reindex" : "(re)build index"
LOG.info "preparing to #{op}"
# ensure there's no identity mapping or caching going on while we do this
Mongoid.identity_map_enabled = false
Mongoid.unit_of_work(disable: :all) do
if in_place then
# reindex, moving source documents directly from old index to new
LOG.info "copying documents from original index (this may take a while!)"
old_index.reindex new_index.name
LOG.info "done copying!"
else
# fetch all the documents ever, up til start_time
cursor = klass.where(:updated_at.lte => start_time)
# import them to the new index
import_from_cursor(cursor, new_index, opts)
end
# move the alias if necessary
did_alias_move = move_alias_to(klass.tire.index.name, new_index)
if did_alias_move then
# Reimport any source documents that got updated since start_time,
# while the alias still pointed to the old index.
# Elasticsearch understands our document ids, so re-indexing the same
# document won't create duplicates.
LOG.info "importing any documents that changed between #{start_time} and now"
cursor = klass.where(:created_at.gte => start_time).union.where(:updated_at.gte => start_time)
import_from_cursor(cursor, new_index, opts)
end
if in_place then
# reindex, moving source documents directly from old index to new
LOG.info "copying documents from original index (this may take a while!)"
old_index.reindex new_index.name
LOG.info "done copying!"
else
# fetch all the documents ever, up til start_time
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.lte => start_time)
# import them to the new index
import_from_cursor(cursor, new_index, opts)
end
# move the alias if necessary
did_alias_move = move_alias_to(Content::ES_INDEX_NAME, new_index)
if did_alias_move then
# Reimport any source documents that got updated since start_time,
# while the alias still pointed to the old index.
# Elasticsearch understands our document ids, so re-indexing the same
# document won't create duplicates.
LOG.info "importing any documents that changed between #{start_time} and now"
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.gte => start_time)
import_from_cursor(cursor, new_index, opts)
end
end
desc "Copies contents of MongoDB into Elasticsearch if updated in the last N minutes."
task :catchup, [:minutes, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each do |klass|
the_index = klass.tire.index
alias_ = Tire::Alias.find the_index.name
# this check makes sure we are working with the index to which
# the desired model's alias presently points.
raise RuntimeError, "could not find live index for #{klass.name}" if alias_.nil?
start_time = Time.now - (args[:minutes].to_i * 60)
cursor = klass.where(:updated_at.gte => start_time)
import_from_cursor(cursor, the_index, opts)
end
the_index = get_es_index
alias_ = Tire::Alias.find the_index.name
# this check makes sure we are working with the index to which
# the desired model's alias presently points.
raise RuntimeError, "could not find live index" if alias_.nil?
start_time = Time.now - (args[:minutes].to_i * 60)
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.gte => start_time)
import_from_cursor(cursor, the_index, opts)
end
def batch_opts(args)
@ -383,59 +384,46 @@ namespace :search do
desc "Removes any data from Elasticsearch that no longer exists in MongoDB."
task :prune, [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
the_index = get_es_index
puts "pruning #{the_index.name}"
alias_ = Tire::Alias.find the_index.name
raise RuntimeError, "could not find live index" if alias_.nil?
scan_size = opts[:batch_size] / get_number_of_primary_shards(the_index.name)
cnt = 0
[CommentThread, Comment].each do |klass|
cnt = 0
the_index = klass.tire.index
puts "pruning #{the_index.name}"
alias_ = Tire::Alias.find the_index.name
doc_type = klass.document_type
# this check makes sure we are working with the index to which
# the desired model's alias presently points.
raise RuntimeError, "could not find live index for #{klass.name}" if alias_.nil?
scan_size = opts[:batch_size] / get_number_of_primary_shards(the_index.name)
search = Tire::Search::Scan.new the_index.name, size: scan_size
search = Tire::Search::Scan.new the_index.name, {size: scan_size, type: doc_type}
search.each do |results|
es_ids = results.map(&:id)
mongo_ids = klass.where(:id.in => es_ids).map {|d| d.id.to_s}
to_delete = es_ids - mongo_ids
if to_delete.size > 0
cnt += to_delete.size
puts "deleting #{to_delete.size} orphaned documents from elasticsearch"
the_index.bulk_delete (to_delete).map {|v| {"type" => klass.document_type, "id" => v}}
puts "deleting #{to_delete.size} orphaned #{doc_type} documents from elasticsearch"
the_index.bulk_delete (to_delete).map {|v| {"type" => doc_type, "id" => v}}
end
puts "#{the_index.name}: processed #{search.seen} of #{search.total}"
puts "#{the_index.name}/#{doc_type}: processed #{search.seen} of #{search.total}"
sleep opts[:sleep_time]
end
puts "done pruning #{the_index.name}, deleted a total of #{cnt} orphaned documents"
end
puts "done pruning #{the_index.name}, deleted a total of #{cnt} orphaned documents"
end
desc "Generate a new physical index, copy data from MongoDB, and bring online."
task :rebuild, [:classname, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
do_reindex(args[:classname], opts)
desc "Rebuild the content index from MongoDB data."
task :rebuild, [:batch_size, :sleep_time] => :environment do |t, args|
do_reindex(batch_opts(args))
end
desc "Perform a rebuild on both CommentThread and Comment, using the same options."
task :rebuild_all, [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each { |klass| do_reindex(klass.name, opts) }
desc "Rebuild the content index from already-indexed data (in place)."
task :reindex, [:batch_size, :sleep_time] => :environment do |t, args|
do_reindex(batch_opts(args), true)
end
desc "Generate a new physical index, copy data from the existing index, and bring online."
task :reindex, [:classname, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
do_reindex(args[:classname], opts, true)
end
desc "Perform a reindex on both CommentThread and Comment, using the same options."
task :reindex_all , [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each { |klass| do_reindex(klass.name, opts, true) }
end
desc "Generate new, empty physical indexes, without bringing them online."
task :create_indexes => :environment do
[CommentThread, Comment].each { |klass| create_index_for_class(klass) }
desc "Generate a new, empty physical index, without bringing it online."
task :create_index => :environment do
create_es_index
end
end

5
app.rb
Просмотреть файл

@ -53,6 +53,7 @@ CommentService.config = YAML.load(application_yaml).with_indifferent_access
Tire.configure do
url CommentService.config[:elasticsearch_server]
logger STDERR if ENV["ENABLE_ELASTICSEARCH_DEBUGGING"]
end
Mongoid.load!("config/mongoid.yml", environment)
@ -75,6 +76,10 @@ Dir[File.dirname(__FILE__) + '/lib/**/*.rb'].each {|file| require file}
Dir[File.dirname(__FILE__) + '/models/*.rb'].each {|file| require file}
Dir[File.dirname(__FILE__) + '/presenters/*.rb'].each {|file| require file}
# Ensure elasticsearch index mappings exist.
Comment.put_search_index_mapping
CommentThread.put_search_index_mapping
# Comment out observers until notifications are actually set up properly.
#Dir[File.dirname(__FILE__) + '/models/observers/*.rb'].each {|file| require file}
#Mongoid.observers = PostReplyObserver, PostTopicObserver, AtUserObserver

Просмотреть файл

@ -30,9 +30,14 @@ class Comment < Content
include Tire::Model::Search
include Tire::Model::Callbacks
index_name Content::ES_INDEX_NAME
mapping do
indexes :body, type: :string, analyzer: :english, stored: true, term_vector: :with_positions_offsets
indexes :course_id, type: :string, index: :not_analyzed, included_in_all: false
indexes :comment_thread_id, type: :string, index: :not_analyzed, included_in_all: false, as: 'comment_thread_id'
indexes :commentable_id, type: :string, index: :not_analyzed, included_in_all: false, as: 'commentable_id'
indexes :group_id, type: :string, index: :not_analyzed, included_in_all: false, as: 'group_id'
indexes :created_at, type: :date, included_in_all: false
indexes :updated_at, type: :date, included_in_all: false
end
@ -111,6 +116,19 @@ class Comment < Content
t.commentable_id
end
end
rescue Mongoid::Errors::DocumentNotFound
nil
end
def group_id
if self.comment_thread_id
t = CommentThread.find self.comment_thread_id
if t
t.group_id
end
end
rescue Mongoid::Errors::DocumentNotFound
nil
end
def self.by_date_range_and_thread_ids from_when, to_when, thread_ids

Просмотреть файл

@ -26,6 +26,8 @@ class CommentThread < Content
include Tire::Model::Search
include Tire::Model::Callbacks
index_name Content::ES_INDEX_NAME
mapping do
indexes :title, type: :string, analyzer: :english, boost: 5.0, stored: true, term_vector: :with_positions_offsets
indexes :body, type: :string, analyzer: :english, stored: true, term_vector: :with_positions_offsets
@ -97,10 +99,11 @@ class CommentThread < Content
#so first, find the comment threads associated with comments that hit the query
search = Tire::Search::Search.new 'comment_threads'
search = Tire::Search::Search.new Content::ES_INDEX_NAME
search.query {|query| query.match [:title, :body], params["text"]} if params["text"]
search.highlight({title: { number_of_fragments: 0 } } , {body: { number_of_fragments: 0 } }, options: { tag: "<highlight>" })
search.filter(:type, value: 'comment_thread')
search.filter(:term, commentable_id: params["commentable_id"]) if params["commentable_id"]
search.filter(:terms, commentable_id: params["commentable_ids"]) if params["commentable_ids"]
search.filter(:term, course_id: params["course_id"]) if params["course_id"]
@ -117,8 +120,9 @@ class CommentThread < Content
#again, b/c there is no relationship in ordinality, we cannot paginate if it's a text query
results = search.results
search = Tire::Search::Search.new 'comments'
search = Tire::Search::Search.new Content::ES_INDEX_NAME
search.query {|query| query.match :body, params["text"]} if params["text"]
search.filter(:type, value: 'comment')
search.filter(:term, course_id: params["course_id"]) if params["course_id"]
search.size CommentService.config["max_deep_search_comment_count"].to_i
@ -151,7 +155,8 @@ class CommentThread < Content
end
#now run one more search to harvest the threads and filter by group
search = Tire::Search::Search.new 'comment_threads'
search = Tire::Search::Search.new Content::ES_INDEX_NAME
search.filter(:type, value: 'comment_thread')
search.filter(:terms, :thread_id => thread_ids)
search.filter(:terms, commentable_id: params["commentable_ids"]) if params["commentable_ids"]
search.filter(:term, course_id: params["course_id"]) if params["course_id"]

Просмотреть файл

@ -16,6 +16,16 @@ class Content
index({comment_thread_id: 1, endorsed: 1}, {sparse: true})
index({commentable_id: 1}, {sparse: true, background: true})
ES_INDEX_NAME = 'content'
def self.put_search_index_mapping(idx=nil)
idx ||= self.tire.index
success = idx.mapping(self.tire.document_type, {:properties => self.tire.mapping})
unless success
logger.warn "WARNING! could not apply search index mapping for #{self.name}"
end
end
before_save :set_username
def set_username
# avoid having to look this attribute up later, since it does not change
@ -29,7 +39,7 @@ class Content
(anonymous || anonymous_to_peers) ? attr_when_anonymous : author.send(attr)
end
end
def self.flagged
#return an array of flagged content
holder = []

Просмотреть файл

@ -9,8 +9,8 @@ describe "app" do
let(:author) { create_test_user(42) }
describe "GET /api/v1/search/threads" do
it "returns the correct values for total_results and num_pages", :focus => true do
course_id = "test_course_id"
it "returns the correct values for total_results and num_pages" do
course_id = "test/course/id"
for i in 1..100 do
text = "all"
text += " half" if i % 2 == 0
@ -24,8 +24,7 @@ describe "app" do
end
# Elasticsearch does not necessarily make newly indexed content
# available immediately, so we must explicitly refresh the index
CommentThread.tire.index.refresh
Comment.tire.index.refresh
refresh_es_index
test_text = lambda do |text, expected_total_results, expected_num_pages|
get "/api/v1/search/threads", course_id: course_id, text: text, per_page: "10"
@ -46,12 +45,12 @@ describe "app" do
# Elasticsearch may not be able to handle searching for non-ASCII text,
# so prepend the text with an ASCII term we can search for.
search_term = "artichoke"
course_id = "unicode_course"
course_id = "unicode/course"
thread = make_thread(author, "#{search_term} #{text}", course_id, "unicode_commentable")
make_comment(author, thread, text)
# Elasticsearch does not necessarily make newly indexed content
# available immediately, so we must explicitly refresh the index
CommentThread.tire.index.refresh
refresh_es_index
get "/api/v1/search/threads", course_id: course_id, text: search_term
last_response.should be_ok
result = parse(last_response.body)["collection"]

Просмотреть файл

@ -28,6 +28,25 @@ def set_api_key_header
current_session.header "X-Edx-Api-Key", TEST_API_KEY
end
def delete_es_index
Tire.index Content::ES_INDEX_NAME do delete end
end
def create_es_index
new_index = Tire.index Content::ES_INDEX_NAME
new_index.create
[CommentThread, Comment].each do |klass|
klass.put_search_index_mapping
end
end
def refresh_es_index
# we are using the same index for two types, which is against the
# grain of Tire's design. This is why this method works for both
# comment_threads and comments.
CommentThread.tire.index.refresh
end
RSpec.configure do |config|
config.include Rack::Test::Methods
config.treat_symbols_as_metadata_keys_with_true_values = true
@ -36,10 +55,8 @@ RSpec.configure do |config|
config.before(:each) do
Mongoid::IdentityMap.clear
DatabaseCleaner.clean
[CommentThread, Comment].each do |class_|
class_.tire.index.delete
class_.create_elasticsearch_index
end
delete_es_index
create_es_index
end
end
@ -59,8 +76,8 @@ def init_without_subscriptions
[Comment, CommentThread, User, Notification, Subscription, Activity, Delayed::Backend::Mongoid::Job].each(&:delete_all).each(&:remove_indexes).each(&:create_indexes)
Content.mongo_session[:blocked_hash].drop
Tire.index 'comment_threads' do delete end
CommentThread.create_elasticsearch_index
delete_es_index
create_es_index
commentable = Commentable.new("question_1")
@ -140,8 +157,8 @@ end
def init_with_subscriptions
[Comment, CommentThread, User, Notification, Subscription, Activity, Delayed::Backend::Mongoid::Job].each(&:delete_all).each(&:remove_indexes).each(&:create_indexes)
Tire.index 'comment_threads' do delete end
CommentThread.create_elasticsearch_index
delete_es_index
create_es_index
user1 = create_test_user(1)
user2 = create_test_user(2)