Infinite random feeders for Gatling

In a previous article, I talked about using pseudo-random values in performance tests. In this article, I will present how infinite, deterministic Gatling feeders can be created with Scala lazy streams.

Recap on feeders and random generators

Gatling’s documentation mentions that a feeder is essentially an Iterator[Map[String, T]]. In the rest of this article, I will use the more concrete Iterator[Map[String, Any]] type, which still allows all possible values. The main idea is that any list of maps can be used in Gatling.

In the article about random values, I pointed to the java.util.Random constructor that takes a seed parameter. The main idea here is that, with a specific seed, it is possible to create an instance of Random that generates predictable, pseudo-random values.

Infinite streams

The typical example for a feeder is given by the code below. It has the expected type, but does not use a predictable random source.

val feeder = Iterator.continually(
  Map("email" -> (Random.alphanumeric.take(20).mkString + "@foo.com"))
)

Another possibility is to use the scala.collection.immutable.Stream class. This code can be re-written as follows:

val feeder = Stream.continually(
  Map("email" -> (Random.alphanumeric.take(20).mkString + "@foo.com"))
).iterator

So far, nothing crazy. That being said, we can do better. We can, for instance, generate a unique integer identifier and associate it to an e-mail address. This will result in a series of e-mail addresses and user ids, starting at 1.

val feeder = Stream.from(1).map(id => Map(
  "id" -> id,
  "email" -> (Random.alphanumeric.take(20).mkString + "@foo.com"))
).iterator

It works. But these examples are way too simple. Let’s now move on to more realistic examples.

An IoT data model

CREATE TABLE iot.sensors_data (
  sensor_id text,
  metric_type text,
  measurement_date date,
  measurement_time time,
  value bigint,
  PRIMARY KEY ((sensor_id, metric_type, measurement_date), measurement_time)
)

Let’s consider an IoT use case in which sensors send their readings every second to a server. Each sensor has a unique identifier and can send several metrics (A, B et C). It sends one new value per second per metric. In Cassandra, that could be stored in the table listed above.

To simulate one measurement in Gatling, we could use a Map that would contain the data of one row, as follows:

Map(
  "sensor_id" -> "abcdef",
  "metric_type" -> "A",
  "measurement_date" -> "2020-01-01",
  "measurement_time" -> "00:00:01",
  "value" -> 42
)

How would it be possible to create an infinite stream of readings for 10 different sensors?

Step one: generate 3 readings

Note: in the rest of this article, this function will be used to inspect produced values.

def debug(stream: Stream[Map[String, Any]]) =
  stream.map(_.values.mkString(",")).foreach(println)

We can create a function that generates the three metrics A, B and C for a given sensor and at a given time.

val metricTypes = Stream("A", "B", "C")

val isoDate = DateTimeFormatter.ISO_DATE
val isoTime = DateTimeFormatter.ISO_TIME

def newSensorMetrics(sensorId: String,
                     measurementDateTime: LocalDateTime,
                     random: Random): Stream[Map[String, Any]] =
  metricTypes.map(metricType => Map(
    "sensor_id" -> sensorId,
    "metric_type" -> metricType,
    "measurement_date" -> isoDate.format(measurementDateTime),
    "measurement_time" -> isoTime.format(measurementDateTime),
    "value" -> random.nextInt
  ))

For a given sensor and a given date, this function produces as many elements as metricTypes. We can easily verify the data it generates:

scala> debug(newSensorMetrics("abcdef", LocalDateTime.of(2019, 12, 29, 18, 51, 59), new Random()))
18:51:59,2019-12-29,abcdef,A,994309426
18:51:59,2019-12-29,abcdef,B,94083573
18:51:59,2019-12-29,abcdef,C,1420415181

Step two: Flatmap zat shit

how to draw an owl

We can now slightly improve the design. Let’s define a function that will generate the readings for 5 different sensors. It means that it should produce 15 different maps. Its definition is relatively simple, as it leverages the previous function.

val sensorIds = Stream("room-1", "room-2", "room-3", "room-4", "basement")

def newMetricsAt(measurementDateTime: LocalDateTime,
                 random: Random): Stream[Map[String, Any]] =
  sensorIds.flatMap(sensorId => newSensorMetrics(sensorId, measurementDateTime, random))

Let’s check its output. We can see that, as expected, it creates the values for metrics A, B and C for each of the 5 sensors.

scala> debug(newMetricsAt(LocalDateTime.of(2019, 12, 29, 18, 51, 59), new Random()))
18:51:59,2019-12-29,room-1,A,-225317240
18:51:59,2019-12-29,room-1,B,-1407073398
18:51:59,2019-12-29,room-1,C,1353350072
18:51:59,2019-12-29,room-2,A,1391219556
18:51:59,2019-12-29,room-2,B,-1186475735
18:51:59,2019-12-29,room-2,C,-1528496533
18:51:59,2019-12-29,room-3,A,820521014
18:51:59,2019-12-29,room-3,B,155042503
18:51:59,2019-12-29,room-3,C,4863426
18:51:59,2019-12-29,room-4,A,1366254389
18:51:59,2019-12-29,room-4,B,1687308010
18:51:59,2019-12-29,room-4,C,554733033
18:51:59,2019-12-29,basement,A,1903022513
18:51:59,2019-12-29,basement,B,-1904888787
18:51:59,2019-12-29,basement,C,-939754900

Finally, we only need to call that new function for every second, and call it a day. This time, we are going to reuse the idea presented earlier and leverage the Stream.from(0) method. That stream will contain an integer value that will be the number of seconds elapsed since 2020-01-01. It is an arbitrary reference point, really. Any other starting point would work as well.

val baseEpochSecond = LocalDateTime.of(2020, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC)

def newMetricsFeeder(random: Random): Stream[Map[String, Any]] = {
  Stream.from(0)
    .map(x => LocalDateTime.ofEpochSecond(baseEpochSecond + x, 0, ZoneOffset.UTC))
    .flatMap(measurementDateTime => newMetricsAt(measurementDateTime, random))
}

Let’s verify the output of that new function for the first 30 lines. We can see that the first 15 lines correspond to readings at midnight. And after these, we can see that the next 15 lines correspond to readings at 00:00:01.

scala> debug(newMetricsFeeder(new Random()).take(30))
00:00:00,2020-01-01,room-1,A,-853286656
00:00:00,2020-01-01,room-1,B,919370010
00:00:00,2020-01-01,room-1,C,178811566
00:00:00,2020-01-01,room-2,A,1508459530
00:00:00,2020-01-01,room-2,B,1676289209
00:00:00,2020-01-01,room-2,C,-741724980
00:00:00,2020-01-01,room-3,A,139680150
00:00:00,2020-01-01,room-3,B,653904810
00:00:00,2020-01-01,room-3,C,-72138997
00:00:00,2020-01-01,room-4,A,71036715
00:00:00,2020-01-01,room-4,B,1664416044
00:00:00,2020-01-01,room-4,C,1597468231
00:00:00,2020-01-01,basement,A,-1453698797
00:00:00,2020-01-01,basement,B,-1731220294
00:00:00,2020-01-01,basement,C,-1653835053
00:00:01,2020-01-01,room-1,A,635621957
00:00:01,2020-01-01,room-1,B,-299580046
00:00:01,2020-01-01,room-1,C,1538466835
00:00:01,2020-01-01,room-2,A,-2092402454
00:00:01,2020-01-01,room-2,B,1597417387
00:00:01,2020-01-01,room-2,C,833558919
00:00:01,2020-01-01,room-3,A,1260013834
00:00:01,2020-01-01,room-3,B,1891732764
00:00:01,2020-01-01,room-3,C,1860549092
00:00:01,2020-01-01,room-4,A,2051048928
00:00:01,2020-01-01,room-4,B,556152339
00:00:01,2020-01-01,room-4,C,-337711114
00:00:01,2020-01-01,basement,A,-792602173
00:00:01,2020-01-01,basement,B,-1781777940
00:00:01,2020-01-01,basement,C,117395424
Let’s emphasize that the take(30) function was used in the listing above. Without it, we would have had an infinite loop, because we would have tried to print the content of an infinite stream.

Some code golf

In this article, I divided the work in three distinct functions. This resulted in three simple, if not simplistic, functions. But it is possible to define the same feeder in a single block of code, leveraging Scala’s for-comprehensions.

The code below produces the same result. It is not the shortest possible version though, to ensure better readability.

The maps are generated in the same order than before. We can see that the for loop is executed in a breadth-first fashion. I.e. in order or priority, the metrics for a given sensor and a given time are generated together. Then, the readings for all sensors at a given time are generated together. And finally, all the readings starting from January 1st are generated.

val baseEpochSecond = LocalDateTime.of(2020, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC)

val dateStream = Stream.from(0).map(x => LocalDateTime.ofEpochSecond(x + baseEpochSecond, 0, ZoneOffset.UTC))
val sensorStream = Stream("room-1", "room-2", "room-3", "room-4", "basement")
val metricStream = Stream("A", "B", "C")

val random = new Random(1)

val feeder:Iterator[Map[String,Any]] = (for {
  measurementDateTime <- dateStream
  sensorId <- sensorStream
  metricType <- metricStream
} yield Map(
  "sensor_id" -> sensorId,
  "metric_type" -> metricType,
  "measurement_date" -> DateTimeFormatter.ISO_DATE.format(measurementDateTime),
  "measurement_time" -> DateTimeFormatter.ISO_TIME.format(measurementDateTime),
  "value" -> random.nextInt
)).iterator

Conclusion

In the previous article, I emphasized how important using pseudo-random values was, for operational ease. This technique yields even more advantages when it is used with the Stream class.

Gatling’s design allows us to generate data in an infinite, random and yet predictable fashion. Regardless of the complexity of the data to generate, and the type of traversal (depth-first / breadth-first), there is a way to write it as an infinite feeder.