Chapter 4. Dealing with Persistence, State, and Clients

There is no serious enterprise-grade application without persistent data. And the same is true for microservices-based applications. Event sourcing and CQRS are fundamental concepts behind Lagom’s support for services that store information. To understand these concepts, it is recommended that you read Lagom’s CQRS and Event Sourcing documentation.

When using event sourcing, all changes are captured as domain events, which are immutable facts of things that have happened. The persistent equivalent is called an AggregateRoot (PersistentEntity). This is a cluster of domain objects that can be treated as a single unit. An example may be a piece of cargo and its transport legs, which will be separate objects, but it’s useful to treat the cargo (together with its transport legs) as a single aggregate. The aggregate can reply to queries for a specific identifier but it cannot be used for serving queries that span more than one aggregate. Therefore, you need to create another view of the data that is tailored to the queries that the service provides (see Figure 4-1). This separation of the write-side and the read-side of the persistent data is often referred to as the CQRS pattern.

Command Query Responsibility Segregation
Figure 4-1. Command query responsibility segregation

To implement persistence in Lagom you have to implement a class that extends PersistentEntity<Command, Event, State>.

If you are familiar with JPA, you might want to consider a PersistentEntity as a mixture between data access objects (DTOs) and a JPA @Entity. But the differences are obvious. While a JPA entity is loaded from the database wherever it is needed, there may be many Java object instances with the same entity identifier. In contrast, there is only one instance of PersistentEntity with a given identifier. With JPA you typically only store the current state and the history of how the state was reached is not captured.

You interact with a PersistentEntity by sending command messages to it. Commands are processed sequentially, one at a time, for a specific entity instance. A command may result in state changes that are persisted as events, representing the effect of the command. The current state is not stored for every change, since it can be derived from the events. These events are only ever appended to storage; nothing is ever mutated, which allows for very high transaction rates and efficient replication.

With all this knowledge, the remaining lines of code in the RegistrationServiceImpl just got a little clearer. In Lagom, the way to send commands to a PersistentEntity is by using a PersistentEntityRef, which needs to be looked up via the PersistentEntityRegistry. This means that CargoEntity is the PersistentEntity and RegisterCargo is the command we want to send:

// Look up the CargoEntity for the given ID.
PersistentEntityRef<RegistrationCommand> ref =
        persistentEntityRegistry.refFor(CargoEntity.class,
                                        request.getId());
// Tell the entity to use the Cargo information in the request.
return ref.ask(RegisterCargo.of(request));

We made it all the way through the service implementation down to the CargoEntity. This is an event-sourced entity. It has a state CargoState, which stores information about the registered cargo. It can also receive commands that are defined in the RegistrationCommand and translate them into events that are defined in the RegistrationEvent class.

As mentioned before, this is the place where commands are translated into events. And as such, the PersistentEntity has to define a behavior for every command it understands. A behavior is defined by registering commands and event handlers:

public class CargoEntity
       extends PersistentEntity<RegistrationCommand,
                                RegistrationEvent,
                                CargoState> {
   @Override
   public Behavior initialBehavior(
                Optional<CargoState> snapshotState) {

       //  command and event handlers

       return b.build();
     }
   }

If you look at the supported commands, you will find only one, RegisterCargo. It is sent down from the UI when a user adds a new cargo. By convention, the commands should be inner classes of the interface, which makes it simple to get a complete picture of what commands an entity supports. Commands are also immutable objects:

public interface RegistrationCommand extends Jsonable {
    @Value.Immutable
    @ImmutableStyle
    @JsonDeserialize(as = RegisterCargo.class)
    public interface AbstractRegisterCargo extends
            RegistrationCommand,
            CompressedJsonable,
            PersistentEntity.ReplyType<Done> {
        @Value.Parameter
        Cargo getCargo();
    }
}

Commands get translated to events, and it’s the events that get persisted. Each event will have an event handler registered for it, and an event handler simply applies an event to the current state. This will be done when the event is first created, and it will also be done when the entity is loaded from the database—each event will be replayed to re-create the state of the entity. The RegistrationEvent interface defines all the events supported by the CargoEntity. In our case it is exactly one event: CargoRegistered:

public interface RegistrationEvent extends
                 Jsonable,
                 AggregateEvent<RegistrationEvent> {

    @Immutable
    @ImmutableStyle
    @JsonDeserialize(as = CargoRegistered.class)
    interface AbstractCargoRegistered extends
              RegistrationEvent {
        @Override
        default public AggregateEventTag<RegistrationEvent>
             aggregateTag() {
               return RegistrationEventTag.INSTANCE;
             }
        @Value.Parameter
        String getId();
        @Value.Parameter
        Cargo getCargo();
    }
}

This event is emitted when a RegisterCargo command is received. Events and commands are nothing more than immutable objects. Let’s add the different behaviors to handle the command and trigger events.

Behavior is defined using a behavior builder. The behavior builder starts with a state, and if this entity supports snapshotting—with an optimization strategy that allows the state itself to be persisted to combine many events into one—then the passed-in snapshotState may have a value that can be used. Otherwise, the default state is to use a dummy cargo with an id of empty string:

@Override
  public Behavior initialBehavior(
                  Optional<CargoState> snapshotState) {

  BehaviorBuilder b = newBehaviorBuilder(
              snapshotState.orElse(
                CargoState.builder().cargo(
                        Cargo.builder()
                                .id("")
                                .description("")
                                .destination("")
                                .name("")
                                .owner("").build())
                        .timestamp(LocalDateTime.now()
                        ).build()));
            //...

The functions that process incoming commands are registered in the behavior using setCommandHandler of the BehaviorBuilder. We start with the initial RegisterCargo command. The command handler validates the command payload (in this case it only checks if the cargo has a name set) and emits the CargoRegistered event with the new payload. A command handler returns a persist directive that defines what event or events, if any, to persist. This example uses the thenPersist directive, which only stores a single event:

      //...
      b.setCommandHandler(RegisterCargo.class, (cmd, ctx) -> {
             if (cmd.getCargo().getName() == null ||
              cmd.getCargo().getName().equals("")) {
                 ctx.invalidCommand("Name must be defined");
                 return ctx.done();
             }

             final CargoRegistered cargoRegistered =
                     CargoRegistered.builder().cargo(
                     cmd.getCargo()).id(entityId()).build();
                   return ctx.thenPersist(cargoRegistered,
                   evt -> ctx.reply(Done.getInstance()));
            });

When an event has been persisted successfully the current state is updated by applying the event to the current state. The functions for updating the state are also registered with the setEventHandler method of the BehaviorBuilder. The event handler returns the new state. The state must be immutable, so you return a new instance of the state:

b.setEventHandler(CargoRegistered.class,
             // We simply update the current
             // state to use the new cargo payload
             // and update the timestamp
             evt -> state()
                     .withCargo(evt.getCargo())
                     .withTimestamp(LocalDateTime.now())
                     );
Note

The event handlers are typically only updating the state, but they may also change the behavior of the entity in the sense that new functions for processing commands and events may be defined. Learn more about this in the PersistentEntity documentation.

We successfully persisted an entity. Let’s finish the example and see how it is displayed to the user. The getLiveRegistrations() service call subscribes to the topic that was created in the register() service call before and returns the received content:

@Override
   public ServiceCall<NotUsed, NotUsed, Source<Cargo, ?>>
    getLiveRegistrations() {
       return (id, req) -> {
           PubSubRef<Cargo> topic = topics.refFor(
           TopicId.of(Cargo.class, "topic"));
           return CompletableFuture.completedFuture(
           topic.subscriber()
           );
       };
   }

To see the consumer side, you have to look into the front-end project and open the ReactJS application in main.jsx. The createCargoStream() function points to the API endpoint and the live cargo events are published to the cargoNodes function and rendered accordingly (see Figure 4-2).

Publishing Cargo Events to the UI
Figure 4-2. Publishing cargo events to the UI

One last step in this example is to add a REST-based API to expose all the persisted cargo to an external system. While persistent entities are used for holding the state of individual entities—and to work with them you need to know the identifier of an entity—the readAll (select *) is a different use case. Another view on the persisted data is tailored to the queries the service provides. Lagom has support for populating this read-side view of the data and also for building queries on the read-side.

We start with the service implementation again. The CassandraSession is injected in the constructor of the implementation class. CassandraSession provides several methods in different flavors for executing queries. All methods are nonblocking and they return a CompletionStage or a Source. The statements are expressed in Cassandra query language (CQL) syntax:

@Override
   public ServiceCall<NotUsed, NotUsed, PSequence<Cargo>>
   getAllRegistrations() {
       return (userId, req) -> {
           CompletionStage<PSequence<Cargo>>
           result = db.selectAll("SELECT cargoid,"
                        + "name, description, owner,"
                        + "destination FROM cargo")
                   .thenApply(rows -> {
                       List<Cargo> cargos =
                       rows.stream().map(row ->
                       Cargo.of(row.getString("cargoid"),
                               row.getString("name"),
                               row.getString("description"),
                               row.getString("owner"),
                               row.getString("destination")))
                               .collect(Collectors.toList());
                       return TreePVector.from(cargos);
                   });
           return result;
       };
   }

Before the query side actually works, we need to work out a way to transform the events generated by the persistent entity into database tables. This is done with a CassandraReadSideProcessor:

public class CargoEventProcessor extends
CassandraReadSideProcessor<RegistrationEvent> {

  @Override
  public AggregateEventTag<RegistrationEvent> aggregateTag() {
    return RegistrationEventTag.INSTANCE;
  }

  @Override
  public CompletionStage<Optional<UUID>>
         prepare(CassandraSession session) {
    // TODO prepare statements, fetch offset
    return noOffset();
  }

  @Override
  public EventHandlers
      defineEventHandlers(EventHandlersBuilder builder) {
    // TODO define event handlers
    return builder.build();
  }

}

To make the events available for read-side processing, the events must implement the aggregateTag method of the AggregateEvent interface to define which events belong together. Typically, you define this aggregateTag on the top-level event type of a PersistentEntity class. Note that this is also used to create read-side views that span multiple PersistentEntities:

public class RegistrationEventTag {
    public static final AggregateEventTag<RegistrationEvent>
    INSTANCE = AggregateEventTag.of(RegistrationEvent.class);
}

Finally, the RegistrationEvent also needs to extend the AggregateEvent<E> interface. Now, we’re ready to implement the remaining methods of the CargoEventProcessor.

Tables and prepared statements need to be created first. Further on, it has to be decided how to process existing entity events, which is the primary purpose of the prepare method. Each event is associated with a unique offset, a time-based UUID. The offset is a parameter to the event handler for each event and should typically be stored so that it can be retrieved with a select statement in the prepare method. You can use the CassandraSession to get the stored offset.

Composing all of the described asynchronous CompletionStage tasks for this example look like this:

@Override
   public CompletionStage<Optional<UUID>>
          prepare(CassandraSession session) {
       return
          prepareCreateTables(session).thenCompose(a ->
          prepareWriteCargo(session).thenCompose(b ->
          prepareWriteOffset(session).thenCompose(c ->
          selectOffset(session))));
   }

Starting with the table preparation for the read-side is simple. Use the CassandraSession to create the two tables:

private CompletionStage<Done>
   prepareCreateTables(CassandraSession session) {
     return session.executeCreateTable(
       "CREATE TABLE IF NOT EXISTS cargo ("
       + "cargoId text, name text, description text,"
       + "owner text, destination text,"
       + "PRIMARY KEY (cargoId, destination))")
       .thenCompose(a -> session.executeCreateTable(
       "CREATE TABLE IF NOT EXISTS cargo_offset ("
         + "partition int, offset timeuuid, "
         + "PRIMARY KEY (partition))"));
     }

The same can be done with the prepared statements. This is the example for inserting new cargo into the cargo table:

private CompletionStage<Done>
   prepareWriteCargo(CassandraSession session) {
       return session
          .prepare("INSERT INTO cargo"
          + "(cargoId, name, description, "
          + "owner,destination) VALUES (?, ?,?,?,?)")
          .thenApply(ps -> {
           setWriteCargo(ps);
           return Done.getInstance();
       });
}

The last missing piece is the event handler. Whenever a CargoRegistered event is received, it should be persisted into the table. The events are processed by event handlers that are defined in the method defineEventHandlers, one handler for each event class. A handler is a BiFunction that takes the event and the offset as parameters and returns zero or more bound statements that will be executed before processing the next event.

@Override
public EventHandlers
    defineEventHandlers(EventHandlersBuilder builder) {
      builder.setEventHandler(CargoRegistered.class,
      this::processCargoRegistered);
      return builder.build();
}

private CompletionStage<List<BoundStatement>>
processCargoRegistered(CargoRegistered event, UUID offset) {
        // bind the prepared statement
        BoundStatement bindWriteCargo = writeCargo.bind();
        // insert values into prepared statement
        bindWriteCargo.setString("cargoId",
                event.getCargo().getId());
        bindWriteCargo.setString("name",
                event.getCargo().getName());
        bindWriteCargo.setString("description",
        event.getCargo().getDescription());
        bindWriteCargo.setString("owner",
                event.getCargo().getOwner());
        bindWriteCargo.setString("destination",
                event.getCargo().getDestination());
        // bind the offset prepared statement
        BoundStatement bindWriteOffset =
                    writeOffset.bind(offset);
        return completedStatements(
               Arrays.asList(bindWriteCargo,
                    bindWriteOffset));
}

In this example we add one row to the cargo table and update the current offset for each RegistrationEvent.

Note

It is safe to keep state in variables of the enclosing class and update it from the event handlers. The events are processed sequentially, one at a time. An example of such state could be values for calculating a moving average.

If there is a failure when executing the statements the processor will be restarted after a backoff delay. This delay is increased exponentially in the case of repeated failures.

There is another tool that can be used if you want to do something else with the events other than updating tables in a database. You can get a stream of the persistent events with the eventStream method of the PersistentEntityRegistry.

You have already seen the service implementation that queries the database. Let’s try out the API endpoint and get a list of all the registered cargo in the system by curling it:

curl http://localhost:9000/api/registration/all

[
   {
      "id":"522871",
      "name":"TEST",
      "description":"TEST",
      "owner":"TEST",
      "destination":"TEST"
   },
   {
      "id":"623410",
      "name":"SECOND",
      "description":"SECOND",
      "owner":"SECOND",
      "destination":"SECOND"
   }
]

Consuming Services

We’ve seen how to define service descriptors and implement them, but now we need to consume services. The service descriptor contains everything Lagom needs to know to invoke a service. Consequently, Lagom is able to implement service descriptor interfaces for you.

The first thing necessary to consume a service is to bind it, so that Lagom can provide an implementation for your application to use. We’ve done that with the service before. Let’s add a client call from the shipping-impl to the registration-api and validate a piece of cargo before we add a leg in the shipping-impl:

public class ShippingServiceModule extends
      AbstractModule implements ServiceGuiceSupport {
    @Override
    protected void configure() {
        bindServices(serviceBinding(ShippingService.class,
        ShippingServiceImpl.class));
        bindClient(RegistrationService.class);
    }
}

Make sure to also add the dependency between both projects in the build.sbt file by adding .dependsOn(registrationApi) to the shipping-impl project.

Having bound the client, you can now have it injected into any Lagom component using the @Inject annotation. In this example it is injected into the ShippingServiceImpl:

public class ShippingServiceImpl implements ShippingService {
    private final RegistrationService registrationService;

    @Inject
    public ShippingServiceImpl(PersistentEntityRegistry
    persistentEntityRegistry,
    RegistrationService registrationService) {
        this.registrationService = registrationService;
        //...
    }

The service can be used to validate a cargo ID in the shipping-impl before adding a leg:

@Override
    public ServiceCall<String, Leg, Done> addLeg() {
        return (id, request) -> {
            CompletionStage<Cargo> response =
            registrationService.getRegistration()
              .invoke(request.getCargoId(),
                 NotUsed.getInstance());
            PersistentEntityRef<ShippingCommand>
            itinerary = persistentEntityRegistry
                    .refFor(ItineraryEntity.class, id);
            return itinerary.ask(AddLeg.of(request));
        };
    }

All service calls with Lagom service clients are by default using circuit breakers. Circuit breakers are used and configured on the client side, but the granularity and configuration identifiers are defined by the service provider. By default, one circuit breaker instance is used for all calls (methods) to another service. It is possible to set a unique circuit breaker identifier for each method to use a separate circuit breaker instance for each method. It is also possible to group related methods by using the same identifier on several methods. You can find more information about how to configure the circuit breaker in the Lagom documentation.

Get Developing Reactive Microservices now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.