Spring Boot / Docker ( JiB ) / Helm configuration / Tests for Akka / Pekko Finite State Machine ( FSM )

This blog is the detail analysis of an implementation of how to host Akka / Pekko Finite State Machine in a Spring Boot application, which this Netflix blog thinks also that is great idea, so you can use Dependency Injection, Spring Data Elasticsearch, Spring Cloud Kubernetes, Spring Profile, SpringTest support features of Spring Boot, as part of series of blogs explaining how to build an Event Sourcing application with the help of Akka / Pekko Finite State Machine.

Additionally I will explain here the configuration of the Google JiB tool to build Docker images, which saves Maven / Gradle users from hard to maintain cryptic Dockerfiles, gives them the possibility of daemon-less building of Docker Images of the Spring Boot applications and how it is optimises Docker Images in layers to speed up your development cycle which can observe with awesome Dive tool.

And finally how to deploy this Spring Boot application with the help of the Helm Charts.

The sources of this blog lies under following Github Project Akka / Pekko Finite State Machine.

  1. Proof of Concept Application
    1. Docker / JiB
      1. Image Layers
      2. Gradle Configuration
      3. Native Image
    2. Kubernetes Configuration
      1. Helm
      2. Image Configuration
      3. Readiness / Liveness Check
      4. Environment Configuration
      5. Autoscaling
      6. Gradle Configuration
    3. Application Initialisation
    4. Proof of Concep Tests
      1. Positive Test
      2. Unhandled Transition Test
      3. Long Running Workflow / Persistance Recovery Test
      4. Four Eyes Workflow Test

This blog is a part of a series demonstrate the full implementation of Proof of Concept application with Event Sourcing principles using Akka Framework, Apache Kafka, Apache Cassandra and Elasticsearch which you can find it here, to be able to understand whole context of what here explained, I advice to start reading from that blog but if you just need implementation detail of the topic mentioned in the title please continue to read.

PS: I like to apologise for the quality of the some images, unfortunately WordPress reduces the image quality during the upload. For some images that you have to really see the details, I placed links to externally hosted images, so please click the images and you will be forwarded to these high definition images.

Proof of Concept Application

fsm-pekko-4eyes-application

This project does not contain that much functionality, it only contains two classes, one for starting the Spring Boot application and one configuring Spring Context but all the integration tests are here and they are proofs that all the ideas in this blog functions, so we will look them closely.

Additionally under this project, there are here the Helm Charts for the deployment to Kubernetes and the Docker Image creation configurations but I will not go in details here about this topic but I will have further in blog a dedicated chapter / blog about it.

Docker / JiB

Our application is designed to run in Cloud / Kubernetes so we have to create a Docker Image from it. To achieve this goal, we can use the classical approach and use Dockerfile’s. My Problem with as Java / Scala developers, we have to learn another Syntax / Grammar. The developers of the Google had the same idea and developed JiB for Gradle and Maven with following missions statement. ‘Jib builds optimised Docker and OCI images for your Java applications without a Docker daemon – and without deep mastery of Docker best-practices‘.

Image Layers

It has additional advantages like you don’t need a docker daemon on your build machine and more important of all it can do real good optimisation that reduces your build times, by separating your application into multiple layers, splitting dependencies from classes. Now you don’t have to wait for Docker to rebuild your entire Java application – just deploy the layers that changed.

There is another extraordinary tool called Dive that show how the Docker Image Layers build and what are the advantages JiB brings with.

For ex, following show us JiB is clever enough to build a Layer for our Dependencies with a concrete version number, the reason for it, we will not change the version number for Akka / Pekko or Spring not every commit / build (may be every six months or may be years but defintetly not every commit), it makes no sense to build this Image Layer for every checkin in our in Continuous Build environment or even for local development, as you can see Layer size is 150MB which will costs lots time if we build 30 times per day. This is something you get for free in JiB, with Dockerfiles you can make this yourself but I can tell you this will cost you lots of Lines of Code.

Second picture shows us, JiB is even clever enough to pack our direct project dependencies (the code that belongs to us but provided as -SNAPSHOT dependencies) to another layer with the assumption that our source code in the local project has a much bigger possibility to be changed then our project dependencies, so this should layer should not build every time.

It even packs our application configuration data to another layer.

And finally another layer for our code.

Couldn’t we achieve the same goal by writing our own Dockerfiles, sure but why should we waste time by teaching everybody in our project the syntax of Dockerfiles and dealing with it complexities while JiB is already doing it for us.

Gradle Configuration

Lets see how we configure JiB in Gradle.

   jib {
        from {
            image = 'azul/zulu-openjdk:17.0.2-17.32.13'
            platforms {
                platform {
                    architecture = 'amd64'
                    os = 'linux'
                }
                platform {
                    architecture = 'arm64'
                    os = 'linux'
                }
            }
            auth {
                username = "${props.DOCKER_HUB_USER}"
                password = "${props.DOCKER_HUB_PASSWORD}"
            }
        }
        to {
            image = "fsm-pekko.registry:5555/fsmpekko/${project.name}:${project.version}"
            auth {
                username = "${props.DOCKER_UPLOAD_USER}"
                password = "${props.DOCKER_UPLOAD_PASSWORD}"
            }
        }
        extraDirectories {
            permissions = [
                    'var/lib/fsm_pekko_4eyes_application': '644'
            ]
        }
        allowInsecureRegistries = true
    }
    tasks.build.dependsOn tasks.jib

Most important configurations are base image configuration, I am using Zulu JDKs as base image ‘azul/zulu-openjdk:17.0.2‘ (mainly because I am using a Mac Notebook with M1 chip and it is one of the JDK that currently perform good with M1) which I hear real good comments about it and the second configuration is the Image name, the version tag of the image (based on the Gradle Project version) and destination Docker Registry that we will install the image.

You might see many more configuration parameters for JiB in the documentation but only one interesting for us here is the ‘extraDirectories‘ if we want extra directory and its permission. To create this directory you should create this directory physically under ‘fsm-akka-4eyes-application’ with the following naming convention.

so our Akka / Pekko Cluster can write it’s Cluster information to this directory if necessary, we also let the JiB give ‘644’ chmod permissions to this directory.

And voila. with so few configurations and hustle you will get your layered Docker image.

Native Image

With Spring Boot 3.X and Google JiB it is really easy to create ‘ native-image’ with GraalVM as I explained extensively in this blog so I will not explain here, please check the link to learn reach unbelievable low start up times with Spring Boot..

Kubernetes Configuration

Helm

As I mentioned multiple times in this Blog our mechanism to deploy our Proof of Concept to Cloud/Kubernetes would be via Helm Charts, for that we have to package our application via Helm and we to create a structure for it.

Thankfully ‘helm init‘ command prepare a template that fulfil the %90 of our requirements and you just have to fill the blanks. Industry ‘Best Practice‘ is to keep your Helm Chart definitions in the same repository of Scala / Java Code, so you don’t have to jump between the repositories and the fact that you developers knows best how to configure your application, then this Helm Chart will be packaged with the help of an Gradle Plugin and deployed to Helm Registry, so it can be referenced from an Umbrella Helm Chart managing your whole system (Application and Infrastructure components).

If we look to the ‘Chart.yaml’.

apiVersion: v2
name: fsm-pekko-foureyes-application
description: A Helm chart for Kubernetes

# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application

# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.0.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "${appVersion}"

In which we are defining the Helm Chart Name ‘fsm-pekko-foureyes-application‘, Version ‘1.0.0‘ and appVersion, Helm Chart two Version concept, one is the Version of the Helm Chart, which in theory should not change, as long the configuration of Helm Chart changes and one Application Version, which most of the people relates to the Version of the Docker Images. So as long as only your Docker Image version changes, you should not increment the version of the Helm Chart with Image changes but only with Helm configuration changes and use the ‘appVersion‘ for Image changes.

While this would be to nice to be true, there is one problem with when you reference another Helm Chart as dependency, the Helm dependency resolution mechanism respects only ‘version‘, so if you are going to use umbrella charts like me, you might have to change the ‘version‘ when Docker Image version changes otherwise not.

Now if you look to the above snippet, I am using a variable notation for ‘appVersion: “${appVersion}”‘, the Gradle Plugin that we use ‘org.unbroken-dome.helm‘ has nice feature called ‘filtering‘ which help us replace some values in Helm file, which you will see in Gradle Configuration chapter.

Image Configuration

# Default values for fsm-pekko-4eyes-application.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.

replicaCount: 2

image:
  repository: "k3d-${imageRepository}"
  pullPolicy: Always
  # Overrides the image tag whose default is the chart appVersion.
  tag: "${imageTag}"

And the interesting parts of the ‘valus.yaml’, while we want to test our application in Kubernetes with Akka Cluster Sharding, we are starting two instances of our application (this is the place you upscale your application if you need more instances, in further chapters I will show how you can configure the Horizontal Pod Autoscaling over Helm). Next point, is telling the Helm Chart which Docker Image to use (which is previously.build with JiB and deployed to our Docker Registry in our k3d instance)., while we are working with a SNAPSHOT Version for this reason ‘pullPolicy’ is ‘Always’. Finally ‘tag’ defines which Docker Image tag that we want to use, while this is a development version, we have SNAPSHOT version and not a concrete Version.

We are again using filtering feature of ‘org.unbroken-dome.helm‘ Gradle Plugin and the values of ‘repository: “k3d-${imageRepository}”‘ and ‘tag: “${imageTag}”‘ will be replaced.

resources:
  limits:
    cpu: 1
    memory: 1Gi
  requests:
    cpu: 1
    memory: 512Mi

This configuration block, tells Kubernetes to how much CPU time and memory it should allocate for this pod.

Readiness / Liveness Check

As you can remember we discussed in ‘fsm-pekko-akkasystem‘ chapter hte topic of HealthCheck / Readiness / Liveness, this is the mechanism decides for Kubernetes an application is successfully started and the Pod initialisation process is complete and it is ready accept network traffic.

If the Liveness return false after certain period of the time, Kubernetes will try to restart and restart the Pod until certain number of tries before it switch to complete Error mode. This is the reason you don’t need humans observing system 24 / 7 to restart the applications, it would occur automatically.

Second check Readiness decides the Pod should receive network traffic, imagine you have 5 Akka Nodes, if for some reason Node 5 receives can’t process requests because of some internal problem but still receiving Network request, Kubernetes is clever enough not to direct requests to Node 5, with the knowledge that can’t be anyway processed. At this point thanks to Akka Cluster Sharding, the other 4 Akka Nodes will take the load and process. If Akka Node will be healthy again and Liveness Check returns positive results, Node 5 will be again included Cluster Sharding and can process requests. If the Liveness Check does not return positive after a while the Pod will be started new.

Below the configuration of the Helm Chart for Readiness / Liveness Check.

          ports:
            - name: management
              containerPort: 8558
              protocol: TCP
            - name: http
              containerPort: {{ .Values.service.port }}
              protocol: TCP
          livenessProbe:
            httpGet:
              path: /alive
              port: management
            initialDelaySeconds: 120
            periodSeconds: 30
          readinessProbe:
            httpGet:
              path: /ready
              port: management
            initialDelaySeconds: 120
            periodSeconds: 30

Environment Configuration

pekko:
  fsm:
    numberOfShards: 30

cassandraContactPoint:
  cassandraReleaseName: fsm-cassandra
  cassandraNamespace: fsmpekko

spring:
  kafka:
    bootstrapServers: "fsm-pekko-kafka-headless:9092"
            - name: KUBERNETES_NAMESPACE
              value: {{ .Release.Namespace }}
            - name: REQUIRED_CONTACT_POINT_NR
              value: "{{ .Values.replicaCount }}"
            - name: CASSANDRA_CONTACT_POINT
              value: "{{ .Values.cassandraContactPoint.cassandraReleaseName }}-dc1-all-pods-service.{{ .Values.cassandraContactPoint.cassandraNamespace }}.svc.cluster.local"
            - name: NUMBER_OF_SHARDS
              value: "{{ .Values.pekko.fsm.numberOfShards }}"
            - name: SPRING_KAFKA_BOOTSTRAP_SERVERS
              value: "{{ .Values.spring.kafka.bootstrapServers }}"
            - name: K8SSANDRA_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: fsm-cassandra-superuser
                  key: password

This last parameter is the application configuration for the Spring Boot and Akka System.

pekko {
  fsm.numberOfShards = 30
  remote.artery {
    canonical {
      hostname = 127.0.0.1
      port = 2551
    }
  }
  ...
}

How many Cluster Shard that we want to have in Kubernetes.

datastax-java-driver {
  basic {
    contact-points = ["127.0.0.1:9042"]
  }
    advanced {
      auth-provider {
        password = ${K8SSANDRA_PASSWORD}
      }
    }
}

which Apache Cassandra to connect

spring:
  cloud.kubernetes:
    config:
      enabled: false
    secrets:
      enabled: false
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      autoOffsetReset: earliest
      group-id: "pekko-fsm-foureyes"

And for the Spring Boot configuration.

Finally the Umbrella Helm Chart controlling the deployment all Infrastructure Components and the Applications in the ‘fsm-akka-helm-charts project.

apiVersion: v2
name: fsm-pekko-helm-charts
description: A Helm chart for Kubernetes
type: application
version: 0.1.0
appVersion: "1.0.0-SNAPSHOT"
dependencies:
  - name: k8ssandra
    version: 1.5.0-20220218064357-f72a8cb2
    condition: k8ssandra.enabled
    repository: https://helm.k8ssandra.io/stable
    tags:
      - infrastructure
  - name: traefik
    version: 10.14.2
    condition: traefik.enabled
    repository: https://helm.traefik.io/traefik
    tags:
      - infrastructure
  - name: fsmpekkokafka
    version: 1.0.0
    condition: fsmpekkokafka.enabled
    tags:
      - infrastructure
  - name: elasticsearch
    version: 7.16.3
    condition: elasticsearch.enabled
    repository: https://helm.elastic.co/
    tags:
      - infrastructure
  - name: nexus-repository-manager
    version: 38.0.0
    condition: nexus.enabled
    repository: https://sonatype.github.io/helm3-charts/
    tags:
      - infrastructure
  - name: metrics-server
    version: 3.8.2
    condition: metrics-server.enabled
    repository: https://kubernetes-sigs.github.io/metrics-server/
    tags:
      - infrastructure
  - name: fsm-pekko-4eyes-application
    version: 1.1.2
    condition: fsm-pekko-4eyes-application.enabled
    repository: http://localhost:57198/repository/fsm-akka-helm/
    tags:
      - application

As you can see, we are installing Apache Cassandra, Traefik, Aapache Kafka, Elasticsearch, Nexus and finally our Akka / Pekko Application ‘fsm-akka-4eyes-application’ with the help of this Helm Chart.

Autoscaling

This Topic become too big to cover on this blog entry, so I created a dedicated blog for it.

Limitless Scaling and Elasticity with Kubernetes Event Driven Autoscaling ( KEDA ) for an Event Sourcing Akka / Pekko Application – Apache Kafka

If you want to learn more about it, please check that one.

Gradle Configuration

To be able use ‘fsm-akka-4eyes-application‘ Helm Chart from the Umbrella Helm Chart, we have to upload it to a Helm Repository (for our PoC I choose Nexus and you can see here how to setup) and to package / upload our Helm Chart we will use a Gradle Plugin.

project(':fsm-pekko-4eyes-application') {
    apply plugin: 'org.springframework.boot'
    apply plugin: 'java'
    apply plugin: 'io.freefair.lombok'
    apply plugin: 'groovy'
    apply plugin: 'com.google.cloud.tools.jib'
    apply plugin: 'org.unbroken-dome.helm'
    apply plugin: 'org.unbroken-dome.helm-publish'

    ...
     helm {
        charts {
            foureyes {
                publish = true
                chartName = 'fsm-pekko-4eyes-application'
                chartVersion = "${project.version}"
                sourceDir = file('helm')
                filtering {
                    values.put 'imageRepository', jib.to.image
                    values.put 'imageTag', jib.to.tags.first()
                    values.put 'appVersion', jib.to.tags.first()
                }
            }
        }
        repositories {
            fsmpekko {
                url 'http://localhost:57198//repository/fsm-pekko-helm/'
                credentials {
                    username = "${props.HELM_USER}"
                    password = "${props.HELM_PASSWORD}"
                }
            }
        }
        publishing {
            repositories {
                nexus {
                    url = uri('http://localhost:57198/')
                    repository = 'fsm-pekko-helm'
                    apiVersion = 'v1'
                    credentials {
                        username = "${props.HELM_USER}"
                        password = "${props.HELM_PASSWORD}"
                    }
                }
            }
        }
    }
}

For Helm definition we have to give a name to our chart, naturally our Gradle project name and a Version, our Gradle project version, then the things gets interesting, remeber the filtering feature of the Helm plugin I mentioned, we can here replace the values of ‘appVersion’, ‘Docker Image’ and ‘Docker Tag’, please pay attention that the values for these replacements supplied in collaboration with JiB plugin.

The rest of the configuration is about how publish to Nexus Repository and authentication information etc.

One thing I like to point out, for every Helm Chart Publishing you have to increase the Version of the Chart for every upload, Nexus does not accept upload if the Version already exist in Nexus Helm Repository (or you can delete the Version from Nexus).

You can upload Helm Charts to a Helm repository with following command.

gradle :fsm-pekko-4eyes-application:helmPublish

The commands how to install these Helm Charts to Kubernetes would be dealt in another chapter.

Application Initialisation

Now that we dealt with how we package our application in Docker and deliver with Helm, let’s look how we initialise it with Spring.

fsm-pekko-4eyes-application/src/main/java/org/salgar/fsm/pekko/foureyes/FSMAkka4EyesApplication.java

package org.salgar.fsm.pekko.foureyes;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.salgar.fsm.pekko.foureyes.addresscheck.actions.config.AdressCheckSMActionConfiguration;
import org.salgar.fsm.pekko.foureyes.addresscheck.guards.config.AdressCheckSMGuardConfiguration;
import org.salgar.fsm.pekko.foureyes.credit.actions.config.CreditSMActionConfiguration;
import org.salgar.fsm.pekko.foureyes.credit.guards.config.CreditSMGuardConfiguration;
import org.salgar.fsm.pekko.foureyes.creditscore.actions.config.CreditScoreSMActionConfiguration;
import org.salgar.fsm.pekko.foureyes.creditscore.guards.config.CreditScoreSMGuardConfiguration;
import org.salgar.fsm.pekko.foureyes.fraudprevention.actions.config.FraudPreventionSMActionConfiguration;
import org.salgar.fsm.pekko.foureyes.fraudprevention.actions.guards.config.FraudPreventionSMGuardConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;

@Import({
        AdressCheckSMActionConfiguration.class,
        AdressCheckSMGuardConfiguration.class,
        CreditSMActionConfiguration.class,
        CreditSMGuardConfiguration.class,
        CreditScoreSMActionConfiguration.class,
        CreditScoreSMGuardConfiguration.class,
        FraudPreventionSMActionConfiguration.class,
        FraudPreventionSMGuardConfiguration.class
})
@EnableElasticsearchRepositories("org.salgar.fsm.pekko.elasticsearch")
@RequiredArgsConstructor
@Slf4j
@SpringBootApplication(scanBasePackages = {"org.salgar.pekko",
        "org.salgar.fsm"})
public class FSMPekko4EyesApplication {
    public static void main(String[] args) {
        log.info("Starting!");
        new SpringApplicationBuilder(FSMPekko4EyesApplication.class)
                .registerShutdownHook(true)
                .run( args);
    }
}

As you can see there is nothing fancy about the Spring Boot configuration, we define in ‘@Import‘ statement which Spring Context configuration classes should be inspected and with ‘scanBasePackages‘ parameters tells Spring Boot which packages should be scanned for Component Scan.

Next class is the ‘Starter’…

fsm-pekko-4eyes-application/src/main/java/org/salgar/fsm/pekko/foureyes/Starter.java

package org.salgar.fsm.pekko.foureyes;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.salgar.fsm.pekko.elasticsearch.OffsetFacade;
import org.salgar.fsm.pekko.foureyes.addresscheck.kafka.stream.AdressCheckSMStreamConfig;
import org.salgar.fsm.pekko.foureyes.addresscheck.protobuf.AdressCheckSMCommand;
import org.salgar.fsm.pekko.foureyes.credit.kafka.config.TopicProperties;
import org.salgar.fsm.pekko.foureyes.credit.kafka.facade.AskFacade;
import org.salgar.fsm.pekko.foureyes.credit.kafka.stream.CreditSMStreamConfig;
import org.salgar.fsm.pekko.foureyes.credit.protobuf.CreditSMCommand;
import org.salgar.fsm.pekko.foureyes.creditscore.kafka.stream.CreditScoreSMStreamConfig;
import org.salgar.fsm.pekko.foureyes.creditscore.kafka.stream.MultiTenantCreditScoreSMStreamConfig;
import org.salgar.fsm.pekko.foureyes.creditscore.protobuf.CreditScoreSMCommand;
import org.salgar.fsm.pekko.foureyes.creditscore.protobuf.MultiTenantCreditScoreSMCommand;
import org.salgar.fsm.pekko.foureyes.fraudprevention.kafka.stream.FraudPreventionSMStreamConfig;
import org.salgar.fsm.pekko.foureyes.fraudprevention.protobuf.FraudPreventionSMCommand;
import org.salgar.fsm.pekko.foureyes.projections.CreditSMProjection;
import org.salgar.fsm.pekko.foureyes.projections.CreditSMProjectionHandler;
import org.salgar.fsm.pekko.kafka.config.ConsumerConfig;
import org.salgar.fsm.pekko.pekkosystem.ActorService;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class Starter {
    private final ActorService actorService;
    private final TopicProperties topicProperties;
    private final AskFacade askFacade;
    private final CreditSMProjectionHandler creditSMProjectionHandler;
    private final OffsetFacade offsetFacade;
    private final ConsumerConfig<String, CreditSMCommand> creditSMConsumerConfig;
    private final ConsumerConfig<String, CreditScoreSMCommand> creditScoreSMConsumerConfig;
    private final ConsumerConfig<String, MultiTenantCreditScoreSMCommand> multiTenantCreditScoreSMConsumerConfig;
    private final ConsumerConfig<String, AdressCheckSMCommand> adressCheckSMConsumerConfig;
    private final ConsumerConfig<String, FraudPreventionSMCommand> fraudPreventionSMConsumerConfig;

    @EventListener(ApplicationReadyEvent.class)
    public void initialised(ApplicationReadyEvent applicationReadyEvent) {
        log.info("FSM Pekko 4eyes Initialised!");

        CreditSMStreamConfig.apply(
                creditSMConsumerConfig,
                actorService,
                topicProperties,
                askFacade);
        
        ...

        CreditSMProjection.init(
                actorService.actorSystem(),
                creditSMProjectionHandler,
                offsetFacade);
    }
}

This class get over Spring Dependency Injection all components necessary to start Akka Alpakka Kafka Stream / Pekko Kafka Connectors to be able receives Events for our Akka Finite State Machine from Apache Kafka. For starting Kafka Stream, we are waiting ‘@EventListener(ApplicationReadyEvent)‘ which will signal that all Spring Components are initialised and our application is ready to accept Events.

Now let’s look to the integration tests, which are the proof that this Proof of Concept works.

Proof of Concep Tests

Until now we discussed the theory of Event Sourcing with Apache Kafka and Akka / Pekko Finite State Machine, now it is time to look all these theories functions or not

Positive Test

Our first candidate is the ‘fsm-pekko-4eyes-application/src/test/java/org/salgar/fsm/pekko/foureyes/creditsm/InitialTest.java‘ Test.

package org.salgar.fsm.pekko.foureyes.creditsm;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.salgar.fsm.pekko.foureyes.FSMPekko4EyesApplication;
import org.salgar.fsm.pekko.foureyes.credit.facade.CreditSMFacade;
import org.salgar.fsm.pekko.foureyes.credit.model.Address;
import org.salgar.fsm.pekko.foureyes.credit.model.CreditApplication;
import org.salgar.fsm.pekko.foureyes.credit.model.CreditTenantScoreResult;
import org.salgar.fsm.pekko.foureyes.credit.model.CreditTenants;
import org.salgar.fsm.pekko.foureyes.credit.model.Customer;
import org.salgar.fsm.pekko.foureyes.elasticsearch.CreditSMRepository;
import org.salgar.fsm.pekko.foureyes.model.CreditSmEs;
import org.salgar.fsm.pekko.foureyes.usecasekey.CreditUseCaseKeyStrategy;
import org.salgar.fsm.pekko.foureyes.variables.PayloadVariableConstants;
import org.salgar.pekko.fsm.foureyes.cra.kafka.CustomerRelationshipAdapter;
import org.salgar.pekko.fsm.foureyes.cra.model.CRMCustomer;
import org.salgar.pekko.fsm.foureyes.creditscore.AddressCheckService;
import org.salgar.pekko.fsm.foureyes.creditscore.CreditScoreService;
import org.salgar.pekko.fsm.foureyes.faudprevention.FraudPreventionService;
import org.salgar.pekko.fsm.foureyes.notifier.NotifierService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.CREDIT_REJECTED;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.ReportResponse;
import static org.salgar.fsm.pekko.foureyes.credit.CreditSM.Response;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.ADDRESS_CHECK_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.CUSTOMER_SCORE_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.FRAUD_PREVENTION_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.SOURCE_SLAVE_SM_TAG;
import static org.salgar.pekko.fsm.foureyes.notifier.NotificationHelper.CREDIT_ANALYST_NOTIFICATION_LIST;
import static org.salgar.pekko.fsm.foureyes.notifier.NotificationHelper.RELATIONSHIP_MANAGER_NOTIFICATION_LIST;
import static org.salgar.pekko.fsm.foureyes.notifier.NotificationHelper.SALES_MANAGER_NOTIFICATION_LIST;
import static org.salgar.pekko.fsm.foureyes.notifier.NotificationHelper.SENIOR_SALES_MANAGER_NOTIFICATION_LIST;

@EnableElasticsearchRepositories("org.salgar.fsm.pekko.foureyes.elasticsearch")
@ActiveProfiles({"itest"})
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(
        partitions = 1,
        topics = {"creditSM", "creditScoreSM", "addressCheckSM", "fraudPreventionSM", "multiTenantScreditScoreSM"},
        brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
@Slf4j
@SpringBootTest(classes = { FSMPekko4EyesApplication.class })
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class InitialTest {
    private final static long WAIT_TIME_BETWEEN_STEPS = TimeUnit.MILLISECONDS.toMillis(500);
    private final static long WAIT_TIME_ELASTIC = TimeUnit.SECONDS.toMillis(15);
    final List<String> relationShipNotificationList = Arrays.asList("relationshipmanager1@example.com", "relationshipmanager2@example.com");
    final List<String> salesManagerNotificationList = Arrays.asList("salesmanager1@example.com", "salesmanager2@example.com");
    final List<String> creditAnalystNotificationList = List.of("creditanalyst@example.com");
    final List<String> seniorSalesManagerNotificationList = List.of("seniorSalesManagert@example.com");

Naturally we have to first setup our tests, interesting part here is how we are configuring the @EmbeddedKafka for test environment with the necessary Topics and URL / Port that Apache Kafka should Kafka listens along with some configuration that is necessary from mocking of Notifier Service.

Our initial test would test a full workflow from submitting a credit request to it’s positive resolution. First we have to prepare payload objects of our first Event.

    @Test
    @SneakyThrows
    public void creditAcceptedTest() {
        final String creditUuid = UUID.randomUUID().toString();
        final Customer customer =
                new Customer(
                        "John",
                        "Doe",
                        "123456789X",
                        new Address(
                                "muster strasse 1",
                                "11A",
                                "city1",
                                "country1"
                        ),
                        "customer1@test.info");
        final List<Customer> creditTenants = new ArrayList<>();
        creditTenants.add(customer);
        Map<String, Object> payload = preparePayload(creditUuid, 100000.0, creditTenants);

        creditSMFacade.submit(payload);

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);
        ...

After this part of test runs, this is what we see in logs.

[CreditSMGuardian$] [akka://FourEyes/system/sharding/creditsm/18/8f9a1dc8-c0b7-4ad1-b675-ef03deee5d91] – Processing INITIAL onSubmit payload: {creditUuid=8f9a1dc8-c0b7-4ad1-b675-ef03deee5d91, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))]}
[CreditSM$onSubmit], resulting effect: [Persist(CreditSM$CreditApplicationSubmittedPersistEvent)], side effects: [1]

As you can see Akka Finite State Machine is doing what we are expecting, but lets check that with assertions.

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

        Future<Response> futureCreditSMState = creditSMFacade.currentState(payload);
        ReportResponse report =
                (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

        assertNotNull(report);
        assertThat(report.state(), instanceOf(CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL.class));
        assertEquals(((List<Customer>)report.state().controlObject().get(PayloadVariableConstants.CREDIT_TENANTS)).get(0), customer);
        verify(notifierService, times(1)).notify(eq(relationShipNotificationList), anyString());

        payload = preparePayload(creditUuid, creditTenants);

If you look to the UML Diagram,

as you can see with ‘onSubmit‘ event the State Machine will transition to State (please remember that WAIT_APPROVAL state has a Nested Submachine that is the reason State name looks that complex).

That is exactly the assertion checks…

CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL

We also validate the NotifierService called from Transition Action.

INITIAL_$$_CREDIT_APPLICATION_SUBMITTED_intial_onSubmit_ActionImpl.java

@RequiredArgsConstructor
public class INITIAL_$$_CREDIT_APPLICATION_SUBMITTED_intial_onSubmit_ActionImpl
    extends INITIAL_$$_CREDIT_APPLICATION_SUBMITTED_intial_onSubmit_Action {
    private final CustomerRelationshipAdapter customerRelationshipAdapter;
    private final NotifierService notifierService;

    @Override
    protected Map<String, Object> processCustomAction(ActorContext<CreditSM.CreditSMEvent> actorContext,
                                                        Map<String, Object> controlObject,
                                                        Map<String, Object> payload) {

        List<String> notificationList = notifierService.calculateRecipientList(RELATIONSHIP_MANAGER_NOTIFICATION_LIST);
        notifierService.notify(notificationList, "Credit Tenants applied for Credit. Please check!");
        controlObject.put(RELATIONSHIP_MANAGERS, notificationList);

        String creditId = (String) payload.get(CreditUseCaseKeyStrategy.CREDIT_UUID);
        CreditApplication creditApplication = (CreditApplication) payload.get(CREDIT_APPLICATION);

        controlObject.put(CreditUseCaseKeyStrategy.CREDIT_UUID, creditId);
        controlObject.put(CREDIT_AMOUNT, creditApplication.getCreditAmount());
        controlObject.put(CREDIT_TENANTS, creditApplication.getCreditTenants().getCreditTenants());

        for (Customer customer: creditApplication.getCreditTenants().getCreditTenants()) {
            CRMCustomer crmCustomer =
                    new CRMCustomer(
                            customer.getFirstName(),
                            customer.getLastName());
            customerRelationshipAdapter.transferCustomerCreation(crmCustomer);
        }

        return Collections.emptyMap();
    }
}
verify(notifierService, times(1)).notify(eq(relationShipNotificationList), anyString());

Now we are in WAITING_APPROVAL State and we are waiting the approval of the Relationship Manager.

In our test now we have to prepare test events.

        verify(notifierService, times(1)).notify(eq(relationShipNotificationList), anyString());

        payload = preparePayload(creditUuid, creditTenants);

        creditSMFacade.relationshipManagerApproved(payload);

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

And let’s see how Akka Finite State Machine reacts to this Event on logs.

[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-18] [akka://FourEyes/system/sharding/creditsm/18/8f9a1dc8-c0b7-4ad1-b675-ef03deee5d91] – Handled command [org.salgar.fsm.akka.foureyes.credit.CreditSM$onRelationshipManagerApproved], resulting effect: [Persist(org.salgar.fsm.akka.foureyes.credit.CreditSM$RelationshipManagerApprovedPersistEvent)], side effects: [1]

And also validate the State of State Machine with our assertions.

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

        futureCreditSMState = creditSMFacade.currentState(payload);

        report =
                (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

        assertNotNull(report);
        assertThat(report.state(), instanceOf(CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL.class));
        verify(notifierService, times(1)).notify(eq(salesManagerNotificationList), anyString());

        payload = preparePayload(creditUuid, creditTenants);

Our State Machine changed to State.

CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL

And Transition Action calls the Notification Service to alert Sales Manager while its reaction is expected.

WAITING_APPROVAL_$$_RELATIONSHIP_MANAGER_APPROVED_waitingApproval_onRelationshipManagerApproved_ActionImpl.java

@RequiredArgsConstructor
public class WAITING_APPROVAL_$$_RELATIONSHIP_MANAGER_APPROVED_waitingApproval_onRelationshipManagerApproved_ActionImpl
    extends WAITING_APPROVAL_$$_RELATIONSHIP_MANAGER_APPROVED_waitingApproval_onRelationshipManagerApproved_Action {
    private final NotifierService notifierService;

    @Override
    protected Map<String, Object> processCustomAction(ActorContext<CreditSM.CreditSMEvent> actorContext,
                                                        Map<String, Object> controlObject,
                                                        Map<String, Object> payload) {
        List<String> notificationList = notifierService.calculateRecipientList(SALES_MANAGER_NOTIFICATION_LIST);
        notifierService.notify(notificationList,
                "Relationship Manager Approved the Credit, Sales Manager you should proceed. Please check!");
        controlObject.put(SALES_MANAGERS, notificationList);

        return payload;
    }
}
verify(notifierService, times(1)).notify(eq(salesManagerNotificationList), anyString());

Now that Relationship Manager Approved, we need the Approval of the Sales Manager to be able to continue with our workflow.

Test Event for this.

        verify(notifierService, times(1)).notify(eq(salesManagerNotificationList), anyString());

        payload = preparePayload(creditUuid, creditTenants);
        creditSMFacade.salesManagerApproved(payload);

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

And what we see in logs.

[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-18] [akka://FourEyes/system/sharding/creditsm/18/8f9a1dc8-c0b7-4ad1-b675-ef03deee5d91] – Handled command [org.salgar.fsm.akka.foureyes.credit.CreditSM$onSalesManagerApproved], resulting effect: [Persist(org.salgar.fsm.akka.foureyes.credit.CreditSM$SalesManagerApprovalPersistEvent)], side effects: [1]

As usual we assert that State Machine Transition to State we are expecting.

        futureCreditSMState = creditSMFacade.currentState(payload);

        report =
                (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

        assertNotNull(report);
        assertThat(report.state(), instanceOf(CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC.class));

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

but it will not be as straightforward as you expect like the previous States. Approval of the Sales Manager also triggers our inquiries to our Partner Systems about this Credit Approval. To handle this complexity ‘SALES_MANAGER_APPROVED’ has a Nested / Sub State Machine (do you remember our discussion about State Explosion and managing complexity by dividing and conquering?).

So State Machine would be in

CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC

We also verify our calls toward Partner Systems are also triggered.

@RequiredArgsConstructor
public class RELATIONSHIP_MANAGER_APPROVED_$$_SALES_MANAGER_APPROVED_relationshipManagerApproved_onSalesManagerApproved_ActionImpl
    extends RELATIONSHIP_MANAGER_APPROVED_$$_SALES_MANAGER_APPROVED_relationshipManagerApproved_onSalesManagerApproved_Action {
    private final MultiTenantCreditScoreSMFacade multiTenantScreditScoreSMFacade;
    private final FraudPreventionSMFacade fraudPreventionSMFacade;
    private final AdressCheckSMFacade adressCheckSMFacade;

    @Override
    protected Map<String, Object> processCustomAction(ActorContext<CreditSM.CreditSMEvent> actorContext,
                                                        Map<String, Object> controlObject,
                                                        Map<String, Object> payload) {
        if(payload != null) {
            String useCaseKey = CreditSMServiceLocator.getInstance().useCaseKeyStrategy().getKey(payload);

            Map<String, Object> creditScorePayload = new HashMap<>(payload);
            multiTenantScreditScoreSMFacade.startMultiTenantCreditScoreResearch(
                    () -> useCaseKey,
                    creditScorePayload
            );

            Map<String, Object> fraudPreventionPayload = new HashMap<>(payload);
            fraudPreventionSMFacade.startFraudPreventionEvaluation(
                    () -> useCaseKey,
                    fraudPreventionPayload
            );

            Map<String, Object> addressCheckPayload = new HashMap<>(payload);
            adressCheckSMFacade.startAdressCheckResearch(
                    () -> useCaseKey,
                    addressCheckPayload
            );
        }

        return payload;
    }
}
verify(creditScoreServiceMockBean).calculateCreditScore(anyString(), anyString(), anyString()); verify(fraudPreventionServiceMockBean).reportFraudPrevention(anyString(), anyString(), anyString());       verify(addressCheckServiceMockBean).addressExist(anyString(), anyString(), anyString(), anyString());

And logs confirms the interaction with our Slave State Machines.

[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-45] [akka://FourEyes/system/sharding/multitenantcreditscoresm/18/8f9a1dc8-c0b7-4ad1-b675-ef03deee5d91] – Handled command [org.salgar.fsm.akka.foureyes.creditscore.MultiTenantCreditScoreSM $ onStartMultiTenantCreditScoreResearch], resulting effect: [Persist(org.salgar.fsm.akka.foureyes.creditscore.MultiTenantCreditScoreSM $ StartMultiTenantResearchPersistEvent)], side effects: [1]
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-44] [akka://FourEyes/system/sharding/fraudpreventionsm/18/8f9a1dc8-c0b7-4ad1-b675-ef03deee5d91] – Handled command [org.salgar.fsm.akka.foureyes.fraudprevention.FraudPreventionSM $ onStartFraudPreventionEvaluation], resulting effect: [Persist(org.salgar.fsm.akka.foureyes.fraudprevention.FraudPreventionSM $ FraudPreventionPersistEvemt)], side effects: [1]
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-46] [akka://FourEyes/system/sharding/adresschecksm/18/8f9a1dc8-c0b7-4ad1-b675-ef03deee5d91] – Handled command [org.salgar.fsm.akka.foureyes.addresscheck.AdressCheckSM$ onStartAdressCheckResearch], resulting effect: [Persist(org.salgar.fsm.akka.foureyes.addresscheck.AdressCheckSM $ StartAdressCheckPersistEvent)], side effects: [1]

Now to progress with our test, we should receive responses from our Partner Systems, in which order we receive the responses is not relevant for State Machine. This test will receives first the response from CustomerScore Slave State Machine but in other tests under this project you simulate the responses arrives in different order, you can observe these tests yourself to convince yourselves.

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

        payload = preparePayload(creditUuid, creditTenants);
        Map<String, CreditTenantScoreResult> creditTenantScoreResultMap = new HashMap<>();
        creditTenantScoreResultMap.put(
                customer.getPersonalId(),
                new CreditTenantScoreResult(customer.getPersonalId(), 73.72));
        payload.put(PayloadVariableConstants.CREDIT_SCORE_TENANT_RESULTS, creditTenantScoreResultMap);
        payload.put(SOURCE_SLAVE_SM_TAG, CUSTOMER_SCORE_SM);
        creditSMFacade.resultReceived(payload);

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

This will trigger following Transition.

With this transition we see first time use of a Guard Condition, ‘isCreditScoreResultGuard‘.

package org.salgar.fsm.pekko.foureyes.credit.guards;

import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.salgar.fsm.pekko.foureyes.credit.CreditSM;

import java.util.Map;

import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.CUSTOMER_SCORE_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.SOURCE_SLAVE_SM_TAG;

public class INITIAL_CSC_$$_CREDITSCORE_RECEIVED_isCreditScoreResultGuard_GuardImpl
    implements INITIAL_CSC_$$_CREDITSCORE_RECEIVED_isCreditScoreResultGuard_Guard {
    @Override
    public boolean evaluate(
            ActorContext<CreditSM.CreditSMEvent> actorContext,
            Map<String, Object> controlObject,
            Map<String, Object> payload) {
        actorContext.log().debug("Evaluating INITIAL_CSC initial_creditScoreReceived Guard");

        String slaveTag = (String) payload.get(SOURCE_SLAVE_SM_TAG);
        if(CUSTOMER_SCORE_SM.equals(slaveTag)) {
            return true;
        }
        return false;
    }
}

As you can see, we are checking ‘onResultReceived‘ Event is originated from Customer Score Slave State Machine or not, when we were simulating the response in the test, we placed this flag in the Event payload, the flag ‘SOURCE_SLAVE_SM_TAG’.

package org.salgar.fsm.pekko.foureyes.credit.actions;

import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.salgar.fsm.pekko.foureyes.credit.CreditSM;
import org.salgar.fsm.pekko.foureyes.credit.model.CreditTenantScoreResult;

import java.util.HashMap;
import java.util.Map;

import static org.salgar.fsm.pekko.foureyes.variables.PayloadVariableConstants.CREDIT_SCORE_TENANT_RESULTS;

public class INITIAL_CSC_$$_CREDITSCORE_RECEIVED_initial_creditScoreReceived_ActionImpl
    extends INITIAL_CSC_$$_CREDITSCORE_RECEIVED_initial_creditScoreReceived_Action {

    @Override
    protected Map<String, Object> processCustomAction(ActorContext<CreditSM.CreditSMEvent> actorContext,
                                                        Map<String, Object> controlObject,
                                                        Map<String, Object> payload) {
        Map<String, Object> modifiedPayload = new HashMap<>();

        Map<String, CreditTenantScoreResult> creditTenantScoreResultMap =
                (Map<String, CreditTenantScoreResult>) payload.get(CREDIT_SCORE_TENANT_RESULTS);
        modifiedPayload.put(CREDIT_SCORE_TENANT_RESULTS, creditTenantScoreResultMap);

        return modifiedPayload;
    }
}

and the Action places the reported the Customer Score in the Master State Machine control object to be able to access later on.

futureCreditSMState = creditSMFacade.currentState(payload);
        report =
                (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

        assertNotNull(report);
        assertThat(report.state(), instanceOf(CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED.class));
        Map<String, CreditTenantScoreResult> map =
                (Map<String, CreditTenantScoreResult>) report
                        .state()
                        .controlObject()
                        .get(PayloadVariableConstants.CREDIT_SCORE_TENANT_RESULTS);
        assertEquals(73.72, map.get(customer.getPersonalId()).getCreditScore());

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

Naturally also we assert that the returned values appears in Master State Machine and State Machine switched to State ‘

CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED

We can also verify from logs that we received the ‘onResultReceived’ Event and evaluation of Guards and Actions.

[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-20] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Processing CREDIT_APPLICATION_SUBMITTED_SALES_MANAGER_APPROVED_INITIAL_CSC onResultReceived payload: {creditScoreTenantResults={123456789X=CreditTenantScoreResult(personalId=123456789X, creditScore=73.72)}, sourceSlaveSMTag=customerScoreSM, creditUuid=9fac857e-69de-45f1-8ed6-7eb641d3d61b, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))]}
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-20] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Evaluating INITIAL_CSC initial_AddressCheckResultReceived Guard
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-20] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Evaluating INITIAL_CSC initial_FraudPreventionResultReceived Guard
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-20] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Evaluating INITIAL_CSC initial_creditScoreReceived Guard
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-20] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Executing INITIAL_CSC initial_creditScoreReceived Action
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-20] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Handled command [org.salgar.fsm.akka.foureyes.credit.CreditSM$onResultReceived], resulting effect: [Persist(org.salgar.fsm.akka.foureyes.credit.CreditSM$CreditScoreReceivedPersistEvent)], side effects: [1]
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-20] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Processing CREDIT_APPLICATION_SUBMITTED_SALES_MANAGER_APPROVED_INITIAL_CSC CreditScoreReceivedPersistEvent payload: {creditScoreTenantResults={123456789X=CreditTenantScoreResult(personalId=123456789X, creditScore=73.72)}, creditUuid=9fac857e-69de-45f1-8ed6-7eb641d3d61b, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))]}

Now we will simulate in our Test, the response from Fraud Prevention partner system.

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

        payload = preparePayload(creditUuid, creditTenants);
        payload.put(PayloadVariableConstants.FRAUD_PREVENTION_RESULT, true);
        payload.put(SOURCE_SLAVE_SM_TAG, FRAUD_PREVENTION_SM);
        creditSMFacade.resultReceived(payload);

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

This triggers following Transition.

This transition will check the following Guard condition ‘isFraudPreventionResultGuard’.

package org.salgar.fsm.pekko.foureyes.credit.guards;

import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.salgar.fsm.pekko.foureyes.credit.CreditSM;

import java.util.Map;

import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.FRAUD_PREVENTION_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.SOURCE_SLAVE_SM_TAG;

public class CREDITSCORE_RECEIVED_$$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED_isFraudPreventionResultGuard_GuardImpl
    implements CREDITSCORE_RECEIVED_$$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED_isFraudPreventionResultGuard_Guard {
    @Override
    public boolean evaluate(
            ActorContext<CreditSM.CreditSMEvent> actorContext,
            Map<String, Object> controlObject,
            Map<String, Object> payload) {
        actorContext
                .log()
                .debug("Evaluating CREDITSCORE_RECEIVED creditScoreReceived_creditScore_fraudPreventionResult Guard");

        String slaveTag = (String) payload.get(SOURCE_SLAVE_SM_TAG);
        if(FRAUD_PREVENTION_SM.equals(slaveTag)) {
            return true;
        }
        return false;
    }
}

As you can see, the Guard condition checks that the event originated from Address Check Slave State Machine.

package org.salgar.fsm.pekko.foureyes.credit.actions;

import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.salgar.fsm.pekko.foureyes.credit.CreditSM;
import org.salgar.fsm.pekko.foureyes.variables.PayloadVariableConstants;

import java.util.HashMap;
import java.util.Map;

public class CREDITSCORE_RECEIVED_$$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED_creditScoreReceived_creditScore_fraudPreventionResult_ActionImpl
    extends CREDITSCORE_RECEIVED_$$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED_creditScoreReceived_creditScore_fraudPreventionResult_Action {

    @Override
    protected Map<String, Object> processCustomAction(ActorContext<CreditSM.CreditSMEvent> actorContext,
                                                        Map<String, Object> controlObject,
                                                        Map<String, Object> payload) {

        Map<String, Object> modifiedPayload = new HashMap<>();
        Boolean fraudPreventionResult = (Boolean) payload.get(PayloadVariableConstants.FRAUD_PREVENTION_RESULT);
        modifiedPayload.put(PayloadVariableConstants.FRAUD_PREVENTION_RESULT, fraudPreventionResult);

        return modifiedPayload;
    }
}

And the Action places the response values into the Control Object of the Master State Machine.

Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

        futureCreditSMState = creditSMFacade.currentState(payload);
        report =
                (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

        assertNotNull(report);
        assertThat(report.state(),
                instanceOf(CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED.class));
        assertEquals(true, report.state().controlObject().get(PayloadVariableConstants.FRAUD_PREVENTION_RESULT));

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

And off course we will use assertions to check our State Machine correct state

CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED

[o.s.f.a.s.f.StateMachineFacade] [FourEyes-akka.actor.default-dispatcher-22] [akka://FourEyes/system/creditSMGuardian] – We are processing onResultReceived(payload): {fraudPreventionResult=true, sourceSlaveSMTag=fraudPreventionSM, creditUuid=9fac857e-69de-45f1-8ed6-7eb641d3d61b, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))]}
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-22] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Processing CREDIT_APPLICATION_SUBMITTED_SALES_MANAGER_APPROVED_CREDITSCORE_RECEIVED onResultReceived payload: {fraudPreventionResult=true, sourceSlaveSMTag=fraudPreventionSM, creditUuid=9fac857e-69de-45f1-8ed6-7eb641d3d61b, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))]}
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-22] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Evaluating CREDITSCORE_RECEIVED creditScore_addressCheck_onResultReceived Guard
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-22] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Evaluating CREDITSCORE_RECEIVED creditScoreReceived_creditScore_fraudPreventionResult Guard
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-22] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Executing CREDITSCORE_RECEIVED creditScoreReceived_creditScore_fraudPreventionResult Action
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-22] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Handled command [org.salgar.fsm.akka.foureyes.credit.CreditSM$onResultReceived], resulting effect: [Persist(org.salgar.fsm.akka.foureyes.credit.CreditSM$CreditScoreFraudPreventionReceivedPersistEvent)], side effects: [1]


Now we will simulate in our Test, the response from Address Check partner system.

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

        payload = preparePayload(creditUuid, creditTenants);
        payload.put(PayloadVariableConstants.ADDRESS_CHECK_RESULT, true);
        payload.put(SOURCE_SLAVE_SM_TAG, ADDRESS_CHECK_SM);
        creditSMFacade.resultReceived(payload);

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

This will trigger the following Transition.

Now if you are paying attention, this Transition looks quite different then the previous, while we are using the wonderful advantages Nested Submachines, while we previous two States in the Submachine handled but final State change will return us to the original State Machine.

package org.salgar.fsm.pekko.foureyes.credit.guards;

import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.salgar.fsm.pekko.foureyes.credit.CreditSM;
import org.salgar.fsm.pekko.foureyes.credit.model.CreditTenantScoreResult;
import org.salgar.fsm.pekko.foureyes.variables.PayloadVariableConstants;

import java.util.Map;

import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.ADDRESS_CHECK_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.CUSTOMER_SCORE_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.FRAUD_PREVENTION_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.SOURCE_SLAVE_SM_TAG;

public class SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_isResultSufficientGuard_GuardImpl
    implements SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_isResultSufficientGuard_Guard {
    @Override
    public boolean evaluate(
            ActorContext<CreditSM.CreditSMEvent> actorContext,
            Map<String, Object> controlObject,
            Map<String, Object> payload) {
        actorContext.log().debug("Evaluating SALES_MANAGER_APPROVED salesManagerApproved_onResultReceived Guard");

        String slaveSM = (String) payload.get(SOURCE_SLAVE_SM_TAG);

        if(CUSTOMER_SCORE_SM.equals(slaveSM)) {
            return checkResultCreditScore(
                    actorContext,
                    controlObject,
                    payload
            );
        } else if(FRAUD_PREVENTION_SM.equals(slaveSM)) {
            return checkResultFraudPrevention(
                    actorContext,
                    controlObject,
                    payload
            );
        } else if(ADDRESS_CHECK_SM.equals(slaveSM)) {
            return checkResultFromAddressCheck(
                    actorContext,
                    controlObject,
                    payload
            );
        }

        return false;
    }
    ...
}

while this Guard ‘isResultSufficientGuard‘ is not in the Submachine it must deal with the fact it can be an answer from any of our the Partner Systems, additionally it should also check the informations delivered from our Partner Systems, to decide we can continue with our workflow.

There are two other Transitions that are reacting ‘onResultReceived‘.

One is dealing the scenario, Credit Score for an application is not quite enough, a human interaction must happen, a Senior Manager should decide to continue with credit process or reject the credit application.

Or the case, the results from Partner Systems dictates, the Credit Application must be flat out rejected .

These additional scenarios will be dealt in additional tests.

package org.salgar.fsm.pekko.foureyes.credit.actions;

import lombok.RequiredArgsConstructor;
import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.salgar.fsm.pekko.foureyes.credit.CreditSM;
import org.salgar.fsm.pekko.foureyes.credit.model.CreditTenantScoreResult;
import org.salgar.pekko.fsm.foureyes.notifier.NotifierService;

import java.util.List;
import java.util.Map;

import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.ADDRESS_CHECK_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.CUSTOMER_SCORE_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.FRAUD_PREVENTION_SM;
import static org.salgar.fsm.pekko.foureyes.slaves.SlaveStatemachineConstants.SOURCE_SLAVE_SM_TAG;
import static org.salgar.fsm.pekko.foureyes.variables.PayloadVariableConstants.ADDRESS_CHECK_RESULT;
import static org.salgar.fsm.pekko.foureyes.variables.PayloadVariableConstants.CREDIT_ANALYSTS;
import static org.salgar.fsm.pekko.foureyes.variables.PayloadVariableConstants.CREDIT_SCORE_TENANT_RESULTS;
import static org.salgar.fsm.pekko.foureyes.variables.PayloadVariableConstants.FRAUD_PREVENTION_RESULT;
import static org.salgar.pekko.fsm.foureyes.notifier.NotificationHelper.CREDIT_ANALYST_NOTIFICATION_LIST;

@RequiredArgsConstructor
public class SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_salesManagerApproved_onResultReceived_ActionImpl
    extends SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_salesManagerApproved_onResultReceived_Action {
    private final NotifierService notifierService;

    @Override
    protected Map<String, Object> processCustomAction(ActorContext<CreditSM.CreditSMEvent> actorContext,
                                                        Map<String, Object> controlObject,
                                                        Map<String, Object> payload) {

        String slaveSM = (String) payload.get(SOURCE_SLAVE_SM_TAG);
        if(CUSTOMER_SCORE_SM.equals(slaveSM)) {
            processCreditScoreResult(
                    actorContext,
                    controlObject,
                    payload
            );
        } else if(FRAUD_PREVENTION_SM.equals(slaveSM)) {
            processFraudPreventionResult(
                    actorContext,
                    controlObject,
                    payload
            );
        } else if(ADDRESS_CHECK_SM.equals(slaveSM)) {
            processAddressCheckResult(
                    actorContext,
                    controlObject,
                    payload
            );
        }
        List<String> notificationList = notifierService.calculateRecipientList(CREDIT_ANALYST_NOTIFICATION_LIST);
        notifierService
                .notify(notificationList, "Sales Manager Approved the Credit and Partner Systems delivered results"
                + " you should proceed. Please check!");

        controlObject.put(CREDIT_ANALYSTS, notificationList);

        return payload;
    }
    ...
}

Now that we have the results, we place the results in the Control Object in the Master State Machine and notify next person to let the workflow to continue.

Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

        futureCreditSMState = creditSMFacade.currentState(payload);

        report =
                (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

        assertNotNull(report);
        assertThat(report.state(), instanceOf(CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL.class));
        assertEquals(true, report.state().controlObject().get(PayloadVariableConstants.ADDRESS_CHECK_RESULT));
        verify(notifierService, times(1)).notify(eq(creditAnalystNotificationList), anyString());

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

And our typical assertions that we are in the correct State.

CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL

And the notification service is also called.

[o.s.f.a.s.f.StateMachineFacade] [FourEyes-akka.actor.default-dispatcher-3] [akka://FourEyes/system/creditSMGuardian] – We are processing onResultReceived(payload): {sourceSlaveSMTag=addressCheckSM, creditUuid=9fac857e-69de-45f1-8ed6-7eb641d3d61b, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))], addressCheckResult=true}
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Processing CREDIT_APPLICATION_SUBMITTED_ $ _SALES_MANAGER_APPROVED_ $ _CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED onResultReceived payload: {sourceSlaveSMTag=addressCheckSM, creditUuid=9fac857e-69de-45f1-8ed6-7eb641d3d61b, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))], addressCheckResult=true}
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Evaluating SALES_MANAGER_APPROVED salesManagerApproved_onCreditScoreReceived Guard
[2022-03-14 10:31:51,747] [DEBUG] [o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Evaluating SALES_MANAGER_APPROVED salesManagerApproved_onResultReceived Guard
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Executing SALES_MANAGER_APPROVED salesManagerApproved_onResultReceived Action
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Handled command [org.salgar.fsm.akka.foureyes.credit.CreditSM$onResultReceived], resulting effect: [Persist(org.salgar.fsm.akka.foureyes.credit.CreditSM$PositiveResultPersistedEvent)], side effects: [1]
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Processing CREDIT_APPLICATION_SUBMITTED_ $ _SALES_MANAGER_APPROVED_ $ _CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED PositiveResultPersistedEvent payload: {fraudPreventionResult=true, creditScoreTenantResults={123456789X=CreditTenantScoreResult(personalId=123456789X, creditScore=73.72)}, sourceSlaveSMTag=addressCheckSM, creditUuid=9fac857e-69de-45f1-8ed6-7eb641d3d61b, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))], addressCheckResult=true}

And finally, we will complete this test by accepting the credit application.

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

        payload = preparePayload(creditUuid, creditTenants);
        creditSMFacade.accepted(payload);

        Thread.sleep(15000L);

This will trigger the following Transition.

And the Action for it.

package org.salgar.fsm.pekko.foureyes.credit.actions;

import lombok.RequiredArgsConstructor;
import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.salgar.fsm.pekko.foureyes.credit.CreditSM;
import org.salgar.fsm.pekko.foureyes.credit.model.Customer;
import org.salgar.fsm.pekko.foureyes.variables.PayloadVariableConstants;
import org.salgar.pekko.fsm.foureyes.notifier.NotifierService;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@RequiredArgsConstructor
public class WAITING_CREDIT_ANALYST_APPROVAL_$$_CREDIT_ACCEPTED_creditAnalyst_onAccepted_ActionImpl
    extends WAITING_CREDIT_ANALYST_APPROVAL_$$_CREDIT_ACCEPTED_creditAnalyst_onAccepted_Action {
    private final NotifierService notifierService;

    @Override
    protected Map<String, Object> processCustomAction(ActorContext<CreditSM.CreditSMEvent> actorContext,
                                                        Map<String, Object> controlObject,
                                                        Map<String, Object> payload) {
        List<Customer> customers = (List<Customer>) controlObject.get(PayloadVariableConstants.CREDIT_TENANTS);
        List<String> emails = new ArrayList<>();
        for (Customer customer : customers) {

            emails.add(customer.getEmail());
        }
        notifierService.notify(emails, "Your credit application is accepted!");
        return payload;
    }
}

Which notify the Customer who applied for the Credit and while this is Final State of the State Machine, this will also terminate the Actor.

package org.salgar.fsm.pekko.foureyes.credit.actions;

import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.apache.pekko.persistence.typed.scaladsl.Effect;
import org.apache.pekko.persistence.typed.scaladsl.EffectBuilder;
import org.apache.pekko.persistence.typed.scaladsl.ReplyEffect;
import org.salgar.pekko.fsm.api.action.Action;
import org.salgar.pekko.fsm.api.action.Action;
import org.salgar.fsm.pekko.foureyes.credit.CreditSM;

import java.util.Map;

public abstract class WAITING_CREDIT_ANALYST_APPROVAL_$$_CREDIT_ACCEPTED_creditAnalyst_onAccepted_Action
     implements Action<CreditSM.CreditSMEvent, CreditSM.PersistEvent, CreditSM.State, CreditSM.Response> {

    @Override
    public ReplyEffect<CreditSM.PersistEvent, CreditSM.State> doAction(
            ActorContext<CreditSM.CreditSMEvent> actorContext,
            Map<String, Object> controlObject,
            Map<String, Object> payload,
            ActorRef<CreditSM.Response> replyTo) throws InterruptedException {
        actorContext.log().debug("Executing WAITING_CREDIT_ANALYST_APPROVAL creditAnalyst_onAccepted Action");

        Map<String, Object> persistPayload = processCustomAction(actorContext, controlObject, payload);

        return processPersist(controlObject, persistPayload, replyTo);
    }

    protected abstract Map<String, Object> processCustomAction(ActorContext<CreditSM.CreditSMEvent> actorContext,
                                                               Map<String, Object> controlObject,
                                                               Map<String, Object> payload);


    protected ReplyEffect<CreditSM.PersistEvent, CreditSM.State> processPersist(
                                                                    Map<String, Object> controlObject,
                                                                    Map<String, Object> persistPayload,
                                                                    ActorRef<CreditSM.Response> replyTo) {
        controlObject.putAll(persistPayload);
        EffectBuilder<CreditSM.PersistEvent, CreditSM.State> effectBuilder =
                        Effect
                        .persist(new CreditSM.CreditAcceptedPersistEvent(controlObject));

        ReplyEffect<CreditSM.PersistEvent, CreditSM.State> replyEffect;
        if(replyTo == null) {
            replyEffect = effectBuilder.thenStop().thenNoReply();
        } else {
            replyEffect= effectBuilder.thenStop().thenReply(replyTo, (state) -> new CreditSM.AcknowledgeResponse());
        }

        return replyEffect;
    }
}

And the assertions but please pay the attention for the difference previous one, as I just explained ‘CREDIT_ACCEPTED‘ is final state and it terminates / stops the CreditSM Actor so if we use our standard ‘reportState’ assertion will fail because the Actor was probably stopped. So we will make our assertion over the Elasticsearch, for this purpose we have to create Spring Data Elasticsearch Repository for CreditSM.

fsm-pekko-4eyes-application/src/test/java/org/salgar/fsm/pekko/foureyes/elasticsearch/CreditSMRepository.java

package org.salgar.fsm.pekko.foureyes.elasticsearch;

import org.salgar.fsm.pekko.foureyes.model.CreditSmEs;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface CreditSMRepository extends CrudRepository<CreditSmEs, String> {
}
        Thread.sleep(15000L);

        Optional<CreditSmEs> creditSmEs = creditSMRepository.findById(creditUuid);

        assertNotNull(creditSmEs);
        assertEquals("CREDIT_ACCEPTED", creditSmEs.get().getState());
        verify(notifierService, times(1)).notify(eq(List.of(customer.getEmail())), anyString());

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

[o.s.f.a.s.f.StateMachineFacade] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/creditSMGuardian] – We are processing onAccepted(payload): {creditUuid=9fac857e-69de-45f1-8ed6-7eb641d3d61b, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))]}
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Processing CREDIT_APPLICATION_SUBMITTED_WAITING_CREDIT_ANALYST_APPROVAL onAccepted payload: {creditUuid=9fac857e-69de-45f1-8ed6-7eb641d3d61b, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))]}
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Executing CREDIT_APPLICATION_SUBMITTED manager_onAccepted Action
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Handled command [org.salgar.fsm.akka.foureyes.credit.CreditSM$onAccepted], resulting effect: [Persist(org.salgar.fsm.akka.foureyes.credit.CreditSM$CreditAcceptedPersistEvent)], side effects: [1]
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-5] [akka://FourEyes/system/sharding/creditsm/4/9fac857e-69de-45f1-8ed6-7eb641d3d61b] – Processing CREDIT_APPLICATION_SUBMITTED_WAITING_CREDIT_ANALYST_APPROVAL CreditAcceptedPersistEvent payload: {fraudPreventionResult=true, creditScoreTenantResults={123456789X=CreditTenantScoreResult(personalId=123456789X, creditScore=73.72)}, sourceSlaveSMTag=addressCheckSM, creditUuid=9fac857e-69de-45f1-8ed6-7eb641d3d61b, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1))], addressCheckResult=true}

Unhandled Transition Test

Now we completed the analysis of our first test, which was observing a positive test for completing a successful workflow. If you remember our initial discussion, I presented as one of the biggest advantage of State Machine the iterative approach, now either we are in the initial phase of our project, our design not complete or we made a mistake, we oversaw a State or a Transition. Let’s observe in the following Test, how our System would behave in the case of an missing transition information.

The test is nearly the same as previous one, the problem / bug that we would simulate, would be the event that Sales Manager approved and our State Machine progressed further in the workflow for accepting the results from our Partner Systems, but the client of our State Machine (may be a Web Interface) had send the Sales Manager Approved twice (which can happen easily in any Web Application or Messaging System).

report =
        (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

assertNotNull(report);
assertThat(report.state(), instanceOf(CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL.class));
verify(notifierService, times(1)).notify(eq(salesManagerNotificationList), anyString());

payload = preparePayload(creditUuid, creditTenants);
creditSMFacade.salesManagerApproved(payload);

Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

...

payload = preparePayload(creditUuid, creditTenants);
payload.put(PayloadVariableConstants.ADDRESS_CHECK_RESULT, true);
payload.put(SOURCE_SLAVE_SM_TAG, ADDRESS_CHECK_SM);
creditSMFacade.resultReceived(payload);

Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

futureCreditSMState = creditSMFacade.currentState(payload);

report =
        (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

assertNotNull(report);
assertNotNull(report);
assertThat(
        report.state(), 
        instanceOf(
                CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL.class));
assertEquals(true, report.state().controlObject().get(PayloadVariableConstants.ADDRESS_CHECK_RESULT));
verify(notifierService, times(1)).notify(eq(creditAnalystNotificationList), anyString());

assertEquals(true, report.state().controlObject().get(PayloadVariableConstants.ADDRESS_CHECK_RESULT));
verify(notifierService, times(1)).notify(eq(creditAnalystNotificationList), anyString());

payload = preparePayload(creditUuid, creditTenants);
creditSMFacade.salesManagerApproved(payload);

If you check the UML State Machine, ‘WAITING_CREDIT_ANALYST_APPROVAL‘ State has no Transition for Trigger ‘onSalesManagerApproved’.

And State Machines does exactly what we expect, report the problem.

[o.s.f.a.s.f.StateMachineFacade] [FourEyes-akka.actor.default-dispatcher-18] [akka://FourEyes/system/creditSMGuardian] – We are processing onSalesManagerApproved(payload): {creditUuid=01e82520-cb35-41a2-9f27-1b5b4bbb4445, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer1@test.info)]}
[WARN ] [o.s.a.f.a.FSMAspect] [FourEyes-akka.actor.default-dispatcher-18] [] – Unhandled transition! call(public akka.persistence.typed.scaladsl.EffectBuilder akka.persistence.typed.scaladsl.Effect..unhandled()) event: onSalesManagerApproved(org.salgar.fsm.akka.foureyes.credit.CreditSMGuardian$,{creditUuid=01e82520-cb35-41a2-9f27-1b5b4bbb4445, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer1@test.info)]},null) state: CREDIT_APPLICATION_SUBMITTED_WAITING_CREDIT_ANALYST_APPROVAL ( {fraudPreventionResult=true, salesManagerNotificationList=[salesmanager1@example.com, salesmanager2@example.com], creditScoreTenantResults={123456789X=CreditTenantScoreResult(personalId=123456789X, creditScore=73.72)}, sourceSlaveSMTag=addressCheckSM, creditUuid=01e82520-cb35-41a2-9f27-1b5b4bbb4445, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer1@test.info)], addressCheckResult=true})
java.lang.RuntimeException: Unhandled transition!
at org.salgar.fsm.akka.foureyes.credit.CreditSM.unhandled_aroundBody53$advice(CreditSM.scala:27)
at org.salgar.fsm.akka.foureyes.credit.CreditSM.$anonfun$commandHandlerInternal$14(CreditSM.scala:1128)
at org.salgar.akka.fsm.base.actors.BaseActor.base(BaseActor.scala:34)
at org.salgar.fsm.akka.foureyes.credit.CreditSM.commandHandlerInternal( CreditSM.scala:1074)
at org.salgar.fsm.akka.foureyes.credit.CreditSM.commandHandler(CreditSM.scala:239)
at org.salgar.fsm.akka.foureyes.credit.CreditSMGuardian$.$anonfun$prepare$3( CreditSMGuardian.scala:211 )
at akka.persistence.typed.internal.Running$HandlingCommands.onCommand( Running.scala:260 )

You can see State Machine reports exactly where the unexpected Event is received, Event type, what was the Payload of the Event and current State of the State Machine and content of the Control Object.

What you see here, is the biggest advantage of a State Machine, if this is a Use Case overseen by you, you have to teach this Use Case to the State Machine or if it is a bug, that is something you have to fix for next iteration.

Now think how you use to program this with ‘IF Salads‘, your program will just do something here, now it might be correct thing or a colossal bug, what it think. it fits best. Not with the State Machine, it will stop here and ask you to this teach to it, how exactly it should behave here.

Long Running Workflow / Persistance Recovery Test

If you remember the previous chapters of this blog, I mentioned one of the biggest advantages of the Akka / Pekko Framework was its capability to persist long running workflows. You have seen in the Positive Test scenario, our Test executed in 10s but this is not how the real word works, isn’t it? May be Sales Manager’s research to approve the Credit takes the next 10 days and he/she could not give its approval directly.

So what should happen, should we kept the CreditSM Actor then days in memory, which might not be possible because we might run into a resource bottle neck and we might have to stop and unload some Actors from the Memory until another event arrives for this Actor or may be our Kubernetes Cluster observes reducing loads and decides to auto downscale number of Pods for our application or in these 10 days, we might release a new version our application so we have to restart it so the Actor would not be in the memory.

Following test would simulate these scenarios, first part of the test will bring the State Machine to a certain state and stop it, shutdown the Java Virtual Machine. Then the second part of the Test will take over and recover CreditSM Actor and continue with the workflow(while first part of the test, create a unique id for the Credit Application and second part must recover with this unique id, this id is unique per day, if you want to run this test twice in one day, you have to delete the Cassandra Database for it).

Of course first we mock our Credit Applicants.

fsm-pekko-4eyes-application/src/test/java/org/salgar/fsm/pekko/foureyes/creditsm/RecoveryPreTest.java

    @SneakyThrows
    @Test
    public void prepareForRecoveryOnlyOneCreditScoreEventReceivedTest() {
        final String creditUuid = "0e679b7f-806f-41db-bd8e-21f3f62aa3e4_"
                + DateTimeFormatter.ofPattern("dd_MM_yyyy").format(LocalDate.now());
        final Customer customer1 =
                new Customer(
                        "John",
                        "Doe",
                        "123456789X",
                        new Address(
                                "muster strasse 1",
                                "11A",
                                "city1",
                                "country1"
                        ),
                        "customer1@test.info");
        final Customer customer2 =
                new Customer(
                        "Max",
                        "Musterman",
                        "Z987654321",
                        new Address(
                                "muster strasse 1",
                                "11A",
                                "city1",
                                "country1"
                        ),
                        "customer2@test.info");

        List<Customer> creditTenants = new ArrayList<>();
        creditTenants.add(customer1);
        creditTenants.add(customer2);

        Map<String, Object> payload = preparePayload(creditUuid, 100000.0, creditTenants);

And we prepare the Mocks of our Partner Systems.

        /* Mock Preparation */
        doAnswer(invocation -> {
            if(invocation.getArgument(2).equals(customer2.getPersonalId())) {
                log.info("Sending Credit Score Result customer2");

                Thread.sleep(5000L);

                Map<String, Object> creditScorePayload = new HashMap<>();
                creditScorePayload.put(PayloadVariableConstants.CREDIT_SCORE_RESULT, 97.45);
                creditScoreSMFacade.resultReceived(
                        () -> creditUuid + "_" + customer2.getPersonalId(),
                        creditScorePayload);
            } else {
                String personalId = invocation.getArgument(2);
                log.warn("Unkown customer: {}", personalId);
            }

            return null;
        }).when(creditScoreServiceMockBean).calculateCreditScore(any(), any(), any());

        doAnswer(invocation -> {
            log.info("Sending Fraud Prevention Result");

            Thread.sleep(1000L);

            Map<String, Object> fraudPreventionResultPayload = new HashMap<>();
            fraudPreventionResultPayload.put(PayloadVariableConstants.FRAUD_PREVENTION_RESULT, true);
            fraudPreventionSMFacade.result(
                    () -> creditUuid,
                    fraudPreventionResultPayload);

            return null;
        }).when(fraudPreventionServiceMockBean).reportFraudPrevention(
                any(), any(), any());

        doAnswer(invocation -> {
            log.info("Sending Address Check Result");

            Thread.sleep(2000L);

            Map<String, Object> addressCheckResultPayload = new HashMap<>();
            addressCheckResultPayload.put(PayloadVariableConstants.ADDRESS_CHECK_RESULT, true);
            adressCheckSMFacade.result(
                    () -> creditUuid,
                    addressCheckResultPayload);

            return null;
        }).when(addressCheckServiceMockBean).addressExist(
                any(),
                any(),
                any(),
                any());


        /* Test Start */

        creditSMFacade.submit(payload);

        Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

Then we will start sending our Events, you already know how this mechanism works, so I will jump to the last Event just before we stop our Test.

        payload = preparePayload(creditUuid, creditTenants);
        creditSMFacade.salesManagerApproved(payload);

        Thread.sleep(20000);
        verify(notifierService, atLeastOnce()).notify(eq(salesManagerNotificationList), anyString());

        futureCreditSMState = creditSMFacade.currentState(payload);

        report =
                (CreditSM.ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

        assertNotNull(report);
        assertThat(
                report.state(),
                instanceOf(CreditSM.
                        CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED
                        .class));

Now if you look to Mock configuration above you will see something really interesting, we have two Credit Tenants for the Credit Application, normally we should have two Credit Scores but we simulate here that before the second Credit Score result received a system shutdown will occur (while we work with Apache Kafka after the restart Event will still be there), to prove that State Machine can deal with any scenario after Akka / Pekko recovery..

Before Test shutdowns, we check that we are at the correct State.

Application stops when Fraud Prevention, Address Check and one of the Credit Score Result Received, so we will not be in State ‘WAITING_CREDIT_ANALYST_APPROVAL‘ but in ‘FRAUDPREVENTION_ADDRESSCHECK_RESULT_RECEIVED‘.

Now lets continue the Test after the Recovery of the CreditSM Actor.

fsm-pekko-4eyes-application/src/test/java/org/salgar/fsm/pekko/foureyes/creditsm/RecoveryPostTest.java

    @SneakyThrows
    @Test
    public void recoveryOnlyOneCreditScoreEventReceivedTest() {
        final String creditUuid = "0e679b7f-806f-41db-bd8e-21f3f62aa3e4_"
                + DateTimeFormatter.ofPattern("dd_MM_yyyy").format(LocalDate.now());
        /* Test Start */
        Future<CreditSM.Response> futureCreditSMState = creditSMFacade.currentState(payload);
        CreditSM.ReportResponse report =
                (CreditSM.ReportResponse) Await
                        .result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

        assertNotNull(report);
        assertThat(report.state(),
            instanceOf(
                CreditSM.CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED.class));

First we check the after the recovery, we are at correct State ‘FRAUDPREVENTION_ADDRESSCHECK_RESULT_RECEIVED’.That proves that after restart, Akka recovered CreditSM Actor.

Now we have to simulate missing Credit Score Result Event (if you are asking should Apache Kafka do this, this module does not deal with Kafka but here additional Integration Tests dealing with Kafka under module ‘fsm-pekko-4eyes-kafka‘).

        Customer customer =  creditTenants.get(0);
        log.info("Sending Credit Score Result for Customer: {}", customer);

        Map<String, Object> creditScorePayload = new HashMap<>();
        creditScorePayload.put(PayloadVariableConstants.CREDIT_SCORE_RESULT, 83.45);
        creditScoreSMFacade.resultReceived(
                () -> creditUuid + "_" + customer.getPersonalId(),
                creditScorePayload);

This will bring us to following State.

And naturally we will assert that we reached the correct State.

        Thread.sleep(15000L);

        Optional<CreditSmEs> creditSmEs = creditSMRepository.findById(creditUuid);
        assertNotNull(creditSmEs);
        assertEquals(
                "WAITING_CREDIT_ANALYST_APPROVAL",
                creditSmEs.get().getState()
        );

And the logs confirms the recovery action and the processing of the Event.


[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-43] [akka://FourEyes/system/sharding/creditsm/4/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022] – Loaded snapshot with metadata [SnapshotMetadata(creditsm0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022, 2, 1647550133852, None)]
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-43] [akka://FourEyes/system/sharding/creditsm/4/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022] – Snapshot recovered from 2 Map() VersionVector()
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-43] [akka://FourEyes/system/sharding/creditsm/4/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022] – Replaying events: from: 3, to: 9223372036854775807
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-41] [akka://FourEyes/system/sharding/creditsm/4/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022] – Processing CREDIT_APPLICATION_SUBMITTED_ $ _RELATIONSHIP_MANAGER_APPROVED SalesManagerApprovalPersistEvent payload: {creditUuid=0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer1@test.info), Customer(firstName=Max, lastName=Musterman, personalId=Z987654321, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer2@test.info)]}
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-41] [akka://FourEyes/system/sharding/creditsm/4/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022] – Processing CREDIT_APPLICATION_SUBMITTED_ $ _SALES_MANAGER_APPROVED_ $ _INITIAL_CSC FraudPreventionReceivedPersistEvent payload: { fraudPreventionResult=true, creditUuid=0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer1@test.info), Customer(firstName=Max, lastName=Musterman, personalId=Z987654321, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer2@test.info)]}
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-41] [akka://FourEyes/system/sharding/creditsm/4/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022] – Processing CREDIT_APPLICATION_SUBMITTED_ $ _SALES_MANAGER_APPROVED_ $ _FRAUDPREVENTION_RESULT_RECEIVED FraudPreventionAdressCheckReceivedPersistEvent payload: {fraudPreventionResult=true, creditUuid=0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022, creditTenants=[Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer1@test.info), Customer(firstName=Max, lastName=Musterman, personalId=Z987654321, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer2@test.info)], addressCheckResult=true}
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-41] [akka://FourEyes/system/sharding/creditsm/4/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022] – Recovery successful, recovered until sequenceNr: [5]
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-41] [akka://FourEyes/system/sharding/creditsm/4/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022] – Returning recovery permit, reason: replay completed successfully
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-41] [akka://FourEyes/system/sharding/creditsm/4/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022] – Recovery for persistenceId [PersistenceId(creditsm0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022)] took 139.8 ms
[o.s.f.a.f.c.RecoveryPostTest] [Test worker] [] – Sending Credit Score Result for Customer: Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer1@test.info)
[o.s.f.a.s.f.StateMachineFacade] [FourEyes-akka.actor.default-dispatcher-21] [akka://FourEyes/system/creditScoreSMGuardian] – We are processing onResultReceived(payload): {creditScoreResult=83.45}
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-21] [akka://FourEyes/system/sharding/creditscoresm/18/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022_123456789X] – Initializing snapshot recovery: Recovery( SnapshotSelectionCriteria ( 9223372036854775807,9223372036854775807,0,0),9223372036854775807,9223372036854775807 )
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-20] [akka://FourEyes/system/sharding/creditscoresm/18/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022_123456789X] – Snapshot recovered from 0 Map() VersionVector()
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-20] [akka://FourEyes/system/sharding/creditscoresm/18/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022_123456789X] – Replaying events: from: 1, to: 9223372036854775807
[o.s.f.a.f.c.CreditScoreSMGuardian$] [FourEyes-akka.actor.default-dispatcher-29] [akka://FourEyes/system/sharding/creditscoresm/18/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022_123456789X] – Processing INITIAL StartCreditScoreResearchPersistEvent payload: {creditUuid=0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022, customer=Customer(firstName=John, lastName=Doe, personalId=123456789X, address=Address(street=muster strasse 1, houseNo=11A, city=city1, country=country1), email=customer1@test.info)}
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-29] [akka://FourEyes/system/sharding/creditscoresm/18/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022_123456789X] – Recovery successful, recovered until sequenceNr: [1]
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-29] [akka://FourEyes/system/sharding/creditscoresm/18/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022_123456789X] – Returning recovery permit, reason: replay completed successfully
[a.p.t.i.EventSourcedBehaviorImpl] [FourEyes-akka.actor.default-dispatcher-29] [akka://FourEyes/system/sharding/creditscoresm/18/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022_123456789X] – Recovery for persistenceId [PersistenceId(creditscoresm0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022_123456789X)] took 24.55 ms
[o.s.f.a.f.c.CreditScoreSMGuardian$] [FourEyes-akka.actor.default-dispatcher-29] [akka://FourEyes/system/sharding/creditscoresm/18/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022_123456789X] – Processing WAITING_RESULT onResultReceived payload: {creditScoreResult=83.45}
[o.s.f.a.f.c.CreditSMGuardian$] [FourEyes-akka.actor.default-dispatcher-21] [akka://FourEyes/system/sharding/creditsm/4/0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022] – Processing CREDIT_APPLICATION_SUBMITTED_ $ _SALES_MANAGER_APPROVED_ $ _FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED onResultReceived payload: {creditScoreTenantResults={Z987654321=CreditTenantScoreResult(personalId=Z987654321, creditScore=97.45), 123456789X=CreditTenantScoreResult(personalId=123456789X, creditScore=83.45)}, sourceSlaveSMTag=customerScoreSM, creditUuid=0e679b7f-806f-41db-bd8e-21f3f62aa3e4_17_03_2022}

Four Eyes Workflow Test

This whole Proof of Concept application is called ‘Four Eyes Credit Approval’ but we didn’t see until now that much about this ‘Four Eyes‘ principle. The State Machine that we modelled, for the credit amounts more than 2 000 000, State Machine will switch to Four Eyes mode, which means every decision must be approved by two person.

The states

WAITING_APPROVAL
RELATIONSHIP_MANAGER_APPROVED
SALES_MANAGER_APPROVED
WAITING_CREDIT_ANALYST_APPROVAL

have Nested Sub State Machines ( Submachine ). These Submachine will for amounts less than 2 000 000 as money amount would accept only one of the available managers approval, for higher amounts these Sub State Machines and Guard Conditions will not accept approvals unless all of the Managers approves.

Also normal workflow only requires the Approval of a Senior Manager, if the Credit Score from the applicants are in a certain range, if the credit amount is more than 2 000 000 approval of the Senior Manager would be mandatory.

You can see this Test ‘creditAmountHighCreditAcceptedTest‘ n the following Java Test Class.

Major difference to the previous tests being, two different Managers should send the Approval Events for the Workflow to proceed.

Future<Response> futureCreditSMState = creditSMFacade.currentState(payload);
ReportResponse report =
        (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

assertNotNull(report);
assertThat(
        report.state(),
        instanceOf(
                CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL.class));
assertEquals(((List<Customer>)report.state().controlObject().get(PayloadVariableConstants.CREDIT_TENANTS)).get(0), customer);
verify(notifierService, times(1)).notify(eq(relationShipNotificationList), anyString());

payload = preparePayload(creditUuid, creditTenants);

creditSMFacade.relationshipManagerApproved(payload);

Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

futureCreditSMState = creditSMFacade.currentState(payload);
report =
        (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

assertNotNull(report);
assertThat(
        report.state(),
        instanceOf(
                CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL.class));
assertEquals(((List<Customer>)report.state().controlObject().get(PayloadVariableConstants.CREDIT_TENANTS)).get(0), customer);
verify(notifierService, times(1)).notify(eq(relationShipNotificationList), anyString());

payload = preparePayload(creditUuid, creditTenants);

creditSMFacade.relationshipManagerApproved(payload);

Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

futureCreditSMState = creditSMFacade.currentState(payload);

report =
        (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

assertNotNull(report);
assertThat(
        report.state(),
        instanceOf(
                CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL.class));
verify(notifierService, times(1)).notify(eq(salesManagerNotificationList), anyString());

payload = preparePayload(creditUuid, creditTenants);
creditSMFacade.salesManagerApproved(payload);

As you can see, now we need two ‘relationshipManagerApproved‘ Event to reach

CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL

State from

CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL

State.

Different from the our default process, because of the required amount, we will also need the the approval of the Senior Manager.

As you can see we need now the Senior Manager approval to be able continue with the process.

Now the final Event from Credit Analyst, accepting the Credit Application will bring the test to a successful end.

futureCreditSMState = creditSMFacade.currentState(payload);

report =
        (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

assertNotNull(report);
assertThat(
        report.state(), 
        instanceOf(
                CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER.class));
assertEquals(true, report.state().controlObject().get(PayloadVariableConstants.ADDRESS_CHECK_RESULT));
verify(notifierService, times(1)).notify(eq(seniorSalesManagerNotificationList), anyString());

Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

payload = preparePayload(creditUuid, creditTenants);
creditSMFacade.acceptableScore(payload);

Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

futureCreditSMState = creditSMFacade.currentState(payload);

report =
        (ReportResponse) Await.result(futureCreditSMState, Duration.create(20, TimeUnit.SECONDS));

assertNotNull(report);
assertThat(
        report.state(),
        instanceOf(
                CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL.class));
verify(notifierService, times(1)).notify(eq(creditAnalystNotificationList), anyString());

Thread.sleep(WAIT_TIME_BETWEEN_STEPS);

payload = preparePayload(creditUuid, creditTenants);
creditSMFacade.accepted(payload);

Thread.sleep(15000L);

Optional<CreditSmEs> creditSmEs = creditSMRepository.findById(creditUuid);

assertNotNull(creditSmEs);
assertEquals("CREDIT_ACCEPTED", creditSmEs.get().getState());
verify(notifierService, times(1)).notify(eq(List.of(customer.getEmail())), anyString());

And the Test result.

4 thoughts on “Spring Boot / Docker ( JiB ) / Helm configuration / Tests for Akka / Pekko Finite State Machine ( FSM )

  1. Pingback: A Model Driven Event Sourced Cloud Ready Application with Akka Finite State Machine using Kafka, Cassandra and Elasticsearch | Mehmet Salgar's Blog

  2. Pingback: Event / Schema Evolution for Akka Finite State Machine ( FSM ) | Mehmet Salgar's Blog

  3. Pingback: Ideal CI / CD Pipeline for your Kubernetes Deployments | Mehmet Salgar's Blog

  4. Pingback: Akka / Pekko configuration for Finite State Machine ( FSM ) | Mehmet Salgar's Blog

Leave a comment