Dynamic Consistency Boundary in Marten, Part 2: Implementation with plain Marten
Series: Dynamic Consistency Boundary in Marten
- 1 Dynamic Consistency Boundary in Marten, Part 1: The aggregate trap
- 2 Dynamic Consistency Boundary in Marten, Part 2: Implementation with plain Marten
- 3 Dynamic Consistency Boundary in Marten, Part 3: Less ceremony with Wolverine
- 4 Dynamic Consistency Boundary in Marten, Part 4: Production considerations
Part 1 covered why classic aggregate-based event sourcing cannot defend invariants that span entities, and named DCB as the answer. This part builds the coupon redemption command from that post using Marten’s DCB API.
The implementation here is deliberately verbose. Marten ships an integration with Wolverine that collapses most of the boilerplate, but seeing the cycle written out by hand first makes the Wolverine version in Part 3 obvious.
The DCB API surface in one table
| API | Purpose |
|---|---|
opts.Events.RegisterTagType<T>(name) | Declare a strong-typed tag wrapper |
[BoundaryAggregate] | Mark a class as an identity-less aggregate built from a tag query |
EventTagQuery | Compose a query of tag X or tag Y or ... |
session.Events.FetchForWritingByTags<T>(query) | Load matching events, project into T, remember the read point |
DcbConcurrencyException | Raised at commit if the consistency check fails |
That is the full surface area we will use through the rest of this post.
Configure Marten and register tag types
A tag is a small wrapper record around a primitive. Marten uses tags to index events for the DCB queries.
public record CouponCode(string Value);
public record CustomerId(Guid Value);
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddMarten(opts =>
{
opts.Connection(builder.Configuration.GetConnectionString("Postgres")!);
opts.Events.RegisterTagType<CouponCode>("coupon")
.ForAggregate<CouponRedemptionGuard>();
opts.Events.RegisterTagType<CustomerId>("customer")
.ForAggregate<CouponRedemptionGuard>();
// Faster reads/writes for DCB queries. The default is TagTables; HStore
// is opt-in and needs the Postgres `hstore` extension.
opts.Events.DcbStorageMode = DcbStorageMode.HStore;
// opts.Events.DcbStorageMode = DcbStorageMode.TagTables; // default
});
Three things going on here:
- The second argument to
RegisterTagTypeis the short tag name written alongside each event. It becomes part of the event store schema, so pick it once and do not change it casually. Part 4 covers what happens if you do. - The
.ForAggregate<CouponRedemptionGuard>()chain links each tag to the boundary aggregate that should project events carrying that tag. Without it, Marten cannot resolve the aggregator whenFetchForWritingByTags<CouponRedemptionGuard>(...)runs. The aggregate type itself is defined a couple of sections down - Marten resolves it via the type argument, so the forward reference is fine. DcbStorageMode.HStoreuses an HSTORE-backed storage layout for tags, which is faster for tag-heavy workloads. It requires the Postgreshstoreextension, which all the major managed Postgres providers support.
Define events and the decision model
public record CouponDefined(string Code, int MaxTotalUses, int MaxPerCustomer);
public record CouponRedeemed(string Code, Guid CustomerId, decimal OrderTotal);
The decision model is a class with Apply methods (the projection from events to in-memory state) and a CanRedeem predicate.
[BoundaryAggregate]
public class CouponRedemptionGuard
{
public string Code { get; private set; } = "";
public int MaxTotalUses { get; private set; }
public int MaxPerCustomer { get; private set; }
public int TotalUses { get; private set; }
public Dictionary<Guid, int> UsesByCustomer { get; } = new();
public void Apply(CouponDefined e)
{
Code = e.Code;
MaxTotalUses = e.MaxTotalUses;
MaxPerCustomer = e.MaxPerCustomer;
}
public void Apply(CouponRedeemed e)
{
TotalUses++;
UsesByCustomer.TryGetValue(e.CustomerId, out var count);
UsesByCustomer[e.CustomerId] = count + 1;
}
public RedeemDecision CanRedeem(Guid customerId)
{
if (MaxTotalUses == 0) return RedeemDecision.Reject("Coupon does not exist");
if (TotalUses >= MaxTotalUses) return RedeemDecision.Reject("Total-use cap reached");
UsesByCustomer.TryGetValue(customerId, out var count);
if (count >= MaxPerCustomer) return RedeemDecision.Reject("Per-customer cap reached");
return RedeemDecision.Accept();
}
}
public readonly record struct RedeemDecision(bool Allowed, string? Reason)
{
public static RedeemDecision Accept() => new(true, null);
public static RedeemDecision Reject(string reason) => new(false, reason);
}
The class has no Id property and is not tied to a single stream. The [BoundaryAggregate] attribute tells Marten this is built on the fly from whichever events the tag query returns.
The redemption command
This is the function the series has been building toward. The interesting work happens in the call to FetchForWritingByTags, the catch block for DcbConcurrencyException, and the bounded retry loop around them.
public async Task<RedeemResult> RedeemAsync(
string code, Guid customerId, decimal orderTotal, CancellationToken ct = default)
{
var couponTag = new CouponCode(code);
var customerTag = new CustomerId(customerId);
for (var attempt = 0; attempt < MaxRetries; attempt++)
{
await using var session = _store.LightweightSession();
// The query defines the consistency boundary for this command.
var query = new EventTagQuery()
.Or<CouponCode>(couponTag)
.Or<CustomerId>(customerTag);
var boundary = await session.Events
.FetchForWritingByTags<CouponRedemptionGuard>(query, ct);
var guard = boundary.Aggregate;
if (guard is null || guard.MaxTotalUses == 0)
return new RedeemResult(RedeemOutcome.NotFound, $"Coupon {code} not defined");
var decision = guard.CanRedeem(customerId);
if (!decision.Allowed)
return new RedeemResult(RedeemOutcome.Rejected, decision.Reason);
var redeemed = session.Events.BuildEvent(
new CouponRedeemed(code, customerId, orderTotal));
redeemed.WithTag(couponTag, customerTag);
boundary.AppendOne(redeemed);
try
{
await session.SaveChangesAsync(ct);
return new RedeemResult(RedeemOutcome.Accepted);
}
catch (ConcurrencyException)
{
// Two cases land here:
// - DcbConcurrencyException: another event matching our query
// appeared past our read-point. The DCB check rejected us.
// - EventStreamUnexpectedMaxEventIdException: Marten's normal
// per-stream optimistic check rejected us. This can fire under
// DCB too because boundary aggregate events route to a single
// stream identified by the first tag that has .ForAggregate<>(),
// so concurrent appends for the same aggregate hit version
// conflicts before the DCB check even runs.
// Both extend JasperFx.ConcurrencyException. Either way the answer
// is the same: re-read, re-decide, re-write. The small delay is
// just backoff to spread out a thundering herd - on Marten 9.4.0+
// it is no longer required for correctness (the DCB check now
// serializes same-tag appends, so the retry already sees the
// committed state). On 9.3.x it WAS load-bearing - immediate
// retries raced the old non-locking EXISTS check.
await Task.Delay(50, ct);
}
}
return new RedeemResult(RedeemOutcome.Rejected, "Too many concurrent attempts");
}
Walking through what happens:
- The two
.Or<TTag>(...)clauses build the query. We are saying this decision depends on events tagged with this coupon or this customer. FetchForWritingByTags<CouponRedemptionGuard>(query, ct)reads matching events, projects them intoCouponRedemptionGuard, and remembers which global sequence number we read up to.CanRedeemis a plain in-memory predicate against the projected state. No SQL, no clever filtering, just a regular method.- We build a new
CouponRedeemedevent, tag it with the coupon and the customer, and queue it for append viaboundary.AppendOne(...). SaveChangesAsyncruns the insert in a single Postgres transaction along with anEXISTScheck: did any event match the query past our read point? If yes, the transaction fails withDcbConcurrencyException. If no, the insert commits.- On conflict, the loop re-reads. The next attempt either accepts (still under the cap) or rejects correctly with the new state of the world.
That is the entire DCB cycle. Everything else in this method is plumbing.
Two concurrent requests
To see how this defends a race, consider two parallel requests from the same customer:
| Request A | Request B |
|---|---|
FetchForWritingByTags sees 1 prior redemption | FetchForWritingByTags sees 1 prior redemption |
CanRedeem returns true | CanRedeem returns true |
| Append, commit. Accepted. | Append, commit, conflict, DcbConcurrencyException, retry. On retry, CanRedeem returns false. Rejected. |
The store catches the race that an in-memory decision cannot.
Proving the behaviour with a test
The companion sample uses Testcontainers to spin up a real Postgres for each test class. No docker compose up step is required, only a running Docker daemon.
public class ConcurrentRedemptionTests : IAsyncLifetime
{
private readonly PostgreSqlContainer _postgres = new PostgreSqlBuilder()
.WithImage("postgres:16")
.WithDatabase("dcb_sample")
.WithUsername("dcb").WithPassword("dcb")
.Build();
private IDocumentStore _store = null!;
private CouponRedeemer _redeemer = null!;
public async Task InitializeAsync()
{
await _postgres.StartAsync();
_store = DocumentStore.For(opts =>
{
opts.Connection(_postgres.GetConnectionString());
opts.Events.RegisterTagType<CouponCode>("coupon")
.ForAggregate<CouponRedemptionGuard>();
opts.Events.RegisterTagType<CustomerId>("customer")
.ForAggregate<CouponRedemptionGuard>();
opts.Events.DcbStorageMode = DcbStorageMode.HStore;
});
_redeemer = new CouponRedeemer(_store);
}
public async Task DisposeAsync()
{
_store.Dispose();
await _postgres.DisposeAsync();
}
[Fact]
public async Task Total_use_cap_is_respected_under_concurrency()
{
// 5 total uses, 1 per customer. 50 distinct customers race.
await _redeemer.DefineCouponAsync("FLASH5", maxTotal: 5, maxPerCustomer: 1);
var customers = Enumerable.Range(0, 50).Select(_ => Guid.NewGuid()).ToArray();
var results = await Task.WhenAll(
customers.Select(id => _redeemer.RedeemAsync("FLASH5", id, 9.99m)));
var accepted = results.Count(r => r.Outcome == RedeemOutcome.Accepted);
accepted.ShouldBeLessThanOrEqualTo(5);
accepted.ShouldBeGreaterThan(0);
}
}
The assertion is worth looking at twice. Fifty distinct customers, each on a different notional stream, all redeeming the same coupon with a total cap of 5. No aggregate-based design can defend this rule because the rule depends on events spread across fifty customer streams. With DCB, the cap is defended.
As of Marten 9.4.0 that cap is exact. At SaveChangesAsync the DCB check is backed by a real serializing constraint: a side table parallel to mt_events that turns the check into a row-level write conflict. Two transactions appending under the same tag now contend on that row at READ COMMITTED - the first to commit wins, and the second’s check finds the row already bumped and throws DcbConcurrencyException. No SERIALIZABLE, no advisory lock; just one extra row touched per save. The losing writer hits the catch, re-reads the now-committed world, sees the cap is full, and rejects cleanly. So exactly five land - never six.
That is also why the Task.Delay(50, ct) in the catch is no longer load-bearing. The serializing constraint, not the cooldown, is what makes the retry observe the committed state, so the delay is now just backoff to spread out a thundering herd.
NOTE
This is a 9.4.0 change. On Marten 9.3.x the check ran as a plain non-locking SELECT EXISTS(...) separate from the insert, so two truly-simultaneous writers could each pass it before either committed and the cap could briefly sit at N+1 - a soft cap with bounded slack. The cooldown above was load-bearing then, dodging the visibility window on the retry path. Marten issue #4591 tracked the race and 9.4.0 closed it with the side-table constraint; this sample targets 9.5.0. Part 4 covers the mechanism, the upgrade (it’s a schema change), and what exactness costs on a hot tag.
The test asserts accepted <= 5 rather than accepted == 5 mainly defensively - the assertion that actually matters is we never accept past the cap, which is the invariant DCB now defends exactly. (In practice, with fifty contenders and a cap of five, you’ll see exactly five.)
If you swap FetchForWritingByTags for QueryByTagsAsync (which has no consistency check), the same test will let twenty, thirty, sometimes all fifty through. The contrast is what DCB is actually buying you.
What we have
We can now write a command that:
- Declares its own consistency scope in code as a tag query.
- Uses plain in-memory C# for the decision.
- Relies on the event store to atomically defend that scope at commit.
- Retries cleanly on concurrency conflict.
The cost is the retry loop and the boilerplate around FetchForWritingByTags, SaveChangesAsync, and catch DcbConcurrencyException. Workable for one command, noisy if there are a hundred. Part 3 collapses that boilerplate using Wolverine’s DCB handlers.
Series: Dynamic Consistency Boundary in Marten
- 1 Dynamic Consistency Boundary in Marten, Part 1: The aggregate trap
- 2 Dynamic Consistency Boundary in Marten, Part 2: Implementation with plain Marten
- 3 Dynamic Consistency Boundary in Marten, Part 3: Less ceremony with Wolverine
- 4 Dynamic Consistency Boundary in Marten, Part 4: Production considerations