Limitless Scaling and Elasticity with Kubernetes Event Driven Autoscaling ( KEDA ) / Karpenter for an Event Sourcing Akka / Pekko Application – Apache Kafka

This blog is the detail analysis of an implementation of how to host Akka / Pekko Finite State Machine realising Event Sourcing via Apache Kafka / Spring Boot application, as part of series of blogs explaining how to build an Event Sourcing application.

In this part of the blog we will explain Akka / Pekko can horizontally scale limitlessly while providing elasticity with the help of the Apache Kafka and most importantly Kubernetes Event Driven Autoscaling KEDA and specially in AWS EKS with spot instances with the help of the Karpenter.

  1. Introduction
  2. KEDA Configuration
  3. Karpenter Configuration
  4. Horizonzal Pod Autoscaling
  5. Appendix
    1. KEDA Setup
    2. Karpenter Setup

Introduction

In the original blog, I used the following diagrams to explain how Akka / Pekko scale-out and scale-in.

As I previously mentioned Akka / Pekko is a framework designed for modern IT, It has components that helps for horizontal scaling, most important being Cluster Sharding. In an Akka / Pekko System that is using Cluster Sharding, when an Actor is created,  it will be placed in a Shard depending on the unique Actor id.



An Akka / Pekko System that feel CPU or Memory pressure, would horizontally scale and get extra Akka / Pekko Nodes. These Akka / Pekko Nodes will have certain amount of the Shards are attached to it (number of shards for a Cluster is a configuration parameter, if the parameter is 60 and you have 10 Akka / Pekko Nodes in your Cluster, every Node will have 10 Shards, now if you will have an additional Node then some Nodes will have 9 and some 8 Shards). The system in the above picture starts with 4 Akka / Pekko Nodes and 12 Shards.



If Akka / Pekko Cluster scale-out and gets bigger ( gets additional Akka / Pekko Nodes ), some Shards will be transferred to these new Nodes. In the above picture Akka / Pekko grew from 4 Modes to 5 but number of the totals Shards stayed same but redistributed to new Node. If the Cluster decides to scale down ( Akka / Pekko Nodes removed), the Nodes that are still running, will again take over the control of the existing Shards.

Now, if this Scale-Out / Scale-In sounds you like Kubernetes Autoscaling, yes it is. Kubernetes and Akka / Pekko Cluster Sharding is a match made in heaven.

We also have additional card in our sleeve, we are developing an Event Sourcing application with the help of Apache Kafka, which help us a lot with Elasticity during unexpected Load Spikes. In the Event of such an Load Spike scenario occurs, normally if Kubernetes identifies in the Pods that our Akka / Pekko application runs, is under distress over CPU / Memory usage, it will start additional Pods for our application, more Akka / Pekko Nodes join to Cluster and application automatically scale, but unfortunately this is reactive and not proactive action.

Until the metric server from Kubernetes identifies the duress of our application, our client may be started feeling the problems in our application (if your normal load is 1 000 000 message/s and it goes to 10 000 000 messages/s, believe me until Kubernetes Cluster will catch up scale-out some really bad things will happen).

Fortunately this problem identified and a framework developed to counter this problem, the Kubernetes Event Driven Autoscaling ( KEDA ). The difference is instead of monitoring CPU and Memory metrics, KEDA monitors the depth / lag in our Kafka Topics, how many unprocessed messages that we have in our topics, so in the event of too many messages are waiting for pick-up, KEDA will initialise additional Pods of our application. This is combined with Akka / Pekko Clusters capability to scale, it will allow us to scale limitlessly.

I am explicitly speaking here about Apache Kafka but KEDA can also monitor other message brokers / systems like AWS SQS, Google Pub / Sub, Azure Event Hub, Apache Pulsar, etc (or 64 other Scalers).

Now attentive readers will probably discerned, I used the terms ‘limitless scaling and elasticity’ in this blog, so they are probably asking, even we are using AWS EKS, MS Azure AKS, Google GKE, we have finite amount resources under the Node Pools of these Kubernetes Engines, how could we scale limitlessly. Off course, we can attach our Kubernetes Engines with Autoscaling Groups in AWS, Virtual Machine Scale Sets in MS Azure and Managed Instance Groups in Google Cloud Platform, to give more resources to our Node Pools which sounds nice isn’t it?

Naturally to scale such a way is nice, but there is one cave-eat, all the above mentioned methods are not Kubernetes native processes, they are the part of virtualisation framework of the Cloud Provider and reaction time to a scale request for a Kubernetes Cluster for provisioning / retries can take minutes, while our application is suffering.

Fortunately at least AWS has an answer to this dilemma, they build the Karpenter Framework to manage the creation / scaling of Node Pools over AWS Fleet API which reduce the reaction time to milliseconds (documentation says, it would be available in the future for more Cloud Providers but currently it is only available for AWS). Karpenter also provides the capability of automatically find the virtual machines type that fitting most to the your workload, plus the possibility to mix ‘on-demand‘, ‘spot‘ instances and all the cost saving possibilities of using spot instances.

It can even arrange that, when your Node Pools are using Spot Instances and they are reclaimed by AWS, it can allocate On-Demand Instances for your Node Pools and when Spot Instances are available again, evict Pods from the Node Pools, terminate On-Demand instances and allocate again Spot-Instances.

KEDA Configuration

So how are we configuring KEDA to scale our Akka / Pekko Application, below Snippet is an example.

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: creditSM-kafka-topic-scaledobject
  namespace: {{ .Release.Namespace }}
spec:
  scaleTargetRef:
    name: {{ include "fsm-pekko-4eyes-application.fullname" . }}    
  minReplicaCount: 1  # We don't want pods if the queue is empty 
  maxReplicaCount: 10000
  pollingInterval: 30 # How frequently we should go for metrics (in seconds)
  cooldownPeriod:  10 # How many seconds should we wait for downscale
  advanced: # Optional. Section to specify advanced options
    restoreToOriginalReplicaCount: true
    horizontalPodAutoscalerConfig: 
      behavior:
        scaleDown:
          stabilizationWindowSeconds: 300
          policies:
          - type: Percent
            value: 100
            periodSeconds: 15
  triggers:
  - type: kafka
    metadata:
      bootstrapServers: {{ .Values.spring.kafka.bootstrapServers }}
      consumerGroup: pekko-fsm-foureyes
      topic: creditSM
      lagThreshold: '5'
      activationLagThreshold: '3'
      excludePersistentLag: false
      partitionLimitation: '1-20'

This KEDA Custom Resource for KEDA Operator should be deployed to same Kubernetes Namespace of your application, if we look to the configuration options.

spec:
  scaleTargetRef:
    name: {{ include "fsm-pekko-4eyes-application.fullname" . }}    
  minReplicaCount: 5
  maxReplicaCount: 10000
  pollingInterval: 30 
  cooldownPeriod:  10

scaleTargetRef‘ instructs to the KEDA Operator which Kubernetes Object it should Scale (Object can be ‘Deployment‘, ‘StatefulSet‘, ‘Custom‘) by creating a HPA ( Horizontal Pod Autoscaler ) Kubernetes Object at the background.

minReplicaCount‘ to be 5 and ‘maxReplicaCount‘ 10000 (this number will be what you define to be limitless scaling for me, let say for Akka / Pekko System that have to deal with 1 billion Actors, yeah 10000 replicas will be realistic, if you want to deal with with 100 million Actors then 200 would be realistic).

pollingInterval‘ will decide how often Trigger Condition for Scale will be checked (in this case Kafka Topic lag).

cooldownPeriod‘ when Scale Up / Scale Down decision reached and executed, if the Kafka Topic lag values are volatile, KEDA will reach not every 1s a scale up / down decision and without such a setting we will see lots of chaotic situations (in the case of a Scale Up / Down, KEDA will wait 10s before reaching another decision).

advanced: # Optional. Section to specify advanced options
  restoreToOriginalReplicaCount: true
  horizontalPodAutoscalerConfig: 
    behavior:
      scaleDown:
        stabilizationWindowSeconds: 300
        policies:
        - type: Percent
          value: 100
          periodSeconds: 15

You can even fine tune Scale Up / Down in HPA level by using ‘advanced>horizontalPodAutoscalerConfig‘. This particular configuration states, in the case of a ‘scaleDown‘, %15 of the Replicas should be terminated, so we can fine tune this behaviour, there are many more parameters to configure.

triggers:
- type: kafka
  metadata:
    bootstrapServers: {{ .Values.spring.kafka.bootstrapServers }}
    topic: creditSM
    consumerGroup: pekko-fsm-foureyes
    topic: creditSM
    lagThreshold: '1000'
    activationLagThreshold: '3'
    excludePersistentLag: true
    partitionLimitation: '1-20'

And finally the Trigger for Scale Object for Kafka, which tells which Kafka Cluster to connect, which Topic to observe for lag (and naturally which Consumer Group). ‘lagThreshold‘ is to decide at what number of waiting Messages at Kafka Topic should KEDA trigger the Scale Up (and off course under that number of Messages to scale down).

Now KEDA in combination with the Karpenter has a really nice capability, but first please don’t consider what we are explaining here, only in the context of Production System. Surely you would have ‘test‘, ‘staging‘ and hopefully really good CI / CD environments creating for you dynamically environments to measure quality of the pull requests, feature branches, epic stories, etc, but you don’t want these environments constantly up. They should only run when somebody actually come to test and use it (having epic story environment for months while nobody is using it or even let’s say between 00:00 – 06:00 hours, would cost you lots of money in Cloud Environments / Providers). So how about these environments to be initialised only when first message produced to a Kafka Topic. The parameter ‘activationLagThreshold‘ is exactly for that, with above configuration (and with value ‘minReplicaCount‘ 0), when 3 Messages appears in Kafka Topic, it would be a scale up signal for KEDA and with the Karpenter, it would be the trigger for the provisioning of Kubernetes Node Pool. The moment Message number is below 3. It will be a signal to de-provision the Node Pool, consider the cost savings you will reach this way.

Other interesting parameters, ‘excludePersistentLag‘ is to prevent if between 2 measurements values, if the lag value stays the same to prevent the startup of additional replicas and to prevent uncontrolled Scale-Up operations. ‘partitionLimitation‘ to tell KEDA which Kafka Topic partitions to observe.

Karpenter Configuration

Now let’s look to the Karpernter configuration.

apiVersion: karpenter.sh/v1beta1
kind: NodePool
metadata:
  name: pekko-fsm-nodepool
spec:
  template:
    spec:
      nodeClassRef:
        name: pekko-fsm-node-class
      requirements:
        - key: kubernetes.io/arch
          operator: In
          values: ["amd64"]
        - key: kubernetes.io/os
          operator: In
          values: ["linux"]
        - key: "topology.kubernetes.io/zone"
          operator: In
          values: ["eu-central-1a", "eu-central-1b", "eu-central-1c"]
        - key: karpenter.sh/capacity-type
          operator: In
          values: ["spot", "on-demand"]
        - key: node.kubernetes.io/instance-category
          operator: In
          values: ["m", "c", "r"]
        - key: "karpenter.k8s.aws/instance-cpu"
          operator: In
          values: ["4", "8"]
        - key: karpenter.k8s.aws/instance-generation
          operator: Gt
          values: ["2"]
      nodeClassRef:
        name: default
  limits:
    cpu: "10000"
    memory: 10000Gi
  disruption:
    consolidationPolicy: WhenUnderutilized
    expireAfter: 168h # 7 * 24h = 168h

Now let’s look what above configuration does in detail…

- key: kubernetes.io/arch
  operator: In
  values: ["amd64"]
- key: kubernetes.io/os
  operator: In
  values: ["linux"]
- key: "topology.kubernetes.io/zone"
  operator: In
  values: ["eu-central-1a", "eu-central-1b", "eu-central-1c"]

In this snippet, we are telling to Karpenter to look ‘amd64’, ‘linux’ based of virtual machines in ‘eu-central-1’ availability zones via AWS Fleet API.

- key: karpenter.sh/capacity-type
  operator: In
  values: ["spot", "on-demand"]
- key: node.kubernetes.io/instance-category
  operator: In
  values: ["m", "c", "r"]
- key: "karpenter.k8s.aws/instance-cpu"
  operator: In
  values: ["4", "8"]
- key: karpenter.k8s.aws/instance-generation
  operator: Gt
  values: ["2"]

This parts tell the Karpenter that we want our virtual machines from spot pool or on-demand pool, Karpenter will first try to get those from spot pool, if that not succeeds, it will take VM from on-demand pool. Also if an Instance cause Spot Instance Interruption signal, Karpenter will first try to find a Spot instance with a different Instance Type, if that does not succeed again an on-demand instance.

We also defines which type of instance categories that we are interested so Fleet API would have a better chance to find a spot instance, in this case instance types ‘m’, ‘c, ‘r’ like ‘r5.2xlarge‘, ‘c5.2xlarge‘, ‘m5.xlarge‘, wtc.

Next parameter, is relevant to our Akka / Pekko application, it is possible to allocate Terrabytes of memory to an Akka / Pekko Node but not it is not clever idea to create JVM greater then 64 GB because Garbage Collection becoming really inefficient, while in AWS generally memory size of an Virtual Machine is directly correlated to size of the memory so I am telling Karpenter not to pick VM more then 8 CPU and the VM with 2 CPU has to few memory.

The last one which is just telling that instance generation must be bigger then 2.

limits:
  cpu: "10000"
  memory: 10000Gi

The parameters would tell to Karpenter up to which limits it can scale the Node Pools, while I titled this Blog with limitless scaling :), the limit is 10000 CPU but of courses you are limited to amount of CPUs available at AWS availability zones. Same goes on for the memory also.

disruption:
  consolidationPolicy: WhenUnderutilized
  expireAfter: 168h # 7 * 24h = 168h

Final one is the most interesting part for my point of view, do you remember I told you, if Karpenter has to use ‘on-demand‘ instances while no spot instances are available or they are interrupted, Karpenter always try to save money for us. It always checks it can consolidate Node Pool by taking workload from under utilised Nodes, terminating those and transferring Workloads to other nodes. Secondly, with the given expiry configuration, after the expiry period, it checks it can find cheaper instances for us preferably from spot instance pool with ‘price-capacity-optimized‘ strategy. Now you have to decide how aggressive you want to be with this optimisation, I choose in this configuration 1 week but if cost saving that important for you, you can go 1 day or lower if you want to.

Finally we would need a NodeClass configuration to be able configure AWS specific settings, like AMIs, Subnets, SecurityGroups, BlockDeviceMappings, etc.

apiVersion: karpenter.k8s.aws/v1beta1
kind: EC2NodeClass
metadata:
  name: pekko-fsm-node-class
spec:
  amiFamily: AL2 # Amazon Linux 2
  role: "KarpenterNodeRole-pekko-fsm" 
  subnetSelectorTerms:
    - tags:
        karpenter.sh/discovery: "pekko-fsm" 
  securityGroupSelectorTerms:
    - tags:
        karpenter.sh/discovery: "pekko-fsm"

As you can see, KEDA and Karpenter are provides unbelievable tools to scale limitlessly and efficiently

Horizonzal Pod Autoscaling

Now let’s look how can you configure HPA, if you don’t have the possiblity to use KEDA or Karpenter.

At several points in this blog serie, I explicitly expressed that biggest advantage of Akka / Pekko, Kubernetes and Service Oriented design as unlimited Horizontal Scaling and Elasticity capability. Kubernetes Autoscaling is what makes this a non-human interaction automated process.

The configuration of Autoscaling via Helm Charts is a trivial task. if you observe the file ‘fsm-akka-4eyes-application/helm/templates/hpa.yaml‘.

{{- if .Values.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: {{ include "fsm-pekko-4eyes-application.fullname" . }}
  labels:
    {{- include "fsm-pekko-4eyes-application.labels" . | nindent 4 }}
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: {{ include "fsm-pekko-4eyes-application.fullname" . }}
  minReplicas: {{ .Values.autoscaling.minReplicas }}
  maxReplicas: {{ .Values.autoscaling.maxReplicas }}
  metrics:
    {{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
    - type: Resource
      resource:
        name: cpu
        targetAverageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
    {{- end }}
    {{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
    - type: Resource
      resource:
        name: memory
        targetAverageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
    {{- end }}
  behavior:
    scaleDown:
      stabilizationWindowSeconds: {{ .Values.autoscaling.scaleDown.stabilizationWindowSeconds }}
      policies:
        {{- toYaml .Values.autoscaling.scaleDown.policies | nindent 8 }}
      selectPolicy: {{ .Values.autoscaling.scaleDown.selectPolicy }}
    scaleUp:
      stabilizationWindowSeconds: {{ .Values.autoscaling.scaleUp.stabilizationWindowSeconds }}
      policies:
        {{- toYaml .Values.autoscaling.scaleUp.policies | nindent 8 }}
      selectPolicy: {{ .Values.autoscaling.scaleUp.selectPolicy }}
{{- end }}

As you can see, we are telling Kubernetes, in which ranges we can increase or decrease the number of Pods and with which criteria, in this particular configuration Kubernetes observes for overall cluster % 80 cpu load or certain % of memory pressure, it should start an extra new Pod.

Pod Scaling has also mechanism to automatically scale down Pods as you can see above ‘behavior‘ element, more detailed documentation about the topic you can find in the following link.

We can configure any of these values over ‘fsm-akka-4eyes-application/helm/values.yaml‘.

autoscaling:
  enabled: false
  minReplicas: 2
  maxReplicas: 100
  targetCPUUtilizationPercentage: 80
  # targetMemoryUtilizationPercentage: 80
  scaleDown:
    stabilizationWindowSeconds: 300
    policies:
      - type: Percent
        value: 10
        periodSeconds: 60
      - type: Pods
        value: 5
        periodSeconds: 60
    selectPolicy: Min
  scaleUp:
    stabilizationWindowSeconds: 0
    policies:
      - type: Percent
        value: 100
        periodSeconds: 15
      - type: Pods
        value: 10
        periodSeconds: 15
    selectPolicy: Max

A final point about the subject, for Kubernetes Autoscaling mechanism to function, your Kubernetes Environment needs a Metric Server, if you want to test Autoscaling in the context of this PoC, you have to install it also but a word caution, I have a powerful computer with 64 GB Memory but a constellation of Apache Cassandra, Apache Kafka, Elasticsearch, Nexus, Akka / Pekko brings my computer to its limits, I will explain in Helm Deployment chapter how to install Metric Server but I advice to test autoscaling in AWS, Google Cloud, MS Azure or your on premise dedicated Kubernetes cluster.

Appendix

KEDA Setup

Now let’s see how can we setup KEDA with Helm.

First please add KEDA to your Helm Repository

helm repo add kedacore https://kedacore.github.io/charts
helm repo update

And then install it.

helm install keda kedacore/keda -f values.yaml --namespace keda --create-namespace

With the help of the following ‘values,yaml’.

image:
  keda:
    repository: ghcr.io/kedacore/keda
  metricsApiServer:
    repository: ghcr.io/kedacore/keda-metrics-apiserver
  pullPolicy: Always

crds:
  install: true

operator:
  name: keda-operator
  replicaCount: 1
  affinity: {}

metricsServer:
  replicaCount: 1
  dnsPolicy: ClusterFirst
  useHostNetwork: false
  affinity: {}

rbac:
  create: true

serviceAccount:
  create: true
  name: keda-operator
  automountServiceAccountToken: true
  annotations: 
    eks.amazonaws.com/role-arn: arn:aws:iam::xxx-yyy-zzz-accountid:role/keda-role

podIdentity:
  aws:
    irsa:
      enabled: false
      audience: "sts.amazonaws.com"
      roleArn: ""
      stsRegionalEndpoints: "true"
      tokenExpiration: 86400

logging:
  operator:
    level: info
    format: console
    timeEncoding: rfc3339
  metricServer:
    level: 0

securityContext:
  operator:
    capabilities:
      drop:
      - ALL
    allowPrivilegeEscalation: false
    readOnlyRootFilesystem: true
    seccompProfile:
      type: RuntimeDefault
  metricServer:
    capabilities:
      drop:
      - ALL
    allowPrivilegeEscalation: false
    seccompProfile:
      type: RuntimeDefault

podSecurityContext:
  operator:
    runAsNonRoot: true
    runAsUser: 1001
    runAsGroup: 1001
    fsGroup: 1001
  metricServer:
    runAsNonRoot: true

service:
  type: ClusterIP
  portHttp: 80
  portHttpTarget: 8080
  portHttps: 443
  portHttpsTarget: 6443

resources:
  operator:
    limits:
      cpu: 1
      memory: 1000Mi
    requests:
      cpu: 100m
      memory: 100Mi
  metricServer:
    limits:
      cpu: 1
      memory: 1000Mi
    requests:
      cpu: 100m
      memory: 100Mi

  timeout: 3000
  keepAlive:
    enabled: true

prometheus:
  metricServer:
    enabled: false
    port: 9022
    portName: metrics
    path: /metrics
    podMonitor:
      # Enables PodMonitor creation for the Prometheus Operator
      enabled: false
      interval:
      scrapeTimeout:
      namespace:
      additionalLabels: {}
      relabelings: []
  operator:
    enabled: false
    port: 8080
    podMonitor:
      # Enables PodMonitor creation for the Prometheus Operator
      enabled: false
      interval:
      scrapeTimeout:
      namespace:
      additionalLabels: {}
      relabelings: []
    prometheusRules:
      enabled: false
      namespace:
      additionalLabels: {}
      alerts:
        []

permissions:
  metricServer:
    restrict:
      secret: false
  operator:
    restrict:
      secret: false

Now to connect AWS Token Service with KEDA, we have to link with IAM Role so KEDA can manipulate Horizontal Pod Autoscaler of our application.

aws iam create-role --role-name keda-role  --assume-role-policy-document role-mapping.json --description "keda role-description" --output text

role-mapping.json‘ configuration for the IAM Role, we also have to identify OIDC Provider of the our AWS EKS Cluster.

aws eks describe-cluster --name pekko_fsm --region eu-central-1 --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///")
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Federated": "arn:aws:iam::xxx-yyy-zzz-accountid:oidc-provider/oidc_provider"
      },
      "Action": "sts:AssumeRoleWithWebIdentity",
      "Condition": {
        "StringEquals": {
          "{{ OIDC_PROVIDER }}:aud": "sts.amazonaws.com",
          "{{ OIDC_PROVIDER }}:sub": [
            "system:serviceaccount:keda:keda-operator",
 "system:serviceaccount:fsm_pekko:{{ include "fsm-pekko-4eyes-application.serviceAccountName" . }}"
          ]
        }
      }
    }
  ]
}

And finally, we have to annotate our Application’s Service Account with IAM Role.

kubectl annotate serviceaccount -n fsm-pekko {{ include "fsm-pekko-4eyes-application.serviceAccountName" . }} eks.amazonaws.com/role-arn=arn:aws:iam::xxx-yyy-zzz-accountid:role/keda-role

Karpenter Setup

Now Karpenter Setup is more complex, we will need lots of IAM Role so Karpenter can interact with our AWS EKS Cluster, AWS Eventbridge and AWS SQS so it can receive Spot Instance interruption events.

PS. when you ever see this notation, ‘{{ fsm-pekko-interruption-queue }}’ you have to replace with the actual ARN of your SQS Queue.

aws iam create-role --role-name karpenter-node-role-fsm-pekko  --assume-role-policy-document karpenter-node-role.json --description "Karpenter Node Role" --output text

and Karpenter Node Role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Path": "/",
                "ManagedPolicyArns": [
                    {
                        "Ref": "arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy"
                    },
                    {
                        "Ref": "arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy"
                    },
                    {
                        "Ref": "arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly"
                    },
,
                    {
                        "Ref": "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
                    }
                ]
    }
  ]
}

Next we have to create some Resources for SQS Queue to be able to receive Event to be able to track the specially the Life Cycle Events from Spot Instances.

aws sqs create-queue --queue-name fsm-pekko-interruption-queue --attributes file://fsm-pekko-queue.json

and ‘fsm-pekko-queue.json’-

{
  "MessageRetentionPeriod": "300",
  "Policy": "{
   \"Version\": \"2012-10-17\",
   \"Id\": \"EC2InterruptionPolicy\",
   \"Statement\": [{
      \"Sid\":"fsm-pekko-interruption-queue\",
      \"Effect\": \"Allow\",
      \"Principal\": {
         \"Service\": [
            \"events.amazonaws.com\",
            \"sqs.amazonaws.com\"
         ]
      },
      \"Action\": [
         \"sqs:SendMessage\",
         \"sqs:ReceiveMessage\"
      ],
      \"Resource\": \"{{ fsm-pekko-interruption-queue.arn }}\"
   }]
}"
}

We have to filter which Event we are going to receive from SQS Queue.

AWS Event Rule for AWS SQS Queue from ‘aws.health’.

aws events put-rule --name "ScheduledChangeRule" --event-pattern "{\"source\":[\"aws.health\"],\"detail-type\":[\"AWS Health Event\"]}"  --role-arn "{{ fsm-pekko-interruption-queue.arn }}"

AWS Event Rule for AWS SQS Queue from ‘aws.ec2’ for Spot Instance Interruption.

aws events put-rule --name "SpotInterruptionRule" --event-pattern "{\"source\":[\"aws.ec2\"],\"detail-type\":[\"EC2 Spot Instance Interruption Warning\"]}"  --role-arn "{{ fsm-pekko-interruption-queue.arn }}"

AWS Event Rule for AWS SQS Queue from ‘aws.ec2’ for Instance Rebalance.

aws events put-rule --name "RebalanceRule" --event-pattern "{\"source\":[\"aws.ec2\"],\"detail-type\":[\"EC2 Instance Rebalance Recommendation\"]}"  --role-arn "{{ fsm-pekko-interruption-queue.arn }}"

AWS Event Rule for AWS SQS Queue from ‘aws.ec2’ for Instance State Change.

aws events put-rule --name "RebalanceRule" --event-pattern "{\"source\":[\"aws.ec2\"],\"detail-type\":[\"EC2 Instance State-change Notification\"]}"  --role-arn "{{ KarpenterInterruptionQueue.arn }}"

We also need an IAM Policy for Karpenter Controller in AWS EKS.

aws iam create-policy --policy-name KarpenterControllerPolicy-fsm-pekko --policy-document policy.json

and the IAM Policy definition,

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeAvailabilityZones",
                "ec2:DescribeImages",
                "ec2:DescribeInstances",
                "ec2:DescribeInstanceTypeOfferings",
                "ec2:DescribeInstanceTypes",
                "ec2:DescribeLaunchTemplates",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSpotPriceHistory",
                "ec2:DescribeSubnets",
                "pricing:GetProducts",
                "ssm:GetParameter",
                "ec2:CreateFleet",
                "ec2:CreateLaunchTemplate",
                "ec2:CreateTags",
                "ec2:DeleteLaunchTemplate",
                "ec2:RunInstances",
                "ec2:TerminateInstances"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage"
            ],
            "Resource": [
                "{{ fsm-pekko-interruption-queue.Arn }}"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": [              "arn:aws:iam:xxx_yyy_zzz_accountId:role/KarpenterNodeRole-fsm-pekko"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": [
"arn:aws:eks:eu-central-1:xxx_yyy_zzz_accountId:cluster/fsm-pekko"
            ]
        }
    ]
}

Now we have to create Mapping from IAM. Role “KarpenterNodeRole-fsm-pekko” that we previously created to our EKS Cluster.

eksctl create iamidentitymapping --username system:node:{{EC2PrivateDNSName}} --cluster  fsm-pekko --arn "arn:aws:iam::xxx_yyy_zzz_accountId:role/KarpenterNodeRole-fsm-pekko" --group system:bootstrappers --group system:nodes 

IAM Role for Karpenter Controller

eksctl utils associate-iam-oidc-provider --cluster fsm-pekko --approve

We attach AM ServiceAccount with the previously created “KarpenterControllerPolicy-fsm-pekko” so Karpenter can launch instances.

eksctl create iamserviceaccount --cluster "fsm-pekko" --name karpenter --namespace karpenter --role-name "Karpenter-fsm-pekko" --attach-policy-arn "arn:aws:iam::xxx_yyy_zzz_accountId:policy/KarpenterControllerPolicy-fsm-pekko" --role-only --approve

We have to create a Service Linked Role to Spot Service at AWS.

aws iam create-service-linked-role --aws-service-name spot.amazonaws.com 2> /dev/null

And finally install Karpenter via Helm.

helm upgrade --install karpenter oci://public.ecr.aws/karpenter/karpenter --namespace karpenter --create-namespace --set serviceAccount.annotations."eks\.amazonaws\.com/role-arn"=arn:aws:iam::xxx_yyy_zzz_accountId:role/Karpenter-fsm-pekko --set settings.aws.clusterName=fsm-pekko --set settings.aws.defaultInstanceProfile=KarpenterNodeInstanceProfile-fsm-pekko --set settings.aws.interruptionQueueName=fsm-pekko-interruption-queue --set controller.resources.requests.cpu=1 --set controller.resources.requests.memory=1Gi --set controller.resources.limits.cpu=1 --set controller.resources.limits.memory=1Gi --wait

Code Generation from UML with XText / XTend for Akka / Pekko Finite State Machine ( FSM )

I this article you will find a the detail analysis of how to generate Scala / Java code for Akka / Pekko Finite State Machine from UML Diagram which models a State Machine with the help of Eclipse XText and XTend Frameworks. UML Diagram will be modelled with the help of Eclipse Papyrus.

If you are not interested Akka / Pekko FSM but you need an example about how to generate State Machine from UML Diagram (or any Domain Specific Language) especially for Java or Scala Code with Gradle and XText, XTend (there isn’t too much examples in the internet about it), you can find the details in below mentioned Github repository in the ‘build.gradle‘.

The sources of this blog lies under following Github Project Akka Finite State Machine.

  1. XText / XTend
    1. XTend
    2. MWE2
    3. XText / UML
    4. XText Reader
    5. Gradle Configuration for XText

This blog is a part of a series demonstrate the full implementation of Proof of Concept application with Eventsourced principles using Akka / Pekko Framework, Apache Kafka, Apache Cassandra and Elasticsearch which you can find it here, to be able to understand whole context of what here explained, I advice to start reading from that blog but if you just need implementation detail of the topic mentioned in the title please continue to read.

We will create from the following UML Diagram of State Machine, the necessary Source Code and Configuration for Akka / Pekko State Machine.

PS: Unfortunately WordPress reduces the image quality, if you want to see the images in a better resolution, please click the images.

XText / XTend

fsm-pekko-xtend

If you stayed with me in these series of Blogs, at this point it must be clear for you that I am using lots of code generation from UML Models and the magic happens with the help of the Eclipse’s XText and XTend frameworks. Now some of you would be irritated while we all like to work with IntelliJ so you would not be thrilled with develop our Code generation with Eclipse.

To tell you the truth, if you are not experienced with XText or XTend, I strongly advice you develop your code generation templates with Eclipse, Syntax and Context help is irreplaceable for starters but I personally when I create Templates or change those, I don’t even start the Eclipse and just use the editors in IntelliJ.

And there is another good news, the creators of the XText / XTend probably are not so happy with this fact, they are developing a TypeScript based version at the moment Langium. In my opinion it is not quite mature at the moment but they are coming there, there would most probably a future version of this blog using Langium instead XText / XTend in the future.

If you are really committed to understand what is happening under this chapter, you have to spend little bit time in XText / XTend documentation. XText part is not really relevant for us ( XText is extremely powerful for creating Domain Specific Languages) while UML is our DSL, XText is only help us to translate UML’s Ecore Metamodel for XTend so we can generate code.

To generate Akka Finite State Machine, we will always asking after UML Model is loaded via XText MWE2 framework to get List of all State Machines in the UML Model.

XTend

Lets look to the
fsm-pekko-xtend/org/salgar/pekko/fsm/xtend/uml/templates/StateMachineTemplate.xtend‘.

package org.salgar.pekko.fsm.xtend.uml.templates

import javax.inject.Inject
import org.eclipse.emf.ecore.resource.Resource
import org.eclipse.xtext.generator.IFileSystemAccess2
import org.eclipse.xtext.generator.AbstractGenerator
import org.eclipse.xtext.generator.IGeneratorContext

import org.eclipse.uml2.uml.Pseudostate
import org.eclipse.uml2.uml.State
import org.eclipse.uml2.uml.Transition
import org.eclipse.uml2.uml.Trigger
import org.eclipse.uml2.uml.Vertex

import java.util.ArrayList
import java.util.List

import org.salgar.pekko.fsm.xtend.uml.templates.facade.StateMachineFacadeTemplate
import org.salgar.pekko.fsm.xtend.uml.templates.facade.SlaveStateMachineFacadeTemplate

class StateMachineTemplate extends AbstractGenerator {
	@Inject extension Naming
	@Inject extension GlobalVariableHelper
	@Inject extension StateMachineHelper
	@Inject extension SubStateMachineHelper
	@Inject ScalaActionsTemplate scalaActionsTemplate
	@Inject ScalaGuardsTemplate scalaGuardsTemplate
	@Inject GuardianTemplate guardianTemplate
	@Inject ServiceLocatorTemplate serviceLocatorTemplate
	@Inject SlaveGuardianTemplate slaveGuardianTemplate
	@Inject SnapshotTemplate snapshotTemplate
	@Inject SnapshotAdapterTemplate snapshotAdapterTemplate
	@Inject StateMachineFacadeTemplate stateMachineFacadeTemplate
	@Inject SlaveStateMachineFacadeTemplate slaveStateMachineFacadeTemplate

	override doGenerate(Resource input, IFileSystemAccess2 fsa, IGeneratorContext context) {
		input.allContents.filter(org.eclipse.uml2.uml.StateMachine).filter(s | s.isActive).forEach[
			val content = generate(context)
			fsa.generateFile(packagePath+"/"+name+".scala", content)
			scalaActionsTemplate.doGenerate(it, fsa, context)
			scalaGuardsTemplate.doGenerate(it, fsa, context)

			if(!isAbstract) {
			    guardianTemplate.doGenerate(it, fsa)
			    serviceLocatorTemplate.doGenerate(it, fsa)
			    stateMachineFacadeTemplate.doGenerate(it, fsa)
			} else {
			    slaveGuardianTemplate.doGenerate(it, fsa)
			    slaveStateMachineFacadeTemplate.doGenerate(it, fsa)
			}
			snapshotTemplate.doGenerate(it, fsa)
			snapshotAdapterTemplate.doGenerate(it, fsa, context)

			//stateMachineFacadeImplTemplate.doGenerate(it, fsa)
		]
	}
    ...
}

This is the basic usage pattern of XTend, first we locate object that we are interested in the UML Model, by telling the type of the object that we are looking for, in this case ‘org.eclipse.uml2.uml.StateMachine’ and then we generate code for those.

For ex, to generate all the States that are found in the UML State Machine.

object «name» {
   sealed trait State {
      private[«nearestPackageName»] def controlObject: util.Map[java.lang.String, AnyRef]
   }
   «FOR state : allOwnedElements().filter(Pseudostate).sortWith([o1, o2 | o1.getName().compareTo(o2.getName())])»
        final case class «state.name.toUpperCase()»(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    «ENDFOR»
    «FOR state : allOwnedElements().filter(State).sortWith([o1, o2 | o1.getName().compareTo(o2.getName())])»
        «subStateRecursive(state, new ArrayList<Vertex>, context)»
    «ENDFOR»

Please pay attention to the special characters ‘«’ and ‘»’ around the keywords, everything in between these characters are either commands or logic for the code generation and the rest just String templates for the code generation, above snippet creates Scala Case classes for every State in the UML State Machine. I will not go more in detail here because the amount of the generation templates is a lot, please look to the code in the GitHub, if you have concrete problems to understand the XTend code, please write in the comments, so I can improve this chapter.

One of the interesting features of the XTend is the extension mechanism, if you have functionality that you are using again and again, instead of creating a base class and extending this class to access to the his functionality, we can inject this functionality as Extension.

For ex, below we are using ‘Naming‘ extension, to run over UML Package to figure out what will be the Scala / Java package and which directory we have to create the generated code.

class StateMachineTemplate extends AbstractGenerator {
	@Inject extension Naming
	@Inject extension GlobalVariableHelper
	@Inject extension StateMachineHelper
	@Inject extension SubStateMachineHelper
	@Inject ScalaActionsTemplate scalaActionsTemplate
	@Inject ScalaGuardsTemplate scalaGuardsTemplate
    
    ...

    def generate (org.eclipse.uml2.uml.StateMachine it, IGeneratorContext context) '''
        package «packageName»

        import java.util
        ...
     }
     
     ...
}
package org.salgar.pekko.fsm.xtend.uml.templates

import org.eclipse.uml2.uml.PackageableElement
import org.eclipse.uml2.uml.PrimitiveType
import org.eclipse.uml2.uml.Type
import org.eclipse.uml2.uml.Property

class Naming {

	/**
	 * Strip off Model name
	 */
	def packageName (PackageableElement it) {
		val qnNearestPck = it.nearestPackage.qualifiedName
		qnNearestPck.substring(qnNearestPck.indexOf("::")+2).replace('::','.')
	}

	def packagePath (PackageableElement it) {
		packageName.replace('.','/')
	}
    ...
}

While we are using extensively code generation, it is not feasible here to visit every code generation template, only thing I can give you as advice, if you have to change any of the functionality in the generated code for Scala, Java, etc, just create a prototype code and then try to integrate that to code generation template.

MWE2

Now let’s look how we are telling build system, Gradle to generate code. XText has Model Workflow Engine component for this purpose, its configuration is simple, if you check the ‘fsm-pekko-4eyes-statemachine/src/main/mwe2/GenerateWorkflow.mwe2‘.

Workflow {
	bean = org.salgar.pekko.fsm.xtend.uml.engine.UmlGeneratorStandaloneSetup : setupScala {
        config = {
            javaGenPath = "build/generated-sources/main/scala"
            rootClass = "org.salgar.pekko.fsm.xtend.uml.templates.ScalaRoot"
        }
        doInit = true
    }
}
package org.salgar.pekko.fsm.xtend.uml.templates

import javax.inject.Inject
import org.eclipse.emf.ecore.resource.Resource
import org.eclipse.xtext.generator.IFileSystemAccess2
import org.eclipse.xtext.generator.AbstractGenerator
import org.eclipse.xtext.generator.IGeneratorContext

class ScalaRoot extends AbstractGenerator {
	@Inject StateMachineTemplate stateMachineTemplate
	// add more templates here

	override doGenerate(Resource input, IFileSystemAccess2 fsa, IGeneratorContext context) {
		stateMachineTemplate.doGenerate(input, fsa, context)
	}
}

As you can see, first we have to say from which XTend Generator we will generate code, this case, it is
fsm-pekko-xtend/org/salgar/pekko/fsm/xtend/uml/templates/ScalaRoot.xtend
class (while we are creating Scala class) which must base on the ‘org.eclipse.xtext.generator.AbstractGenerator’ and ‘StateMachineTemplate’ which will generate the Scala class would be injected here.

Next we have to tell which UML Model we would use to create our System.

component =  org.salgar.pekko.fsm.xtext.reader.XTextReader : scalaReader  {
        useJavaClassPath = true
        uriFilter = org.eclipse.xtext.mwe.NameBasedFilter {
            regularExpression = '4eyes.*\\.uml'
        }

        register = setupScala
        loadResource = {
            slot = "modelSlot"
        }
    }

We used a regular expression filter here to locate all UML Models containing ‘4eyes‘ in the name. The artefact that containing our UML Model must be in the dependency list of our project.

Next part joins the code generator component and the UML Model.

	component = org.salgar.pekko.fsm.xtend.uml.generator.FsmPekkoGeneratorComponent : scalaGenerator {
		register = setupScala
        slot = 'modelSlot'
        globalVar = {
            name = "targetSourceStateSeperator" value = "_$$_"
        }
        globalVar = {
                name = "submachineSeperator" value = "_$_"
        }
        outlet = {
            path = "build/generated-sources/main/scala"
        }
        outlet = {
        	outletName = "gen-once"
            path = javaPath
        }
    }

with ‘setupScala‘ and ‘modelSlot‘, additionally you could see that we are configuring Global Variables for MWE2 Workflow, to be tell generation framework to use special characters for Action / Guard class names or between State of Sub / Nested State Machines(naturally only the special characters allowed from Scala /Java are usable) and finally which directory the code should be generated.

XText / UML

As I mentioned before, only role of the XText for this Proof of Concept is to enable XTend to access Ecore objects of the UML definition from Eclipse but if you like to create your own DSL to generate Akka / Pekko Finite State Machines or everything in this project, please check another blog of mine, with the information there, you could easily do it.

Main magic happens in two classes,
fsm-pekko-xtend/org/salgar/akka/fsm/xtend/uml/engine/UmlGeneratorStandaloneSetup.java‘ and
fsm-pekko-xtend/org/salgar/akka/fsm/xtend/uml/engine/UMLGeneratorModule.java‘.

    public void register(Injector injector) {
        org.eclipse.xtext.resource.IResourceFactory resourceFactory = 
                injector.getInstance(org.eclipse.xtext.resource.IResourceFactory.class);
        org.eclipse.xtext.resource.IResourceServiceProvider serviceProvider = 
                injector.getInstance(org.eclipse.xtext.resource.IResourceServiceProvider.class);
        Resource.Factory.Registry.INSTANCE.getExtensionToFactoryMap().put("uml", resourceFactory);
        org.eclipse.xtext.resource.IResourceServiceProvider.Registry.INSTANCE.getExtensionToFactoryMap()
                .put("uml", serviceProvider);

        registerUMLPathMaps();
    }

Basically the ‘UmlGeneratorStandaloneSetup’, tells XText, what would be the file extension for UML Model (‘.uml’ what a surprise).

    protected void registerUMLPathMaps() {
        String umlResourcesPath = null;
        String p = "metamodels/UML.metamodel.uml";
        URL url = Thread.currentThread().getContextClassLoader().getResource("metamodels/UML.metamodel.uml");
        URI uri = URI.createURI(url.toExternalForm());
        if (uri == null || uri.toString().equals(p)) {
            throw new IllegalStateException("Missing required plugin 'org.eclipse.uml2.uml.resources' in classpath.");
        }
        umlResourcesPath = uri.toString();
        final int mmIndex = umlResourcesPath.lastIndexOf("/metamodels");
        if (mmIndex < 0)
            throw new IllegalStateException("Missing required plugin 'org.eclipse.uml2.uml.resources' in classpath.");

        umlResourcesPath = umlResourcesPath.substring(0, mmIndex);

        //
        URIConverter.URI_MAP.put(URI.createURI("pathmap://UML_PROFILES/"), URI.createURI(umlResourcesPath+"/profiles/"));
        URIConverter.URI_MAP.put(URI.createURI("pathmap://UML_METAMODELS/"), URI.createURI(umlResourcesPath+"/metamodels/"));
        URIConverter.URI_MAP.put(URI.createURI("pathmap://UML_LIBRARIES/"), URI.createURI(umlResourcesPath+"/libraries/"));
    }

And registers UML’s ECore Metamodel to the XText.

    @Override
    public Injector createInjectorAndDoEMFRegistration() {
        if(injector == null) {
            try {
                injector = Guice.createInjector(
                        new UMLGeneratorModule(
                                (Class<? extends IGenerator2>)Class.forName(
                              config.getRootClass())), 
                              getDynamicModule());
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            register(injector);
        }
        return injector;
    }

Final point is what we already saw in MWE2 configuration, injection of the Generator class to the XText components.

UMLGeneratorModule‘ is responsible to organise the necessary injections for Code Genereation and XTend.

XText Reader

In its normal setup XText / XTend expects its model files to lie in the same projects directory structure but while we are using our UML Models from different Workflows in different Projects, we want to be accessible as Gradle dependencies and the XText / XTend should be able to access to the contents of that dependency, I have to develop a special XText Reader.

fsm-pekko-xtext-reader/org/salgar/pekko/fsm/xtext/reader/XTextReader.java

mport org.apache.log4j.Logger;
import org.eclipse.xtext.mwe.Reader;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class XTextReader extends Reader {
    private static final Logger LOGGER = Logger.getLogger(XTextReader.class);

    @Override
    public void setUseJavaClassPath(boolean isUse) {
        if (isUse) {
            Set<String> classPathEntries = new HashSet();
            try {
                retrieveClassPathEntries(Thread.currentThread().getContextClassLoader(), classPathEntries);
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (NoSuchFieldException e) {
                e.printStackTrace();
            }
            List<String> tmp = new ArrayList<>(classPathEntries);

            for (String entry : tmp) {
                addPath(entry);
            }
        }
    }

    private Set<String> retrieveClassPathEntries(ClassLoader classLoader, final Set<String> classPathEntries) throws IllegalAccessException, NoSuchFieldException {
        final List<String> givenLoaderClassPathEntries = new ArrayList<String>();

        if (classLoader instanceof URLClassLoader) {
            URLClassLoader tmp = (URLClassLoader) classLoader;
            for(URL classPathURL : tmp.getURLs()) {
                String classPath = classPathURL.getPath().trim().toLowerCase();

                if (classPath.endsWith("/") || classPath.endsWith(".jar")) {
                    givenLoaderClassPathEntries.add(classPathURL.getFile());
                }
            }
            if(LOGGER.isDebugEnabled()) {
                LOGGER.debug("Classpath entries from thread context loader: {" + givenLoaderClassPathEntries.toString() + "}");
            }
            classPathEntries.addAll(givenLoaderClassPathEntries);
        }

        classPathEntries.addAll(givenLoaderClassPathEntries);

        if (givenLoaderClassPathEntries.isEmpty()) {
            givenLoaderClassPathEntries.addAll(retrieveSystemClassPathEntries());

            LOGGER.debug("System classpath entries from thread context loader: {" + givenLoaderClassPathEntries + "}");

            classPathEntries.addAll(givenLoaderClassPathEntries);
            return classPathEntries;
        }
        if( classLoader.getParent() != null) {
            retrieveClassPathEntries(classLoader.getParent(), classPathEntries);
        }
        return classPathEntries;
    }

    private static List<String> retrieveSystemClassPathEntries() {
        List<String> paths = new ArrayList<>();
        String classPath = System.getProperty("java.class.path");
        String separator = System.getProperty("path.separator");
        String[] strings = classPath.split(separator);

        for(String path : strings) {
            paths.add(path);
        }

        return paths;
    }
}

which will will load all the Jar’s from classpath and added to the path of the XText MWE Reader to be able to load all necessary objects from Gradle dependencies.

    component =  org.salgar.pekko.fsm.xtext.reader.XTextReader : scalaReader  {
        useJavaClassPath = true
        uriFilter = org.eclipse.xtext.mwe.NameBasedFilter {
            regularExpression = '4eyes.*\\.uml'
        }

        register = setupScala
        loadResource = {
            slot = "modelSlot"
        }
    }

we also have to use this specific reader in our MWE2 workflow.

Gradle Configuration for XText

Let’s look shortly how to configure Gradle for code generation.

tasks.register('generateXtextLanguage', JavaExec) {
        def generatedFileDir = file('build/generated-sources')
        outputs.dir generatedFileDir
        mainClass = 'org.eclipse.emf.mwe2.launch.runtime.Mwe2Launcher'
        classpath = configurations.mwe2
        inputs.file('src/main/mwe2/GenerateWorkflow.mwe2').withPathSensitivity(PathSensitivity.RELATIVE)
        args += "src/main/mwe2/GenerateWorkflow.mwe2"
        args += "-p"
        args += "rootPath=/${projectDir}/.."
    }

    generateXtext.dependsOn(generateXtextLanguage)
    clean.dependsOn(cleanGenerateXtextLanguage)
    spotlessScala.dependsOn(generateXtextLanguage)
    compileJava.dependsOn(spotlessApply)
    compileScala.dependsOn(spotlessApply)

Actually configuration is quite simple, we are just telling where should the generated sources must be placed, ‘Mwe2Launcher‘ class that will run the code generation workflows from
fsm-pekko-4eyes-statemachine/mwe2/GenerateWorkflow.mwe2
and the classpath for dependencies necessary for Code Generation, which we have to separate while we don’t want those to leak to runtime classpath because after code generation those are not necessary.

 configurations {
        mwe2 {
            extendsFrom implementation
        }
        aspectImplementation {
            extendsFrom implementation
            canBeResolved=true
        }
    }

    dependencies {
        mwe2 project(':fsm-pekko-xtext-reader')
        mwe2 project(':fsm-pekko-xtend')
        mwe2 project(':fsm-pekko-4eyes-uml-model')
        mwe2 'org.salgar.pekko.fsm:fsm-pekko-eclipse-dependencies:1.0.0-SNAPSHOT:repackaged'
    }

Final configuration is to identify which ‘.mwe2‘ to use for configuration and to tell XTend to which Gradle project directory we are using, so XTend can make necessary calculations when it creates the generated files.

This was a short explanation how to integrate the UML Models from tools like Eclipse Papyrus to XText and XText.

If you want to inspect how the generated Artefacts look like, it is normally not a good idea to commit generated code to GiT but if you don’t want to build whole project to see it, I created a special branch, in which for example you can see the generated Akka / Pekko Finite State Machine code.

fsm-pekko-4eyes-statemachine/org/salgar/fsm/pekko/foureyes/credit/CreditSM.scala

package org.salgar.fsm.pekko.foureyes.credit

import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.salgar.fsm.pekko.foureyes.credit.CreditSM._
import org.salgar.fsm.pekko.foureyes.credit.actions.SpringCreditSMActionsLocator
import org.salgar.fsm.pekko.foureyes.credit.guards.SpringCreditSMGuardsLocator
import org.salgar.pekko.fsm.api.UseCaseKey
import org.salgar.pekko.fsm.base.CborSerializable
import org.salgar.pekko.fsm.base.actors.BaseActor
import org.salgar.pekko.fsm.base.actors.BaseActor.InternalBaseMessage

import java.util

object CreditSM {
    sealed trait State {
        private[credit] def controlObject: util.Map[java.lang.String, AnyRef]
    }
    final case class INITIAL(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_INITIAL_CA(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable
    final case class CREDIT_REJECTED(controlObject: util.Map[java.lang.String, AnyRef]) extends State with CborSerializable

    sealed trait CreditSMEvent {
        def useCaseKey: UseCaseKey
    }
    final case class onAcceptableScore(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef], replyTo: ActorRef[Response]) extends CreditSMEvent
    object onAcceptableScore {
        def apply(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef]) : onAcceptableScore = {
            onAcceptableScore(useCaseKey, payload, null)
        }
    }
    final case class onAccepted(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef], replyTo: ActorRef[Response]) extends CreditSMEvent
    object onAccepted {
        def apply(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef]) : onAccepted = {
            onAccepted(useCaseKey, payload, null)
        }
    }
    final case class onCustomerUpdated(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef], replyTo: ActorRef[Response]) extends CreditSMEvent
    object onCustomerUpdated {
        def apply(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef]) : onCustomerUpdated = {
            onCustomerUpdated(useCaseKey, payload, null)
        }
    }
    final case class onRejected(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef], replyTo: ActorRef[Response]) extends CreditSMEvent
    object onRejected {
        def apply(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef]) : onRejected = {
            onRejected(useCaseKey, payload, null)
        }
    }
    final case class onRelationshipManagerApproved(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef], replyTo: ActorRef[Response]) extends CreditSMEvent
    object onRelationshipManagerApproved {
        def apply(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef]) : onRelationshipManagerApproved = {
            onRelationshipManagerApproved(useCaseKey, payload, null)
        }
    }
    final case class onResultReceived(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef], replyTo: ActorRef[Response]) extends CreditSMEvent
    object onResultReceived {
        def apply(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef]) : onResultReceived = {
            onResultReceived(useCaseKey, payload, null)
        }
    }
    final case class onSalesManagerApproved(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef], replyTo: ActorRef[Response]) extends CreditSMEvent
    object onSalesManagerApproved {
        def apply(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef]) : onSalesManagerApproved = {
            onSalesManagerApproved(useCaseKey, payload, null)
        }
    }
    final case class onSubmit(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef], replyTo: ActorRef[Response]) extends CreditSMEvent
    object onSubmit {
        def apply(useCaseKey: UseCaseKey, payload: util.Map[String, AnyRef]) : onSubmit = {
            onSubmit(useCaseKey, payload, null)
        }
    }

    //Reporting
    sealed trait ReportEvent extends CreditSMEvent
    final case class onReport(useCaseKey: UseCaseKey, replyTo: ActorRef[Response])
        extends ReportEvent

    sealed trait Response
    final case class ReportResponse(state: State, payload: util.Map[String, AnyRef]) extends Response
    final case class AcknowledgeResponse() extends Response

    //internal protocol
    sealed trait InternalMessage extends CreditSMEvent
    final case class onAddCreditSMRelatedReference(useCaseKey: UseCaseKey, listing: Receptionist.Listing) extends InternalMessage with InternalBaseMessage

    sealed trait PersistEvent
    final case class PositiveResultPersistedEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditScoreAddressResultReceivedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditScoreNotEnoughPersisteEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditRejectedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class WaitingCreditAnalystApprovalCreditAcceptedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class AcceptableScorePersistedEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditInitialPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class FraudPreventionReceivedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class RelationshipManagerApprovedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class FraudPreventionCreditScoreReceivedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class AddressCheckCreditScoreResultReceivedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CustomerUpdatedEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class FraudPreventionFraudPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class PositiveResultReceivedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditScoreAddressCheckReceivedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class WaitingManagerApprovalRelationshipManagerPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditScoreToLowPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class AdressCheckFraudPreventionReceviedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditAcceptedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class AdressCheckReceivedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditScoreReceivedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class SalesManagerApprovalPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditScoreFraudPreventionReceivedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class FraudPreventionAdressCheckReceivedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class WaitingManagerApprovalSalesManagerApprovedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditApplicationSubmittedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
    final case class CreditAanlystApprovedPersistEvent(@JsonDeserialize(as = classOf[util.Map[String, AnyRef]]) payload: util.Map[String, AnyRef]) extends PersistEvent with CborSerializable
}

class CreditSM (
                    ctx: ActorContext[CreditSMEvent],
                    useCaseKey: String
                  ) extends BaseActor[CreditSMEvent, onAddCreditSMRelatedReference, PersistEvent, State] (
                                TypeCase[onAddCreditSMRelatedReference],
                                useCaseKey
                            ) {
    import CreditSM._

    private val listingAdapter: ActorRef[Receptionist.Listing] = ctx.messageAdapter {
        listing => onAddCreditSMRelatedReference(() => useCaseKey, listing)
    }

    ctx.system.receptionist ! Receptionist.Subscribe(
        ServiceKey[CreditSMEvent]("creditsmService_" + useCaseKey),
        listingAdapter)

    def commandHandler(context: ActorContext[CreditSMEvent], cmd: CreditSMEvent, state: State): ReplyEffect[PersistEvent, State] =
        cmd match {
            case onReport(useCaseKey, replyTo) =>
                Effect.reply(replyTo)( ReportResponse(state, java.util.Collections.unmodifiableMap(state.controlObject)))

            case _ =>
                commandHandlerInternal(context, cmd, state)
    }

    def commandHandlerInternal(context: ActorContext[CreditSMEvent], cmd: CreditSMEvent, state: State): ReplyEffect[PersistEvent, State] =
        state match {
            //State Command Handler
            case INITIAL(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onSubmit(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing INITIAL onSubmit payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_INITIAL_$$_CREDIT_APPLICATION_SUBMITTED_intial_onSubmitAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_INITIAL_CA(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_INITIAL_CA onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_INITIAL_CA onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_INITIAL_CA onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onSalesManagerApproved(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM onSalesManagerApproved payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_RELATIONSHIP_MANAGER_APPROVED_$$_SALES_MANAGER_APPROVED_relationshipManagerApproved_onSalesManagerApprovedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onSalesManagerApproved(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL onSalesManagerApproved payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_WAITING_MANAGER_APPROVAL_$$_WAITING_MANAGER_APPROVAL_onSalesManagerApproved_salesManagerCreditAmountCriticalGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_WAITING_MANAGER_APPROVAL_$$_WAITING_MANAGER_APPROVAL_waitingForApproval_onSalesManagerApprovedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_RELATIONSHIP_MANAGER_APPROVED_$$_SALES_MANAGER_APPROVED_relationshipManagerApproved_onSalesManagerApprovedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_INITIAL_CSC_$$_ADRRESCHECK_RESULT_RECEIVED_onResultReceived_isAdressCheckResultGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_INITIAL_CSC_$$_ADRRESCHECK_RESULT_RECEIVED_initial_AddressCheckResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_INITIAL_CSC_$$_FRAUDPREVENTION_RESULT_RECEIVED_onResultReceived_isFraudPreventionResultGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_INITIAL_CSC_$$_FRAUDPREVENTION_RESULT_RECEIVED_initial_FraudPreventionResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_INITIAL_CSC_$$_CREDITSCORE_RECEIVED_onResultReceived_isCreditScoreResultGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_INITIAL_CSC_$$_CREDITSCORE_RECEIVED_initial_creditScoreReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_onResultReceived_isCreditScoreNotSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_salesManagerApproved_onCreditScoreReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_onResultReceived_isResultSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_salesManagerApproved_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_ADRRESCHECK_RESULT_RECEIVED_$$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED_onResultReceived_isCreditScoreResult.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_ADRRESCHECK_RESULT_RECEIVED_$$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED_addressCheck_CredfitScore_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_ADRRESCHECK_RESULT_RECEIVED_$$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED_onResultReceived_isFraudPreventionResultGaurd.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_ADRRESCHECK_RESULT_RECEIVED_$$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED_addressCheckResultReceived_fraudPreventionResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_onResultReceived_isCreditScoreNotSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_salesManagerApproved_onCreditScoreReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_onResultReceived_isResultSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_salesManagerApproved_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_onResultReceived_isCreditScoreNotSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_salesManagerApproved_onCreditScoreReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_onResultReceived_isResultSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_salesManagerApproved_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_onResultReceived_isCreditScoreNotSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_salesManagerApproved_onCreditScoreReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_onResultReceived_isResultSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_salesManagerApproved_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDITSCORE_RECEIVED_$$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED_onResultReceived_isAddressCheckResult.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDITSCORE_RECEIVED_$$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED_creditScore_addressCheck_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDITSCORE_RECEIVED_$$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED_onResultReceived_isFraudPreventionResultGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDITSCORE_RECEIVED_$$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED_creditScoreReceived_creditScore_fraudPreventionResultAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_onResultReceived_isCreditScoreNotSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_salesManagerApproved_onCreditScoreReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_onResultReceived_isResultSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_salesManagerApproved_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_onResultReceived_isCreditScoreNotSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_salesManagerApproved_onCreditScoreReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_onResultReceived_isResultSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_salesManagerApproved_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_FRAUDPREVENTION_RESULT_RECEIVED_$$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED_onResultReceived_isAddressCheckResultGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_FRAUDPREVENTION_RESULT_RECEIVED_$$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED_fraudPReventionResultReceived_fraudPReventionResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_FRAUDPREVENTION_RESULT_RECEIVED_$$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED_onResultReceived_isCreditScoreResultGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_FRAUDPREVENTION_RESULT_RECEIVED_$$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED_fraudPreventionResultReceived_creditScoreFraudPreventionResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_onResultReceived_isCreditScoreNotSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_APPROVAL_FROM_SENIOR_MANAGER_salesManagerApproved_onCreditScoreReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_onResultReceived_isResultSufficientGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_SALES_MANAGER_APPROVED_$$_WAITING_CREDIT_ANALYST_APPROVAL_salesManagerApproved_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                        else if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRelationshipManagerApproved(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE onRelationshipManagerApproved payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_WAITING_APPROVAL_$$_RELATIONSHIP_MANAGER_APPROVED_waitingApproval_onRelationshipManagerApprovedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRelationshipManagerApproved(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL onRelationshipManagerApproved payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_WAITING_MANAGER_APPROVAL_$$_WAITING_MANAGER_APPROVAL_onRelationshipManagerApproved_relationshipManagerCreditAmountCriticalGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_WAITING_MANAGER_APPROVAL_$$_WAITING_MANAGER_APPROVAL_waitingManagerApproval_onRelationshipManagerApprovedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_WAITING_APPROVAL_$$_RELATIONSHIP_MANAGER_APPROVED_waitingApproval_onRelationshipManagerApprovedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onAcceptableScore(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER onAcceptableScore payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_WAITING_APPROVAL_FROM_SENIOR_MANAGER_$$_WAITING_CREDIT_ANALYST_APPROVAL_waitingApprovalFromSeniorOfficier_onAcceptableScoreAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onAccepted(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM onAccepted payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_WAITING_CREDIT_ANALYST_APPROVAL_$$_CREDIT_ACCEPTED_creditAnalyst_onAcceptedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(controlObject) =>
                base[CreditSMEvent](cmd, state) {
                    case onAccepted(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL onAccepted payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_WAITING_ANAYLIST_APPROVAL_$$_WAITING_ANAYLIST_APPROVAL_onAccepted_creditAnalystCreditAmountCritical.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_WAITING_ANAYLIST_APPROVAL_$$_WAITING_ANAYLIST_APPROVAL_waitingAnalystApproval_onCreditAcceptedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_WAITING_CREDIT_ANALYST_APPROVAL_$$_CREDIT_ACCEPTED_creditAnalyst_onAcceptedAction.doAction(
                            context,
                            controlObject,
                            payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onCustomerUpdated(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL onCustomerUpdated payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_APPLICATION_SUBMITTED_customer_updatedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onRejected(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL onRejected payload: {}", payload.toString)
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_managerRejected_onRejectedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                    
                        Effect.unhandled.thenNoReply()
                    }
                    case onResultReceived(useCaseKey, payload, replyTo) => {
                        context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL onResultReceived payload: {}", payload.toString)
                        if(SpringCreditSMGuardsLocator.getInstance.creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_onResultReceived_isScoreTooLowGuard.evaluate(
                            context,
                            controlObject,
                            payload)) {
                    
                        return SpringCreditSMActionsLocator.getInstance().creditsm_CREDIT_APPLICATION_SUBMITTED_$$_CREDIT_REJECTED_lowScore_onResultReceivedAction.doAction(
                                        context,
                                        controlObject,
                                        payload, replyTo)
                        }
                    
                        Effect.unhandled.thenNoReply()
                    }
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
            //State Command Handler
            case CREDIT_REJECTED(controlObject) =>
                base[CreditSMEvent](cmd, state) {
            
                    case _ =>
                        Effect.unhandled.thenNoReply()
                }
                case _ =>
                    Effect.unhandled.thenNoReply()
        }

    def eventHandler(context: ActorContext[CreditSMEvent], state: State, event: PersistEvent): State = {
        state match {
            //1 Pseud0
            case _initial @ INITIAL(controlObject) =>
                event match {
                case CreditApplicationSubmittedPersistEvent(payload) =>
                    context.log.debug("Processing INITIAL CreditApplicationSubmittedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit5 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL(_initial.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during INITIAL State {}", unh)
                    _initial
                  }
            }

            //1 Recursive
            //pseudo
            case _credit_application_submitted_initial_ca @ CREDIT_APPLICATION_SUBMITTED_$_INITIAL_CA(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_INITIAL_CA CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_initial_ca
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_INITIAL_CA CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_initial_ca.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_INITIAL_CA CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_initial_ca.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_INITIAL_CA State {}", unh)
                    _credit_application_submitted_initial_ca
                  }
            }
            //recursive
            case _credit_application_submitted_credit_accepted @ CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_credit_accepted
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_credit_accepted.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_credit_accepted.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED State {}", unh)
                    _credit_application_submitted_credit_accepted
                  }
            }
            //recursive
            //pseudo
            case _credit_application_submitted_relationship_manager_approved_initial_fe_sm @ CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_relationship_manager_approved_initial_fe_sm
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_relationship_manager_approved_initial_fe_sm.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_relationship_manager_approved_initial_fe_sm.controlObject)
                case SalesManagerApprovalPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM SalesManagerApprovalPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC(_credit_application_submitted_relationship_manager_approved_initial_fe_sm.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_INITIAL_FE_SM State {}", unh)
                    _credit_application_submitted_relationship_manager_approved_initial_fe_sm
                  }
            }
            //recursive
            case _credit_application_submitted_relationship_manager_approved_waiting_manager_approval @ CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_relationship_manager_approved_waiting_manager_approval
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_relationship_manager_approved_waiting_manager_approval.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_relationship_manager_approved_waiting_manager_approval.controlObject)
                case SalesManagerApprovalPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL SalesManagerApprovalPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC(_credit_application_submitted_relationship_manager_approved_waiting_manager_approval.controlObject)
                case WaitingManagerApprovalSalesManagerApprovedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL WaitingManagerApprovalSalesManagerApprovedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_relationship_manager_approved_waiting_manager_approval
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL State {}", unh)
                    _credit_application_submitted_relationship_manager_approved_waiting_manager_approval
                  }
            }
            //recursive
            //pseudo
            case _credit_application_submitted_sales_manager_approved_initial_csc @ CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_sales_manager_approved_initial_csc
                case AdressCheckReceivedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC AdressCheckReceivedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit4 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED(_credit_application_submitted_sales_manager_approved_initial_csc.controlObject)
                case FraudPreventionReceivedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC FraudPreventionReceivedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit4 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED(_credit_application_submitted_sales_manager_approved_initial_csc.controlObject)
                case CreditScoreReceivedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC CreditScoreReceivedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit4 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED(_credit_application_submitted_sales_manager_approved_initial_csc.controlObject)
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_initial_csc.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_initial_csc.controlObject)
                case CreditScoreNotEnoughPersisteEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC CreditScoreNotEnoughPersisteEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER(_credit_application_submitted_sales_manager_approved_initial_csc.controlObject)
                case PositiveResultPersistedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC PositiveResultPersistedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(_credit_application_submitted_sales_manager_approved_initial_csc.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_INITIAL_CSC State {}", unh)
                    _credit_application_submitted_sales_manager_approved_initial_csc
                  }
            }
            //recursive
            case _credit_application_submitted_sales_manager_approved_adrrescheck_result_received @ CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED(controlObject) =>
                event match {
                case AdressCheckFraudPreventionReceviedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED AdressCheckFraudPreventionReceviedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit4 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED(_credit_application_submitted_sales_manager_approved_adrrescheck_result_received.controlObject)
                case AddressCheckCreditScoreResultReceivedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED AddressCheckCreditScoreResultReceivedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit4 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED(_credit_application_submitted_sales_manager_approved_adrrescheck_result_received.controlObject)
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_sales_manager_approved_adrrescheck_result_received
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_adrrescheck_result_received.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_adrrescheck_result_received.controlObject)
                case CreditScoreNotEnoughPersisteEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED CreditScoreNotEnoughPersisteEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER(_credit_application_submitted_sales_manager_approved_adrrescheck_result_received.controlObject)
                case PositiveResultPersistedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED PositiveResultPersistedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(_credit_application_submitted_sales_manager_approved_adrrescheck_result_received.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_ADRRESCHECK_RESULT_RECEIVED State {}", unh)
                    _credit_application_submitted_sales_manager_approved_adrrescheck_result_received
                  }
            }
            //recursive
            case _credit_application_submitted_sales_manager_approved_creditscore_addresscheck_result_received @ CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_sales_manager_approved_creditscore_addresscheck_result_received
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_creditscore_addresscheck_result_received.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_creditscore_addresscheck_result_received.controlObject)
                case CreditScoreNotEnoughPersisteEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED CreditScoreNotEnoughPersisteEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER(_credit_application_submitted_sales_manager_approved_creditscore_addresscheck_result_received.controlObject)
                case PositiveResultPersistedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED PositiveResultPersistedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(_credit_application_submitted_sales_manager_approved_creditscore_addresscheck_result_received.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED State {}", unh)
                    _credit_application_submitted_sales_manager_approved_creditscore_addresscheck_result_received
                  }
            }
            //recursive
            case _credit_application_submitted_sales_manager_approved_creditscore_fraudprevention_result_received @ CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_sales_manager_approved_creditscore_fraudprevention_result_received
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_creditscore_fraudprevention_result_received.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_creditscore_fraudprevention_result_received.controlObject)
                case CreditScoreNotEnoughPersisteEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED CreditScoreNotEnoughPersisteEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER(_credit_application_submitted_sales_manager_approved_creditscore_fraudprevention_result_received.controlObject)
                case PositiveResultPersistedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED PositiveResultPersistedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(_credit_application_submitted_sales_manager_approved_creditscore_fraudprevention_result_received.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED State {}", unh)
                    _credit_application_submitted_sales_manager_approved_creditscore_fraudprevention_result_received
                  }
            }
            //recursive
            case _credit_application_submitted_sales_manager_approved_creditscore_received @ CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED(controlObject) =>
                event match {
                case CreditScoreFraudPreventionReceivedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED CreditScoreFraudPreventionReceivedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit4 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED(_credit_application_submitted_sales_manager_approved_creditscore_received.controlObject)
                case CreditScoreAddressResultReceivedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED CreditScoreAddressResultReceivedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit4 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_ADDRESSCHECK_RESULT_RECEIVED(_credit_application_submitted_sales_manager_approved_creditscore_received.controlObject)
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_sales_manager_approved_creditscore_received
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_creditscore_received.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_creditscore_received.controlObject)
                case CreditScoreNotEnoughPersisteEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED CreditScoreNotEnoughPersisteEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER(_credit_application_submitted_sales_manager_approved_creditscore_received.controlObject)
                case PositiveResultPersistedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED PositiveResultPersistedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(_credit_application_submitted_sales_manager_approved_creditscore_received.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_RECEIVED State {}", unh)
                    _credit_application_submitted_sales_manager_approved_creditscore_received
                  }
            }
            //recursive
            case _credit_application_submitted_sales_manager_approved_fraudprevention_adresscheck_result_received @ CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_sales_manager_approved_fraudprevention_adresscheck_result_received
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_fraudprevention_adresscheck_result_received.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_fraudprevention_adresscheck_result_received.controlObject)
                case CreditScoreNotEnoughPersisteEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED CreditScoreNotEnoughPersisteEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER(_credit_application_submitted_sales_manager_approved_fraudprevention_adresscheck_result_received.controlObject)
                case PositiveResultPersistedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED PositiveResultPersistedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(_credit_application_submitted_sales_manager_approved_fraudprevention_adresscheck_result_received.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED State {}", unh)
                    _credit_application_submitted_sales_manager_approved_fraudprevention_adresscheck_result_received
                  }
            }
            //recursive
            case _credit_application_submitted_sales_manager_approved_fraudprevention_result_received @ CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_sales_manager_approved_fraudprevention_result_received
                case FraudPreventionAdressCheckReceivedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED FraudPreventionAdressCheckReceivedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit4 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_ADRESSCHECK_RESULT_RECEIVED(_credit_application_submitted_sales_manager_approved_fraudprevention_result_received.controlObject)
                case FraudPreventionCreditScoreReceivedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED FraudPreventionCreditScoreReceivedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit4 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_CREDITSCORE_FRAUDPREVENTION_RESULT_RECEIVED(_credit_application_submitted_sales_manager_approved_fraudprevention_result_received.controlObject)
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_fraudprevention_result_received.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_sales_manager_approved_fraudprevention_result_received.controlObject)
                case CreditScoreNotEnoughPersisteEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED CreditScoreNotEnoughPersisteEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER(_credit_application_submitted_sales_manager_approved_fraudprevention_result_received.controlObject)
                case PositiveResultPersistedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED PositiveResultPersistedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(_credit_application_submitted_sales_manager_approved_fraudprevention_result_received.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_SALES_MANAGER_APPROVED_$_FRAUDPREVENTION_RESULT_RECEIVED State {}", unh)
                    _credit_application_submitted_sales_manager_approved_fraudprevention_result_received
                  }
            }
            //recursive
            //pseudo
            case _credit_application_submitted_waiting_approval_initial_fe @ CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_waiting_approval_initial_fe
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_waiting_approval_initial_fe.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_waiting_approval_initial_fe.controlObject)
                case RelationshipManagerApprovedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE RelationshipManagerApprovedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL(_credit_application_submitted_waiting_approval_initial_fe.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_INITIAL_FE State {}", unh)
                    _credit_application_submitted_waiting_approval_initial_fe
                  }
            }
            //recursive
            case _credit_application_submitted_waiting_approval_waiting_manager_approval @ CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_waiting_approval_waiting_manager_approval
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_waiting_approval_waiting_manager_approval.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_waiting_approval_waiting_manager_approval.controlObject)
                case RelationshipManagerApprovedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL RelationshipManagerApprovedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_RELATIONSHIP_MANAGER_APPROVED_$_WAITING_MANAGER_APPROVAL(_credit_application_submitted_waiting_approval_waiting_manager_approval.controlObject)
                case WaitingManagerApprovalRelationshipManagerPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL WaitingManagerApprovalRelationshipManagerPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_waiting_approval_waiting_manager_approval
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_$_WAITING_MANAGER_APPROVAL State {}", unh)
                    _credit_application_submitted_waiting_approval_waiting_manager_approval
                  }
            }
            //recursive
            case _credit_application_submitted_waiting_approval_from_senior_manager @ CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER(controlObject) =>
                event match {
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_waiting_approval_from_senior_manager
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_waiting_approval_from_senior_manager.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_waiting_approval_from_senior_manager.controlObject)
                case AcceptableScorePersistedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER AcceptableScorePersistedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit4 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(_credit_application_submitted_waiting_approval_from_senior_manager.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_WAITING_APPROVAL_FROM_SENIOR_MANAGER State {}", unh)
                    _credit_application_submitted_waiting_approval_from_senior_manager
                  }
            }
            //recursive
            //pseudo
            case _credit_application_submitted_waiting_credit_analyst_approval_initial_ca_sm @ CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM(controlObject) =>
                event match {
                case CreditAcceptedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM CreditAcceptedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED(_credit_application_submitted_waiting_credit_analyst_approval_initial_ca_sm.controlObject)
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_waiting_credit_analyst_approval_initial_ca_sm
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_waiting_credit_analyst_approval_initial_ca_sm.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_waiting_credit_analyst_approval_initial_ca_sm.controlObject)
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_INITIAL_CA_SM State {}", unh)
                    _credit_application_submitted_waiting_credit_analyst_approval_initial_ca_sm
                  }
            }
            //recursive
            case _credit_application_submitted_waiting_credit_analyst_approval_waiting_anaylist_approval @ CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL(controlObject) =>
                event match {
                case CreditAcceptedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL CreditAcceptedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_APPLICATION_SUBMITTED_$_CREDIT_ACCEPTED(_credit_application_submitted_waiting_credit_analyst_approval_waiting_anaylist_approval.controlObject)
                case CustomerUpdatedEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL CustomerUpdatedEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_waiting_credit_analyst_approval_waiting_anaylist_approval
                case CreditScoreToLowPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL CreditScoreToLowPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_waiting_credit_analyst_approval_waiting_anaylist_approval.controlObject)
                case CreditRejectedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL CreditRejectedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit1 from Master State
                    CREDIT_REJECTED(_credit_application_submitted_waiting_credit_analyst_approval_waiting_anaylist_approval.controlObject)
                case WaitingCreditAnalystApprovalCreditAcceptedPersistEvent(payload) =>
                    context.log.debug("Processing CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL WaitingCreditAnalystApprovalCreditAcceptedPersistEvent payload: {}", payload)
                    controlObject.putAll(payload)
                    //Exit6
                    _credit_application_submitted_waiting_credit_analyst_approval_waiting_anaylist_approval
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_APPLICATION_SUBMITTED_$_WAITING_CREDIT_ANALYST_APPROVAL_$_WAITING_ANAYLIST_APPROVAL State {}", unh)
                    _credit_application_submitted_waiting_credit_analyst_approval_waiting_anaylist_approval
                  }
            }
            case _credit_rejected @ CREDIT_REJECTED(controlObject) =>
                event match {
            
                case unh @ _ => {
                    context.log.warn("This PersistEvent is not handled during CREDIT_REJECTED State {}", unh)
                    _credit_rejected
                  }
            }
        }
    }
}

Akka / Pekko configuration for Finite State Machine ( FSM )

This blog is the detail analysis of an implementation of how to configure Akka / Pekko Framework for Cluster Sharding, Akka / Pekko Persistence, Serialisation so it can start persisted Finite State Machine ( FSM ) Actors which can function in Kubernetes Environment.

The sources of this blog lies under following Github Project Akka Finite State Machine

  1. Akka / Pekko System Configuration
    1. Settings
    2. Persistences
    3. Local Configuration
    4. Kubernetes Configuration
    5. Cluster Discovery / Bootstrap
    6. Health Check / Readiness / Liveness
    7. Kubernetes Cassandra
    8. Local / Kubernetes Profile

This blog is a part of a series demonstrate the full implementation of Proof of Concept application with Eventsourced principles using Akka / Pekko Framework, Apache Kafka, Apache Cassandra and Elasticsearch which you can find it here, to be able to understand whole context of what here explained, I advice to start reading from that blog but if you just need implementation detail of the topic mentioned in the title please continue to read.

We will use following configuration to execute the following UML State Machine.

PS: Unfortunately WordPress reduces the quality of the pictures, if you want to see the image in a better quality please click the image.

Akka / Pekko System Configuration

fsm-pekko-pekkosystem

This Proof of Concept is based on Akka Framework, so let’s look how are we configuring the Akka Framework.

There are several Systems that we have to configure, Akka / Pekko Actor System, Cluster Sharding, Akka / Pekko Persistence, Serialisation, etc. the main functionality concentrated in one class.

package org.salgar.fsm.pekko.pekkosystem

import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.BootstrapSetup
import org.apache.pekko.actor.setup.ActorSystemSetup
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.actor.typed.{ActorSystem, Behavior, Scheduler}
import org.apache.pekko.cluster.sharding.typed.scaladsl.ClusterSharding
import org.apache.pekko.serialization.jackson.{JacksonObjectMapperFactory, JacksonObjectMapperProviderSetup}
import org.apache.pekko.util.Timeout
import org.salgar.fsm.pekko.pekkosystem.ActorService.{customJacksonObjectMapperFactory, log, mainBehavior}
import org.salgar.fsm.pekko.pekkosystem.config.PekkoApplicationProperty
import org.slf4j.{Logger, LoggerFactory}
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component

import javax.annotation.{PostConstruct, PreDestroy}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

object ActorService {
  private val log : Logger = LoggerFactory.getLogger(ActorService.getClass)

  val customJacksonObjectMapperFactory = new JacksonObjectMapperFactory {
    override def newObjectMapper(bindingName: String, jsonFactory: JsonFactory): ObjectMapper =
    {
      if(bindingName == "jackson-json" || bindingName == "jackson-cbor") {
        val mapper = new ObjectMapper(jsonFactory)
        val pvt =  BasicPolymorphicTypeValidator
          .builder()
          .allowIfBaseType(classOf[Object])
          .build()
        mapper.activateDefaultTyping(pvt)
      } else {
        super.newObjectMapper(bindingName, jsonFactory)
      }
    }
  }

  def mainBehavior: Behavior[NotUsed] =
    Behaviors.setup {
      context =>
        Behaviors.empty
    }

  implicit var timeout: Timeout = 30.seconds
}

@Component
class ActorService (
                     @Autowired pekkoApplicationProperty: PekkoApplicationProperty,
                     @Autowired additionalService: AdditionalService){
  private var _sharding: ClusterSharding = null
  private var _ec: ExecutionContext = null
  private var _scheduler : Scheduler = null
  private var _actorSystem: ActorSystem[NotUsed] = null

  def sharding() = _sharding

  def ec() = _ec

  def scheduler() = _scheduler

  def actorSystem() = _actorSystem

  @PostConstruct
  def init(): Unit = {
    log.info("We are initializing the Actor System!")

    val actorSystemSetup = ActorSystemSetup()
      .withSetup(JacksonObjectMapperProviderSetup(customJacksonObjectMapperFactory))
      .withSetup(BootstrapSetup(additionalService.configureFromFiles()))
    _actorSystem = ActorSystem(mainBehavior, pekkoApplicationProperty.getApplicationName(), actorSystemSetup)

    additionalService.config(actorSystem)

    _sharding = ClusterSharding(actorSystem)
    _ec = actorSystem.executionContext
    _scheduler = actorSystem.scheduler

    log.info("Initialization of the Actor System complete!")
  }

  @PreDestroy
  def destroy() {
    _actorSystem.terminate()
  }
}

This class actually quite simple, only interesting part is the Serialisation configuration, I previously mentioned in the blog, for this Proof of Concept I used ‘jackson-cbor‘ serialisation to serialise the messages for Cluster Sharding and Event Persistence for the Journal and Snapshot.

Settings

The rest of the Akka / Pekko Framework over Akka configuration file
fsm-pekko-pekkosystem/src/main/resources/application.conf.

pekko.persistence.cassandra.journal.keyspace = "foureyes_Journal"
pekko.persistence.cassandra.snapshot.keyspace = "foureyes_Snapshot"

pekko.persistence.cassandra.journal.jmx-reporting-enabled=off
pekko.persistence.cassandra.snapshot.jmx-reporting-enabled=off
cassandra-query-journal.events-by-tag.jmx-reporting-enabled=off

pekko {
  actor {
    allow-java-serialization = off
    provider = "cluster"

    coordinated-shutdown.exit-jvm = on
    serialization-bindings {
      "org.salgar.pekko.fsm.base.CborSerializable" = jackson-cbor
    }
  }

  cluster {
    shutdown-after-unsuccessful-join-seed-nodes = 60s
    sharding {
      remember-entities = on
      remember-entities-store = eventsourced
      journal-plugin-id = "pekko.persistence.cassandra.journal"
      snapshot-plugin-id = "pekko.persistence.cassandra.snapshot"
      verbose-debug-logging = off
    }
  }

  loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
  logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
  logger-startup-timeout = 120s

  persistence {
    journal {
      plugin = "pekko.persistence.cassandra.journal"
    }
    snapshot-store {
      plugin = "pekko.persistence.cassandra.snapshot"
    }
    cassandra {
      events-by-tag {
        pubsub-notification=on

        first-time-bucket = "20230521T00:00"
      }
      journal {
        keyspace-autocreate = true
        tables-autocreate = true
      }
      snapshot {
        keyspace-autocreate = true
        tables-autocreate = true
      }
    }
  }
}

datastax-java-driver {
  basic {
    load-balancing-policy.local-datacenter = "dc1"
  }
  advanced {
    auth-provider {
      class= PlainTextAuthProvider
      username: fsm-cassandra-superuser
    }
  }
}

Let’s now do detail analysis of the specific configuration parts.

Persistences

pekko.persistence.cassandra.journal.keyspace = "foureyes_Journal"
pekko.persistence.cassandra.snapshot.keyspace = "foureyes_Snapshot"

The first configuration is the name of the Journal and Snapshot tables in the Apache Cassandra key space, ‘foureyes_Journal’, ‘foureyes_Snapshot’, at least for development purposes, when Akka / Pekko System starts, if theses tables will be created in an Apache Cassandra Keyspace (this behaviour is also configurable and can be turned off but while this is a Proof of Concept we are using this Quality of Life feature).

pekko {
  actor {
    allow-java-serialization = off
    provider = "cluster"

    coordinated-shutdown.exit-jvm = on
    serialization-bindings {
      "org.salgar.pekko.fsm.base.CborSerializable" = jackson-cbor
    }
  }

Next thing we configure is the Akka / Pekko Actor system’s serialisation configuration. First we make sure that Akka / Pekko is not using Java Serialisation which is extremely slow and inefficient compare to the protocols like Protobuf, Avro, Jackson Cbor, serialisation is a very important performance criteria because dependence in Akka Cluster Sharding and Akka / Pekko Persistence.

Then the serialisation binding which is signalling to Akka / Pekko when a class inherits from ‘CBorSerializable’, it should be serialised with ‘jackson-cbor’. It is possible to configure other frameworks but for small to medium size projects, ‘jasckson-cbor’ is more than enough but if you are dealing a system which will have 50 millions Actors should really consider using Protobuf or Avro. I will show in future blogs how to configure Protobuff and Avro also.

We also have to define, while we are using Cluster Sharding, as Actor provider ‘cluster’ as configuration and finally we tell the Actor system, if there are multiple Akka / Pekko Nodes, they should use coordinated shutdown.

cluster {
    shutdown-after-unsuccessful-join-seed-nodes = 60s
    sharding {
      remember-entities = on
      remember-entities-store = eventsourced
      journal-plugin-id = "pekko.persistence.cassandra.journal"
      snapshot-plugin-id = "pekko.persistence.cassandra.snapshot"
      verbose-debug-logging = off
    }
  }

Next part configures the Cluster Sharding, as we previously discussed here. ‘distributed-data’ is the default mode for Cluster Sharding and Remember Entities but I also mentioned, ‘distributed-data‘ performs well up to 100 000 Actors. Above this number, you have to switch to ‘eventsourced‘ mode, I configured the system to use ‘eventsourced’ mode out of the box, so configuration would be visible, if you prefer ‘distributed-data‘, you have to comment out these lines.

Only other interesting parameter here, if an Akka / Pekko Node can’t join the Cluster in 60 seconds, it will shutdown itself, so especially in Kubernetes environment, Kubernetes will get the signal that it had restart the Pod and hope that it can join the cluster this time.

persistence {
    journal {
      plugin = "pekko.persistence.cassandra.journal"
    }
    snapshot-store {
      plugin = "pekko.persistence.cassandra.snapshot"
    }
    cassandra {
      events-by-tag {
        pubsub-notification=on
        first-time-bucket = "20230521T00:00"
      }
      journal {
        keyspace-autocreate = true
        tables-autocreate = true
      }
      snapshot {
        keyspace-autocreate = true
        tables-autocreate = true
      }
    }
  }

Next configuration that we will look is the Akka / Pekko Persistence, first we have to configure which persistence plugins we want to use. In this PoC we are using Apache Cassandra so naturally plugins would be ‘pekko.persistence.cassandra.journal’ for Journal and ‘pekko.persistence.cassandra.snapshot’ for Snapshot persistence. One additional configuration is auto creation of Cassandra Keyspace and Tables, this is a convenience feature for development, I advice you not to use this in production and have full control about your database schemas in production.

Final configuration in ‘application.conf‘ is the connection information for Apache Cassandra Database.

datastax-java-driver {
  basic {
    load-balancing-policy.local-datacenter = "dc1"
  }
  advanced {
    auth-provider {
      class= PlainTextAuthProvider
      username: fsm-cassandra-superuser
    }
  }
}

These are necessary information for an Apache Cassandra connection but you might ask where is the connection url and password. Well this PoC is designed to run locally and kubernetes, for this purpose we have two additional configuration file.

Local Configuration

First lets look to local configuration
fsm-pekko-pekkosystem/src/main/resources/application-local.conf.

First Akka / Pekko has the possibility of combining several configuration file to one, ‘application.conf‘ contains base configuration with following notation, we will combine this with ‘application-local.conf‘.

include "application.conf"

pekko {
  fsm.numberOfShards = 30
  remote.artery {
    canonical {
      hostname = 127.0.0.1
      port = 2551
    }
  }

Local configuration is intended for development, so we can start our application with small scale values, for ex 30 as number of shards for a local environment is enough and for networking we can naturally configure localhost values.

cluster {
    sharding {
      distributed-data {
        durable.lmdb.dir = "build/ddata-foureyes"
      }
    }
    seed-nodes = [
      ${SEED_NODES}
    ]
  }

Next we configure where we are going to save Distributed Data for Cluster Sharding which is mainly necessary for ‘Remember Entities‘ (which is the mechanism to recover Entities / Actors during Adding / Removing Akka / Pekko Nodes or restart of the Akka / Pekko System).

‘Seed Nodes’ is the configuration telling under which names Actor System should be created (and Akka / Pekko Node with this name would join to the Akka / Pekko Cluster) and which IPs and ports should be monitored for additional Akka / Pekko Nodes (while we are in local development mode only one seed). With the help of the ‘pekko-management-cluster-*‘ dependencies, Akka / Pekko is capable of reading from Environment Properties to the configuration files, for this purpose we will place the ‘SEED_NODES’ in the ‘gradle.properties’ this way, we would be able to have different values locally, in CI / CD environments and production, below is a snippet of my local ‘gradle.properties‘.

K8SSANDRA_PASSWORD=XXXXXXXXXXXXXX
SEED_NODES=pekko://FourEyes@127.0.0.1:2551

datastax-java-driver {
  basic {
    contact-points = ["127.0.0.1:9042"]
  }
    advanced {
      auth-provider {
        password = ${K8SSANDRA_PASSWORD}
      }
    }
}

And finally off course we need the Apache Cassandra Database configuration, URL in this case localhost while have the Apache Cassandra running in local Kubernetes / Docker and Apache Cassandra password. The value of the Apache Cassandra Password is also provided ‘gradle.properties‘.

Kubernetes Configuration

Then we can look to the interesting part, our application should able to run in Kubernetes also it needs a different configuration for it
fsm-pekko-pekkosystem/src/main/resources/application-kubernetes.conf‘.
Again we will combine the kubernetes configuration with base one.

include "application.conf"

pekko {
  fsm.numberOfShards = ${NUMBER_OF_SHARDS}
  management {
    cluster.bootstrap {
      contact-point-discovery {
        discovery-method = kubernetes-api
        required-contact-point-nr = ${REQUIRED_CONTACT_POINT_NR}
      }
    }
    health-checks {
      akka-persistence-cassandra = "org.apache.pekko.persistence.cassandra.healthcheck.CassandraHealthCheck"
      cluster-membership = "org.apache.pekko.management.cluster.scaladsl.ClusterMembershipCheck"
    }
  }

Cluster Discovery / Bootstrap

Now first change you see compared to local configuration, number of shards is now a configurable value over Helm Charts while can easily change number of Akka / Pekko Nodes we have in Kubernetes. Secondly ‘seed-nodes’ configuration is not there, while we can change the number of Akka / Pekko Nodes over Helm, we can’t hard code the URLs of the seed nodes but thankfully Akka / Pekko has it own way to discover Akka / Pekko Cluster Nodes in Kubernetes, for this purpose we define the ‘discover-method‘ as ‘kubernetes-api‘ and additional configuration parameter to decide how many Akka Node should join to Cluster to accept the Akka / Pekko Cluster up.

Health Check / Readiness / Liveness

Next parameter is speciality of Kubernetes, kubernetes if the applications inside of the Pods fails has the ability to restart the Pods/Applications but it needs a mean to understand the application is failed, health endpoints are this mechanism, in our example we have two Cassandra Health Check and Cluster Membership Health Check.

cluster {
    shutdown-after-unsuccessful-join-seed-nodes = 60s
    sharding {
      distributed-data {
        durable.lmdb.dir = "var/lib/fsm_akka_4eyes_application"
      }
    }
  }

As you can see, cluster configuration is different in Kubernetes, we are telling the Akka / Pekko Pod to shutdown (and Kubernetes will restart it) if it can join a Akka / Pekko Cluster in 60s and naturally a different directory in Kubernetes to persist the distributed-data.

[INFO ] [a.c.Cluster] [FourEyes-akka.actor.default-dispatcher-24] [Cluster(akka://FourEyes)] – Cluster Node [akka://FourEyes@10.42.0.78:25520] – Received InitJoinAck message from [Actor[akka://FourEyes@10.42.0.77:25520/system/cluster/core/daemon#-1925380896]] to [akka://FourEyes@10.42.0.78:25520]
[INFO ] [a.c.Cluster] [FourEyes-akka.actor.default-dispatcher-24] [Cluster(akka://FourEyes)] – Cluster Node [akka://FourEyes@10.42.0.78:25520] – Welcome from [akka://FourEyes@10.42.0.77:25520]

After a successfully joining Akka / Pekko Cluster you would see the logs above, as you can see the Akka / Pekko Discovery searching the Kubenetes cluster for additional members of the Akka / Pekko Cluster and joins to the Cluster.

Kubernetes Cassandra

datastax-java-driver {
  basic {
    contact-points = [${CASSANDRA_CONTACT_POINT}":9042"]
  }
  advanced {
      auth-provider {
        password = ${K8SSANDRA_PASSWORD}
      }
    }
}

Naturally the configuration of the Apache Cassandra connection looks different in Kubernetes environment ‘CASSANDRA_CONTACT_POINT‘ variable set over the Helm Chart and transferred to our application (the exact mechanism how is it done I will explain when we look closely
fsm-pekko-helm-charts
project in this blog).

Local / Kubernetes Profile

Now how our application decides to use local configuration or kubernetes configuration, the answer here, Spring Boot help us here. Following dependency…

api 'org.springframework.cloud:spring-cloud-starter-kubernetes-fabric8-config'

detects if our application runs in kubernetes environment or not. If it runs in kubernetes it automatically turns following Spring Profile.

With the help of this profile, we can decide which application context to activate, if you look to
fsm-pekko-pekkosystem/org/salgar/fsm/pekko/pekkosystem/config/ActorSystemConfig.scala.

package org.salgar.fsm.pekko.pekkosystem.config

import org.salgar.fsm.pekko.pekkosystem.{AdditionalService, KubernetesActorService, NeutralActorService}
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.context.annotation.{Bean, Configuration, Primary, Profile}

@Configuration
class ActorSystemConfig {
  @Profile(Array("kubernetes"))
  @Bean
  @Primary
  def kubernetesActorService() : AdditionalService = {
    new KubernetesActorService();
  }

  @Bean
  def neutralActorService() : AdditionalService = {
    new NeutralActorService()
  }
}

As you can see, if ‘kubernetes‘ profile active, we are activating ‘KubernetesActorService‘ otherwise ‘NeutralActorService‘ is activated, which deals the scenario that we are running our application locally.

fsm-pekko-pekkosystem/org/salgar/fsm/pekko/pekkosystem/KubernetesActorService.scala

package org.salgar.fsm.pekko.pekkosystem

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.management.cluster.bootstrap.ClusterBootstrap
import org.apache.pekko.management.scaladsl.PekkoManagement
import org.salgar.fsm.pekko.pekkosystem.KubernetesActorService.log
import org.slf4j.{Logger, LoggerFactory}

object KubernetesActorService {
  private val log : Logger = LoggerFactory.getLogger(KubernetesActorService.getClass)
}

class KubernetesActorService extends AdditionalService {
  override def config(actorSystem : ActorSystem[NotUsed]): Unit = {
    log.info("We are initializing with Kubernetes")
    PekkoManagement(actorSystem).start()
    ClusterBootstrap(actorSystem).start()
  }

  override def configureFromFiles(): Config = {
    ConfigFactory.load("application-kubernetes.conf")
  }
}

As you might see all the Systems that we mentioned before are configured here, Akka / Pekko Management and Cluster Bootstrap are libraries responsible for discovering Akka / Pekko Nodes in kubernetes and dynamically building Akka / Pekko Clusters. This class also telling which Akka / Pekko configuration should be used, in this case ‘application-kubernetes.conf‘.

And for the local development,

fsm-pekko-pekkosystem/org/salgar/fsm/pekko/pekkosystem/NeutralActorService.scala

package org.salgar.fsm.pekko.pekkosystem

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.typed.ActorSystem
import org.salgar.fsm.pekko.pekkosystem.NeutralActorService.log
import org.slf4j.{Logger, LoggerFactory}

object NeutralActorService {
  val log : Logger = LoggerFactory.getLogger(NeutralActorService.getClass)
}

class NeutralActorService extends AdditionalService {
  override def config(actorSystem : ActorSystem[NotUsed]): Unit = {
    log.info("We are initializing without Kafka")
  }

  override def configureFromFiles(): Config = {
    ConfigFactory.load("application-local.conf")
  }
}

you can see that none of the kubernetes components are started.

All these components are injected to the base Akka / Pekko System configuration
fsm-pekko-pekkosystem/org/salgar/fsm/pekko/pekkosystem/ActorService.scala

package org.salgar.fsm.pekko.pekkosystem

import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.BootstrapSetup
import org.apache.pekko.actor.setup.ActorSystemSetup
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.actor.typed.{ActorSystem, Behavior, Scheduler}
import org.apache.pekko.cluster.sharding.typed.scaladsl.ClusterSharding
import org.apache.pekko.serialization.jackson.{JacksonObjectMapperFactory, JacksonObjectMapperProviderSetup}
import org.apache.pekko.util.Timeout
import org.salgar.fsm.pekko.pekkosystem.ActorService.{customJacksonObjectMapperFactory, log, mainBehavior}
import org.salgar.fsm.pekko.pekkosystem.config.PekkoApplicationProperty
import org.slf4j.{Logger, LoggerFactory}
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component

import javax.annotation.{PostConstruct, PreDestroy}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

object ActorService {
  private val log : Logger = LoggerFactory.getLogger(ActorService.getClass)

  val customJacksonObjectMapperFactory = new JacksonObjectMapperFactory {
    override def newObjectMapper(bindingName: String, jsonFactory: JsonFactory): ObjectMapper =
    {
      if(bindingName == "jackson-json" || bindingName == "jackson-cbor") {
        val mapper = new ObjectMapper(jsonFactory)
        val pvt =  BasicPolymorphicTypeValidator
          .builder()
          .allowIfBaseType(classOf[Object])
          .build()
        mapper.activateDefaultTyping(pvt)
      } else {
        super.newObjectMapper(bindingName, jsonFactory)
      }
    }
  }

  def mainBehavior: Behavior[NotUsed] =
    Behaviors.setup {
      context =>
        Behaviors.empty
    }

  implicit var timeout: Timeout = 30.seconds
}

First part of the base configuration deals with Serialisation configuration, as you can see all the classes inherited from ‘CBorSerializable‘ will be handled from ‘jackson-cbor‘ or ‘jackson-json‘ bindings.

@Component
class ActorService (
                     @Autowired pekkoApplicationProperty: PekkoApplicationProperty,
                     @Autowired additionalService: AdditionalService){
  private var _sharding: ClusterSharding = null
  private var _ec: ExecutionContext = null
  private var _scheduler : Scheduler = null
  private var _actorSystem: ActorSystem[NotUsed] = null

  def sharding() = _sharding

  def ec() = _ec

  def scheduler() = _scheduler

  def actorSystem() = _actorSystem

  @PostConstruct
  def init(): Unit = {
    log.info("We are initializing the Actor System!")

    val actorSystemSetup = ActorSystemSetup()
      .withSetup(JacksonObjectMapperProviderSetup(customJacksonObjectMapperFactory))
      .withSetup(BootstrapSetup(additionalService.configureFromFiles()))
    _actorSystem = ActorSystem(mainBehavior, pekkoApplicationProperty.getApplicationName(), actorSystemSetup)

    additionalService.config(actorSystem)

    _sharding = ClusterSharding(actorSystem)
    _ec = actorSystem.executionContext
    _scheduler = actorSystem.scheduler

    log.info("Initialization of the Actor System complete!")
  }

  @PreDestroy
  def destroy() {
    _actorSystem.terminate()
  }
}

As you can see base Akka / Pekko Systems here initialised, ActorSetup with the injected fitting configuration files (local, kubernetes) and additional systems for kubernetes (Akka / Pekko Management, Cluster Bootstrapping), Cluster Sharding, Execution Context and Scheduler.