Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 65 additions & 32 deletions spec/es_spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
require_relative './spec_helper'

require 'elasticsearch'
require_relative "support/elasticsearch/api/actions/delete_ilm_policy"
require_relative "support/elasticsearch/api/actions/get_alias"
require_relative "support/elasticsearch/api/actions/put_alias"
require_relative "support/elasticsearch/api/actions/get_ilm_policy"
require_relative "support/elasticsearch/api/actions/put_ilm_policy"

require 'json'
require 'cabin'

# remove this condition and support package once plugin starts consuming elasticsearch-ruby v8 client
# in elasticsearch-ruby v7, ILM APIs were in a separate xpack gem, now directly available
unless elastic_ruby_v8_client_available?
require_relative "support/elasticsearch/api/actions/delete_ilm_policy"
require_relative "support/elasticsearch/api/actions/get_alias"
require_relative "support/elasticsearch/api/actions/put_alias"
require_relative "support/elasticsearch/api/actions/get_ilm_policy"
require_relative "support/elasticsearch/api/actions/put_ilm_policy"
end

module ESHelper
def get_host_port
if ENV["INTEGRATION"] == "true"
Expand All @@ -20,8 +25,12 @@ def get_host_port
end

def get_client
Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client|
allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking
if elastic_ruby_v8_client_available?
Elasticsearch::Client.new(:hosts => [get_host_port])
else
Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client|
allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking
end
end
end

Expand Down Expand Up @@ -125,35 +134,59 @@ def get_cluster_settings(client)
client.cluster.get_settings
end

def get_policy(client, policy_name)
client.get_ilm_policy(name: policy_name)
end
# remove else condition once plugin starts consuming elasticsearch-ruby v8 client
if elastic_ruby_v8_client_available?
def get_policy(client, policy_name)
client.index_lifecycle_management.get_lifecycle(policy: policy_name)
end

def put_policy(client, policy_name, policy)
client.put_ilm_policy({:name => policy_name, :body=> policy})
end
def put_policy(client, policy_name, policy)
client.index_lifecycle_management.put_lifecycle({:policy => policy_name, :body=> policy})
end

def put_alias(client, the_alias, index)
body = {
"aliases" => {
index => {
"is_write_index"=> true
}
}
}
client.put_alias({name: the_alias, body: body})
end
def clean_ilm(client)
client.index_lifecycle_management.get_lifecycle.each_key { |key| client.index_lifecycle_management.delete_lifecycle(policy: key) if key =~ /logstash-policy/ }
end

def clean_ilm(client)
client.get_ilm_policy.each_key { |key| client.delete_ilm_policy(name: key) if key =~ /logstash-policy/ }
end
def supports_ilm?(client)
begin
client.index_lifecycle_management.get_lifecycle
true
rescue
false
end
end
else
def get_policy(client, policy_name)
client.get_ilm_policy(name: policy_name)
end

def put_policy(client, policy_name, policy)
client.put_ilm_policy({:name => policy_name, :body=> policy})
end

def clean_ilm(client)
client.get_ilm_policy.each_key { |key| client.delete_ilm_policy(name: key) if key =~ /logstash-policy/ }
end

def supports_ilm?(client)
begin
client.get_ilm_policy
true
rescue
false
def supports_ilm?(client)
begin
client.get_ilm_policy
true
rescue
false
end
end

def put_alias(client, the_alias, index)
body = {
"aliases" => {
index => {
"is_write_index"=> true
}
}
}
client.put_alias({name: the_alias, body: body})
end
end

Expand Down
8 changes: 4 additions & 4 deletions spec/integration/outputs/delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,25 @@
it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-delete', :id => id, :refresh => true)
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 98)])
r2 = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-delete', :id => id, :refresh => true)
expect(r2['_version']).to eq(99)
expect(r2['_source']['message']).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-delete', :id => id, :refresh => true)
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 100)])
expect { es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect { es.get(:index => 'logstash-delete', :id => id, :refresh => true) }.to raise_error(get_expected_error_class)
end
end
end
30 changes: 18 additions & 12 deletions spec/integration/outputs/ilm_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
it 'should not install the default policy' do
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
end

it 'should not write the ILM settings into the template' do
Expand Down Expand Up @@ -282,12 +282,12 @@
subject.register
sleep(1)
expect(@es.indices.exists_alias(name: "logstash")).to be_truthy
expect(@es.get_alias(name: "logstash")).to include("logstash-000001")
expect(@es.indices.get_alias(name: "logstash")).to include("logstash-000001")
end
end

it 'should install it if it is not present' do
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.not_to raise_error
Expand All @@ -298,7 +298,7 @@
subject.register
sleep(1)
expect(@es.indices.exists_alias(name: "logstash")).to be_truthy
expect(@es.get_alias(name: "logstash")).to include("logstash-#{todays_date}-000001")
expect(@es.indices.get_alias(name: "logstash")).to include("logstash-#{todays_date}-000001")
end

it 'should ingest into a single index' do
Expand Down Expand Up @@ -340,14 +340,14 @@
let (:policy) { small_max_doc_policy }

before do
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
put_policy(@es,ilm_policy_name, policy)
end

it 'should not install the default policy if it is not used' do
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
end
end

Expand All @@ -357,14 +357,14 @@
let (:policy) { max_age_policy("1d") }

before do
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
put_policy(@es,ilm_policy_name, policy)
end

it 'should not install the default policy if it is not used' do
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
end
end

Expand All @@ -374,7 +374,7 @@
subject.register
sleep(1)
expect(@es.indices.exists_alias(name: expected_index)).to be_truthy
expect(@es.get_alias(name: expected_index)).to include("#{expected_index}-#{todays_date}-000001")
expect(@es.indices.get_alias(name: expected_index)).to include("#{expected_index}-#{todays_date}-000001")
end

it 'should write the ILM settings into the template' do
Expand Down Expand Up @@ -443,17 +443,18 @@
subject.register
sleep(1)
expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_truthy
expect(@es.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
expect(@es.indices.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
end

context 'when the custom rollover alias already exists' do
it 'should ignore the already exists error' do
expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_falsey
put_alias(@es, "#{ilm_rollover_alias}-#{todays_date}-000001", ilm_rollover_alias)
@es.indices.create(index: "#{ilm_rollover_alias}-#{todays_date}-000001")
@es.indices.put_alias(name: ilm_rollover_alias, index: "#{ilm_rollover_alias}-#{todays_date}-000001")
expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_truthy
subject.register
sleep(1)
expect(@es.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
expect(@es.indices.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
end

end
Expand Down Expand Up @@ -532,3 +533,8 @@
end

end

def get_expected_error_class
return Elastic::Transport::Transport::Errors::NotFound if elastic_ruby_v8_client_available?
Elasticsearch::Transport::Transport::Errors::NotFound
end
14 changes: 7 additions & 7 deletions spec/integration/outputs/index_version_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@

it "should default to ES version" do
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
r = es.get(:index => 'logstash-index', :id => '123', :refresh => true)
expect(r["_version"]).to eq(1)
expect(r["_source"]["message"]).to eq('foo')
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foobar")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
r2 = es.get(:index => 'logstash-index', :id => '123', :refresh => true)
expect(r2["_version"]).to eq(2)
expect(r2["_source"]["message"]).to eq('foobar')
end
Expand All @@ -63,33 +63,33 @@
it "should respect the external version" do
id = "ev1"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')
end

it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "98", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-index', :id => id, :refresh => true)
expect(r2["_version"]).to eq(99)
expect(r2["_source"]["message"]).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "100", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-index', :id => id, :refresh => true)
expect(r2["_version"]).to eq(100)
expect(r2["_source"]["message"]).to eq('foo')
end
Expand Down
21 changes: 11 additions & 10 deletions spec/integration/outputs/painless_update_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ def get_es_output( options={} )
# This can fail if there are no indexes, ignore failure.
@es.indices.delete(:index => "*") rescue nil
@es.index(
:index => 'logstash-update',
:type => doc_type,
:id => "123",
:body => { :message => 'Test', :counter => 1 }
)
{
:index => 'logstash-update',
:id => '123',
:body => { :message => 'Test', :counter => 1 },
:refresh => true
})
@es.indices.refresh
end

Expand All @@ -46,7 +47,7 @@ def get_es_output( options={} )
subject = get_es_output(plugin_parameters)
subject.register
subject.multi_receive([LogStash::Event.new("count" => 4 )])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(5)
end
end
Expand All @@ -57,15 +58,15 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

it "should create new documents with event/doc as upsert" do
subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('sample message here')
end

Expand All @@ -82,7 +83,7 @@ def get_es_output( options={} )
subject.register

subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

Expand All @@ -91,7 +92,7 @@ def get_es_output( options={} )
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 1)])
@es.indices.refresh
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["counter"]).to eq(1)
end
end
Expand Down
Loading