Wednesday, November 1, 2017

TCP gotchas

When I first learned about TCP, I found a few things surprising. Here's a short list:

Server socket has an "accept" but not a "reject" method

Connection set up is handled by the operating system (OS). The application has no way of examining the client before establishing the connection. If you want to ban connections from a certain IP, your can only use a firewall or close the connection immediately after accept.

Blocking "send" does not block until the data is delivered to the other end

Send operation just copies the data to OS buffer for transmission. If the OS has sufficient free buffer space, send operation returns immediately. Send only blocks when OS buffer is full.

"ACK" packet does not mean that the application on the other end successfully received the message

ACKs are sent by the receiving operating system when it stores the message in its internal buffer. The receiving application may read the data at a later point, or not at all.

TCP does not guarantee that a broken connection will raise an error

If you send some data and then close a connection, both operations may succeed even if no data is actually delivered to the other end, for example because the machine on the other end lost its network connection. And conversely, if the sending machine crashes, the receiving machine may never notice.

As far as reliability is concerned, TCP guarantees only the following:
  • No data will be delivered out of order
  • Everything you send will be delivered at most once
  • If the OS can tell that an operation cannot succeed, it will return an error (like when you send to a connection that is already known to be broken)
Any further guarantees are the responsibility of the application.

Wednesday, July 26, 2017

Range of Random.nextGaussian, continued

In the first part I described a theoretical range of values returned by nextGaussian; in this part I will describe the way used to find out the actual minimum and maximum values.

The simplest way would require using brute force to check all 248 possible seed values. This would take a few years on my computer, so I had to do better than that.

As explained in the first part, in order to maximize the values of nextGaussian, the values of v1 and v2 need to be as close to zero as possible. For that, the value returned by nextDouble needs to be close to 0.5. So, how do we make the random number generator return the values we want?

NextDouble calls next two times, first to get top 26 significant bits of the result, then again to get next 27 bits. If we can get the top bits to be 1 << 25, the resulting double will be very close to 0.5.

A call to next first updates the seed value used by Random, then returns top N bits of the new seed. So, we know the desired seed value after the call to next. In order to find out the value for seed before the call, we can use the simple reversing algorithm found here:
private static final long multiplier = 0x5DEECE66DL;
private static final long addend = 0xBL;

private static long reverseSeed (long seed) {
    // reverse the addend from the seed
    seed -= addend;
    long result = 0;
    // iterate through the seeds bits
    for (int i = 0; i < 48; i++) {
        long mask = 1L << i;
        // find the next bit
        long bit = seed & mask;
        // add it to the result
        result |= bit;
        if (bit == mask) {
            // if the bit was 1, subtract its effects from the seed
            seed -= multiplier << i;
        }
    }
    return result;
} 
Then, we also need to counter the initial scrambling that happens in setSeed operation:
private static long unscramble(long seed) {
    return seed ^ multiplier;
}
 And we're ready to start hacking:
long seedRange = 0x2000000L;
Random rand = new Random();
double minSoFar = 0, maxSoFar = 0;
for(long seed = -seedRange;seed<seedRange;seed++) {
    rand.setSeed(unscramble(reverseSeed(0x800000000000L + seed)));
    double d = rand.nextGaussian();
    if(minSoFar > d) {
        minSoFar = d;
        System.out.println("Seed: "+seed+ " min: "+d);
    }
    if(maxSoFar < d) {
        maxSoFar = d;
        System.out.println("Seed: "+seed+ " max: "+d);
    }

    d = rand.nextGaussian();
    if(minSoFar > d) {
        minSoFar = d;
        System.out.println("Seed: "+seed+ " second min: "+d);
    }
    if(maxSoFar < d) {
        maxSoFar = d;
        System.out.println("Seed: "+seed+ " second max: "+d);
    }
}
The lowest value found was: -7.844680087923773 (on second call to nextGaussian with seed = 994892)
The highest value found was: 7.995084298635286 (on first call to nextGaussian with seed = 14005843)
These are the real maximum & minimum values returned by Oracle Java 8 implementation of nextGaussian.

Range of Random.nextGaussian()

Recently I had a look at the source code of a language detection library. The library internally uses Random.nextGaussian to determine its behavior, but uses it in such a way that any value lower than -10 would result in incorrect calculations. I was curious if getting such a value is even possible.


The implementation of nextGaussian in Java 8 is as follows:
 private double nextNextGaussian;
 private boolean haveNextNextGaussian = false;

 public double nextGaussian() {
   if (haveNextNextGaussian) {
     haveNextNextGaussian = false;
     return nextNextGaussian;
   } else {
     double v1, v2, s;
     do {
       v1 = 2 * nextDouble() - 1;   // between -1.0 and 1.0
       v2 = 2 * nextDouble() - 1;   // between -1.0 and 1.0
       s = v1 * v1 + v2 * v2;
     } while (s >= 1 || s == 0);
     double multiplier = StrictMath.sqrt(-2 * StrictMath.log(s)/s);
     nextNextGaussian = v2 * multiplier;
     haveNextNextGaussian = true;
     return v1 * multiplier;
   }
 }
The function used to generate next values has extremes when both v1 and v2 are as close to zero as possible, without being both equal to zero.

Quick check of nextDouble indicates that the function generates only multiples of 1 / (double)(1L << 53). This means that nextGaussian will return extreme values for v1 = +/- 2 / (double)(1L << 53)and v2 = 0.

For these values nextGaussian would return +/- 12.00727336061225.

That does not mean that it is possible to get these values. There's a limited number of values that can be returned by nextDouble. But it does mean that nextGaussian (in its Oracle Java 8 implementation) will never return a value outside of the range between -12.00727336061225 and 12.00727336061225.

Friday, April 28, 2017

HBase and big key/value pairs

In my previous post I mentioned that HBase does not perform well when retrieving large key/value pairs (known as cells). I decided to look under the surface, and here I will present some of my findings.

My goal is to create a HTTP file server serving data from HBase, similar to HBase REST, but with embedded business logic. I'd like to start sending the response as soon as possible, and send data at the highest rate permitted by hardware.

Surface


The first apparent problem with retrieving large cells is related to the HBase client API. The getter for cell value returns a byte array. This means that the cell value has to be fully buffered by the API before the application can access it. If the client intends to process the data as a stream, this is highly inefficient - the streaming can only happen after a delay.

Next I measured the time passed between sending the request and receiving the first byte of the response. For optimal performance this should be independent from cell size, but I found that this is not the case. That's because of the second, less apparent problem. The server buffers the entire response in memory before sending it to the client. The data has to be read from the underlying storage first, only then is it sent to the client. As a result, the time to start sending response is proportional to the cell size.

Internals

Under the hood HBase uses a RPC implementation for client-server communication. This means that server sends data to client only in response to a client call, and only one response per call is allowed. Scans are implemented as a series of "give me more" requests sent to the server. In current version these are sent when the client finishes processing the current batch, though HBASE-13071 will implement a background prefetch, retrieving next batch while the current one is still being processed by the application.

Concurrent RPC calls are multiplexed over the same socket connection. With larger messages it can create a head-of-line problem. This is mitigated to some extent by buffering the entire response in memory - usually the server is able to use the entire available bandwidth, so the throughput is the same (or better) as it would be with dedicated connections. However, in certain cases the multiplexing can reduce overall throughput, for example when client is running multiple threads sending memory-mapped files to the server. Multiplexing also forces the client to read the entire response as fast as possible in order to allow other responses to arrive.

Wire protocol is implemented using Google's Protobuf library, which is not designed to handle large messages. Protobuf does not allow access to part of the message before all of it is deserialized. This means that stream-reading the server responses on client side would require either a protocol modification, or using a custom-coded library.

Friday, April 21, 2017

Hadoop as a general purpose file repository

Starting point

I'm looking for a replacement for a file storage system; current implementation stores the data on a RAID array. Every time we hit a size limit on the array, we replace drives with larger ones and copy the data over. Now that the data size increases by a few terabytes every year, the replacements are becoming costly. A system that would reduce the operating cost of file storage is needed.

Hadoop

Hadoop distributed file system (HDFS) can scale out - instead of replacing the current hardware we just add more machines, and the new storage becomes immediately available for use. It can also run on commodity hardware, as in - it does not require vendor-specific hardware, but rather can run on generic server machines. Hadoop also has the ability to run MapReduce jobs on the stored content, and can be deployed on both Amazon and Microsoft cloud. Hadoop is free software, but at least 2 companies provide commercial support. What's not to like? Well, there's just one thing.

Small file problem

HDFS is capable of scaling up to thousands of terabytes, with just one catch. Simplifying a little, the list of files has to fit in memory of a single machine - name node. The name node may require up to 1 GB of memory for every million of files, so storing hundreds of millions of files would require powerful hardware and careful tuning. In addition, the name node information has to be loaded from disk on every restart, which takes time proportional to the file size.
There's no real solution to the small file problem. Workarounds are available:
  • Use multiple independent name nodes sharing the same data nodes (i.e. machines physically hosting the data on their disks). This requires some partitioning logic on the client side, so that the client knows which name node to check for a particular file.
  • Coalesce multiple files into a single Hadoop file, using HAR, sequence files or HBase. Each one of these has its limitations and won't suit every use case. I'll present them shortly.
What happens if you have too many files? Name node process starts using excessive amounts of memory. High memory usage may decrease service responsiveness, and in extreme cases will cause it to quit completely.

Hadoop archive (HAR) files

HAR files are archives containing a small filesystem. The files and directories in a HAR archive are readable using the same interfaces as regular Hadoop files. HAR files are created by running a mapreduce job over a set of files and directories in HDFS. Once created, the archives are immutable.
For our use case HAR files were not a good fit for a few reasons. One, we add files one at a time, so we would need to run the archive process on schedule. We also support random file lookup by ID, so we would need to update file location for all files once the archive process is complete. Another reason is, HAR files are immutable, and we update files quite a lot. We would need another process to periodically purge outdated file revisions by unpacking and repacking the archives. Both are doable, but require additional coding. We decided to explore the other alternatives first.

Sequence files

Sequence files store data as a list of key/value pairs. It is possible to append new values to the end of an existing sequence file, so new data could be immediately stored to its final location. The main drawback of sequence files is, the files have to be read sequentially. In other words, to locate a document by ID in a sequence file we would potentially need to read all of the file. Since random access times are important to our use case, sequence files were not an option.

HBase

HBase is a key/value store system using Hadoop for data storage. Like sequence files, HBase also stores data as a list of key/value pairs. Like HAR files, HBase also indexes its content allowing for fast random access. It also handles all insert/update/delete operations transparently to the user, repacking its store files behind the scenes. HBase makes good use of HDFS by creating large files and appending to them whenever it makes sense, so the small files problem usually does not apply. Configuring HBase for optimal performance is not a trivial task, but when done well, HBase is also very fast, with one exception. Data read operations are non-streaming, so retrieving a value takes time proportional to its size. With 1GBps network adapter I got average response time of 100ms for 1 MB file, but an entire second for a 30MB file.

HBase/HDFS hybrid

With all of the above in mind, hybrid storage with small files stored in HBase and large files stored in HDFS may or may not be a good option; storing small files in HBase avoids the small files problem, and storing large ones in HDFS avoids increasing latency. The appropriate cutoff size depends on the data being stored, with lower cutoff demanding for more expensive name node hardware and higher  resulting in worse latency for mid-sized files.