External memory algorithms are indispensable for big data computing. In fact computations can’t be performed right in the external memory. The so-called external memory model of computation refers to the computing method that a task will be handled by loading and processing only one segment of data at a time and the whole task will be done with multiple operations. The intermediate data sets generated during the process will be buffered to the disk cache if they are too big.
Generally the external memory device is the hard disk. The principles of optimizing computations using the hard disk are: try to minimize the hard disk traffic by increasing CPU usage for less hard disk accesses; retrieve data in order because hard disks only allow accessing batched data (a sector or bigger); set a reasonable number of threads for parallel retrieval because it will take time to locate the track.
Parallel strategy based on data segmentation
Text files are the commonly seen data sources in external memory computations. They need to be parsed as corresponding data types for computation when being imported into the memory. The process is slow, sometimes so slow that the CPU time is longer than the time spent in accessing the hard disk. The employment of multithreaded parallel processing can effectively speed up the process.
To use the parallel strategy, we need to divide a file into segments, each of which will be assigned to a thread to handle. For a text file, one line corresponds to a record. A precise segmentation of the file by bytes probably results in putting information of one single line into different segments, causing computational errors. And the segmentation by lines requires a traversal for each segment from the very beginning, which can’t improve performance at all.
esProc performs byte-based segmentation according to the “skip head line and complement tail line” strategy – that is giving the first line away to the previous segment and supplying the missing part to the last line – to ensure complete rows and data integrity for each segment. This strategy, used with the above-mentioned esProc parallel processing mechanism, enables programmers to write a parallel program conveniently.
|1||=file(“data.txt”)||The source file|
|2||fork 4||=A1.cursor@t(amount;A2:4)||Divide the file into 4 segments and create 4 cursors for parallel processing|
|3||=B2.groups(;sum(amount):a)||Traverse each to calculate the total amount|
|4||=A2.conj().sum(a)||Concatenate the results of the threads to get the final result|
In the above code, data.txt is divided into 4 segments and 4 threads are used to traverse each cursor of the 4 segments of data to sum up the amount field, and finally concatenate the returned results of the threads and calculate the total amount.
Sometimes more time is spent in parsing a text file than in data calculation. As long as the parsing can be done in parallel, it’s unnecessary to calculating data in the same way. If the order of the records won’t affect the computational result (such as the above sum operation which is irrelevant to the order), we can also use the esProc built-in parallel option to write the program more easily.
|1||=file(“data.txt”).cursor@tm(amount)||Declare cursors for parallel retrieval|
|2||=A1.groups(;sum(amount))._1||Traverse each cursor to sum up the amount field|
In the above code, the parallel multithreaded processing starts automatically during data retrieval with cursors, but the aggregation of amount field is done sequentially.
The number of parallel threads is affected not only by the number of CPUs, but by the hard disk during external memory computations. If an HDD is used, too many threads will result in a very long time the HDD spends in locating track. This requires a bigger cache for each thread, which in turn will cause more memory usage. esProc Server allows system setup for balancing these factors to achieve the optimal performance in real-world scenarios.
esProc enables dynamic workload assignment among parallel threads. A thread will be assigned an unprocessed segment – if there is any – when it has finished its own work. Since it’s hard to predict when each segment will have been processed, the dynamic assignment design can make more efficient use of the machine’s computing power.
It’s not as easy to segment database tables as segmenting text files. So it’s not suitable to handle database tables in segments and we’d better leave the data-intensive computational tasks to the database. For some computation-intensive tasks, it’s really difficult to handle them in the database and is necessary to retrieve data out. In this latter case, we can in principle use the parallel processing based on segments.
To use the segmental parallel processing, one approach is to create sub-tables. Each thread will handle data of several sub-tables and one sub-table cannot be split to be given to more than one thread. With this approach, we need to create many sub-tables to make the threads take almost equal responsibility. But it’s a bad idea to have too many tables in the database. Therefore we usually create the same number of sub-tables as the number of threads. This is rigid because the number of parallel threads is already fixed.
Another approach is to use the WHERE conditional statements to segment the table. This needs to first create the index, without which each WHERE statement will generate a traversal on the whole table and performance won’t be enhanced. Sometimes the performance remains poor even if an index is used. Here the performance is also determined by the fact that whether the records in physical storage are as consecutive as they are in the index.
It’s very slow to retrieve data out of Oracle and some other databases through their JDBC. In this case JDBC becomes a bottleneck for computations that need to retrieve data out of the database. If the database is not heavily loaded, we can perform data retrieval by segmenting the database table to be handled in parallel, so that the performance lost through JDBC can be made up for.
|1||fork 4||=connect(db)||4 threads, which connect to the database respectively|
|2||=B1.query@x(“select * from T where part=?”,A2)||Retrieve segments respectively|
|3||=A1.conj()||Concatenate the results|
The field test shows that the parallel retrieval can boost the performance by several times when the database is lightly loaded.
On top of the text files, esProc supports binary files that are stored in its own format.
The esProc-format binary files already store the data types. It’s unnecessary, therefore, to parse data during retrieval, making performance much better than retrieving text files. Moreover, esProc provides the zipped format for binary files to reduce their disk usage generally by between one-third and one-half for the same amount of data, compared with the text files. Thus the data retrieval is faster. This doesn’t necessarily mean that a higher compression ratio will necessarily bring a better performance. The decompression process needs some CPU time, and an algorithm with a higher compression ratio needs a longer CPU time. esProc offers a very simple compression algorithm that can achieve its compression ratio while almost demanding no extra CPU time.
If the text data will be used repeatedly, the binary file format will be advantageous. The conversion from a text file to a binary file is simple:
|1||=file(“data.txt”).cursor@t()||Define a cursor of the text file|
|2||=file(“data.bin”).export@z(A1)||Export the data as a segmented binary file|
The esProc-format binary files can be segmented too. The code is almost the same as segmenting a text file:
|2||fork 4||=A1.cursor@b(amount;A2:4)||Except for @b which means exporting data as binary file, the other parameters are same|
|3||=B2.groups(;sum(amount):a)||Syntax for subsequent operations are completely same|
Based on binary files, esProc provides columnar storage plan, which stores a column as a file. As most computations need only a small number of columns, only a small amount of data needs to be retrieved with this plan, significantly reducing the time of accessing hard disk. This is the strength of columnar storage.
|1||=file(“data.txt”).cursor@t()||Original text file|
|2||=10.(file(“col”/~/”.bin”))||Generate files for 10 columns|
|3||>A2.export@z(A1)||Write the original file as multiple segmented columnar files|
|1||=file([1,3].(“col”/~/”.bin”))||Use the first and the third columns|
|2||=A1.cursor()||Define a cursor of a columnar file|
|3||=A2.groups(;sum(col1+col3):all)._1||Calculate the sum using the cursor|
We can process an esProc columnar file through segmental parallel processing. The code is written in a same way as we code for handling a file stored row-wise. The esProc file format makes sure that a satisfactory compression ratio is achieved while multiple columnar files are segmented in alignment.
|1||=file([1,3].(“col”/~/”.bin”))||Define an array of columnar files|
|2||fork 4||=A1.cursor(amount;,A2:4)||Same code as the code for a file stored row-wise in parallel processing and with cursors|
|3||=B2.groups(;sum(amount):a)||Syntax for subsequent operations are completely same|
It’s not always workable to use columnar storage. When many columns are being retrieved, with a HDD there is a serious contradiction between the time spent in locating track and the buffer capacity. What’s more, multiple columnar files accompanied by parallel threads will make the already big complications related to hard disk bigger. The plan is unsuitable for an ordinary HDD, but it should be used with SSDs or in an environment where a high-concurrency disk array is available.
Let’s review the esProc approach to problem-solving: Provide necessary basic support, but won’t set up a framework for programmers. For example, esProc supports columnar storage, but it doesn’t automatically organize data as that; it allows the coexistence of the columnar storage and row storage and encourages programmers to decide whether or not the columnar storage will be used and how widely the storage system will cover.
Exploiting data orderliness
In the industry business practices, often data objects have a great many of fields (hundreds even thousands) to present themselves as wide tables in the databases. Since the fields are different in the amount of data and frequency of utilization, most of the time they are classified and stored in multiple physical tables with same primary keys. When some fields are referenced, certain tables will be retrieved to be JOINed.
Generally database vendors design their products to automatically create indexes for the primary keys, which is equal to sorting those tables by the primary key. After this is done, the ordinary JOIN becomes a much simpler MERGE on ordered data.
For file handling, esProc provides similar approach to JOINing several sorted cursors using relatively simple merge operation. The approach also applies to performing intersection, union, difference and other operations on ordered sets.
To do that, first sort the to-be-reused original data and store it for obtaining a better performance during succeeding computations. We should take full advantage of this feature in data preparation.
In practice we can use the ordered merge operation to prepare data. Because the existing data is already ordered, we just need to sort the new data and merge it with the existing data, without having to re-sort the historical data.
If there’s a great volume of new data, it’s unnecessary to merge it all with the historical data each time. We would sort a segment of the new data and merge it with the ordered historical data as needed. The merge operation is low cost and it affects the general performance only slightly if the data segments to be merged are not many.
In addition to the data needing to be prepared beforehand, sometimes certain data is definitely in order. For instance, the results of group and aggregate operation returned by esProc are by default ordered by the key values, and the grand grouping result obtained by merging the ordered sub-result-sets is still an ordered set, which can be furtherly handled with the merge operation.
Like the columnar storage system, ordered files can only be merged in sequence but can’t be segmented simultaneously. Therefore, to launch a parallel processing, we should first segment them manually, as with the columnar storage. Then we can balance the workload on the threads in the same way.
It is the common data processing task to analyze user behaviors (or to perform account statistics). The computations for one user (account) are extremely complicated but it’s seldom that there are cross-user computations. It’s very difficult to perform these complex computations in SQL right in the database, and normally data will be retrieved out of the database for processing.
In most of the cases, the data one user involves isn’t too big and can be entirely loaded into the memory for performing complex computations. But the data of all users is huge and can’t be completely loaded into the memory. In view of this, it would be better if there is a mechanism enabling retrieving data of one user and handle it each time.
With the approach of fetching only one group of data from an ordered cursor each time, esProc is convenient-to-use in implementing this type of computations.
|1||=file(“user.dat”).cursor@b()||The source file sorted by userid|
|2||for A1;id||…||Fetch data from cursor by loop, with a group of data of the same id retrieved each time|
|3||…||Process the current group of data|
We can use the approach to handle the ordered source data from which the cursor is generated, wherever it comes from – the database or the file.
When data comes from the database, it’s more efficient to sort it first and use this approach to fetch data group by group than to filter it using WHERE statement. Without the index for the data, the WHERE statement will search the whole table for each operation, which is much, much more inefficient than performing a sorting. With the index available, which means the data is already sorted, the WHERE statement will still find the desired data in each operation.
With an esProc-format binary file, we can perform computation following a sorted-data grouping using segmental parallel processing. Marking every group in generating the binary file will make sure esProc won’t split and place data of the same group in two segments during file segmentation, so that each thread will handle complete groups.
|1||=file(“user.txt”).cursor@t()||Cursor of the raw data|
|2||=A1.sortx(id)||Sort the cursor by id and return a cursor too|
|3||>file(“user.dat”).export@z(A2;id)||Export the cursor data as a binary file segmented by id|
Below is a piece of code for segmental parallel processing based on the already sorted data:
|1||=file(“user.dat”)||A data file sorted by id|
|2||fork 4||=A1.cursor@b(;A2:4)||Make sure ids of the same group won’t be separated during segmentation|
|3||for B2;id||…||The syntax to perform succeeding computations remains same|
We can also handle a database table using this strategy of parallel processing by segments, but we can only sort data after using a WHERE statement to segment it, during which a reasonable index should be in place.
Use of indexes
As with the database, esProc provides indexing functionality for binary files to swiftly select desired records from a big data set according to specified key values.
|1||=file(“user.dat”)||Original data file|
|3||>A1.index(A2,id)||Generate the index by id|
|4||=A1.icursor(A2,id>10&&id<20)||Search for desired data using the indexed file and return the result as a cursor|
Similar to handling a database table, we can create multiple mutually-uninfluenced indexed files for one set of data. But different from the database, esProc doesn’t have the concept of tables. The indexed file and the source file are independent of each other and the link between them is maintained by programmers. The change of the source file won’t automatically trigger a change of the indexed file, which needs to be re-created or complemented (when only new data is added). This design is in line with the above-mentioned esProc way of solving problems: providing basic class libraries but won’t involve in how to use them.