Self-optimizing bulk import algorithm
During a professional project, I had to import massive amounts of data into CouchDB, a NoSQL database. The source was a big XML file of approximately 7GB in size, containing 2.5 million records of heterogenous data (from a few bytes to many megabytes).
In this article, we’ll see the various improvements used to make it more efficient.
Before starting, I should point out a few constraints we had. First, the script had to be in Python for our IT stack consistency. Second, we had to wait for all writes to the database to be acknowledged before continuing as loosing any record was not acceptable.
In a preliminary and dumb approach, the loading took about 8 hours. The first draft read the XML file, converted the record into a python object then inserted it into the database and waited for the database to acknowledge the write before inserting another record.
A simple issue with this was that the program tried reading the file without streaming it and would consume monstrous amounts of RAM up to 32 GB. That’s because the standard python library converts every XML object into a Python object. This put massive pressure onto the garbage collector and memory in general.
The first improvement was to convert the reading of the XML to a stream so memory pressure would be acceptable. Another easy improvement was to group records into chunks of 1000 before writing them to the database. Unfortunately this was not enough, we noticed that sometimes 1000 records would be saved quickly (a few milliseconds) and at other times, they would be very slow (5 seconds and more).
Refining the script
As it turns out, this was due to the fact that the records were heterogeneous : some records were small and 1000s of them could be easily written to disk while others were huge and sending a big chunk was too much work for the database and disk to write at once.
I should also mention that the database we used, CouchDB, has a “slow” HTTP API (compared to some other databases) but provides a special bulk import endpoint which is designed for such situations. Unfortunately, the size of the payload and number of elements greatly influences the time used for one such call (I could have write times increase by a factor 3 at least with “bad bulks”).
In my preliminary search, I stumbled on a script to measure and benchmark the CouchDB bulk import to find an optimal number of records but the payload was small and of constant size which was not my case. So even though the script did provide some insight into a good average number of elements tu put into the bulk import, it did not tell anything about my dataset. Therefore my question was the following: what is the optimal number of records to bulk together on each insert to maximize the import throughput for the whole duration of said import?
This is the solution I came up with, in pseudo code :
acc = list() # item accumulator max_items = 1 # optimal item quantity prev_time_used = time.duration.minutes(60) # time used by previous chunk, set high at first. while elem = read_xml_element(): acc.append(elem) if len(acc) > max_items: start = now() couchdb.bult_import(acc) time_used = now() - start if time_used <= prev_time_used: max_items = length(acc) + 10 else: max_items = 10 prev_time_used = time_used # save the time used acc = list() # clear list
Whith this script, we managed to import the whole XML file in approximately 4 hours.
The way it works is by having an accumulator with a dynamic flushing size. The chunk size
max_items is increased or decreased depending on the time that the previous chunk took to insert (couchdb was configured to return only if the data had really been written to disk). If the time used to insert increased, we decrease the chunk size (the new chunk would be smaller so it should reduce the insert time).
This version has some issues because it’s very sensitive to jitter in the timing (other loads on the CPU or other disk writes will impact this). Also, if it reaches a local optimum, it will stay there instead of trying to push more items into the accumulator. I should also mention that our dataset is heterogenous but chunked by similar data sizes. For example, our XML has 40 000k big records, then 20 000k medium records, then 100 000k small records, then 2 000k big records etc.
To try and solve the local optimum problem, I started to save the ratio (
max_items/time_used) into a list that would keep the top 20 best ratios and their
max_items. Then I would make two groups:
best_ratios taken from the top 3 and
second_best_ratios taken from the next 3. I used the average
avg(best_ratios.max_items) as the basis for the new
max_items and added a random value between 0 and the absolute value of
avg(best_ratios.max_items) - avg(second_best_ratios.max_items) to increase the number of elements in the accumulator. If there were not enough ratios in the list, I would increase the number of elements by
random(500, 1000). That prevented the script from stopping in a local optimum and pushed it to continue to try and increase the number of items. By averaging across 2 groups of 3 ratios, it was better at escaping a local optimum and less prone to the jitter of time caused by external factors. It performed better than the previous version of the script (we imported in approximately 3 hours) but the mechanics are still quite crude. Unfortunately, I didn’t have the opportunity to improve it further for the time being.
The current version we have is still very amateur-ish but works well. I didn’t have the opportunity to plot variations on the algorithm or the configuration to see which one performed better, I chose the values by running the script a few minutes each time and changing values here and there to see which one performed better on a small dataset. There’s probably more room for improvement and if you have suggestions, I’d be happy to hear them in the comments below. Happy coding and optimising !