final class RaceTrackerCatchUpHook implements CatchUpHookInterface (View source)

internal  
 

We had some race conditions in projections, where {DoctrineCheckpointStorage} was not working properly.

We saw some non-deterministic, random errors when running the tests - unluckily only on Linux, not on OSX: On OSX, forking a new subprocess in {\Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger} is WAY slower than in Linux; and thus the race conditions which appears if two projector instances of the same class run concurrently won't happen (or are way less likely).

Our Goal: Detect if/when a Projection runs twice at the same time.

The system must ENSURE that a given projection NEVER runs concurrently; so this is the case we need to detect.

This means, the following is the behavior we want to have:

Process A         acquireLock(  |[  ) processEvent() releaseLock(  ]  )
Process B             acquireLock(  |                                [  ) processEvent() releaseLock(  ]  )

(i.e. Process B will wait inside acquireLock() until the lock is released (i.e. Process A finished), and then continue.

A WRONG and UNDEFINED behavior looks as follows:

Process A         acquireLock(  |[  )  applyEvent()  releaseLock(  ]  )
Process B             acquireLock(  | [ )  applyEvent()  releaseLock(  ]  )
                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                                       During this time period, two
                                       processes run the projection
                                       concurrently.

Legend for the flow diagrams above

--> time   |            [                             ]
           ^            ^                             ^
   try to acquire     lock acquired                  lock released
   the lock

Implementation Idea: Race Detector with Redis

We implement a custom CatchUpHook (this class {\Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\RaceTrackerCatchUpHook}) which is notified during the projection run.

When {\Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\onBeforeEvent} is called, we know that we are inside applyEvent() in the diagram above, thus we know the lock HAS been acquired. When {\Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\onBeforeBatchCompleted}is called, we know the lock will be released directly afterwards.

We track these timings across processes in a single Redis Stream. Because Redis is single-threaded, we can be sure that we observe the correct, total order of interleavings across multiple processes inside the single trace.

Race Detector Algorithm

We sequentially go through the stream, we continuously track for which PIDs a transaction is currently open.

When a transaction is open for more than one PID, we know that we found a race.

This algorithm is implemented in {\Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\Dto\TraceEntries::findProjectionConcurrencyViolations()}.

Duplicate Processing Algorithm

At the same time, an Event should never be processed multiple times by the same Projector. We additionally detect this by remembering the sequence numbers of seen events; and if we have already seen the sequence number already, we know this is an error. This is implemented in {\Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\Dto\TraceEntries::findDoubleProcessingOfEvents()}.

Properties

protected array $configuration

Methods

void
onBeforeCatchUp()

This hook is called at the beginning of {ProjectionInterface::catchUpProjection()}; BEFORE the Database Lock is acquired (by {CheckpointStorageInterface::acquireLock()}).

void
onBeforeEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope)

This hook is called for every event during the catchup process, before the projection is updated. Thus, this hook runs AFTER the database lock is acquired.

void
onAfterEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope)

This hook is called for every event during the catchup process, after the projection is updated. Thus, this hook runs AFTER the database lock is acquired.

void
onBeforeBatchCompleted()

This hook is called directly before the database lock is RELEASED in {CheckpointStorageInterface::updateAndReleaseLock()}.

void
onAfterCatchUp()

This hook is called at the END of {ProjectionInterface::catchUpProjection()}, directly before exiting the method.

Details

void onBeforeCatchUp()

This hook is called at the beginning of {ProjectionInterface::catchUpProjection()}; BEFORE the Database Lock is acquired (by {CheckpointStorageInterface::acquireLock()}).

Return Value

void

void onBeforeEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope)

This hook is called for every event during the catchup process, before the projection is updated. Thus, this hook runs AFTER the database lock is acquired.

Parameters

EventInterface $eventInstance
EventEnvelope $eventEnvelope

Return Value

void

void onAfterEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope)

This hook is called for every event during the catchup process, after the projection is updated. Thus, this hook runs AFTER the database lock is acquired.

Parameters

EventInterface $eventInstance
EventEnvelope $eventEnvelope

Return Value

void

void onBeforeBatchCompleted()

This hook is called directly before the database lock is RELEASED in {CheckpointStorageInterface::updateAndReleaseLock()}.

It can happen that this method is called multiple times, even without having seen Events in the meantime.

If there exist more events which need to be processed, the database lock is directly acquired again after it is released.

Return Value

void

void onAfterCatchUp()

This hook is called at the END of {ProjectionInterface::catchUpProjection()}, directly before exiting the method.

At this point, the Database Lock has already been released.

Return Value

void