Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,6 @@ def register
test_connection!
validate_es_for_esql_support! if @query_type == "esql"
setup_serverless
if get_client.es_transport_client_type == "elasticsearch_transport"
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
end
end # def register

def filter(event)
Expand Down
21 changes: 3 additions & 18 deletions lib/logstash/filters/elasticsearch/client.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# encoding: utf-8
require "elasticsearch"
require "base64"

require "elasticsearch"
require "elastic/transport/transport/http/manticore"

module LogStash
module Filters
class ElasticsearchClient

attr_reader :client
attr_reader :es_transport_client_type

BUILD_FLAVOR_SERVERLESS = 'serverless'.freeze
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
Expand Down Expand Up @@ -44,7 +43,7 @@ def initialize(logger, hosts, options = {})

client_options = {
hosts: hosts,
transport_class: get_transport_client_class,
transport_class: ::Elastic::Transport::Transport::HTTP::Manticore,
transport_options: transport_options,
ssl: ssl_options,
retry_on_failure: options[:retry_on_failure],
Expand Down Expand Up @@ -112,20 +111,6 @@ def base64?(string)
rescue ArgumentError
false
end

def get_transport_client_class
# LS-core includes `elasticsearch` gem. The gem is composed of two separate gems: `elasticsearch-api` and `elasticsearch-transport`
# And now `elasticsearch-transport` is old, instead we have `elastic-transport`.
# LS-core updated `elasticsearch` > 8: https://github.com/elastic/logstash/pull/17161
# Following source bits are for the compatibility to support both `elasticsearch-transport` and `elastic-transport` gems
require "elasticsearch/transport/transport/http/manticore"
es_transport_client_type = "elasticsearch_transport"
::Elasticsearch::Transport::Transport::HTTP::Manticore
rescue ::LoadError
require "elastic/transport/transport/http/manticore"
es_transport_client_type = "elastic_transport"
::Elastic::Transport::Transport::HTTP::Manticore
end
end
end
end

This file was deleted.

2 changes: 1 addition & 1 deletion logstash-filter-elasticsearch.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'elasticsearch', ">= 7.14.9", '< 9'
s.add_runtime_dependency 'elasticsearch', '>= 8', '< 10'
s.add_runtime_dependency 'manticore', ">= 0.7.1"
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.3'
s.add_runtime_dependency 'logstash-mixin-ca_trusted_fingerprint_support', '~> 1.0'
Expand Down
30 changes: 0 additions & 30 deletions spec/filters/elasticsearch_dsl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@

before(:each) do
allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client)
if defined?(Elastic::Transport)
allow(client).to receive(:es_transport_client_type).and_return('elastic_transport')
else
allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
end
allow(client).to receive(:search).and_return(response)
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
Expand Down Expand Up @@ -123,26 +118,6 @@
end
end

context 'when Elasticsearch 7.x gives us a totals object instead of an integer' do
Comment thread
alexcams marked this conversation as resolved.
let(:plugin_config) do
{
"hosts" => ["localhost:9200"],
"query" => "response: 404",
"fields" => { "response" => "code" },
"result_size" => 10
}
end

let(:response) do
LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "elasticsearch_7.x_hits_total_as_object.json")))
end

it "should enhance the current event with new data" do
plugin.filter(event)
expect(event.get("[@metadata][total_hits]")).to eq(13476)
end
end

context "if something wrong happen during connection" do

before(:each) do
Expand Down Expand Up @@ -307,11 +282,6 @@

before do
allow(plugin).to receive(:get_client).and_return(client_double)
if defined?(Elastic::Transport)
allow(client_double).to receive(:es_transport_client_type).and_return('elastic_transport')
else
allow(client_double).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
end
allow(client_double).to receive(:client).and_return(transport_double)
end

Expand Down
45 changes: 8 additions & 37 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,9 @@
allow(filter_client).to receive(:serverless?).and_return(true)
allow(filter_client).to receive(:client).and_return(es_client)

if defined?(Elastic::Transport)
allow(es_client).to receive(:info)
.with(a_hash_including(
:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER))
.and_raise(Elastic::Transport::Transport::Errors::BadRequest.new)
else
allow(es_client).to receive(:info)
.with(a_hash_including(
:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER))
.and_raise(Elasticsearch::Transport::Transport::Errors::BadRequest.new)
end
allow(es_client).to receive(:info)
.with(a_hash_including(:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER))
.and_raise(Elastic::Transport::Transport::Errors::BadRequest.new)
end

it "raises an exception when Elastic Api Version is not supported" do
Expand Down Expand Up @@ -119,15 +111,15 @@ def initialize()
"cluster_name": "docker-cluster",
"cluster_uuid": "DyR1hN03QvuCWXRy3jtb0g",
"version": {
"number": "7.13.1",
"number": "8.0.0",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "9a7758028e4ea59bcab41c12004603c5a7dd84a9",
"build_date": "2021-05-28T17:40:59.346932922Z",
"build_snapshot": false,
"lucene_version": "8.8.2",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1"
"lucene_version": "9.0.0",
"minimum_wire_compatibility_version": "7.17.0",
"minimum_index_compatibility_version": "7.0.0"
},
"tagline": "You Know, for Search"
}
Expand Down Expand Up @@ -188,17 +180,6 @@ def wait_receive_request
end
let(:plugin) { described_class.new(config) }
let(:event) { LogStash::Event.new({}) }

# elasticsearch-ruby 7.17.9 initialize two user agent headers, `user-agent` and `User-Agent`
# hence, fail this header size test case
xit "client should sent the expect user-agent" do
plugin.register

request = webserver.wait_receive_request

expect(request.header['user-agent'].size).to eq(1)
expect(request.header['user-agent'][0]).to match(/logstash\/\d*\.\d*\.\d* \(OS=.*; JVM=.*\) logstash-filter-elasticsearch\/\d*\.\d*\.\d*/)
end
end
end

Expand Down Expand Up @@ -226,12 +207,7 @@ def wait_receive_request
# this spec is a safeguard to trigger an assessment of thread-safety should
# we choose a different transport adapter in the future.
transport_class = extract_transport(client).options.fetch(:transport_class)
if defined?(Elastic::Transport)
allow(client).to receive(:es_transport_client_type).and_return("elastic_transport")
expect(transport_class).to equal ::Elastic::Transport::Transport::HTTP::Manticore
else
expect(transport_class).to equal ::Elasticsearch::Transport::Transport::HTTP::Manticore
end
expect(transport_class).to equal ::Elastic::Transport::Transport::HTTP::Manticore
end

it 'uses a client with sufficient connection pool size' do
Expand Down Expand Up @@ -560,11 +536,6 @@ def wait_receive_request

before(:each) do
allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client)
if defined?(Elastic::Transport)
allow(client).to receive(:es_transport_client_type).and_return('elastic_transport')
else
allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
end
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
plugin.register
Expand Down
2 changes: 1 addition & 1 deletion spec/filters/elasticsearch_ssl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
before do
allow(es_client_double).to receive(:close)
allow(es_client_double).to receive(:ping).with(any_args).and_return(double("pong").as_null_object)
allow(es_client_double).to receive(:info).with(any_args).and_return({"version" => {"number" => "7.5.0", "build_flavor" => "default"},
allow(es_client_double).to receive(:info).with(any_args).and_return({"version" => {"number" => "8.0.0", "build_flavor" => "default"},
"tagline" => "You Know, for Search"})
allow(Elasticsearch::Client).to receive(:new).and_return(es_client_double)
end
Expand Down
70 changes: 0 additions & 70 deletions spec/filters/fixtures/elasticsearch_7.x_hits_total_as_object.json

This file was deleted.

10 changes: 1 addition & 9 deletions spec/filters/integration/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@
end

it "fails to register plugin" do
expect { plugin.register }.to raise_error elastic_ruby_v8_client_available? ?
Elastic::Transport::Transport::Errors::Unauthorized :
Elasticsearch::Transport::Transport::Errors::Unauthorized
expect { plugin.register }.to raise_error Elastic::Transport::Transport::Errors::Unauthorized
end

end if ELASTIC_SECURITY_ENABLED
Expand Down Expand Up @@ -152,10 +150,4 @@
end
end
end
def elastic_ruby_v8_client_available?
Elasticsearch::Transport
false
rescue NameError # NameError: uninitialized constant Elasticsearch::Transport if Elastic Ruby client is not available
true
end
end
Loading