Micro Service Transactions Management with State Machines

Lots of us are in the process of migrating our 30 years old Monoliths to a modern Micro Service architecture, but this endeavour presents naturally new challenges, especially at the topic of Transaction Management. A sound Micro Service design foresee usage of the Boundary Context concepts, meaning it should have the sovereignty over the data that it is managing, it should have dedicated either SQL or NoSQL data storage. This brings the dilemma, if a several Micro Services are a collaborating to realise an Use Case how are we going to guarantee data consistency over several Micro Services.

The life under the Monolith was easy, we just start an ACID Transaction over multiple Tables in a SQL database, we will not have this luxury with Micro Services, if you are planning to use two phase commits over several databased to solve this dilemma, I strongly advice not to do that, it will not scale under load and will cause more problems then it will solve.

So what I propose as a solution, first of all I will advice you to go an Event Sourced architecture and target an Eventually Consistent system, two tools that will support you for such an endeavour are Camunda and Apache Pulsar with the use of the Saga Pattern.

Now if you read my previous blog, you might know that I proposed a similar system with Pekko / Akka Finite State Machine and Apache Kafka, I will speak in a further chapter about advantages and disadvantages of both system..

My main motivation to build a similar system with Camunda mainly it is easier to startup a new System with these principles with Camunda and I believe it will perform good numbers like, 10000 transactions / second but if you require numbers like 100000 t/s or 1000000 t/s, you would probably do better with Pekko / Akka Finite State Machine solution and if you already modelled your processes in Camunda, it would not be difficult to convert those to UML State Machines migrate to the Pekko / Akka solution.

Second interesting choice in this blog, is the use of Apache Pulsar instead of Apache Kafka, there are lots of material on the internet how to operate Apache Kafka with Camunda but not so much about Apache Pulsar. So this would be feasibility study to show that it is totally viable.

Apache Pulsar is up and coming Messaging Broker and people always ask me what is the difference between of Apache Kafka and Apache Pulsar, I always say, it is the same distinction as Maven and Gradle. When Maven is first introduced, it was revolutionary and changed the build systems completely, but time goes on, people discerned some aspects of it was cumbersome, then people come up with the ideas that formed theGradle, which followed the same principle but felt more lightweight. In my opinion it is the same feeling with Apache Pulsar, when I used Apache Kafka in several projects, I always thought they could do this and that better, now I have a feeling Apache Pulsar is the answer to those points. Both has nearly the same performance, as you can see from following benchmarks, Apache Pulsar gaining more popularity against Apache Kafka, will not disappear that easily and as I said, Apache Pulsar has some additional nice Quality of Life features like multi tenancy, so I like to show some of its capabilities here.

PS: I like to apologise for the quality of the some images, unfortunately WordPress reduces the image quality during the uploads. For some images that you have to really see the details, I placed links to externally hosted images, so please click the images and you will be forwarded to these high definition images.

Now probably in your first iterations of the Micro Service design, you probably landed something similar to the following picture.

As you look from out side it seem like nice solution, but some of you might have that foul feeling, that this level of interdependency between the Micro Service is not a good thing. We learned over the years, loosely coupled systems are the way to go, the above picture does not look anything like a loosely coupled. Nearly every Micro Service has to know the existence of the other micro service. It is not only design problems but operational problems, above system communicate with Rest Services in a synchronous matter, if any of the micro services experiences an availability problems, the other ones will not able to realise their responsibilities, which is a huge weakness. It also suffer Transaction Management problems that this blogs claim to have a solution for it.

Now the following solution, which could be a better one.

This design avoids extreme inter-dependency of the micro services, adhere better to the concepts of self containment and loosely coupling and introduce and orchestration level but still suffers from synchronous communication over REST, pose same availability challenges and same Transaction Management problems.

My third proposed design evolvement would be adaptation of the Event Sourced principles with the help of a Message Broker.

With this design, Orchestration level gets its Data Storage to keep the information controlled by it Boundary Context in near proximity to itself. Probably you are asking to yourself, if the master of the Products is Product Service, Orders is Order Service and Customers is Customer Service, how Orchestration Layer will keep the data that in Store consistent with owner of those data. This is the point Event Sourced principles comes into play, Product Service, Order Service, Customer Service, will send for every change that is happening to the objects that they are responsible for, Events to Orchestration Layer update about the changes over the Message Broker. The Orchestration Layer should also send Events to these services, for the changes that is responsible for, this is the point Transaction Management over multiple micro services, Camunda State Machine, Eventual Consistency comes into the picture and this blog will show how to this.

One final point before we switched to the implementation details, do you see that we get rid of the synchronous REST protocol for inter micro service communication, with Message Broker and asynchronous communication, our System can operate even during short duration of outage for any micro service, while we have data storage at orchestration layer and eventual consistency to deal with other consequences of such an outage. This design also give us better possibilities during a load spike for elasticity, our use cases will not crash when we are scaling up our micro service, our inter micro service communication would be buffered in the Message Broker when our micro service need 30-40s to startup and scale.

  1. How Camunda will help us in such a scenario
    1. Business Case Definition
  2. How Apache Pulsar will help with such scenario
  3. Overview
    1. Positive Scenario
    2. Transaction Rollback
    3. Optimisations
  4. Infrastructure Setup
    1. Camunda
      1. Camunda vs Pekko / Akka Behavioural Finite State Machine
    2. Apache Pulsar
      1. Why to choose Apache Pulsar
    3. Proof of Concept Installation
  5. Implementation Details
    1. The Workflow
      1. Positive Scenario
      2. Transaction Rollback
      3. Optimisations
  6. Conclusion

How Camunda will help us in such a scenario

Following picture is from Camunda Modeller.

orchestration/src/main/resources/saga.bpmn

Above you see the Camunda Model that will implement Saga Pattern, let’s first look to the Business Case we will try to accomplish with it.

Business Case Definition

Our aim would be to create a new Order for a new Customer

  • We will send the Event to Customer Service to create a Customer for us, while we were storing the Customer at Order Service local storage under the concepts of the Boundary Context, as a response Customer Service will send us the result of Customer creation is a success or not and if successful the ID of the newly created Customer.
  • We will in parallel calculate / validate product selection at Product Options Service for the Order and we will return the result and operation is successful or not.
  • We will in parallel, reserve ordered items in Inventory Service and report that operation was successful or not.
  • Finally we will in parallel, create invoice at Invoice Service and report that operation was successful or not.

If any of this operations fails, we can’t complete our Order Processing, in our Monolith Design or in the existence of Single Database for all these use cases can realise their operations under one single Transaction, if any of those fail what we have to do is to rollback the transaction but what should we do in our Micro Service design? This is the place Saga Pattern comes into play, in the case of failure for these operations Camunda will start the compensating actions, like reverting the Customer creation, reverting the Products Reservations in the Inventory and cancel the created Invoices. Now these are not Atomic Operations but Eventually Consistent ones, meaning reverting of Customer, reverting Product reservation in Inventory, reverting creation of Invoice will take 1 or 2s (may be more in edge cases) before we can see the consistent state.

We also need safe guards that if any of these rollback operations fails (we can off course also build retry mechanisms in Camunda) but we also have to be sure, for ex, that none of the Products stays reserved indefinitely in our Inventory service for this Product, at the end of the day we want to sell these products. If rollback fails, a human interaction should solve these problems.

Fortunately Camunda has User Tasks concept to deal such scenarios, a stuck operation in automation will be either visible in TaskList application of Camunda or we can build custom web applications with popular SPA frameworks like Angular, React, Vue, Swelte, etc with the use of Task API, which I will display in further blogs

These compensation actions would be much more complex then compared to a simple database transaction, for ex, Camunda orchestration can, while reverting the reservation of the Product in the Inventory, it can also send Notification to Customer who inquired the same Product previously for an another Order and rejected, while now Product reservation is reversed, will be informed that the Product is again available. That would be a clever use of an eventual consistency.

How Apache Pulsar will help with such scenario

As you could see in the above Diagrams, we will use Apache Pulsar to organise our communication between the Micro Services, which means we will always receives Events In with ordering (first in first out) and we will never loose Events thanks to Apache Pulsar persistence, that way we can always create the correct state in Micro Services, if necessary by replaying the Events.

Following operations would always organise our communication via Apache Pulsar with Camunda.

Most important element in Camunda for this purpose is ‘Send Task‘ component to communicate with Apache Pulsar then we have two options to react to returning responses from Apache Pulsar, ‘Event Gateway‘ or ‘Receive Tasks‘.

Overview

We can observe this Proof of Concept under three stages of the Workflow that we will implement, which I categorise as Positive Scenario, Transaction Rollback and Optimisations.

Positive Scenario

Positive scenario deals with successful completion of our Workflow.

Over the Camunda Task we will trigger our Spring Boot Services in parallel by linking tasks to Java Job Workers.

@Component
@RequiredArgsConstructor
@Slf4j
public class ReserveProductInInventoryWorker {
    private final InventoryOutboundPort inventoryOutboundPort;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @JobWorker(type = "reserveProductInInventory")
    @SneakyThrows
    public void reserveProductInInventory(final ActivatedJob job) {
        log.info("Reserving Products in Inventory");

        Map<String, Object> existingVariables = job.getVariablesAsMap();
        String json = (String) existingVariables.get("order");
        Order order = objectMapper.readValue(json, Order.class);

        String orderId = (String) existingVariables
                .get(PayloadVariableConstants.ORDER_ID_VARIABLE);

        inventoryOutboundPort.reserveProductInInventory(orderId, order.getOrderItems());
    }
}

And we can even track over Camunda Operate component the execution of our Workflow.

When all the tasks reporting their states as success, we will tigger Order Completion and reach and state.

Transaction Rollback

Naturally most of you are interested to see how the Transaction Rollback process works, in the case of any of the tasks reports failure, it will tigger Transaction Rollback process.

which will revert the steps that we are realised in our Spring Boot Micro Services. Naturally our purpose to realise these steps in a fully automated fashion but we can’t ignore the Murphy’s law that something can go wrong will go wrong, so we will have a manual steps to take corrective action and to reach desired state if everything else fails as Camunda User Task.

For this purpose we are using Camunda’s User Tasks,

which we can also track in Camunda’s TaskList component and take necessary corrective actions.

Here you are seeing the Camunda’s TaskList component, it is really good for general use, you can even configure some custom HTML forms in your BPMN Diagram, to take manual corrective actions but if you need application with your Enterprise Look & Feel, you can program your own version by using Task API, which I will demonstrate in a further blog.

Optimisations

In our first iteration of our Workflow, I tried to keep things simple, so you can understand the core concepts better but some things are far from ideal. For ex, we are waiting completion of all Micro Service processes before deciding, process was successful or not but what happens if the creating Invoice is taking 2 hours. Are we going to keep a Product Reserved for 2 hours even that we know the Order process will fail? If we already know a Customer creation is failed and we can’t complete the Order Process, the Rollback should start immediately, if we would block the Product for 2 hours unnecessary, I think that would be bad for our business.

So I modified the Workflow to be able Rollback the Transaction from several branches of the process.

We are optimising our Business Case but the Workflow does not look nice isn’t it, it seems to have lots redundant components and it is quite hard to read. Fortunately Camunda has a ‘Call Activity‘ element that allow us to model the repetitive parts separately and make our model much more simple and elegant.

Now we would have 2 models, one for managing the whole Business Case,

and a second one to manage the Rollback process.

And we can link those with the help of the ‘Call Activity‘ component.

Now let’s look to the details of how we can setup the environment and develop this Proof of Concept.

Infrastructure Setup

Camunda

Now infrastructure setup of Camunda in Kubernetes was extremely simple, I used the Camunda Helm Chart in my Helm Umbrella Chart.

helm/camunda/Chart.yaml

apiVersion: v2
name: first-camunda
description: A Helm chart for Kubernetes
type: application
version: 0.1.0
appVersion: "1.16.0"
dependencies:
  - name: camunda-platform
    version: 9.3.0
    repository: https://helm.camunda.io

And ‘values,yaml‘ for basic installation in Kubernetes for my Proof of Concept application.

camunda-platform:
  global:
    identity:
      auth:
        enabled: false

  identity:
    enabled: false
    resources:
      limits:
        cpu: "1"
      requests:
        cpu: "0.2"

  optimize:
    enabled: false
    resources:
      limits:
        cpu: "1"
      requests:
        cpu: "0.2"

  zeebe:
    clusterSize: 1
    partitionCount: 1
    replicationFactor: 1
    pvcSize: 10Gi
    resources:
      requests:
        cpu: "0.2"

  zeebe-gateway:
    replicas: 1
    resources:
      limits:
        cpu: "1"
      requests:
        cpu: "0.2"

  connectors:
    enabled: true
    inbound:
      mode: disabled
    resources:
      limits:
        cpu: "1"
      requests:
        cpu: "0.2"

  elasticsearch:
    clusterHealthCheckParams: "wait_for_status=yellow&timeout=1s"
    master:
      replicaCount: 1
      persistence:
        size: 15Gi

I have to reduce the Resource numbers extremely to be able run it along side my Apache Pulsar installation, if you have more resources you can naturally turn those up.

This is no way a Production Setup, you have to pay much more attention to the several additional points, which you can find as List here.

Following command will let you install Camunda Platform from the Helm Chart directory.

helm upgrade --install camunda-platform . -n camunda --create-namespace

Camunda vs Pekko / Akka Behavioural Finite State Machine

As I previously mentioned, I had another blog to demonstrate how to build very similar system with Pekko / Akka Behavioural State Machine. With Camunda, it is much easier start such a concept, specially with the available tooling from Operate, TaskList and Camunda Modeller is a much more user friendly tool then Eclipse Papyrus (but you can represent every BPMN Workflow as an UML State Machine if you want to, so you can use Eclipse Papyrus).

The Pekko / Akka solution becomes much more attractive, if you have to deal with extreme load, Pekko / Akka is better suited for it. For Process Monitoring you have there also Elasticsearch but you have to build your UI for it or use Kibana.

But one clear advantage for Pekko / Akka solution, if you started a Process with Version1 of a State Machine in Production but lets say after a Software Release Cycle of your enterprise, you have to continue with the Version2 of the Process as explained in this blog article, you can release this with Pekko / Akka Solution but not with Camunda at the moment. Camunda documentation mentionx that they will support this in the future but currently it is not there, when this feature implemented for Camunda 8, this point will be mute.

Apache Pulsar

Fortunately Apache Pulsar has a really modern installation System with Kubernetes Operator , first we have to install Operator components.

curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.25.0/install.sh | bash -s v0.25.0

Then the Pulsar Operator (at the moment there is Open Source version and Demo License dependant one, I will display here how to install / use Open Source version if that disappear once, you can get here a Demo License).

kubectl create -f https://raw.githubusercontent.com/streamnative/charts/master/examples/pulsar-operators/olm-subscription.yaml

Finally we want to install our Tenants, Namespaces, Topics via Operators, for this purpose we have to install the Apache Pulsar Resource Operator.

First we have add Stream Native Helm Repo to our Repository list.

helm repo add streamnative https://charts.streamnative.io

And the installation of the Operator.

helm -n operators install pulsar-resource-operator streamnative/pulsar-resources-operator --create-namespace

Now we have to first create our Apache Pulsar Cluster with the following configurations.

helm/pulsar/templates/pulsar.yaml

{{- if .Values.broker.enabled -}}
apiVersion: pulsar.streamnative.io/v1alpha1
kind: PulsarBroker
metadata:
  name: brokers
  namespace: pulsar
spec:
  image: "apachepulsar/pulsar-all@sha256:bcd07cb37874e9364d2660111771148327896a4a788de4d8750a7d78ac9b50a3"
  pod:
    resources:
      requests:
        cpu: 200m
        memory: 512Mi
    securityContext:
      runAsNonRoot: true
  replicas: 2
  zkServers: zookeepers-zk:2181
{{- end -}}

I am using a special image for my Notebook’s ARM Processor, if you don’t have one, you can ignore this.

helm/pulsar/templates/bookkeeper.yaml

{{- if .Values.bookie.enabled -}}
apiVersion: bookkeeper.streamnative.io/v1alpha1
kind: BookKeeperCluster
metadata:
  name: bookies
  namespace: pulsar
spec:
  image: "apachepulsar/pulsar-all@sha256:bcd07cb37874e9364d2660111771148327896a4a788de4d8750a7d78ac9b50a3"
  replicas: 3
  pod:
    resources:
      requests:
        cpu: 200m
        memory: 512Mi
    securityContext:
      runAsNonRoot: true
  config:
    custom:
      journalWriteData: "true"
      journalSyncData: "true"
  storage:
    journal:
      numDirsPerVolume: 1
      numVolumes: 1
      volumeClaimTemplate:
        accessModes:
          - ReadWriteOnce
        resources:
          requests:
            storage: 8Gi
    ledger:
      numDirsPerVolume: 1
      numVolumes: 1
      volumeClaimTemplate:
        accessModes:
          - ReadWriteOnce
        resources:
          requests:
            storage: 16Gi
    reclaimPolicy: Delete
  zkServers: zookeepers-zk:2181
{{- end -}}

helm/pulsar/templates/zookeeper.yaml

{{- if .Values.zookeeper.enabled -}}
apiVersion: zookeeper.streamnative.io/v1alpha1
kind: ZooKeeperCluster
metadata:
  name: zookeepers
  namespace: pulsar
spec:
  image: "apachepulsar/pulsar-all@sha256:bcd07cb37874e9364d2660111771148327896a4a788de4d8750a7d78ac9b50a3"
  pod:
    resources:
      requests:
        cpu: 50m
        memory: 256Mi
    securityContext:
      runAsNonRoot: true
  persistence:
    data:
      accessModes:
        - ReadWriteOnce
      resources:
        requests:
          storage: 8Gi
    dataLog:
      accessModes:
        - ReadWriteOnce
      resources:
        requests:
          storage: 2Gi
    reclaimPolicy: Delete
  replicas: 3
{{- end -}}

helm/pulsar/templates/proxy.yaml

{{- if .Values.proxy.enabled -}}
apiVersion: pulsar.streamnative.io/v1alpha1
kind: PulsarProxy
metadata:
  name: proxys
  namespace: pulsar
spec:
  image: "apachepulsar/pulsar-all@sha256:bcd07cb37874e9364d2660111771148327896a4a788de4d8750a7d78ac9b50a3"
  pod:
    resources:
      requests:
        cpu: 200m
        memory: 512Mi
    securityContext:
      runAsNonRoot: true
  brokerAddress: brokers-broker
  replicas: 2
  config:
    custom:
      authorizationEnabled: "false"
      PULSAR_PREFIX_webSocketServiceEnabled: "true"
    tls:
      enabled: false
  webSocketServiceEnabled: false
  dnsNames:
    []
  issuerRef:
    name: ""
{{- end -}}

We can install our Pulsar Cluster with the following command, helm/pulsar directory…

helm install pulsar . -n pulsar -f values-pulsar.yaml

Next we have to create a PulsarConnection to be able realise administrative tasks like create Tenants, Namespaces and PulsarTopics there are necessary for our Proof of Concept.

helm/pulsar/templates/connection.yaml

{{- if .Values.connection.enabled -}}
apiVersion: resource.streamnative.io/v1alpha1
kind: PulsarConnection
metadata:
  name: {{ .Release.Namespace }}-pulsar-connection
  namespace: {{ .Release.Namespace }}
spec:
  adminServiceURL: "http://proxys-proxy.pulsar.svc.cluster.local:8080"
{{- end -}}

and install it.

helm install pulsar . -n pulsar -f values-pulsar-connection.yaml

Finally you can see the Tenant, Namespace and Topics creations.

helm/pulsar/templates/tenant.yaml

{{- if .Values.resources.enabled -}}
apiVersion: resource.streamnative.io/v1alpha1
kind: PulsarTenant
metadata:
  name: {{ .Release.Namespace }}-saga-pulsar-tenant
  namespace: {{ .Release.Namespace }}
spec:
  name: sagatenant
  connectionRef:
    name: {{ .Release.Namespace }}-pulsar-connection
  adminRoles:
    - admin
    - ops
{{- end -}}

helm/pulsar/templates/namespace.yaml

{{- if .Values.resources.enabled -}}
apiVersion: resource.streamnative.io/v1alpha1
kind: PulsarNamespace
metadata:
  name: {{ .Release.Namespace }}-saga-pulsar-namespace
  namespace: {{ .Release.Namespace }}
spec:
  name: sagatenant/sagans
  connectionRef:
    name: {{ .Release.Namespace }}-pulsar-connection
{{- end -}}

And some topics…

helm/pulsar/templates/order-inbound-topic.yaml

{{- if .Values.resources.enabled -}}
apiVersion: resource.streamnative.io/v1alpha1
kind: PulsarTopic
metadata:
  name: {{ .Release.Namespace }}-pulsar-order-inbound-topic
  namespace: pulsar
spec:
  name: persistent://sagatenant/sagans/orderInboundTopic
  connectionRef:
    name: {{ .Release.Namespace }}-pulsar-connection
{{- end -}}

helm/pulsar/templates/order-outbound-topic.yaml

{{- if .Values.resources.enabled -}}
apiVersion: resource.streamnative.io/v1alpha1
kind: PulsarTopic
metadata:
  name: {{ .Release.Namespace }}-pulsar-order-outbound-topic
  namespace: pulsar
spec:
  name: persistent://sagatenant/sagans/orderOutboundTopic
  connectionRef:
    name: {{ .Release.Namespace }}-pulsar-connection
{{- end -}}

helm/pulsar/templates/customer-inbound-topic.yaml

{{- if .Values.resources.enabled -}}
apiVersion: resource.streamnative.io/v1alpha1
kind: PulsarTopic
metadata:
  name: {{ .Release.Namespace }}-pulsar-customer-inbound-topic
  namespace: pulsar
spec:
  name: persistent://sagatenant/sagans/customerInboundTopic
  connectionRef:
    name: {{ .Release.Namespace }}-pulsar-connection
{{- end -}}

helm/pulsar/templates/customer-outbound-topic.yaml

{{- if .Values.resources.enabled -}}
apiVersion: resource.streamnative.io/v1alpha1
kind: PulsarTopic
metadata:
  name: {{ .Release.Namespace }}-pulsar-customer-outbound-topic
  namespace: pulsar
spec:
  name: persistent://sagatenant/sagans/customerOutboundTopic
  connectionRef:
    name: {{ .Release.Namespace }}-pulsar-connection
{{- end -}}

and install those.

helm install pulsar . -n pulsar -f values-resources.yaml

Why to choose Apache Pulsar

I used Apache Kafka over the years in several projects, it is clearly at the moment an industry standard and it does really performs good but here there, there are also annoying things.

  • One of those, lack of real multi tenancy support for Apache Kafka, one aspect being Security other being Organisational. Apache Kafka is not a that platform you will install for a small project because of the extreme loads it can deal with it, it will be an overkill, the installation probably occurs in Organisation level and several Departments / Projects must be using under it, which poses challenges, let’s say, you need a ‘customerTopic’ for ProjectA and a ‘customerTopic’ for ProjectB, while Apache Kafka has no Tenancy mechanism to manage this scenario, you have to do ugly thing as creating ‘customerProjectATopic‘, ‘customerProjectBTopic‘.

With Apache Pulsar Tenancy / Namespace concepts, you have a much better possibilities organise things (it is not only Organisation you can also configure much fine granular Security).

  • Another one, that Apache Kafka only supporting its own protocol, if you need a message broker that has to support AMQP or JMS you have to install / maintain an additional Brokers. Following is an extensive list of Ecosystem which allow Apache Pulsar to be only Message Broker in your enterprise and you can get rid of other installations.
  • Apache Kafka protocol has some problems when its is working inside of Kubernetes Cluster and tries to communicate with Consumer / Producers outside of Cluster (or any network topology that NAT translation occurs), Apache Pulsar has PulsarProxy component that prevents same sort of the problems.
  • Consumer Subscription is another thing that Apache Pulsar does differently, you have so many options for different scenarios. For ex, Exclusive, Failover, Shared, Key_Shared, Sticky, you can read differences in the above link.
  • Apache Pulsar has ability to scale Message Processing and Message Persistence parts separately (because mainly Message Processing handled by different component and Message Persistance by Apache Bookeeper), which means, let’s say you have an Apache Pulsar Cluster dealing with AMQP traffic but you are not that interested with persistance, so you can let the Message Processing part scale but not the Persistance.
  • One final thing, which is not spoken too much, while Apache Kafka accepted is a super reliable message broker, there is small window with possibility of Data Loss because of how Kafka operates, which is explained here in detail. To quote “Kafka’s inability to journal data makes it vulnerable to data loss in the event of a machine failure or power outage. This is a significant weakness” people are not aware about this while how many times you get complete machine failure or power outage but the risk is there.

Now are these points are game changers, no, but if these are the little things that annoyed you in Apache Kafka and you wished for a better solution, these are small Quality of Life nuances that Apache Pulsar made my favourite Message Broker lately.

For extensive List of difference between Apache Kafka vs Apache Pulsar, you can read this blog entry.

Proof of Concept Installation

Including Orchestration Service, we have 6 Services to install, for this purpose we will again use a Helm Umbrella Chart.

helm/saga/Chart.yaml

apiVersion: v2
name: saga
description: A Helm chart for Kubernetes
type: application
version: 0.1.0
appVersion: "1.16.0"
dependencies:
  - name: customer-service
    version: 0.1.0
    condition: customer-service.enabled
  - name: inventory-service
    version: 0.1.0
    condition: inventory-service.enabled
  - name: invoice-service
    version: 0.1.0
    condition: invoice-service.enabled
  - name: orchestration
    version: 0.1.0
    condition: orchestration.enabled
  - name: order-service
    version: 0.1.0
    condition: order-service.enabled
  - name: product-options-service
    version: 0.1.0
    condition: production-options-service

Service Helm Charts quite basic only relevant configurations are Image and some environment configurations.

helm/saga/values.yaml

# Default values for orchestration.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.

replicaCount: 1

image:
  repository: k3d-saga.registry:5555/saga/orchestration
  pullPolicy: Always
  # Overrides the image tag whose default is the chart appVersion.
  tag: "0.0.1-SNAPSHOT"


service:
  type: ClusterIP
  port: 8080

resources:
  limits:
    cpu: 1
    memory: 512Mi
  requests:
    cpu: 0.1
    memory: 128Mi

jdkOptions: "-XshowSettings:vm -XX:+AlwaysActAsServerClassMachine -XX:TieredStopAtLevel=1 -noverify --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/sun.net=ALL-UNNAMED"

And we will install those to our Kubernetes Cluster…

helm install saga . -n saga --create-namespace

Implementation Details

To develop a Proof of Concept for above mentioned topics, we will create the following Spring Boot Services which you can see under following Github Repository.

  • Orchestration Service
  • Order Service
  • Customer Service
  • Product Options Service
  • Inventory Service
  • Invoice Service

Orchestration service will realise the Camunda State Machine, for this Camunda has some Spring Boot Starter project, ‘io.camunda:spring-zeebe-starter‘, which would make the integration with Camunda super easy. For Apache Pulsar integration, Spring Boot has again a Starter project, ‘org.springframework.boot:spring-boot-starter-pulsar‘ and Sping Pulsar a la Spring Kafka, which I will use in every Micro Service except Orchestration Service, for some reason ‘zeebe-starter‘ and ‘starter-pulsar’ don’t like each other, and Job Workers from Camunda stay stuck, if ‘starter-pulsar‘ is in the classpath, for that reason, I had to use in Orchestration Service native Apache Pulsar client but I will further investigate the problem and change the implementation when I find the cause.

In a dedicated chapter, I will also explain the installation of the Infrastructure components, Camunda and Apache Pulsar to Kubernetes environment.

I also used the principles of Hexagonal Architecture in the Micro Services, Port and Adapter classes might help you understand Proof of Concepts Services better.

So if we have to look in detail to the implementation.

The Workflow

Positive Scenario

First we have to have the necessary dependencies in Gradle ‘build.gradle‘.

dependencies {
        implementation project(':order-api'),
                project(':customer-api'),
                project(':product-options-api'),
                project(':inventory-api'),
                project(':invoice-api')
        implementation 'org.springframework.boot:spring-boot-starter-webflux'
        implementation 'org.springframework.boot:spring-boot-starter-actuator'
        implementation 'io.camunda:spring-zeebe-starter:8.4.0'
        api 'org.apache.pulsar:pulsar-client:3.0.0'
        implementation 'com.fasterxml.jackson.core:jackson-databind'

        implementation 'org.mapstruct:mapstruct:1.5.5.Final'
        compileOnly 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
        annotationProcessor 'org.mapstruct:mapstruct-processor:1.5.5.Final'
        annotationProcessor 'no.entur.mapstruct.spi:protobuf-spi-impl:1.44'
}

which will give us the possibility to configure the following Spring Boot starting point.

camunda-state-machine/src/main/java/org/salgar/camunda/OrchestrationApplication.java

package org.salgar.camunda;

import io.camunda.zeebe.spring.client.annotation.Deployment;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@Deployment(resources =  {"classpath*:*.bpmn", "classpath*:*.dmn"})
public class OrchestrationApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrchestrationApplication.class, args);
    }
}

Specially the ‘@Deployment‘ which will load all Model Files and Decision Models (and yes, we can load those from dependencies, so they can be developed from other teams and we can reuse those) under Orchestration Service and load them to Camunda (Camunda version those files, if you upload newer versions of those, they will be visible as V1, V2, V2,…,VN). In our case, those files would be under ‘src/main/resources‘.

Then we need to have an entry point to start our Workflow, for this purpose, I implemented a REST Service to interact with Proof of Concept.

orchestration/src/main/java/org/salgar/camunda/adapter/RestAdapter.java

@RestController
@RequiredArgsConstructor
@Slf4j
public class RestAdapter implements RestPort {
    private final ZeebeClient zeebeClient;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @PostMapping("/orchestration/trigger")
    public void trigger() {
        triggerWorkflow();
    }

    @Override
    @SneakyThrows
    public void triggerWorkflow(@RequestBody Order order) {
        HashMap<String, Object> variables = new HashMap<String, Object>();
        
        String json = objectMapper.writeValueAsString(order);
        variables.put("order", json);

        log.info("Triggering process");
        ProcessInstanceEvent event = zeebeClient
                .newCreateInstanceCommand()
                .bpmnProcessId("create-order")
                .latestVersion()
                .variables(variables)
                .send().
                join();

        log.info(
                "Started instance for processDefinitionKey={}, bpmnProcessId={}, version={} with processInstanceKey={}",
                event.getProcessDefinitionKey(),
                event.getBpmnProcessId(),
                event.getVersion(),
                event.getProcessInstanceKey());

    }

As you can see this will start the lastest version (if you have multiple versions of the workflow you can start any specific version) of the ‘create-order‘ workflow.

Spring Boot and Camunda need some configuration information to work with it, which you can ‘application.yml‘.

spring.application.name: "saga"

zeebe:
  client:
    security:
      plaintext: "true"
    broker:
      gateway-address: "camunda-platform-zeebe-gateway.camunda:26500"

As you can see, we have to configure the Zeebe Gateway gRpc Port to connect to Camunda, while we are building a Proof of Concept, we will not setup security but normally Camunda has a really good integration with OAuth2.

Now that we started our Workflow first thing we will do, is to send an Event to Order Service via Apache Pulsar.

Now Camunda has concept of Job Workers, which integrates Model Elements with Java code.

orchestration/src/main/java/org/salgar/camunda/core/workers/CreateOrderWorker.java

@Component
@RequiredArgsConstructor
@Slf4j
public class CreateOrderWorker {
    private final OrderOutboundPort orderOutboundPort;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @JobWorker(type = "createOrder")
    @SneakyThrows
    public Map<String, Object> createOrder(final ActivatedJob job) {
        log.info("Creating Order");
        String correlationId = UUID.randomUUID().toString();

        Map<String, Object> existingVariables = job.getVariablesAsMap();

        String json = (String) existingVariables.get("order");
        Order order = objectMapper.readValue(json, Order.class);

        Map<String, Object> variables = new HashMap<>();
        variables.put("correlationId", correlationId);

        orderOutboundPort.createOrder(correlationId, order);

        return variables;
    }
}

@JobWorker’ annotation tells Camunda with which model element this Java Code is bound.

And we send the Event with the help of the ‘OrderOutboundPort‘ and ‘OrderOutboundAdapter‘ to Apache Pulsar.

orchestration/src/main/java/org/salgar/camunda/adapter/OrderOutboundAdapter.java

@Component
@RequiredArgsConstructor
@Slf4j
public class OrderOutboundAdapter implements OrderOutboundPort {
    private final Producer<org.salgar.camunda.order.command.OrderCommand> pulsarProducer;
    private final OrchestrationOrder2Order orchestrationOrder2Order;

    @Override
    @SneakyThrows
    public void createOrder(String correlationId, Order order) {
        log.info("Creating Order: [{}]", order);
        log.info("Key: [{}]", correlationId);

        log.info("Mapping");
        org.salgar.camunda.order.model.protobuf.Order orderExternal =
                orchestrationOrder2Order.map(order);

        log.info("Creating Command");
        OrderCommand.Builder builder = OrderCommand.newBuilder();
        builder.setCommand(CommandConstants.CREATE_ORDER);
        builder.putPayload(
                PayloadVariableConstants.CREATE_ORDER_VARIABLE,
                Any.pack(orderExternal)
        );

        pulsarProducer
                .newMessage()
                .eventTime(LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli())
                .key(correlationId)
                .value(builder.build())
                .send();

        log.info("Create Order sent: [{}]", orderExternal);
    }
}

As I previously mentioned, I had to use native client implementation of Apache Pulsar in Orchestration Service instead Spring Pulsar, so we have to configure ‘org.apache.pulsar.client.api.Producer‘ in PulsarConfig.

orchestration/src/main/java/org/salgar/camunda/core/config/PulsarConfig.java

@Configuration
@Slf4j
public class PulsarConfig {
    @Bean
    @ConfigurationProperties("spring.pulsar")
    public PulsarProperties pulsarProperties() {
        return new PulsarProperties();
    }

    @Bean
    @ConfigurationProperties("spring.pulsar.producer.order")
    public PulsarProperties.Producer orderProducerProperties() {
        return new PulsarProperties.Producer();
    }

    @Bean
    @ConfigurationProperties("spring.pulsar.consumer.order")
    public PulsarProperties.Consumer orderConsumerProperties() {
        return new PulsarProperties.Consumer();
    }

    @Bean
    @SneakyThrows
    public Producer<org.salgar.camunda.order.command.OrderCommand> pulsarOrderProducer(
            PulsarClient pulsarClient,
            PulsarProperties.Producer orderProducerProperties) {
        return pulsarClient
                .newProducer(ProtobufSchema.of(org.salgar.camunda.order.command.OrderCommand.class))

                .topic(orderProducerProperties.getTopicName())
                .create();
    }

    @Bean
    @SneakyThrows
    public Consumer<OrderResponse> pulsarOrderConsumer(
            PulsarClient pulsarClient,
            PulsarProperties.Consumer orderConsumerProperties,
            OrderInboundPort orderInboundPort) {

        final MessageListener<OrderResponse> messageListener = (consumer, msg) -> {
            try {
                log.info("We received the Message from Pulsar: " + msg.getValue());
                orderInboundPort.processOrderResponse(msg);

                consumer.acknowledge(msg);

            } catch(PulsarClientException pce) {
                log.error(pce.getMessage(), pce);
            }
        };
    }
}

As you can see, we are using Command Pattern to communicate with Order Service, ‘org.salgar.camunda.order.command.OrderCommand‘ and we are registering the Protobuf Schema to Apache Pulsar, also which Apache Pulsar Topic that we will send the Event. In the Orchestration service, we could not use the Spring-Pulsar but we could use Configuration Property classes get the connection information and the topics for Apache Pulsar, so we don’t have the code these classes ourselves.

orchestration/src/main/resources/application.yml

spring:
  pulsar:
    client:
      service-url: "pulsar://proxys-proxy.pulsar:6650"
    producer:
      order:
        topic-name: "persistent://sagatenant/sagans/orderInboundTopic"

    consumer:
      order:
        topics: "persistent://sagatenant/sagans/orderOutboundTopic"
        subscription:
          name: "orchestration-order"

Order Service:

At the Order Service side, we will use the Spring Pulsar to receive the Event.

First to be able use the Pulsar at Order Service, we have to have ‘org.springframework.boot:spring-boot-starter-pulsar‘ in classpath and the following Spring Boot entry point.

order-service/src/main/java/org/salgar/camunda/order/service/OrderServiceApplication.java

@EnablePulsar
@SpringBootApplication
public class OrderServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderServiceApplication.class, args);
    }
}

And PulsarListener from Spring Pulsar.

order-service/src/main/java/org/salgar/camunda/order/service/adapter/OrderInboundAdapter.java

@Component
@Slf4j
public class OrderInboundAdapter implements OrderInboundPort {
    @Override
    @PulsarListener(
            subscriptionName = "order-inbound-subscription",
            topics = "${spring.pulsar.consumer.order.topics}"
    )
    public void processOrderEvent(Message<OrderCommand> command) {
        log.info(
                "We received the command: [{}] with payload: [{}]",
                command.getValue().getCommand(),
                command.getValue().getPayloadMap());

        if(CommandConstants.CREATE_ORDER.equals(command.getValue().getCommand())) {
            log.info("Creating Order");
            //Execute Business Logic
        } else if(CommandConstants.COMPLETE_ORDER.equals(command.getValue().getCommand())) {
            log.info("Completing Order");
            //Execute Business Logic
        } else {
            log.info("Unknown command: [{}]", command.getValue().getCommand());
        }
    }
}

As you can see we evaluate the Command for the Order Creation and execute our business logic. Now we have to return a response to Orchestration Service, while this is a Proof of Concept and I want to simulate as many as possible scenarios, I need a mean to steer Spring Boot Services, to be enable that, I will trigger responses over REST Services depending what I am trying to achieve. For ex, next code snippet show successful completion of Order Creation.

order-service/src/main/java/org/salgar/camunda/order/service/adapter/RestAdapter.java

@RestController
@RequiredArgsConstructor
@Slf4j
public class RestAdapter implements RestPort {
    private final OrderOutboundPort orderOutboundPort;

    @PostMapping("/order/response")
    public void prepare(ServerWebExchange exchange) {
        log.info("Preparing response");
        String correlationId = exchange.getRequest().getQueryParams().getFirst("correlationId");
        if (correlationId==null) {correlationId = "";}

        String response = exchange.getRequest().getQueryParams().getFirst("response");
        if (response==null) {correlationId = "";}

        prepareOrderServiceResponse(correlationId, response);
    }

    @Override
    public void prepareOrderServiceResponse(String correlationId, String response) {
        String orderResponse = null;
        if("pending".equals(response)) {
            orderResponse = ResponseConstants.ORDER_PENDING;
            prepareOrderPending(correlationId, orderResponse);
        } else if("failed".equals(response)) {
            orderResponse = ResponseConstants.ORDER_FAILED;
            prepareOrderFailed(correlationId, orderResponse);
        } else if("complete".equals(response)) {
            orderResponse = ResponseConstants.ORDER_COMPLETE;
            prepareOrderComplete(correlationId, orderResponse);
        } else {
            log.info("Unknown response: [{}]", response);
        }
    }

    private void prepareOrderPending(String correlationId, String orderResponse) {
        String orderId = UUID.randomUUID().toString();
        Order.Builder orderBuilder = Order.newBuilder();
        orderBuilder.setOrderId(orderId);
        OrderResponse.Builder builder = OrderResponse.newBuilder();
        builder
                .setResponse(orderResponse)
                .putPayload(
                        ORDER_ID_VARIABLE,
                        Any.pack(orderBuilder.build())
                );

        orderOutboundPort.deliverOrderResponse(correlationId, builder.build());
    }

    private void prepareOrderFailed(String correlationId, String orderResponse) {
        ...
    }

    private void prepareOrderComplete(String correlationId, String orderResponse) {
        ...
    }
}

For ‘ORDER_PENDING‘ state, we prepare the OrderResponse object, please pay attention that we are also sending the correlationId back to Orchestration Service, in this case ‘orderId’ to be able to aggregate Request and Response in the Orchestration Service, which is a critical element for ‘Event Based Gateway‘, ‘Receive Task‘ and ‘Message Intermediate Catch Event‘.

And we will send with the Apache Pulsar the response to Order Service.

order-service/src/main/java/org/salgar/camunda/order/service/adapter/OrderOutboundAdapter.java

@Component
@RequiredArgsConstructor
@Slf4j
public class OrderOutboundAdapter implements OrderOutboundPort {
    private final PulsarTemplate<OrderResponse> pulsarTemplate;
    private final PulsarProperties.Producer orderOutboundProperties;

    @Override
    @SneakyThrows
    public void deliverOrderResponse(
            String correlationId,
            OrderResponse orderResponse) {

        log.info("Sending OrderResponse: [{}]", orderResponse);
        pulsarTemplate
                .newMessage(orderResponse)
                .withMessageCustomizer(mc -> {
                    mc.key(correlationId);
                })
                .withTopic(orderOutboundProperties.getTopicName())
                .send();
    }
}

To send the Event the Order Service, we will use Spring Pulsar’s PulsarTemplate (a la KafkaTemplate), we will use the correlationId as Message Key and send to Topic configured in ‘application.yml‘.

spring.application.name: "order-service"

spring:
  pulsar:
    client:
      service-url: "pulsar://proxys-proxy.pulsar:6650"
    defaults:
      type-mappings:
        - message-type: org.salgar.camunda.order.command.OrderCommand
          schema-info:
            schema-type: PROTOBUF
        - message-type: org.salgar.camunda.order.response.OrderResponse
          schema-info:
            schema-type: PROTOBUF
    consumer:
      order:
        topics: "persistent://sagatenant/sagans/orderInboundTopic"
    producer:
      order:
        topic-name: "persistent://sagatenant/sagans/orderOutboundTopic"

Also please pay attention to, ‘message-type‘ element, this is where we tell Pulsar that Payload is serialised with Profobuf and it should use Protobuf for schema validations.

Orchestration Service:

Now it is time that Order Service returned Response Event and Camunda State Machine progressed, naturally the message should be received in a Pulsar Consumer, let’s check how Pulsar native Consumer works.

orchestration/src/main/java/org/salgar/camunda/core/config/PulsarConfig.java

@Bean
    @SneakyThrows
    public Consumer<OrderResponse> pulsarOrderConsumer(
            PulsarClient pulsarClient,
            PulsarProperties.Consumer orderConsumerProperties,
            OrderInboundPort orderInboundPort) {

        final MessageListener<OrderResponse> messageListener = (consumer, msg) -> {
            try {
                log.info("We received the Message from Pulsar: " + msg.getValue());
                orderInboundPort.processOrderResponse(msg);

                consumer.acknowledge(msg);

            } catch(PulsarClientException pce) {
                log.error(pce.getMessage(), pce);
            }
        };

        return pulsarClient
                .newConsumer(ProtobufSchema.of(org.salgar.camunda.order.response.OrderResponse.class))
                .topic(orderConsumerProperties.getTopics().stream().findFirst().isPresent()?orderConsumerProperties.getTopics().stream().findFirst().get():"")
                .subscriptionName(orderConsumerProperties.getSubscription().getName())
                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                .subscriptionType(SubscriptionType.Exclusive)
                .messageListener(messageListener)
                .subscribe();
    }

We have to configure a MessageListener to tell Apache Pulsar which Port interface to call. Lots of things may seem similar to the people with Apache Kafka experience, only major difference being how Schema configuration realised for Protobuf and subscription type ‘Subscription.Exclusive‘, which is something specific to Apache Pulsar, which I will explain in Apache Pulsar chapter in detail.

Let’s see now what happens ‘OrderInboundAdapter‘.

orchestration/src/main/java/org/salgar/camunda/adapter/OrderInboundAdapter.java

@Component
@Slf4j
public class OrderInboundAdapter extends AbstractInboundAdapter implements OrderInboundPort {
    public OrderInboundAdapter(ZeebeClient zeebeClient) {
        super(zeebeClient);
    }

    @Override
    @SneakyThrows
    public void processOrderResponse(Message<OrderResponse> message) {
        if(ResponseConstants.ORDER_PENDING.equals(message.getValue().getResponse())) {
            Map<String, Object> variables = new HashMap<>();
            Order order =  message
                    .getValue()
                    .getPayloadMap()
                    .get(PayloadVariableConstants.ORDER_ID_VARIABLE).unpack(Order.class);
            variables.put(
                    PayloadVariableConstants.ORDER_ID_VARIABLE,
                    order.getOrderId());

            processZeebeMessage(
                    message.getKey(),
                    ZeebeMessageConstants.ORDER_PENDING_MESSAGE,
                    variables);
        } else if(ResponseConstants.ORDER_FAILED.equals(message.getValue().getResponse())) {
            ...
        } else if(ResponseConstants.ORDER_COMPLETE.equals(message.getValue().getResponse())) {
            ...
        } else {
            log.info("Unknown command: [{}]", message.getValue().getResponse());
        }
    }
}

And there is a special mechanism to Aggregate ‘Send’, Receive Tasks‘ and ‘Message Intermediate Catch Event‘ in Camunda, those correlate over ‘correlationId‘.

orchestration/src/main/java/org/salgar/camunda/core/adapter/AbstractInboundAdapter.java

@RequiredArgsConstructor
public abstract class AbstractInboundAdapter {
    private final ZeebeClient zeebeClient;

    protected void processZeebeMessage(
            String correlationId,
            String zeebeMessage,
            Map<String, Object> variables) {
        zeebeClient
                .newPublishMessageCommand()
                .messageName(zeebeMessage)
                .correlationKey(correlationId)
                .variables(variables)
                .send()
                .join();
    }
}

Which will send the Message ‘OrderPendingMessage‘ for ‘correlationId‘.

Now we completed first steps in our Camunda State Machine, I have to point out a really nice feature in Camunda, you can follow over Camunda’s Operate component exactly where are in the execution of your Workflow.

You can even see the values of the variables in your State Machine.

The blue arrows in the Diagram, show the path that the Workflow is walking, with the arrival of the Order Pending Event, State Machine continued with the Parallel Gateway and the Camunda Job Workers send the Events in parallel to the Customer Service, Product Options Service, Inventory Service, Invoice Service.

Customer Service:

orchestration/src/main/java/org/salgar/camunda/core/workers/CreateCustomerWorker.java

@Component
@RequiredArgsConstructor
@Slf4j
public class CreateCustomerWorker {
    private final CustomerOutboundPort customerOutboundPort;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @JobWorker(type = "createCustomer")
    @SneakyThrows
    public void createCustomer(final ActivatedJob job) {
        log.info("Creating Customer");

        Map<String, Object> existingVariables = job.getVariablesAsMap();

        String json = (String) existingVariables.get("order");
        Order order = objectMapper.readValue(json, Order.class);

        String orderId = (String) existingVariables.get(PayloadVariableConstants.ORDER_ID_VARIABLE);

        Customer customer = order.getCustomer();

        customerOutboundPort.createCustomer(orderId, customer);
    }
}

As you can see, the Camunda Job Worker, is converting its domain model to domain model of the Customer Service and send the Event via Apache Pulsar over CustomerOutboundAdapter.

orchestration/src/main/java/org/salgar/camunda/adapter/CustomerOutboundAdapter.java

@Component
@RequiredArgsConstructor
@Slf4j
public class CustomerOutboundAdapter implements CustomerOutboundPort {
    private final Producer<org.salgar.camunda.customer.command.CustomerCommand> pulsarCustomerProducer;
    private final OrchestrationCustomer2Customer orchestrationCustomer2Customer;
    @Override
    @SneakyThrows
    public void createCustomer(String correlationId, Customer customer) {
        log.info("Creating Customer: [{}]", customer);
        log.info("Key: [{}]", correlationId);

        log.info("Mapping");
        org.salgar.camunda.customer.model.protobuf.Customer customerExternal =
                orchestrationCustomer2Customer.map(customer);

        log.info("Creating Command");
        CustomerCommand.Builder builder = CustomerCommand.newBuilder();
        builder.setCommand(CommandConstants.CREATE_CUSTOMER);
        builder.putPayload(
                PayloadVariableConstants.CREATE_CUSTOMER_VARIABLE,
                Any.pack(customerExternal));


        pulsarCustomerProducer
            .newMessage()
           .eventTime(LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli())
            .key(correlationId)
            .value(builder.build())
            .send();

        log.info("Create Command sent: [{}]", customerExternal);
    }
    ...
}

And you can see from the logs that the Event is received from Customer Service.

2024-04-01T07:13:03.114Z  INFO 1 --- [customer-service] [ntainer#0-0-C-1] o.s.c.c.s.a.CustomerInboundAdapter       : We received the command: [CreateCustomer] with key:[5192d754-d73e-498a-9502-b0d41762b2f2] payload: [{createCustomerVariable=type_url: "type.googleapis.com/org.salgar.camunda.customer.model.protobuf.Customer"
value: "\022\003Doe\032\004John\"0\n\rheaven street\022\00317a\032\vheaven city\"\00598765*\006heaven"
}]
2024-04-01T07:13:03.307Z  INFO 1 --- [customer-service] [ntainer#0-0-C-1] o.s.c.c.s.a.CustomerInboundAdapter       : Creating Customer
2024-04-01T07:13:07.882Z  INFO 1 --- [customer-service] [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://sagatenant/sagans/customerInboundTopic] [customer-inbound-subscription] [4ce30] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

Product Options Service:

orchestration/src/main/java/org/salgar/camunda/core/workers/CalculateProductOptionsWorker.java

@Component
@RequiredArgsConstructor
@Slf4j
public class CalculateProductOptionsWorker {
    private final ProductOptionsOutboundPort productOptionsOutboundPort;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @JobWorker(type = "calculateProductOptions")
    @SneakyThrows
    public void calculateProductOptions(final ActivatedJob job) {
        log.info("Calculate Product Options");

        Map<String, Object> existingVariables = job.getVariablesAsMap();

        String json = (String) existingVariables.get("order");
        Order order = objectMapper.readValue(json, Order.class);

        String orderId = (String) existingVariables.get(PayloadVariableConstants.ORDER_ID_VARIABLE);

        productOptionsOutboundPort.calculateOptions(orderId, order.getOrderItems());
    }
}

The Camunda Job Worker, is converting its domain model to domain model of the Payment Order Service and send the Event via Apache Pulsar via PaymentOrderOutboundAdapter.

orchestration/src/main/java/org/salgar/camunda/adapter/ProductOptionsOutboundAdapter.java

@Component
@RequiredArgsConstructor
@Slf4j
public class ProductOptionsOutboundAdapter implements ProductOptionsOutboundPort {
    private final Producer<ProductOptionsCommand> pulsarProductOptionsProducer;
    private final OrchestrationProduct2Product orchestrationProduct2Product;

    @Override
    @SneakyThrows
    public void calculateOptions(String correlationId, List<OrderItem> orderItems) {
        log.info("Calculate Product Options: [{}]", orderItems);
        log.info("Key: [{}]", correlationId);

        log.info("Mapping");
        OrderItems.Builder orderItemsBuilder = OrderItems.newBuilder();
        for (OrderItem orderItem : orderItems) {
            orderItemsBuilder.addProducts(orchestrationProduct2Product.map(orderItem));
        }
        OrderItems orderItemsExternal = orderItemsBuilder.build();

        log.info("Creating Command");
        ProductOptionsCommand.Builder builder = ProductOptionsCommand.newBuilder();
        builder.setCommand(CommandConstants.CALCULATE_PRODUCT_OPTIONS);
        builder.putPayload(
                PayloadVariableConstants.PRODUCTS_TO_CALCULATE,
                Any.pack(orderItemsExternal));


        pulsarProductOptionsProducer
                .newMessage()
                .eventTime(LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli())
                .key(correlationId)
                .value(builder.build())
                .send();

        log.info("Create Command sent: [{}]", orderItemsExternal);
    }
}

Tthe logs show that the Event is received from Product Options Service.

2024-04-01T07:13:03.208Z  INFO 1 --- [product-options-service] [ntainer#0-0-C-1] o.s.c.p.o.a.ProductOptionsInboundAdapter : We received the command: [CalculateProductOptions] with payload: [{products_to_calculate=type_url: "type.googleapis.com/org.salgar.camunda.product.options.model.protobuf.OrderItems"
value: "\nA\b\001\022=\n$fa08adca-172b-4918-9129-437d8cb41d77\022\fBest product\031\315\314\314\314\314\334\203@"
}]
2024-04-01T07:13:03.359Z  INFO 1 --- [product-options-service] [ntainer#0-0-C-1] o.s.c.p.o.a.ProductOptionsInboundAdapter : Calculating Options and caching those!
2024-04-01T07:13:07.076Z  INFO 1 --- [product-options-service] [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://sagatenant/sagans/productOptionsInboundTopic] [product-options-inbound-subscription] [11311] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

Inventory Service:

orchestration/src/main/java/org/salgar/camunda/core/workers/ReserveProductInInventoryWorker.java

@Component
@RequiredArgsConstructor
@Slf4j
public class ReserveProductInInventoryWorker {
    private final InventoryOutboundPort inventoryOutboundPort;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @JobWorker(type = "reserveProductInInventory")
    @SneakyThrows
    public void reserveProductInInventory(final ActivatedJob job) {
        log.info("Reserving Products in Inventory");

        Map<String, Object> existingVariables = job.getVariablesAsMap();
        String json = (String) existingVariables.get("order");
        Order order = objectMapper.readValue(json, Order.class);

        String orderId = (String) existingVariables.get(PayloadVariableConstants.ORDER_ID_VARIABLE);

        inventoryOutboundPort.reserveProductInInventory(orderId, order.getOrderItems());
    }
}

The Camunda Job Worker, is converting its domain model to domain model of the Inventory Service and send the Event via Apache Pulsar via InventoryOutboundAdapter.

orchestration/src/main/java/org/salgar/camunda/adapter/InventoryOutboundAdapter.java

    @Override
    @SneakyThrows
    public void reserveProductInInventory(String correlationId, List<OrderItem> orderItems) {
        log.info("Reserving Product in Inventory: [{}]", orderItems);
        log.info("Key: [{}]", correlationId);

        log.info("Mapping");
        OrderItems.Builder orderItemsBuilder = OrderItems.newBuilder();
        for (OrderItem orderItem : orderItems) {
            orderItemsBuilder.addOrderItem(orchestrationInventory2Inventory.map(orderItem));
        }
        OrderItems orderItemsExternal = orderItemsBuilder.build();

        log.info("Creating Command");
        InventoryCommand.Builder builder = InventoryCommand.newBuilder();
        builder.setCommand(InventoryCommandConstants.RESERVE_PRODUCT);
        builder.putPayload(
                PayloadVariableConstants.ORDER_ITEMS,
                Any.pack(orderItemsExternal)
        );

        pulsarInventoryProducer
                .newMessage()
                .eventTime(LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli())
                .key(correlationId)
                .value(builder.build())
                .send();

        log.info("Reserve Product Command sent: [{}]", orderItemsExternal);
    }

And you can see from the logs that the Event is received from Inventory Service.

2024-04-01T07:13:03.209Z  INFO 1 --- [inventory-service] [ntainer#0-0-C-1] o.s.c.i.s.a.InventoryInboundAdapter      : We received the command: [ReserveProduct] with key: [5192d754-d73e-498a-9502-b0d41762b2f2] and payload: [{order_items=type_url: "type.googleapis.com/org.salgar.camunda.product.options.model.protobuf.OrderItems"
value: "\nA\b\001\022=\n$fa08adca-172b-4918-9129-437d8cb41d77\022\fBest product\031\315\314\314\314\314\334\203@"
}]
2024-04-01T07:13:03.335Z  INFO 1 --- [inventory-service] [ntainer#0-0-C-1] o.s.c.i.s.a.InventoryInboundAdapter      : Reserving Product
2024-04-01T07:14:00.647Z  INFO 1 --- [inventory-service] [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://sagatenant/sagans/inventoryInboundTopic] [inventory-inbound-subscription] [84347] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

Invoice Service:

orchestration/src/main/java/org/salgar/camunda/core/workers/CreateInvoiceWorker.java

@Component
@RequiredArgsConstructor
@Slf4j
public class CreateInvoiceWorker {
    private final InvoiceOutboundPort invoiceOutboundPort;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @JobWorker(type = "createInvoice")
    @SneakyThrows
    public void createInvoice(final ActivatedJob job) {
        log.info("Creating Invoice");

        Map<String, Object> existingVariables = job.getVariablesAsMap();
        String json = (String) existingVariables.get("order");
        Order order = objectMapper.readValue(json, Order.class);

        String orderId = (String) existingVariables.get(PayloadVariableConstants.ORDER_ID_VARIABLE);

        invoiceOutboundPort.createInvoice(
                orderId,
                order.getCustomer(),
                order.getBankInformation(),
                order.getOrderItems());
    }
}

The Camunda Job Worker, is converting its domain model to domain model of the Invoice Service and send the Event via Apache Pulsar via InvoiceOutboundAdapter.

orchestration/src/main/java/org/salgar/camunda/adapter/InvoiceOutboundAdapter.java

    @Override
    @SneakyThrows
    public void createInvoice(
            String correlationId,
            Customer customer,
            BankInformation bankInformation,
            List<OrderItem> orderItems) {
        log.info("Creating Invoice: [{}]", orderItems);
        log.info("Key: [{}]", correlationId);

        log.info("Mapping");
        org.salgar.camunda.invoice.model.protobuf.Customer customerExternal =
                orchestrationInvoice2Invoice.map(customer);

        org.salgar.camunda.invoice.model.protobuf.BankInformation bankInformationExternal =
                orchestrationInvoice2Invoice.map(bankInformation);

        OrderItems.Builder orderItemsBuilder = OrderItems.newBuilder();
        for (OrderItem orderItem : orderItems) {
            orderItemsBuilder.addOrderItem(orchestrationInvoice2Invoice.map(orderItem));
        }
        OrderItems orderItemsExternal = orderItemsBuilder.build();

        log.info("Creating Command");
        InvoiceCommand.Builder builder = InvoiceCommand.newBuilder();
        builder.setCommand(InvoiceCommandConstants.CREATE_INVOICE);
        builder.putPayload(
                PayloadVariableConstants.CUSTOMER,
                Any.pack(customerExternal)
        );
        builder.putPayload(
                PayloadVariableConstants.BANK_INFORMATION,
                Any.pack(bankInformationExternal)
        );
        builder.putPayload(
                PayloadVariableConstants.ORDER_ITEMS,
                Any.pack(orderItemsExternal)
        );

        InvoiceCommand invoiceCommand = builder.build();

        pulsarInvoiceProducer
                .newMessage()
                .eventTime(LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli())
                .key(correlationId)
                .value(invoiceCommand)
                .send();

        log.info("Create Invoice Command sent: [{}]", invoiceCommand);
    }

The logs shows that the Event is received from Invoice Service.

2024-04-01T07:13:02.997Z  INFO 1 --- [invoice-service] [ntainer#0-0-C-1] o.s.c.i.s.adapter.InvoiceInboundAdapter  : We received the command: [CreateInvoice] with payload: [{Customer=type_url: "type.googleapis.com/org.salgar.camunda.invoice.model.protobuf.Customer"
value: "\022\003Doe\032\004John\"0\n\rheaven street\022\00317a\032\vheaven city\"\00598765*\006heaven"
, BankInformation=type_url: "type.googleapis.com/org.salgar.camunda.invoice.model.protobuf.BankInformation"
value: "\n\022PT1234987612349876\022\t987654321"
, OrderItems=type_url: "type.googleapis.com/org.salgar.camunda.invoice.model.protobuf.OrderItems"
value: "\nA\b\001\022=\n$fa08adca-172b-4918-9129-437d8cb41d77\022\fBest product\031\315\314\314\314\314\334\203@"
}]
2024-04-01T07:13:03.142Z  INFO 1 --- [invoice-service] [ntainer#0-0-C-1] o.s.c.i.s.adapter.InvoiceInboundAdapter  : Creating Invoice
2024-04-01T07:13:08.613Z  INFO 1 --- [invoice-service] [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://sagatenant/sagans/invoiceInboundTopic] [invoice-inbound-subscription] [78880] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

Now that the State Machine progressed every Service in parallel their Business Cases, those Services will send their Response Events to continue with the Workflow. To prove that the State Machine is not depending the return order of the Service Response Events, first Inventory Service will report a Response Event.

While I want to control from outside how State Machine proceed for this, I will use REST Endpoint to simulate successful Business Case completion, I already prepared for REST calls for this purpose.

Inventory Service:

POST http://localhost:60003/inventory/response?correlationId=5192d754-d73e-498a-9502-b0d41762b2f2&response=reserve

The REST call trigger the completion of the Business Case and Inventory Service send over Apache Pulsar the Response Event.

invoice-service/src/main/java/org/salgar/camunda/invoice/service/adapter/RestAdapter.java

@RestController
@RequiredArgsConstructor
@Slf4j
public class RestAdapter implements RestPort {
    private final InventoryOutboundAdapter inventoryOutboundAdapter;
    private final InventoryReservationFacade inventoryReservationFacade;

    @PostMapping("/inventory/response")
    public void prepare(ServerWebExchange exchange) {
        log.info("Preparing response");
        String correlationId = exchange.getRequest().getQueryParams().getFirst("correlationId");
        if (correlationId==null) {correlationId = "";}

        String response = exchange.getRequest().getQueryParams().getFirst("response");
        if (response==null) {correlationId = "";}

        processInventoryResponse(correlationId, response);
    }

    @Override
    public void processInventoryResponse(String correlationId, String response) {
        String customerResponse = null;
        if("reserve".equals(response)) {
            customerResponse = ResponseConstants.PRODUCT_RESERVED;
            prepareInventoryCreated(correlationId, customerResponse);
        } else if("fail".equals(response)) {
            customerResponse = ResponseConstants.PRODUCT_RESERVED;
            prepareInventoryCreatedFailed(correlationId, customerResponse);
        } else if("revert".equals(response)) {
            prepareInventoryReservationCanceled(correlationId);
        } else {
            log.info("Unknown response: [{}]", response);
        }
    }

    private void prepareInventoryCreated(String correlationId, String inventoryResponse) {
        InventoryResponse.Builder builder = InventoryResponse.newBuilder();
        builder.setResponse(inventoryResponse);
        builder.putPayload(
                PayloadVariableConstants.PRODUCT_RESERVED,
                Any.pack(ProductReserved.newBuilder().setProductReserved(true).build())
        );

        inventoryOutboundAdapter.deliverInventoryResponse(correlationId, builder.build());
    }
}

invoice-service/src/main/java/org/salgar/camunda/invoice/service/adapter/InvoiceOutboundAdapter.java

@Component
@RequiredArgsConstructor
@Slf4j
public class InventoryOutboundAdapter implements InventoryOutboundPort {
    private final PulsarTemplate<InventoryResponse> pulsarTemplate;
    private final PulsarProperties.Producer inventoryOutboundProperties;

    @Override
    @SneakyThrows
    public void deliverInventoryResponse(String correlationId, InventoryResponse inventoryResponse) {
        log.info("Sending InventoryResponse: [{}]", inventoryResponse);
        pulsarTemplate
                .newMessage(inventoryResponse)
                .withMessageCustomizer(mc -> {
                    mc.key(correlationId);
                })
                .withTopic(inventoryOutboundProperties.getTopicName())
                .send();
    }
}

And the Orchestration Service, proceed in the State Machine with Receive Task.

Orchestration Service:

And in the InvoiceInboundAdapter in Orchestration Service.

orchestration/src/main/java/org/salgar/camunda/adapter/InvoiceInboundAdapter.java

@Component
@Slf4j
public class InventoryInboundAdapter extends AbstractInboundAdapter implements InventoryInboundPort {
    public InventoryInboundAdapter(ZeebeClient zeebeClient) {
        super(zeebeClient);
    }

    @Override
    @SneakyThrows
    public void processInventoryResponse(Message<InventoryResponse> message) {
        if(ResponseConstants.PRODUCT_RESERVED.equals(message.getValue().getResponse())) {
            ProductReserved productReserved = message
                    .getValue()
                    .getPayloadMap()
                    .get(PayloadVariableConstants.PRODUCT_RESERVED).unpack(ProductReserved.class);

            Map<String, Object> variables = new HashMap<>();
            variables.put(
                    PayloadVariableConstants.PRODUCT_RESERVED,
                    productReserved.getProductReserved()
            );
            processZeebeMessage(
                    message.getKey(),
                    ZeebeMessageConstants.PRODUCT_RESERVED_MESSAGE,
                    variables
            );
        } else if(ResponseConstants.PRODUCT_RESERVATION_CANCELED.equals(message.getValue().getResponse())) {
           ...
        } else {
            log.info("Unknown command: [{}]", message.getValue().getResponse());
        }
    }
}

The response from the other 3 Services uses the same principles, so I will not display those in details, when those 3 returns their Response. our State Machine execution will look like the following.

The magic that is happening behind the scenes, every service report their success or failure with variable and that is kept in Camunda Variables, which we can see in Operate.

These variables would be checked in the Exclusive Gateway.

While with these variables the condition evaluate to ‘true’ State Machine now triggered Order Completion.

This will trigger the following Job Worker.

orchestration/src/main/java/org/salgar/camunda/core/workers/CompleteOrderWorker.java

@Component
@RequiredArgsConstructor
@Slf4j
public class CompleteOrderWorker {
    private final OrderOutboundPort orderOutboundPort;

    @JobWorker(type = "completeOrder")
    public void completeOrder(final ActivatedJob job) {
        log.info("Completing Order");

        Map<String, Object> existingVariables = job.getVariablesAsMap();
        String orderId = (String) existingVariables.get(PayloadVariableConstants.ORDER_ID_VARIABLE);

        orderOutboundPort.completeOrder(orderId);
    }
}

This will send the Event to Order Service.

orchestration/src/main/java/org/salgar/camunda/adapter/OrderOutboundAdapter.java

    @Override
    @SneakyThrows
    public void completeOrder(String correlationId) {
        log.info("Completing Order");
        log.info("Key: [{}]", correlationId);

        log.info("Creating Command");
        OrderCommand.Builder builder = OrderCommand.newBuilder();
        builder.setCommand(CommandConstants.COMPLETE_ORDER);

        OrderCommand orderCommand = builder.build();

        pulsarProducer
                .newMessage()
                .eventTime(LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli())
                .key(correlationId)
                .value(orderCommand)
                .send();

        log.info("Complete Order sent: [{}]", orderCommand);
    }

When Order Service realise the final step of the Business Case, our State Machine will reach the End State, again we will trigger from outside the completion of the Task.

Order Service:

POST http://localhost:60006/order/response?correlationId=5192d754-d73e-498a-9502-b0d41762b2f2&response=complete

And Voila our State Machine reaches the Final State.

Above REST call triggers Apache Pulsar Producer to send the Response Event the Orchestration Service.

order-service/src/main/java/org/salgar/camunda/order/service/adapter/RestAdapter.java

@RestController
@RequiredArgsConstructor
@Slf4j
public class RestAdapter implements RestPort {
    private final OrderOutboundPort orderOutboundPort;

    @PostMapping("/order/response")
    public void prepare(ServerWebExchange exchange) {
        log.info("Preparing response");
        String correlationId = exchange.getRequest().getQueryParams().getFirst("correlationId");
        if (correlationId==null) {correlationId = "";}

        String response = exchange.getRequest().getQueryParams().getFirst("response");
        if (response==null) {correlationId = "";}

        prepareOrderServiceResponse(correlationId, response);
    }

    @Override
    public void prepareOrderServiceResponse(String correlationId, String response) {
        String orderResponse = null;
        if("pending".equals(response)) {
            ...
        } else if("failed".equals(response)) {
            ...
        } else if("complete".equals(response)) {
            orderResponse = ResponseConstants.ORDER_COMPLETE;
            prepareOrderComplete(correlationId, orderResponse);
        } else {
            log.info("Unknown response: [{}]", response);
        }
    }
    
    ...

    private void prepareOrderComplete(String correlationId, String orderResponse) {
        OrderResponse.Builder builder = OrderResponse.newBuilder();
        builder.setResponse(orderResponse);

        orderOutboundPort.deliverOrderResponse(correlationId, builder.build());
    }
}

which send the over OrderOutboundAdapter the Response Event via Apache Pulsar.

order-service/src/main/java/org/salgar/camunda/order/service/adapter/OrderOutboundAdapter.java

@Component
@RequiredArgsConstructor
@Slf4j
public class OrderOutboundAdapter implements OrderOutboundPort {
    private final PulsarTemplate<OrderResponse> pulsarTemplate;
    private final PulsarProperties.Producer orderOutboundProperties;

    @Override
    @SneakyThrows
    public void deliverOrderResponse(
            String correlationId,
            OrderResponse orderResponse) {

        log.info("Sending OrderResponse: [{}]", orderResponse);
        pulsarTemplate
                .newMessage(orderResponse)
                .withMessageCustomizer(mc -> {
                    mc.key(correlationId);
                })
                .withTopic(orderOutboundProperties.getTopicName())
                .send();
    }
}

Orchestration Service:

And Orchestration Service OrderInboundAdapter will receive the Response Event from Apache Pulsar and set Camunda State Machine to correct State.

orchestration/src/main/java/org/salgar/camunda/adapter/OrderInboundAdapter.java

@Component
@Slf4j
public class OrderInboundAdapter extends AbstractInboundAdapter implements OrderInboundPort {
    public OrderInboundAdapter(ZeebeClient zeebeClient) {
        super(zeebeClient);
    }

    @Override
    @SneakyThrows
    public void processOrderResponse(Message<OrderResponse> message) {
        if(ResponseConstants.ORDER_PENDING.equals(message.getValue().getResponse())) {
            ...
        } else if(ResponseConstants.ORDER_FAILED.equals(message.getValue().getResponse())) {
            ...
        } else if(ResponseConstants.ORDER_COMPLETE.equals(message.getValue().getResponse())) {
            Map<String, Object> variables = new HashMap<>();
            processZeebeMessage(
                    message.getKey(),
                    ZeebeMessageConstants.ORDER_COMPLETE_MESSAGE,
                    variables);
        } else {
            log.info("Unknown command: [{}]", message.getValue().getResponse());
        }
    }
}

This completes the positive Scenario for our Use Case.

Transaction Rollback

Now comes the part, probably is the reason why you are reading this blog. Now imagine, our Order is in Pending State, we send Command Events to Customer Service, Product Options Service, Inventory Service and Orchestration Service. Product Options, Product Reservation in Inventory and Invoice Creation report positive results but Customer Creation failed.

Without a Customer, we can’t complete an Order. In a classical Monolith this would be a Transaction Rollback but we can’t do this in our Micro Service Design, we have to take compensatory actions, while we already reserved a Product in Inventory and Created an Invoice, we have to take revert these actions.

The Exclusive Gateway in Camunda, while ‘yes‘ branch failed, will go to the Default Flow and ‘no’ branch and execute reverting tasks for compensation, so I will Camunda State Machine to stage that simulate the Customer Creation failed and continue the explanations from there.

Customer Service:

Customer Service failed with the following REST call.

POST http://localhost:60001/customer/response?correlationId=f5243fec-d647-4702-8ee3-e8d6331491f4&response=fail

As you can see, the Variable representing the Customer Creation success reports ‘false‘.

Revert Task will try to revert, for example Product Reservation in Inventory to make the things interesting, I created another additional task to notify any customer previously inquired item, while it is blocked to be notified from our system that the Product is again available.

Naturally, everything would be perfected if the automated reverting process works but we can’t take the chance that the Product stay reserved if the automated process fails. As a precaution against such case, I had used Camunda’s User Task, which will notify System Admins, that a Product is stuck at Reserved State and they have to manually unreserve it. Camunda has a Component, TaskList, to display these Tasks to System Admins, but I will show you in a further blog, how can we write a Web Application with a popular SPA Framework like Angular, React, Vue, Svelte using Task API to manages these tasks.

Now let’s look how the reverting the Product Reservation process, by going to rollback branch, Camunda triggered the Send Task.

orchestration/src/main/java/org/salgar/camunda/core/workers/RevertProductReservationWorker.java

@Component
@RequiredArgsConstructor
@Slf4j
public class RevertProductReservationWorker {
    private final InventoryOutboundPort inventoryOutboundPort;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @JobWorker(type = "revertProductReservation")
    @SneakyThrows
    public void revertProductReservation(final ActivatedJob job) {
        log.info("Revering Product Reservation");

        Map<String, Object> existingVariables = job.getVariablesAsMap();
        String orderId = (String) existingVariables.get(PayloadVariableConstants.ORDER_ID_VARIABLE);

        String json = (String) existingVariables.get("order");
        Order order = objectMapper.readValue(json, Order.class);

        inventoryOutboundPort.revertProductReservation(orderId, order.getOrderItems());
    }
}

RevertProductReservationWorker‘ will send the Command Event to Inventory Service via ‘InventoryOutboundAdapter‘ and Apache Pulsar.

orchestration/src/main/java/org/salgar/camunda/adapter/InventoryOutboundAdapter.java

    @Override
    @SneakyThrows
    public void revertProductReservation(String correlationId, List<OrderItem> orderItems) {
        log.info("Reverting Product Reservation in Inventory: [{}]", orderItems);
        log.info("Key: [{}]", correlationId);

        log.info("Mapping");
        OrderItems.Builder orderItemsBuilder = OrderItems.newBuilder();
        for (OrderItem orderItem : orderItems) {
            orderItemsBuilder.addOrderItem(orchestrationInventory2Inventory.map(orderItem));
        }
        OrderItems orderItemsExternal = orderItemsBuilder.build();

        log.info("Creating Command");
        InventoryCommand.Builder builder = InventoryCommand.newBuilder();
        builder.setCommand(InventoryCommandConstants.REVERT_PRODUCT_RESERVATION);
        builder.putPayload(
                PayloadVariableConstants.ORDER_ITEMS,
                Any.pack(orderItemsExternal)
        );

        pulsarInventoryProducer
                .newMessage()
                .eventTime(LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli())
                .key(correlationId)
                .value(builder.build())
                .send();

        log.info("Reverting Product Reservation Command sent: [{}]", orderItemsExternal);
    }

Inventory Service:

which will be received by Inventory Service PulsarListener.

inventory-service/src/main/java/org/salgar/camunda/inventory/service/adapter/InventoryInboundAdapter.java

@Component
@RequiredArgsConstructor
@Slf4j
public class InventoryInboundAdapter implements InventoryInboundPort {
    @Override
    @PulsarListener(
            subscriptionName = "inventory-inbound-subscription",
            topics = "${spring.pulsar.consumer.inventory.topics}"
    )
    public void processInventoryCommand(Message<InventoryCommand> command) {
        try {
            processInventoryCommandInternal(command);
        } catch (Throwable t) {
            log.error(t.getMessage(), t);
        }
    }

    public void processInventoryCommandInternal(Message<InventoryCommand> command) throws InvalidProtocolBufferException {
        log.info(
                "We received the command: [{}] with key: [{}] and payload: [{}]",
                command.getValue().getCommand(),
                command.getKey(),
                command.getValue().getPayloadMap());

        if(InventoryCommandConstants.RESERVE_PRODUCT.equals(command.getValue().getCommand())) {
            ...
        } else if(InventoryCommandConstants.REVERT_PRODUCT_RESERVATION.equals(command.getValue().getCommand())) {
            log.info("Reverting Product Reservation");
            String orderId = command.getKey();
           inventoryReservationFacade.revertProductReservation(orderId);
        } else if(InventoryCommandConstants.NOTIFY_CUSTOMER_FOR_INTERESTED_PRODUCTS.equals(command.getValue().getCommand())) {
            ...
        } else {
            log.info("Unknown command: [{}]", command.getValue().getCommand());
        }
    }

I will steer over REST Adapter that Revert Process completed successfully.

POST http://localhost:60003/inventory/response?correlationId=f5243fec-d647-4702-8ee3-e8d6331491f4&response=revert

This will prepare the Response Event, setting the process variables, signalling the success of the Reversion Process and the Event over Apache Pulsar.

inventory-service/src/main/java/org/salgar/camunda/inventory/service/adapter/InventoryOutboundAdapter.java

@RestController
@RequiredArgsConstructor
@Slf4j
public class RestAdapter implements RestPort {
    private final InventoryOutboundAdapter inventoryOutboundAdapter;
    private final InventoryReservationFacade inventoryReservationFacade;

    @PostMapping("/inventory/response")
    public void prepare(ServerWebExchange exchange) {
        log.info("Preparing response");
        String correlationId = exchange.getRequest().getQueryParams().getFirst("correlationId");
        if (correlationId==null) {correlationId = "";}

        String response = exchange.getRequest().getQueryParams().getFirst("response");
        if (response==null) {correlationId = "";}

        processInventoryResponse(correlationId, response);
    }

    @Override
    public void processInventoryResponse(String correlationId, String response) {
        String customerResponse = null;
        if("reserve".equals(response)) {
            ...
        } else if("fail".equals(response)) {
            ...
        } else if("revert".equals(response)) {
            prepareInventoryReservationCanceled(correlationId);
        } else {
            log.info("Unknown response: [{}]", response);
        }
    }

    private void prepareInventoryReservationCanceled(String correlationId) {
        inventoryReservationFacade.revertProductReservation(correlationId);
    }
}

inventory-service/src/main/java/org/salgar/camunda/inventory/service/core/facades/InventoryReservationFacade.java

@Component
@RequiredArgsConstructor
public class InventoryReservationFacade {
    private final InventoryMemory inventoryMemory;
    private final InventoryOutboundPort inventoryOutboundPort;

    ..

    public void revertProductReservation(String orderId) {
        inventoryMemory.revertReservation(orderId);

        InventoryResponse.Builder builder = InventoryResponse.newBuilder();
        builder.setResponse(PRODUCT_RESERVATION_CANCELED);
        builder.putPayload(
                PayloadVariableConstants.PRODUCT_RESERVATION_REVERT_SUCCESSFUL,
                Any.pack(
                        ProductReservationRevertSuccessful
                                .newBuilder()
                                .setProductReservationRevertSuccessful(true)
                                .build()
                )
        );

        inventoryOutboundPort.deliverInventoryResponse(orderId, builder.build());
    }
}

inventory-service/src/main/java/org/salgar/camunda/inventory/service/adapter/InventoryOutboundAdapter.java

@Component
@RequiredArgsConstructor
@Slf4j
public class InventoryOutboundAdapter implements InventoryOutboundPort {
    private final PulsarTemplate<InventoryResponse> pulsarTemplate;
    private final PulsarProperties.Producer inventoryOutboundProperties;

    @Override
    @SneakyThrows
    public void deliverInventoryResponse(String correlationId, InventoryResponse inventoryResponse) {
        log.info("Sending InventoryResponse: [{}]", inventoryResponse);
        pulsarTemplate
                .newMessage(inventoryResponse)
                .withMessageCustomizer(mc -> {
                    mc.key(correlationId);
                })
                .withTopic(inventoryOutboundProperties.getTopicName())
                .send();
    }
}

This progress our State Machine, while Reversion process is successful, Camunda is also send the Notification to the Customer about the availability of the Product, which we can see in the logs of the Inventory Service.

2024-04-01T08:22:40.152Z  INFO 1 --- [inventory-service] [ntainer#0-0-C-1] o.s.c.i.s.a.InventoryInboundAdapter      : We received the command: [notifyCustomerForProduct] with key: [f5243fec-d647-4702-8ee3-e8d6331491f4] and payload: [{}]
2024-04-01T08:22:40.153Z  INFO 1 --- [inventory-service] [ntainer#0-0-C-1] o.s.c.i.s.a.InventoryInboundAdapter      : Notified Interested Customers for Product
2024-04-01T08:23:32.106Z  INFO 1 --- [inventory-service] [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://sagatenant/sagans/inventoryInboundTopic] [inventory-inbound-subscription] [7639a] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

As we can observe, the State Machine reached the successfully Final State.

As final point to observe, I like to show you how the Camunda User Task works for this purpose, I will let Customer Reversion process to fail, so we can see how we can interact with Camunda TaskList.

As you can see, Customer Reversion is failed and Camunda created an User Task, now let’s see how this is presented in Camunda TaskList (one small point here, it is possible to build a simple HTML Form on Camunda TaskList to interact with this User Task to interact with the Workflow but I didn’t follow this in the solution but it is possible).

As you can you see, we get User Task in TaskList with the all Process Variable so we can react to the problem and take correcting actions.

Finally when the Invoice Cancellation successfully completes, State Machine will reach the Final State.

Optimisations

Now let’s think about what can we do better? I tried to keep the State Machine simple for first version, so you can understand the concepts better but now let’s think about the scenario that Customer Creation failed and we want to revert Product Reservation as quickly as possible but while all four activities running parallel and let’s say Invoice Creation take 2 hours, we don’t want to wait 2 hours to revert the product reservation. So we can change our State Machine to something like following.

Our intentions are good but that does not look nice isn’t, there is lots of redundancy in above Model and hard to understand. Fortunately Camunda has component that are really similar to Nested State Machines / Sub State Machines in UML State Machine concepts, ‘Call Activites‘ by extracting ‘Rollback‘ process to another Model to be used as Sub Process, we can extremely optimise our design and get rid of the clutter you see above. (There is additional branch in Github this optimisations are implemented)

orchestration/src/main/resources/saga_optimised.bpmn

orchestration/src/main/resources/saga_reversion_process.bpmn

We have divided our model to two parts. One to deal with our Business Case and one for the Rollback Process and we are using ‘Call Activity’ component of the Camunda to re-use the Rollback Process.

There are two critical configuration for ‘Call Activity’ component.

We have to define the name of the process we like to call, in this case ‘rollback-process‘ and while this Process can be called from multiple paths (for Customer Creation, Product Options, Product Reservation, Invoice Creation) and we have ‘Receive Tasks‘ in Rollback Process, while we can have the Rollback Process started from multiple points, we have to make names Messages on Receive Tasks unique.

How the State Machine progresses is not so different then the initial design, lets observe the positive case.

The difference is really at rollback scenarios, lets look what happens Customer creation fails.

With this design we want to start Rollback Process earlier, so we have to additional checks.

We check explicitly Customer creation was successful or not.

The Rollback Process can be started from several branches in our use case, we have to make sure that it should not be executed multiple times, for this purpose we are checkin ‘rollbackRunnning‘ variable is set or not.

Not lets see how things are working in the new Sub Process.

which progressed really nicely.

I guess this concludes our Proof of Concept showing Transaction Management for Micro Services with the help of the Saga Pattern, Camunda and Apache Pulsar.

Conclusion

I hope I could convince Camunda is viable solution for Transaction Management over multiple Micro Services with the help of the Saga Pattern, specially if you are coming from Monoliths and trying to implement in your new design the concepts like Bounded Context and Event Sourcing.

I also strongly advice you, if you are planning to change your Enterprise Message Broker, to give a chance to Apache Pulsar as a message broker because I strongly believe it is a better version of Apache Kafka. I know I will not convince too many people who are already working with Apache Kafka to migrate to Apache Pulsar but if you are planning to have a new one for your Enterprise, please consider it.