Request Pipelineedit
Every request is executed in the context of a RequestPipeline
when using the
default ITransport implementation.
var settings = TestClient.GlobalDefaultSettings;
When calling Request()
or RequestAsync()
on an ITransport
,
the whole coordination of the request is deferred to a new instance in a using
block.
var pipeline = new RequestPipeline( settings, DateTimeProvider.Default, new MemoryStreamFactory(), new SearchRequestParameters()); pipeline.GetType().Should().Implement<IDisposable>();
An ITransport
does not instantiate a RequestPipeline
directly; it uses a pluggable IRequestPipelineFactory
to create it
var requestPipelineFactory = new RequestPipelineFactory(); var requestPipeline = requestPipelineFactory.Create( settings, DateTimeProvider.Default,new MemoryStreamFactory(), new SearchRequestParameters()); requestPipeline.Should().BeOfType<RequestPipeline>(); requestPipeline.GetType().Should().Implement<IDisposable>();
You can pass your own IRequestPipeline
implementation to the Transport when instantiating a client,
allowing you to have requests executed on your own custom request pipeline
var transport = new Transport<ConnectionSettings>( settings, requestPipelineFactory, DateTimeProvider.Default, new MemoryStreamFactory());
var pool = setupPool(new[] { TestClient.CreateUri(), TestClient.CreateUri(9201) }); var settings = new ConnectionSettings(pool, connection ?? new InMemoryConnection()); settings = settingsSelector?.Invoke(settings) ?? settings;
Pipeline Behavioredit
Sniffing on First usageedit
var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First())); var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris)); var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris));
Here we have setup three pipelines using three different connection pools. Let’s see how they behave on first usage
singleNodePipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse(); staticPipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse(); sniffingPipeline.FirstPoolUsageNeedsSniffing.Should().BeTrue();
We can see that only the cluster that supports reseeding will opt in to FirstPoolUsageNeedsSniffing()
;
You can however disable reseeding/sniffing on ConnectionSettings
sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris), s => s.SniffOnStartup(false)); sniffingPipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse();
Wait for first Sniffedit
All threads wait for the sniff on startup to finish, waiting the request timeout period. A
SemaphoreSlim
is used to block threads until the sniff finishes and waiting threads release the SemaphoreSlim
appropriately.
var response = new { cluster_name = "elasticsearch", nodes = new { node1 = new { name = "Node Name 1", transport_address = "127.0.0.1:9300", host = "127.0.0.1", ip = "127.0.01", version = "5.0.0-alpha3", build = "e455fd0", http_address = "127.0.0.1:9200", settings = new JObject { {"client.type", "node"}, {"cluster.name", "elasticsearch"}, {"config.ignore_system_properties", "true"}, {"name", "Node Name 1"}, {"path.home", "c:\\elasticsearch\\elasticsearch"}, {"path.logs", "c:/ elasticsearch/logs"} } } } }; var responseBody = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(response)); var inMemoryConnection = new WaitingInMemoryConnection( TimeSpan.FromSeconds(1), responseBody); var sniffingPipeline = CreatePipeline( uris => new SniffingConnectionPool(uris), connection: inMemoryConnection, settingsSelector: s => s.RequestTimeout(TimeSpan.FromSeconds(2))); var semaphoreSlim = new SemaphoreSlim(1, 1);
start three tasks that will initiate a sniff on startup. The first task will successfully
sniff on startup with the remaining two waiting tasks exiting without exception and releasing
the SemaphoreSlim
.
var task1 = System.Threading.Tasks.Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim)); var task2 = System.Threading.Tasks.Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim)); var task3 = System.Threading.Tasks.Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim)); var exception = Record.Exception(() => System.Threading.Tasks.Task.WaitAll(task1, task2, task3)); exception.Should().BeNull();
Sniffing on Connection Failureedit
var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First())); var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris)); var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris)); singleNodePipeline.SniffsOnConnectionFailure.Should().BeFalse(); staticPipeline.SniffsOnConnectionFailure.Should().BeFalse(); sniffingPipeline.SniffsOnConnectionFailure.Should().BeTrue();
Only the cluster that supports reseeding will opt in to SniffsOnConnectionFailure() You can however disable this on ConnectionSettings
sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris), s => s.SniffOnConnectionFault(false)); sniffingPipeline.SniffsOnConnectionFailure.Should().BeFalse();
Sniffing on Stale clusteredit
var dateTime = new TestableDateTimeProvider(); var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First(), dateTime), dateTimeProvider: dateTime); var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime); var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime); singleNodePipeline.SniffsOnStaleCluster.Should().BeFalse(); staticPipeline.SniffsOnStaleCluster.Should().BeFalse(); sniffingPipeline.SniffsOnStaleCluster.Should().BeTrue(); singleNodePipeline.StaleClusterState.Should().BeFalse(); staticPipeline.StaleClusterState.Should().BeFalse(); sniffingPipeline.StaleClusterState.Should().BeFalse();
go one hour into the future
dateTime.ChangeTime(d => d.Add(TimeSpan.FromHours(2)));
connection pools that do not support reseeding never go stale
singleNodePipeline.StaleClusterState.Should().BeFalse(); staticPipeline.StaleClusterState.Should().BeFalse();
the sniffing connection pool supports reseeding so the pipeline will signal the state is out of date
sniffingPipeline.StaleClusterState.Should().BeTrue();
Retrying requestsedit
A request pipeline also checks whether the overall time across multiple retries exceeds the request timeout. See the max retry documentation for more details, here we assert that our request pipeline exposes this propertly
var dateTime = new TestableDateTimeProvider(); var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First(), dateTime), dateTimeProvider: dateTime); var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime); var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime); singleNodePipeline.IsTakingTooLong.Should().BeFalse(); staticPipeline.IsTakingTooLong.Should().BeFalse(); sniffingPipeline.IsTakingTooLong.Should().BeFalse();
go one hour into the future
dateTime.ChangeTime(d => d.Add(TimeSpan.FromHours(2)));
connection pools that do not support reseeding never go stale
singleNodePipeline.IsTakingTooLong.Should().BeTrue(); staticPipeline.IsTakingTooLong.Should().BeTrue();
the sniffing connection pool supports reseeding so the pipeline will signal the state is out of date
sniffingPipeline.IsTakingTooLong.Should().BeTrue();
request pipeline exposes the DateTime it started, here we assert it started 2 hours in the past
(dateTime.Now() - singleNodePipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2)); (dateTime.Now() - staticPipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2)); (dateTime.Now() - sniffingPipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2));
var dateTime = new TestableDateTimeProvider(); var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime) as RequestPipeline; sniffingPipeline.SniffPath.Should().Be("_nodes/_all/settings?flat_settings&timeout=2s");