Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions spec/es_spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ def get_host_port
end

def get_client
Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client|
allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking
if elastic_ruby_v8_client_available?
Elasticsearch::Client.new(:hosts => [get_host_port])
else
Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client|
allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking
end
end
end

Expand Down
8 changes: 4 additions & 4 deletions spec/integration/outputs/delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,25 @@
it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r = es.get(generate_common_index_params('logstash-delete', id))
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 98)])
r2 = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r2 = es.get(generate_common_index_params('logstash-delete', id))
expect(r2['_version']).to eq(99)
expect(r2['_source']['message']).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r = es.get(generate_common_index_params('logstash-delete', id))
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 100)])
expect { es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect { es.get(generate_common_index_params('logstash-delete', id)) }.to raise_error(get_expected_error_class)
end
end
end
17 changes: 11 additions & 6 deletions spec/integration/outputs/ilm_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
it 'should not install the default policy' do
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
end

it 'should not write the ILM settings into the template' do
Expand Down Expand Up @@ -287,7 +287,7 @@
end

it 'should install it if it is not present' do
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.not_to raise_error
Expand Down Expand Up @@ -340,14 +340,14 @@
let (:policy) { small_max_doc_policy }

before do
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
put_policy(@es,ilm_policy_name, policy)
end

it 'should not install the default policy if it is not used' do
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
end
end

Expand All @@ -357,14 +357,14 @@
let (:policy) { max_age_policy("1d") }

before do
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
put_policy(@es,ilm_policy_name, policy)
end

it 'should not install the default policy if it is not used' do
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
end
end

Expand Down Expand Up @@ -532,3 +532,8 @@
end

end

def get_expected_error_class
return Elastic::Transport::Transport::Errors::NotFound if elastic_ruby_v8_client_available?
Elasticsearch::Transport::Transport::Errors::NotFound
end
14 changes: 7 additions & 7 deletions spec/integration/outputs/index_version_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@

it "should default to ES version" do
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
r = es.get(generate_common_index_params('logstash-index', '123'))
expect(r["_version"]).to eq(1)
expect(r["_source"]["message"]).to eq('foo')
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foobar")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
r2 = es.get(generate_common_index_params('logstash-index', '123'))
expect(r2["_version"]).to eq(2)
expect(r2["_source"]["message"]).to eq('foobar')
end
Expand All @@ -63,33 +63,33 @@
it "should respect the external version" do
id = "ev1"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = es.get(generate_common_index_params('logstash-index', id))
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')
end

it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = es.get(generate_common_index_params('logstash-index', id))
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "98", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r2 = es.get(generate_common_index_params('logstash-index', id))
expect(r2["_version"]).to eq(99)
expect(r2["_source"]["message"]).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = es.get(generate_common_index_params('logstash-index', id))
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "100", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r2 = es.get(generate_common_index_params('logstash-index', id))
expect(r2["_version"]).to eq(100)
expect(r2["_source"]["message"]).to eq('foo')
end
Expand Down
19 changes: 8 additions & 11 deletions spec/integration/outputs/painless_update_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ def get_es_output( options={} )
@es.indices.delete_template(:name => "*")
# This can fail if there are no indexes, ignore failure.
@es.indices.delete(:index => "*") rescue nil
@es.index(
:index => 'logstash-update',
:type => doc_type,
:id => "123",
:body => { :message => 'Test', :counter => 1 }
)
params = generate_common_index_params('logstash-update', '123')
params[:body] = { :message => 'Test', :counter => 1 }
@es.index(params)
@es.indices.refresh
end

Expand All @@ -46,7 +43,7 @@ def get_es_output( options={} )
subject = get_es_output(plugin_parameters)
subject.register
subject.multi_receive([LogStash::Event.new("count" => 4 )])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(5)
end
end
Expand All @@ -57,15 +54,15 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

it "should create new documents with event/doc as upsert" do
subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('sample message here')
end

Expand All @@ -82,7 +79,7 @@ def get_es_output( options={} )
subject.register

subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

Expand All @@ -91,7 +88,7 @@ def get_es_output( options={} )
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 1)])
@es.indices.refresh
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["counter"]).to eq(1)
end
end
Expand Down
23 changes: 9 additions & 14 deletions spec/integration/outputs/unsupported_actions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,13 @@ def get_es_output( options={} )
# This can fail if there are no indexes, ignore failure.
@es.indices.delete(:index => "*") rescue nil
# index single doc for update purpose
@es.index(
:index => INDEX,
:type => doc_type,
:id => "2",
:body => { :message => 'Test to doc indexing', :counter => 1 }
)
@es.index(
:index => INDEX,
:type => doc_type,
:id => "3",
:body => { :message => 'Test to doc deletion', :counter => 2 }
)
params_index = generate_common_index_params(INDEX, '2')
params_index[:body] = { :message => 'Test to doc indexing', :counter => 1 }
@es.index(params_index)

params_delete = generate_common_index_params(INDEX, '3')
params_delete[:body] = { :message => 'Test to doc deletion', :counter => 2 }
@es.index(params_delete)
@es.indices.refresh
end

Expand All @@ -63,12 +58,12 @@ def get_es_output( options={} )
rejected_events = events.select { |event| !index_or_update.call(event) }

indexed_events.each do |event|
response = @es.get(:index => INDEX, :type => doc_type, :id => event.get("doc_id"), :refresh => true)
response = @es.get(generate_common_index_params(INDEX, event.get("doc_id")))
expect(response['_source']['message']).to eq(event.get("message"))
end

rejected_events.each do |event|
expect {@es.get(:index => INDEX, :type => doc_type, :id => event.get("doc_id"), :refresh => true)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect {@es.get(generate_common_index_params(INDEX, event.get("doc_id")))}.to raise_error(get_expected_error_class)
end
end
end
Expand Down
19 changes: 8 additions & 11 deletions spec/integration/outputs/update_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ def get_es_output( options={} )
@es.indices.delete_template(:name => "*")
# This can fail if there are no indexes, ignore failure.
@es.indices.delete(:index => "*") rescue nil
@es.index(
:index => 'logstash-update',
:type => doc_type,
:id => "123",
:body => { :message => 'Test', :counter => 1 }
)
params = generate_common_index_params('logstash-update', '123')
params[:body] = { :message => 'Test', :counter => 1 }
@es.index(params)
@es.indices.refresh
end

Expand All @@ -40,14 +37,14 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "456" } )
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
expect {@es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect {@es.get(generate_common_index_params('logstash-update', '456'))}.to raise_error(get_expected_error_class)
end

it "should update existing document" do
subject = get_es_output({ 'document_id' => "123" })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "updated message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
r = @es.get(generate_common_index_params('logstash-update', '123'))
expect(r["_source"]["message"]).to eq('updated message here')
end

Expand All @@ -57,7 +54,7 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "123" })
subject.register
subject.multi_receive([LogStash::Event.new("data" => "updated message here", "message" => "foo")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
r = @es.get(generate_common_index_params('logstash-update', '123'))
expect(r["_source"]["data"]).to eq('updated message here')
expect(r["_source"]["message"]).to eq('foo')
end
Expand Down Expand Up @@ -94,15 +91,15 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(generate_common_index_params('logstash-update', '456'))
expect(r["_source"]["message"]).to eq('upsert message')
end

it "should create new documents with event/doc as upsert" do
subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(generate_common_index_params('logstash-update', '456'))
expect(r["_source"]["message"]).to eq('sample message here')
end

Expand Down
33 changes: 33 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,36 @@ module LogStash::Outputs::ElasticSearch::SpecHelper
RSpec.configure do |config|
config.include LogStash::Outputs::ElasticSearch::SpecHelper
end


def elastic_ruby_v8_client_available?
Elasticsearch::Transport
false
rescue NameError # NameError: uninitialized constant Elasticsearch::Transport if Elastic Ruby client is not available
true
end

def generate_common_index_params(index, doc_id)
params = {:index => index, :id => doc_id, :refresh => true}
params[:type] = doc_type unless elastic_ruby_v8_client_available?
params
end


COMMON_QUERY_PARAMS = [
:ignore, # Client specific parameters
:format, # Search, Cat, ...
:pretty, # Pretty-print the response
:human, # Return numeric values in human readable format
:filter_path # Filter the JSON response
]

# This method was removed from elasticsearch-ruby client v8
# Copied from elasticsearch-ruby v7 client to make it available
#
def __extract_params(arguments, params=[], options={})
result = arguments.select { |k,v| COMMON_QUERY_PARAMS.include?(k) || params.include?(k) }
result = Hash[result] unless result.is_a?(Hash) # Normalize Ruby 1.8 and Ruby 1.9 Hash#select behaviour
result = Hash[result.map { |k,v| v.is_a?(Array) ? [k, Utils.__listify(v, options)] : [k,v] }] # Listify Arrays
result
end
2 changes: 1 addition & 1 deletion spec/support/elasticsearch/api/actions/put_alias.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def put_alias(arguments={})
method = HTTP_PUT
path = Utils.__pathify Utils.__escape(arguments[:name])

params = Utils.__validate_and_extract_params arguments
params = __extract_params(arguments)
body = arguments[:body]
perform_request(method, path, params, body.to_json).body
end
Expand Down
Loading