Virtual Airport Demo: Connecting DynamoDB to Lightstreamer

Last updated: August 2, 2024 | Originally published: January 30, 2023

We’re excited to announce the release of our newest demo. And no, this time it’s not a stock-list demo 😉

We switched things up a bit, and the new demo simulates a very simple departures board with a few rows showing real-time flight information to passengers of a hypothetical airport. The data are simulated in the back-end and retrieved from an Amazon DynamoDB data source. In short, we showcase our cool “virtual airport” where you can pretend to take a flight without ever leaving your home!

The live demo is available at https://demos.lightstreamer.com/DynamoDBDemo/

The primary purpose of this demo is to propose an example in which the Data Adapter is powered by an external source and does not internally simulate the data to be sent, like most of our other demos. In this case, it is a NoSQL database service in a cloud environment; to be precise, it is DynamoDB.

DynamoDB is a fully managed NoSQL database service offered by Amazon Web Services (AWS).

Unlike traditional relational database management systems, DynamoDB does not use a fixed schema but supports key-value and document data structures. It offers automatic horizontal scaling; this means that the system can automatically handle an increase in workload and data size without any interruption to the user. DynamoDB also offers low latency for read and write operations, making it suitable for applications that require a high level of performance. In addition, DynamoDB supports geographic replication to ensure high availability of data in case of system failures or connectivity issues.

DynamoDB is used in many applications such as games, social media, e-commerce websites, IoT, and many other business applications. You can access DynamoDB through the AWS web interface, SDKs for various programming languages, or through the programming API.

Overall, DynamoDB is a very flexible and scalable solution for high-performance data management and these features make it a data source perfectly compatible with Lightstreamer’s Data Adapter. For example, using the AWS SDK for Java to interact with the DynamoDB and retrieve the data. The adapter can then use this data to update the Lightstreamer server, which will, in turn, push the updates to the connected clients.

The Demo Architecture

The overall architecture of the demo includes the following:

  • A web page using the Lightstreamer Web Client SDK to connect to the Lightstreamer server and subscribe to the flight information items.
  • A Lightstreamer server deployed on an AWS EC2 instance alongside the custom metadata and data adapters. 
  • The adapters use the Java In-Process Adapter SDK and are developed with Kotlin language; in particular the Data Adapter retrieves data from the DynamoDB data source through the AWS SDK for Java.
  • Two DynamoDB tables (DemoCurrentTimeData and DemoDeparturesData).
  • A simulator, also built with Kotlin language, pushing data into DynamoDB tables.

Client Details

The page uses the Web Client SDK API for Lightstreamer to handle communication with the Lightstreamer Server. The page has a simple user interface that is used to display in tabular form the real-time data received from the Lightstreamer Server.

The demo includes the following client-side functionalities:

  • A Subscription containing only 1 item and 1 field, subscribed to in MERGE mode, and updates a DynaGrid displaying the simulated current time (oh yes, in this demo, the time runs a little faster than reality).
  • A Subscription with a single item subscribed to in COMMAND mode, which updates a DynaGrid displaying the current list and status of the next departing flights based on the simulated time.

Updates related to the COMMAND mode item come in the form of add, delete, or update messages and allow respectively adding a new row, removing an existing one, or updating some fields of an existing row of the displayed table. The changes are automatically applied by the Lightstreamer library to the graphical widget, which displays and keeps the grid updated.

Below is the JavaScript code that subscribes to the item.

let dynaGrid = new Ls.DynaGrid("stocks",true);

dynaGrid.setSort("key");
dynaGrid.setNodeTypes(["div","span","img","a"]);
dynaGrid.setAutoCleanBehavior(true, false);
dynaGrid.addListener({
onVisualUpdate: function(_key,info) {
    if (info == null) {
    //cleaning
    return;
    }

    const cold = "#dedede";
    info.setAttribute("lightgreen", cold, "backgroundColor");
}
});

dynaGrid.setSort("key");

let subMonitor = new Ls.Subscription("COMMAND","DepartureMonitor",fieldsList);
subMonitor.addListener(dynaGrid);
subMonitor.setRequestedSnapshot("yes");

lsClient.subscribe(subMonitor);

Adapters Details

The source code of the adapters is basically divided into two packages:

  • server, which implements the Lightstreamer in-process adapters based on the Java In-Process Adapter API . in particular:
    • DemoDepartureProvider.kt implements the DataProvider interface for the simulated flight information;
    • DemoDataProvider.kt implements the DataProvider interface for the current time of the simulation;
    • DemoMetadataProvider.kt implements the Metadata Adapter for the demo; it is a basic extension of the MetadataProviderAdapter that is the default implementation available with the Java In-Process Adapter library.
  • demo, which implements the operations with DynamoDB. In particular, the DynamoData.kt class is responsible for reading information from the DynamoDB table and injecting them into the Lightstreamer server.

In DynamoDB, a stream is a flow of changes to items in a DynamoDB table. When you enable a stream on a table, DynamoDB captures information about every change to data items in the table. This information is written to a stream in the order that the changes were made.

A shard is a unit of data storage within a stream. Each shard contains multiple items and a sequence number that uniquely identifies each item within the shard. The sequence number is assigned by DynamoDB and is unique across all shards in the stream.

The Data Adapter retrieves all available shards for the streams of the two tables involved in the demo, reads the flows, and converts the information about DynamoDB table changes into updates for Lightstreamer COMMAND mode.

Below is a snippet of the Kotlin code of the DynamoData.kt class with the function that reads a shard flow and generates updates to be passed to the Data Adapter.

/**
 * Events received from the [shardId]
 */
private fun shardUpdateFlow(
    streamArn: String,
    shardId: String,
    tableName: String,
    key: String,
    attributes: List<String>
): Flow<Pair<String, Map<String, String>?>> =
    channelFlow {
        println("shardUpdateFlow $tableName $shardId")
        var currentShardIterator = dynamoDbStreamClient.getShardIterator { builder ->
            builder
                .streamArn(streamArn)
                .shardId(shardId)
                .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        }.await().shardIterator()

        var updateJob: Job? = null
        var streamInSync = false
        while (currentShardIterator != null) {
            val getRecordsResponse =
                dynamoDbStreamClient.getRecords { it.shardIterator(currentShardIterator) }.await()
            val records = getRecordsResponse.records()
            // println("getRecords: received ${records.size} records")
            if (records.isNotEmpty()) {
                streamInSync =
                    records.maxOf { it.dynamodb().approximateCreationDateTime() } >= Instant.now().minusSeconds(3)

                val previousStreamInSync = streamInSync
                val previousUpdateJob = updateJob
                updateJob = launch {
                    records
                        .map { it.dynamodb() }
                        .fold(emptyMap<String, StreamRecord>()) { map, streamRecord ->
                            // get the last event value for each key
                            map + mapOf(streamRecord.keys().getValue(key).unwrapToString() to streamRecord)
                        }.forEach { (row, streamRecord) ->
                            launch {
                                val value = if (previousStreamInSync && streamRecord.hasNewImage()) {
                                    // send the event value
                                    streamRecord.newImage()?.unwrapValuesToString()
                                } else {
                                    // send the actual value
                                    dynamoDbClient.getItem { builder ->
                                        builder
                                            .tableName(tableName)
                                            .key(mapOf(key to streamRecord.keys().getValue(key)))
                                            .attributesToGet(attributes)
                                    }.await()
                                        .takeIf { it.hasItem() }
                                        ?.item()?.unwrapValuesToString()
                                }

                                previousUpdateJob?.join()
                                send(row to value?.takeIf { it.isNotEmpty() })
                            }
                        }
                }

                if (!streamInSync) {
                    println("Receiving old data from $tableName stream " +
                            records.maxOf { it.dynamodb().approximateCreationDateTime() }
                    )
                }
            }

            currentShardIterator = getRecordsResponse.nextShardIterator()
 
        }
        println("Shard $tableName $shardId finished")
    }

The code below receives the changes to the tables from the DynamoDB streams, as seen in the code snippet above, and push them into the Lightstreamer server to be dispatched to the web clients in the form of Add, Delete, or Update messages as requested by the COMMAND subscribe mode

   private fun sendUpdates(itemName: String, newData: List<Map<String, String>>, oldData: List<Map<String, String>>?) {
        val oldKeys: Set<String> = oldData?.map { it.getValue("flight") }?.toSet().orEmpty()
        val newKeys: Set<String> = newData.map { it.getValue("flight") }.toSet()
        val keysToRemove = oldKeys - newKeys
        if (keysToRemove.size == oldKeys.size) {
            if (oldData != null) listener.clearSnapshot(itemName)
        } else {
            // remove old rows
            for (key in keysToRemove) {
                listener.update(itemName, mapOf("key" to key, "command" to "DELETE"), oldData == null)
            }
        }

        for (newLine in newData) {
            val flight = newLine.getValue("flight")
            val oldLine = oldData?.find { it["flight"] == flight }
            if (newLine != oldLine) {
                val command = if (oldLine == null) "ADD" else "UPDATE"
                val update = newLine + mapOf("key" to flight, "command" to command)
                listener.update(itemName, update, oldData == null)
            }
        }
        if (oldData == null) listener.endOfSnapshot(itemName)
    }

The data reception from the streams is triggered by the subscribe function invoked in the Data Adapter when a client subscribes for the first time to the items, specifically in the class DemoDepartureProvider.kt we have this code

   override fun subscribe(itemName: String, needsIterator: Boolean) {
        subscriptionJobs[itemName] = launch {
            var oldData: List<Map<String, String>>? = null
            departureStateFlow.collect { newData ->
                sendUpdates(itemName, newData, oldData)
                oldData = newData
            }
        }
    }

The Simulator

The data showed by the demo are randomly generated by a simulator and written into two Dynamo DB tables; the class implementing the simulator is DemoPublisher.kt.

The simulator generates flight data randomly, creating new flights, updating data, and then deleting them once they depart. To be precise, the record in the DemoDeparturesData table contains the following fields: “row”, “destination”, “departure”, “flight”, “terminal”, “status”, and “airline”. The “row” field serves as the key for the record in the table and ranges from 1..9.

The table containing the current time of the demo, DemoCurrentTimeData, contains only two fields: “key” and “value” and it is updated as if minutes were seconds.

To Recap