From 82f9e0b74160a312bcfe39297aa1550456670fd7 Mon Sep 17 00:00:00 2001 From: roetlich Date: Wed, 25 Nov 2020 15:59:05 +0100 Subject: [PATCH] Add a failing concurrency test Add a test to AcceptanceTests.AppendStream that fails for postgres but passes for all other DBs. This is to reproduce a bug with concurrent appends in postgres, see: https://github.com/SQLStreamStore/SQLStreamStore/issues/478 Co-authored-by: Rasmus Larsson --- .../AcceptanceTests.AppendStream.cs | 45 +++++++++++++------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.AppendStream.cs b/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.AppendStream.cs index 49b41f47c..42e099100 100644 --- a/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.AppendStream.cs +++ b/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.AppendStream.cs @@ -6,6 +6,7 @@ using Shouldly; using SqlStreamStore.Streams; using Xunit; + using System.Linq; public partial class AcceptanceTests { @@ -28,7 +29,7 @@ await Store.AppendToStream( [Fact, Trait("Category", "AppendStream")] public async Task - When_append_stream_second_time_with_no_stream_expected_and_same_messages_then_should_then_should_be_idempotent() + When_append_stream_second_time_with_no_stream_expected_and_same_messages_then_should_be_idempotent() { // Idempotency const string streamId = "stream-1"; @@ -42,7 +43,7 @@ await Store } [Fact, Trait("Category", "AppendStream")] - public async Task When_append_stream_second_time_with_no_stream_expected_and_same_messages_then_should_then_should_have_expected_result() + public async Task When_append_stream_second_time_with_no_stream_expected_and_same_messages_then_should_have_expected_result() { // Idempotency const string streamId = "stream-1"; @@ -665,7 +666,7 @@ await Store exception.ShouldBeOfType( ErrorMessages.AppendFailedWrongExpectedVersion(streamId, 10)); } - + [Theory, Trait("Category", "AppendStream")] [InlineData("stream/id")] [InlineData("stream%id")] @@ -676,7 +677,7 @@ public async Task When_append_to_stream_with_url_encodable_characters_and_expect result.CurrentVersion.ShouldBe(2); result.CurrentPosition.ShouldBeGreaterThanOrEqualTo(Fixture.MinPosition + 2L); } - + [Theory, Trait("Category", "AppendStream")] [InlineData("stream/id")] [InlineData("stream%id")] @@ -699,7 +700,7 @@ public async Task When_append_to_stream_with_url_encodable_characters_and_expect result.CurrentVersion.ShouldBe(2); result.CurrentPosition.ShouldBeGreaterThanOrEqualTo(Fixture.MinPosition + 2L); } - + [Theory, Trait("Category", "AppendStream")] [InlineData("stream/id")] [InlineData("stream%id")] @@ -713,40 +714,56 @@ public async Task When_append_to_stream_with_url_encodable_characters_and_expect } [Fact, Trait("Category", "AppendStream")] - public async Task When_append_stream_concurrently_with_no_stream_expected_and_same_messages_then_should_then_should_have_expected_result() + public async Task When_append_stream_concurrently_with_no_stream_expected_and_same_messages_then_should_have_expected_result() { // Idempotency const string streamId = "stream-1"; - + var messages = CreateNewStreamMessages(1, 2); var tasks = new List>(); - for(var index = 0; index < 10; index++) + for (var index = 0; index < 10; index++) { tasks.Add(Store.AppendToStream(streamId, ExpectedVersion.NoStream, messages)); } - + var results = await Task.WhenAll(tasks); Assert.All(results, result => result.CurrentVersion.ShouldBe(1)); Assert.All(results, result => result.CurrentPosition.ShouldBe(results[0].CurrentPosition)); } - + [Fact, Trait("Category", "AppendStream")] - public async Task When_append_to_different_streams_concurrently_with_no_stream_expected_and_same_messages_then_should_then_should_have_expected_result() + public async Task When_append_to_different_streams_concurrently_with_no_stream_expected_and_same_messages_then_should_have_expected_result() { // Idempotency const string streamPrefix = "stream-"; - + var messages = CreateNewStreamMessages(1, 2); var tasks = new List>(); - for(var index = 0; index < 10; index++) + for (var index = 0; index < 10; index++) { tasks.Add(Store.AppendToStream(streamPrefix + index, ExpectedVersion.NoStream, messages)); } - + var results = await Task.WhenAll(tasks); Assert.All(results, result => result.CurrentVersion.ShouldBe(1)); } + + [Fact, Trait("Category", "AppendStream")] + public async Task When_append_to_same_stream_concurrently_with_expected_version_any_and_different_messages_then_should_not_throw() + { + const string streamName = "stream"; + + var messages = CreateNewStreamMessages(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + var tasks = new List>(); + foreach (var message in messages) + { + tasks.Add(Store.AppendToStream(streamName, ExpectedVersion.Any, new []{message})); + } + + var results = await Task.WhenAll(tasks); + results.Select(r => r.CurrentVersion).Max().ShouldBe(9); + } } }