July 22, 2015

Redis mass import

Story goes like this, I’m following Peter Lawrey on twitter, if you don’t know him that’s because either you’re better at Java than him and thus you didn’t had to read anything related to Java on StackOverflow in the past decade, or you’re just not programming, in which case you won’t care about what I’m going to write. OK so not even one paragraph and I’m already digressing, let’s get back on track.

While browsing his blog Vanilla Java I saw an article about performance comparison between redis and OpenHFT’s SharedHashMap on May 26, 2015. I was just surprised about its final result concerning redis writes which for him culminates at 10K updates/sec using one thread, and 100K updates/sec using multithreading while OpenHFT being 300x faster.

I’m not questioning super performance of OpenHFT, I’m just sceptic about poor redis performance. I’m a big big fan of redis, I use it daily, whenever I can and wherever I can, it’s my swiss knife, it just rocks out of box. Now, I know that benchmarking is hard, and people benchmarking things are almost always completely wrong, but I’ll take a chance, is someone really reading me? Let’s get started :)

Testing redis-cli

First we’ll generate some input file for redis-cli which is able to read input from stdin if given the -x argument, I’ll do that with Python just for some fun. This will generate a succession of 1 million SET random-uuid random-uuid commands. This should create enough keys just for testing purposes.

from __future__ import print_function
import uuid
sample = []
for _ in xrange(1000000):
  sample.append("SET %s %s" % (uuid.uuid4(), uuid.uuid4()))

print("\n".join(sample), file=open('sample.input', 'w+'))

https://gist.github.com/agrison/b35f6d6e403320fb1344

Ok, just start with plain cat sample.input | redis-cli -x > /dev/null and see how it performs? It’s definitely not the way to mass import data, but it took 50 seconds to do so.

$ time cat sample.input | redis-cli -x > /dev/null
real 0m50.275s
user 0m10.891s
sys  0m11.465s

Which is about 1 000 000/50.275 = 19 890 updates/sec. Now what about 20 redis-cli each handling 50 000 rows?

$ split -l 50000 sample.input
$ redis-mass-import() {
    for f in `ls x*`; do
        cat $f | redis-cli -x & > /dev/null;
    done;
    wait;
}
$ time redis-mass-import > /dev/null
real 0m21.283s
user 0m18.204s
sys 0m16.725s

Which is about 1 000 000/21.283 = 47 085 updates/sec. It’s better but still far from the 100K announced by Peter. Maybe by adding more threads?

$ rm x*
$ split -l 10000 sample.input
$ time redis-mass-import > /dev/null real 0m23.210s
user 0m21.652s
sys 0m18.729s

Nope :).

Now testing redis-cli with the redis protocol

I’ll first start by modifying the Python script in order to generate a succession of SET commands valid regarding the redis protocol.

from __future__ import print_function
import uuid, sys

def redis_protocol(cmd):
  proto = "*%d\r\n" % len(cmd.split(' '))
  for arg in cmd.split(' '):
    proto += "$%d\r\n%s\r\n" % (len(arg), arg)
  return proto

sample = ""
for _ in xrange(1000000):
  sample += redis_protocol("SET %s %s" % (uuid.uuid4(), uuid.uuid4()))

print(sample, file=open('sample2.input', 'w+'))

https://gist.github.com/agrison/37d55becc579b4d840fc

And this time we’ll use redis-cli with its –pipe option

$ time cat sample2.input | redis-cli --pipe
All data transferred. Waiting for the last reply...
...
real 0m3.505s
user 0m0.264s
sys 0m0.149s

Which is about 1 000 000/3.505 = 285 306 updates/sec. This is 2,8x more than what Peter said. Not that bad?

Ok, so let’s test that with 10 concurrent process just to see if redis-server can handle even more.

Since each SET redis command using its custom protocol takes 7 lines, we can split each 350 000 lines in order to have 20 files representing the 1 000 000 commands

$ split -l 350000 sample2.input
$ ls -lSh x*
-rw-r--r-- 1 alex staff 4,7M 22 jul 14:48 xaa
-rw-r--r-- 1 alex staff 4,7M 22 jul 14:49 xab
...
-rw-r--r-- 1 alex staff 4,7M 22 jul 14:49 xas
-rw-r--r-- 1 alex staff 4,7M 22 jul 14:49 xat
$ redis-mass-import() {
    for f in `ls x*`; do
        cat $f | redis-cli --pipe & > /dev/null;
    done;
    wait;
}
$ time redis-mass-import
All data transferred. Waiting for the last reply...
...
real 0m3.601s
user 0m0.267s
sys 0m0.156s

Hmmm it does not change anything really. Real problem here is that bash job control involves multiple processes only, that’s not real multi-threading.

Using redis with unix sockets we can push it to 2.734 seconds

$ time cat sample2.input | redis-cli --pipe
All data transferred. Waiting for the last reply...
...
real 0m2.734s
user 0m0.268s
sys 0m0.262s

We’re now at 365 764 updates/sec.

From Java using Jedis

OpenHFT is built in Java, so I’m not going to write some C code for my tests. Instead I’ll be using a simple SpringBoot app using Jedis, a connection pool and redis pipelining to see how it performs.

Let’s write some Java code for that, using Jedis and Spring boot it’s as easy as this :

package me.grison.redis.foo;

import org.springframework.boot.*;
import org.springframework.boot.autoconfigure.*;
import org.springframework.context.annotation.*;
import redis.clients.jedis.*;

import java.util.UUID;

@ComponentScan
@Configuration
@SpringBootApplication
@EnableAutoConfiguration
public class RedisMassImport {
	@Bean
	JedisPool jedisPool() {
		JedisPoolConfig poolConfig = new JedisPoolConfig();
		poolConfig.setTestOnBorrow(true);
		poolConfig.setTestWhileIdle(true);
		return new JedisPool(poolConfig, "localhost", 6379, 30);
	}

	@Bean
	CommandLineRunner init(JedisPool pool) {
		return args -> {
			int numKeys = 1_000_000;
			try (Jedis jedis = pool.getResource()) {
				// prepare UUIDs
				String[] uuids = new String[numKeys];
				for (int i = 0; i < numKeys; ++i)
					uuids[i] = UUID.randomUUID().toString();

				long time = System.nanoTime();
				Pipeline p = jedis.pipelined();
				for (int i = 0; i < numKeys; ++i)
					p.set(uuids[i], uuids[i]);
				p.sync();
				long endTime = System.nanoTime();
				System.out.println("\tTook " + (endTime - time) + "ns to sync the pipeline.");
			}
		};
	}


	public static void main(String[] args) {
		SpringApplication.run(RedisMassImport.class, args);
	}
}

https://gist.github.com/agrison/d4ea24b3687084bc85f9

Running this application outputs

Took 4750988000ns to sync the pipeline.

That means we threw 210 482 updates/sec at redis. Not bad for a first try!

Now add some multi-threading to see where we can go:

package me.grison.redis.foo;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@ComponentScan
@Configuration
@SpringBootApplication
@EnableAutoConfiguration
public class RedisMassImport {
	@Bean
	JedisPool jedisPool() {
		JedisPoolConfig poolConfig = new JedisPoolConfig();
		poolConfig.setTestOnBorrow(true);
		poolConfig.setTestWhileIdle(true);
		return new JedisPool(poolConfig, "localhost", 6379, 30);
	}

	@Bean
	CommandLineRunner init(JedisPool pool) {
		return args -> {
			int numKeys = 1_000_000;
			int availableProcessors = Runtime.getRuntime().availableProcessors() * 2;
			ExecutorService exec = Executors.newFixedThreadPool(availableProcessors);

			// pre-compute UUIDs
			String[] uuids = new String[numKeys];
			for (int i = 0; i < numKeys; ++i)
				uuids[i] = UUID.randomUUID().toString();

			// split
			final int portion = numKeys / availableProcessors;
			long time = System.nanoTime();

			for (int i = 0; i < availableProcessors; ++i) {
				final int threadId = i;
				exec.submit(() -> {
					try (Jedis jedis = pool.getResource()) {
						// throw the data at redis
						Pipeline p = jedis.pipelined();
						for (int j = threadId * portion; j < (threadId + 1) * portion; ++j)
							p.set(uuids[j], uuids[j]);
						p.sync();
					}
				});
			}
			exec.shutdown();
			try {
				exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

				long endTime = System.nanoTime();
				System.out.println("\tTook " + (endTime - time) + "ns to sync the pipeline.");
			} catch (InterruptedException e) {
				System.err.println("Execution interrupted: " + e.getMessage());
			}
		};
	}


	public static void main(String[] args) {
		SpringApplication.run(RedisMassImport.class, args);
	}
}

https://gist.github.com/agrison/3a4fdf090d8817819f7a

Took 3035156000ns to sync the pipeline.

So we threw 329 472 updates/sec at redis using some multi-threading, two threads by core available on my computer. We’re sending it faster than with redis-cli –pipe.

I’m pretty sure that we could improve that if Jedis supported Unix file sockets instead of plain TCP connection.

Using Go and redigo to see how it performs using unix sockets

TLDR: it performs the same as Java & Jedis.

First let’s try the naive way, with only one thread:

package main

import (
	"fmt"
	"log"
	"net"
	"runtime"
	"time"

	"github.com/garyburd/redigo/redis"
	"github.com/twinj/uuid"
)

const numKeys = 1000000
var uuids [numKeys]string
var client redis.Conn

func prepareUUids() {
	for i := 0; i < numKeys; i++ {
		uuids[i] = uuid.NewV4().String()
	}
}

func createClient() {
	c, err := net.Dial("unix", "/tmp/redis.sock")
	if err != nil {
		log.Fatal(err)
		return
	}

	client = redis.NewConn(c, 10*time.Second, 10*time.Second)
}

func massImport() {
	for i := 0; i < numKeys; i++ {
		client.Send("SET", uuids[i], uuids[i])
	}

	client.Flush()
}

func closeClient() {
	client.Close()
}

func main() {
	runtime.GOMAXPROCS(8)

	prepareUUids()
	createClient()
	start := time.Now()
	massImport()
	elapsed := time.Since(start)
	fmt.Println("Took ", elapsed)
	closeClient()
}

https://gist.github.com/agrison/82e8db5ec30dda4f9d49

$ go build foo.go && ./foo
Took 3.564170316s

Now try to add some go routines and a connection pool

package main

import (
	"fmt"
	"log"
	"net"
	"runtime"
	"sync"
	"time"

	"github.com/garyburd/redigo/redis"
	"github.com/twinj/uuid"
)

const numKeys = 1000000
const routines = 16
const portion = numKeys / routines
var uuids [numKeys]string
var wg sync.WaitGroup
var pool *redis.Pool

func prepareUUids() {
	for i := 0; i < numKeys; i++ {
		uuids[i] = uuid.NewV4().String()
	}
}

func createPool() {
	pool = &redis.Pool{
        MaxIdle: 16,
				MaxActive: 16,
        IdleTimeout: 60 * time.Second,
        Dial: func () (redis.Conn, error) {
					c, err := net.Dial("unix", "/tmp/redis.sock")
					if err != nil {
						log.Fatal(err)
						return nil, err
					}
					return redis.NewConn(c, 10*time.Second, 10*time.Second), nil
        },
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            _, err := c.Do("PING")
            return err
        },
    }
}

func massImport() {
	wg.Add(routines)
	for i := 0; i < routines; i++ {
		go importRoutine(i, pool.Get())
	}

	wg.Wait()
}

func importRoutine(t int, client redis.Conn) {
	defer client.Flush()
	defer wg.Done()
	for i := t * portion; i < (t + 1) * portion; i++ {
		client.Send("SET", uuids[i], uuids[i])
	}
}

func closePool() {
	pool.Close()
}

func main() {
	runtime.GOMAXPROCS(16)

	prepareUUids()
	createPool()
	start := time.Now()
	massImport()
	elapsed := time.Since(start)
	fmt.Println("Took ", elapsed)
	closePool()
}

https://gist.github.com/agrison/e46916b9b67a6dbe2bb3

$ go build foomt.go && ./foomt
Took 3.034901581s

Which is roughly 329 500 updates/sec. We were at 329 472 using Java. We’re on par. I thought that using unix file socket and Go would improve the performance, but we’re already at 330K updates / sec on a MacbookPro that’s pretty neat!

What about OpenHFT?

To be honest, I just knew OpenHFT by name, so I implemented a sample app using the simplest techniques, it may even be a bad use of the library on my side:

package me.grison.openhft.foo;

import net.openhft.collections.SharedHashMap;
import net.openhft.collections.SharedHashMapBuilder;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import java.io.File;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@ComponentScan
@Configuration
@SpringBootApplication
@EnableAutoConfiguration
public class OpenHFTMassImport {
	@Bean
	CommandLineRunner init() {
		return args -> {
			SharedHashMap<String, String> map = new SharedHashMapBuilder()
					.create(File.createTempFile("mass-import", "foo"), String.class, String.class);
			int numKeys = 1_000_000;
			int availableProcessors = Runtime.getRuntime().availableProcessors() * 4;
			ExecutorService exec = Executors.newFixedThreadPool(availableProcessors);

			// pre-compute UUIDs
			String[] uuids = new String[numKeys];
			for (int i = 0; i < numKeys; ++i)
				uuids[i] = UUID.randomUUID().toString();

			// split
			final int portion = numKeys / availableProcessors;
			long time = System.nanoTime();

			for (int i = 0; i < availableProcessors; ++i) {
				final int threadId = i;
				exec.submit(() -> {
					// throw the data at OpenHFT
					for (int j = threadId * portion; j < (threadId + 1) * portion; ++j)
						map.put(uuids[j], uuids[j]);
				});
			}
			exec.shutdown();
			try {
				exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

				long endTime = System.nanoTime();
				System.out.println("\tTook " + (endTime - time) + "ns to execute.");
			} catch (InterruptedException e) {
				System.err.println("Execution interrupted: " + e.getMessage());
			}
		};
	}


	public static void main(String[] args) {
		SpringApplication.run(OpenHFTMassImport.class, args);
	}
}

https://gist.github.com/agrison/b51465fee8d46f675fff

Interesting results are depicted in the outputs of two executions using multi-threading and another one using one thread. The multi-threading version does not outperform the redis multi-threading on my computer, the OpenHFT implementation keeps waiting for locks to be released in order to do its processing. On the other hand the version using only one thread took 1,0936 second, which means 914 350 updates/sec, we’re nearly at a million.

For information, using a HashMap and no multi-threading takes 0,87212 sec, this is more than a million updates/sec. Now using a Hashtable and multi-threading takes 0,39202 sec, which means 2,55 millions updates/sec. OpenHFT is really damn fast regarding everything it has to offer besides being a map.

Obviously like I said earlier, benchmarks are benchmarks, and doing correct benchmarks is a difficult task and takes time. Besides, I’m confident in the fact that I did not measured things really properly :D.

My 2 cents :)

Alexandre Grison - //grison.me - @algrison