Real-TimeDataWarehouse

Problem Statement: In a classic corporate environment, a data warehouse stores data extracted from operational databases and external data sources. It is mostly used for analysis and for reporting on historical data for specific business needs, usually in conjunction with visual business intelligence (BI) tools. The data is loaded into the data warehouse from the operational database or other data sources regularly via a batch ETL process or in some cases, in real-time.

Alternative Solutions

Relational (SQL) Database and Star Schema

A typical approach to creating a data warehouse is to use a relational database with a star schema. In a star schema there are two types of tables used:

  • FACT tables, which record measures about particular events or states, such as a customer ordering a particular product, usage of a particular service, a customer contacting the help desk, a sensor reading etc.
  • DIMENSION tables, describe what a particular item in the FACT table may be related to, such as customer, product, or the date and location an event occurred

FACT tables contain measurements as well as references to DIMENSION tables. Thus, the data model is normalized, but the overall complexity of the structure is still relatively low

Screen Shot 2016-02-15 at 13.31.16

The star schema (see Figure 1) allows aggregation queries to be formulated and executed, such as counting or summing a particular measure and slicing it along a particular set of dimensions. For example, a typical analytical query could involve computing net sales of a particular product and slicing it by region.

Pain points of this relational (SQL) solution include:

  • Due to architectural constraints, relational databases experience scaling limitations. These limitations often result in a loss of performance and complex queries cannot be executed in sub- second time, resulting in a setup where reports can only be generated on a daily basis.
  • Relational databases do not support extended, unstructured data formats, such as text documents or hierarchical and repeated structured objects (for example, JSON objects).

NoSQL Technology

With their easily-distributed architectures, NoSQL technologies are well-suited to solving scalability issues by scaling horizontally. There are two possible NoSQL solutions to the scalability problem, either a more traditional NoSQL database, or a MapReduce-based compute cluster. In the case of MapReduce, for example with Hadoop, one often runs SQL-like queries with technology like Apache Hive.

The main pain point here is the data model. Typical NoSQL technologies would require further denormalization of the star schema handled by relational databases. This would result in significant expansion of the data.

Clusterpoint Data Warehouse Solution

Clusterpoint is a SuperSQL database, which means it is a super-set of SQL functionally. It supports joins and ACID-compliant transactions and can effectively reference data from one table into another. Along with that, it has important NoSQL characteristics in terms of horizontal scalability and handling of unstructured data and hierarchical data models.

Thus, for a data warehouse our proposal is to build a hybrid model where FACT tables are sharded (distributed among computational nodes) and DIMENSION tables are replicated among computational nodes. This model allows Clusterpoint to do effective parallelization of computation and join data from FACT and DIMENSION tables efficiently (see Figure 2).

Example Deployments

Imagine a setup where each computational node holds 1 billion FACT records and few million DIMENSION records.

On Single Machine 10 Server Cluster  30 Server Cluster
Number of FACT records  1000000000  10000000000  30000000000
Number of DIMENSION records  10000000  100000000  300000000
FACT records per CPU core  83333333
Bytes per record  2000  2000  2000
Raw data size (on HDD), TB  2,02  20,2  60,6
Bytes in indexes  200  200  200
Index size (on SSD), TB 0,202  2,02  6,06

Specification of a machine in cluster.

Processor 2 x 6-core
HDD 2 TB HDD
SSD  2 x 256GB SSD
RAM  128 GB

Indexing Schemes

As the most efficient indexing scheme, Clusterpoint employs directly addressable value array indexes. In Clusterpoint, every document version, when inserted or replacing an existing document with the same primary key, gets a local document id assigned. Local doc ids are simply a sequence of increasing integers. Thus, a directly addressable index is an array of values addressed by a doc id. This scheme makes Clusterpoint query execution extremely fast, as it is optimized for index access from RAM and SSDs.

Directly addressable value array indexes are created for values that are used in aggregation functions, filtering, or joins between FACT and DIMENSION tables.

Finally, the raw data itself is stored on an HDD, which is an inexpensive storage media. Raw data is not used in aggregation queries executed in typical data warehouse workloads. Raw records are instead used for persistence, recovery and replication during single machine failures in the cluster and for creation of indexes upon schema changes.

Example Queries

The execution times are based on example deployment describes in the previous section.

A typical data warehouse workload is comprised mostly of analytical queries that aggregate over a large number of FACT records and filter or categorize them based on values in DIMENSION tables.

As an example consider the following tables: D_CUSTOMER - a table containing customer information D_PRODUCT - a table containing product information F_MEASURE - a fact table recording customer specific measures, such as purchases, inquiries or product related usage statistics

Broad Aggregation

In a broad aggregation, we aggregate over all (or a large proportion) of the records in the FACT table.

An example query might look like this:


SELECT SUM(F_MEASURE.AMOUNT_CHARGED), D_PRODUCT.NAME
FROM F_MEASURE INNER JOIN D_PRODUCT ON F_MEASURE.ID_PRODUCT = D_PRODUCT.ID
GROUP BY D_PRODUCT.NAME

Query analysis.

In this case, Clusterpoint processes every record in the FACT table. There are two operations it performs: first it does an inner join with the D_PRODUCT table and then it aggregates values. A common approach of relational databases is to use a hash join in this situation. Clusterpoint, however, takes advantage of its directly addressable array index. It stores the foreign key of D_PRODUCT table as its local doc id, which is used as the array index. Thus for Clusterpoint, joining the tables is as cheap as single main memory address dereferencing. As such, Clusterpoint can process up to 20 million records per CPU core per second.

Number of elements 10000000000
Number of elements per core 83333333
CPU-core time per element, sec 0,05
Query execution time, sec 4,167

Narrowly Constrained Aggregation

In this type of query, we aggregate over a constrained set of records. An example FACT record set serving as input for aggregation is on the order of hundreds of thousands of records.

An example query might look like this:


SELECT SUM(F_MEASURE.AMOUNT_CHARGED), D_CUSTOMER.CUSTOMER_SEGMENT
FROM F_MEASURE INNER JOIN D_CUSTOMER ON F_MEASURE.ID_CUSTOMER = D_CUSTOMER.ID
WHERE F_MEASURE.SERVICE_START = ‘True’
GROUP BY D_CUSTOMER.CUSTOMER_SEGMENT

Query analysis.

In this case, Clusterpoint performs an exact match on an index created on the SERVICE_START field. The exact match index contains a list of local doc ids that match the criteria, thus Clusterpoint spends computational cycles only on records contributing to the result set. The query execution time spent on processing items is negligible, so the total query time is dominated by the network overhead of communication from client to processing nodes and back.

Number of elements 100000
Number of elements per core 833
CPU-core time per element, sec 0,05
Query execution time, sec 0,000042

Narrowly Constrained Aggregation with Join

In this case we also aggregate over a constrained set of records, again on the order of hundreds of thousands. Unlike the previous case, the constraint is put on the joined DIMENSION table instead of the FACT table.

An example query might look like this:


SELECT SUM(F_MEASURE.AMOUNT_CHARGED), D_CUSTOMER.CUSTOMER_SEGMENT
FROM F_MEASURE INNER JOIN D_CUSTOMER ON F_MEASURE.ID_CUSTOMER = D_CUSTOMER.ID
WHERE D_CUSTOMER.AGE = 35
GROUP BY D_CUSTOMER.CUSTOMER_SEGMENT

Query analysis.

In this case, Clusterpoint has to do the join between the two tables even though the resulting set is small. Clusterpoint filters the entire FACT table against the hash set of IDs from the dimensions table that match the query parameters. The lookup against the hash set is relatively inexpensive, and we can process up to 80 million records per second.

Number of elements 10000000000
Number of elements per core 83333333
CPU-core time per element, sec 0,0125
Query execution time, sec 1,041667

Computational Queries

Here, instead of standard SQL we execute a JS/SQL query that does custom JavaScript calculation on each record.

An example query, which counts number of customer actions in a geographical grid might look like this:


function grid_cell(x, y) {
return Math.floor((x - x0) / x_step) + “ “ + Math.floor((y - y0) / y_step);
}

SELECT grid_cell(D_CUSTOMER.GPS_X, D_CUSTOMER_.GPS_Y) AS cell, COUNT(*)
FROM F_MEASURE INNER JOIN D_CUSTOMER ON F_MEASURE.ID_CUSTOMER = D_CUSTOMER.ID
WHERE F_MEASURE.DATE = ‘$today’
GROUP BY cell

Query Analysis.

For this type of query, the Clusterpoint query optimizer cannot use indexes directly to find the constrained records set or group key for aggregation. The V8 JavaScript engine is invoked on each record, and we are able to process 500,000 records per second per CPU core. However, the query also constrains the set of records to just a single day’s worth, cutting down the search space.

Number of elements 20000000
Number of elements per core 166667
CPU-core time per element, sec 2
Query execution time, sec 0,333

Extended Use Cases

The query examples above represent classical data warehouse workloads. However, Clusterpoint’s extensible data model also allows for advanced computational capabilities.

Unstructured Data - Text Search & Analysis

When using Clusterpoint as data warehouse platform, it is possible to mix textual/unstructured data with tabular/structured data. For example some DIMENSIONs might contain textual information such as textual descriptions of the products or legal documents associated with the sales events captured in the FACT tables. This allows to analyse events connected with products identified with advanced text retrieval or analysis.

Geospatial

Clusterpoint is excellent at handling geospatial data, because it builds two-dimensional indexes and provides built-in functions to locate objects within specific geographic shapes and use distance based calculations.

Extrapolation & Predictive Analytics

Clusterpoint extends SQL query language with JavaScript scripting capabilities. This allows arbitrary complex computations including predictive or regression models run right in the the database and next to the data. It is easy to create queries not only for historic data, but augment it with forecasts and evaluate different what-if scenarios.

Conclusion

Clusterpoint technology, together with a data model which distributes FACT tables among computational nodes and hyper-replicates dimension tables, allows effective real-time processing of data warehouse workloads for deployments handling up to a billion items per computational node of commodity hardware.

Take a look inside

Scalable computational cloud database.Top speed NoSQL engine with JS/SQL processing