Share

Building Analytics Stack with Apache Arrow

[ad_1]

This article kicks off a series about our Analytics Stack (FlexQuery) and the Longbow engine powering it.

Have you ever implemented a bunch of data services interacting with clients and communicating with each other? Has maintenance become a huge burden? Do you have to write boilerplate too often? Is performance a concern? Or, have you ever needed to embed a custom data service into a third-party platform?

Imagine you implement a nice prediction algorithm and you seamlessly embed it to a BI platform so end users can consume it. Sci-fi? Well, for us it’s not!

Building and maintaining data services, especially for analytics platforms, is often challenging – from performance issues to complex integrations with third-party platforms. To address these, we created a framework designed to significantly boost developer efficiency and ease the integration of data services, leveraging cutting-edge technologies like Apache Arrow for maximal performance and reliability. We call it Longbow and it powers the heart of our Analytics Stack – FlexQuery.

Our journey, led by Lubomir (lupko) Slivka, aims to revolutionize GoodData’s analytics offerings, transforming our traditional BI platform into a robust Analytics Lake. This transformation was motivated by the need to modernize our stack, taking full advantage of open-source technologies and modern architectural principles to better integrate with cloud platforms.

Building Data Services is Hard!

Imagine you have decided to build a stack powering an analytics (BI) platform. You need to implement various data services for basic analytics use cases such as pivoting, machine learning, or caching.

All services share common needs, like:

  • Config management
  • Routing, Load balancing
  • Deployment flexibility, horizontal scaling
  • Multi-tenancy, resource limits

Ideally, you would like to clone a skeleton data service and implement only the service logic, e.g. pivoting, without handling the above. Ultimately you would like to allow 3rd parties to implement and embed their custom services. And that’s where we at GoodData currently are going and we want to offer it to external audiences.

Our Initial Motivation – GoodData 2.0

At GoodData, we build a Business Intelligence and Analytics platform. The heart of our platform is a set of services we call Analytics Lake – a set of components and services that are responsible for figuring out what to compute, query the data sources, post-process and possibly cache the results, and finally serve the results to consumers.

We built the first version of our platform more than fifteen years ago, with the technology available at that time; we combined it with large amounts of our own proprietary code.

As we embarked on building our new GoodData Cloud solution (you may call it GoodData 2.0), we decided to rebuild and refresh our somewhat-outdated stack so that it takes advantage of the available open-source technologies, and modern architectural approaches and can fit well into public cloud platforms.

Very early in our new journey, we discovered the Apache Arrow project and realized that it provides a very strong foundation that we could use when building our new stack. Later on, we got excited about the DuckDB database and partnered with MotherDuck.

We also realized that there is demand from our prospects to embed custom services and that is why, from the very beginning, we had this use case in mind while designing the architecture.

Anatomy of an Analytics Lake

GoodData strongly believes in semantic modeling for BI and analytics – and our new stack adopted this mindset as well. This layer acts as an indirection between two key components:

  • The logical model used for creating and consuming analytics solutions.
  • The physical model of the actual data that analytics are derived from.

We often compare this to Object-Relational Mapping (ORM) in programming – where the object model is mapped to a physical relational model.

To provide this indirection, our stack includes a specialized query engine, which is built upon the Apache Calcite project. This engine plays a crucial role in translating the semantic model and user requests into a physical execution plan.

However, how does one realize such a physical execution plan? Well, that is where our brand-new engine (Longbow), comes into play. It powers the Analytics Stack we call FlexQuery, which is responsible for:

  • Data sources querying and pre-aggregations: Executing queries and preparing data.
  • Data post-processing: Enhancing data through pivoting, sorting, merging, or applying ML.
  • Caching: Storing results and pre-aggregations to reduce latency.

And this is where the Apache Arrow and other projects in the open-source data and analytics ecosystem come into play.

GoodData’s FlexQuery (powered by Longbow) in the context of the whole BI platform.
GoodData’s FlexQuery (powered by Longbow) in the context of the whole BI platform.

Why we chose the Apache Arrow

Usually, when we mention the ‘Arrow’ to someone, they know about the perfect data format for analytics because it’s columnar and in-memory.

In fact, the Apache Arrow project is way bigger and more robust than ‘just’ the data format itself and offers among others:

  • Efficient I/O operations for file systems like S3 or GCS.
  • Converters between various formats, including CSV and Parquet.
  • JDBC result set converters.
  • Streaming computation capabilities with the Acero engine.
  • Flight RPC for creating data services.
  • Arrow Database Connectivity (ADBC) for standardized database interactions.

So in a nutshell, going with Arrow in our analytics stack allows us to streamline lower-level technicalities, enabling us to focus on delivering our added value.

Now, where things get even more interesting is integration with other open-source technologies further enriching our stack’s capabilities:

  • Pandas: Utilized for data processing and exporting (with native Arrow data support). We are also exploring alternatives like Polars.
  • DuckDB: An in-process OLAP database for SQL queries on pre-aggregations.
  • turbodbc: Provides efficient Arrow format result sets.

All of them have very efficient integration with Arrow and can work with the Arrow data almost seamlessly, without too much friction.

What we set out to build

Our decision to develop our stack around Arrow Flight RPC led us to build it as a suite of data services tailored to specific needs:

  • Data Querying Services: For accessing various data sources.
  • Post-Processing Services: Utilizing dataframe operations to refine queried results.
  • Caching Services: For storing both results and intermediate data.

We expected the future addition of services for storing pre-aggregations and utilizing SQL for queries on cached or pre-aggregated data.

Arrow Flight RPC allows our analytics stack’s ‘physical’ components to expose a consistent API for higher-level components, such as the query engine and orchestrators, enhancing the functionality and value provided.

The idea is, that if we focus on creating well-granulated services with well-defined responsibilities that, despite initial cost, it will pay out in reusability and flexibility. This design principle supports the introduction of new product features through the orchestration layer’s interaction with various data services.

As our design evolved, we saw opportunities to boost the system cohesion through Arrow Flight RPC, enabling services to cooperate on advanced functions like transparent caching.

This flexibility also means that you have more ways to design services. Let’s showcase this on two approaches to caching:

  1. Orchestrating calls: You orchestrate data service calls (e.g., pivoting) with a cache service call. Typically, in most cases, you would chain a cache service call because in analytics most data services do the heavy lifting. So it makes sense to cache most results.
  2. Reusable service template: You can also define the caching in the template, so when a developer calls a data service (e.g., an SQL execution), the caching is managed transparently under the hood – if the result is already cached, it is reused; otherwise, the data source is queried.

Obviously, in the case of caching, we opted for the second option. The beauty of the FlexQuery ecosystem is its flexibility. So you can design in both ways based on the use case. And if you change your mind, you can effortlessly migrate from one approach to another.

So in the end, we set out on the path to build a cohesive system of data services that implement the Arrow Flight RPC and provide foundational capabilities in our stack.

From Developers to Developers

Developer velocity is very important. I mean – you can provide a most sophisticated platform to developers but if it takes months to onboard and weeks to build a new simple data service, it’s useless.

That’s why we focused on this aspect and provided a skeleton module, which developers can clone and develop only the logic of the data service itself, without any boiler-plating.

Now, imagine you need to execute custom (analytics) SQL queries on top of multiple data sources. Sounds hard? That’s why we integrated DuckDB in-process OLAP engine into our data service framework.

What does our data service look like?  Each service within our FlexQuery ecosystem is designed to be versatile, accommodating various data storage scenarios—whether you have data in CSV files, databases, or already cached, our service is equipped to manage it efficiently.

Let’s illustrate our commitment to facilitating straightforward and efficient development processes within our ecosystem.

To do that, let’s look at  the process of executing the infamous TPCH query #4:

tpch_q4 = """
select o_orderpriority, count(*) as order_count
from orders
where
o_orderdate >= date '1993-07-01'
and o_orderdate < date '1993-07-01' + interval '3' month
-- not a part of TPCH but demonstrates multi-tenancy
and o_tenant="tenant1"
and exists (
select 1 from lineitem
where l_orderkey = o_orderkey and l_commitdate < l_receiptdate
)
group by o_orderpriority order by o_orderpriority
"""

And now the execution:

import quiver_core.api as qc
from orjson import orjson
from quiver_connector import ConnectorQuery
from quiver_sql_query.service.sql_query import SqlQuery, TableData

tpch4_pqtrim = SqlQuery(
sql=tpch_q4,
tables=(
TableData(
table_name="orders",
data=ConnectorQuery(
payload=orjson.dumps(
{
"type": "parquet-file", "path": "tpch/orders.parquet",
"columns": ["o_orderpriority", "o_orderdate", "o_orderkey"],
}
),
sink_method=qc.SinkToFlightPath(
flight_path="org1/tenant1/tpch/datasets/orders", skip_if_exists=True
),
).to_flight_descriptor(ds_id="my-files"),
),
TableData(
table_name="lineitem",
data=ConnectorQuery(
payload=orjson.dumps(
{
"type": "parquet-file", "path": "tpch/lineitem.parquet",
"columns": ["l_orderkey", "l_commitdate", "l_receiptdate"],
}
),
).to_flight_descriptor(ds_id="my-files"),
),
),
sink_method=qc.SinkToFlightPath(
flight_path="org1/tenant1/tpch/reports/4.sql", skip_if_exists=True
),
)

q = qc.QuiverClient("grpc://localhost:16004")

with q.flight_descriptor(tpch4_pqtrim.to_flight_descriptor()) as stream:
result = stream.read_all()

In this scenario, we execute a query using two parquet files from the ‘my-s3’ data source, which references an AWS S3 bucket. This setup demonstrates the flexibility of our system, as developers can designate distinct data sources for each TableData.

The beauty of it boils down to three points:

  • Decoupled Data Connectors: The TableData connectors are independent of the SQL execution service, allowing you to “hot-swap” data sources.
  • Efficient Caching: Both TableData and the query results can be stored in FlexQuery’s caching system (FlexCache – Arrow format). The system intelligently reuses the cache when available, eliminating the need for manual developer intervention.
  • Dynamic Data Handling: The system can trim columns on the fly and supports predicate pushdown for optimization.

And now let’s review how such a data service is implemented.The entry point of each service looks like this:

def create_task(
self, cmd_envelope: qc.FlightCmdEnvelope, headers: qm.FlightRpcHeaders
) -> Task[SqlQuery]:
sql_query = SqlQuery.from_bytes(cmd_envelope.cmd.payload)
return SqlQueryTask(internal_api=self, cmd=sql_query)

The entry point returns an instance of a class whose run method implements the heavy lifting.

For the sake of simplicity, let’s get the DuckDB instance in a service. Interested in creating it from scratch? Watch GoodData channels, we are going to publish more detailed articles!

acquire_start = time.perf_counter()
instance = self._api.acquire_duckdb_instance(
timeout=self._task_config.acquire_duckdb_timeout
)
acquire_duration = time.perf_counter() - acquire_start
self._logger.debug("duckdb_acquired", duration=acquire_duration)
SqlQueryModuleMetrics.ENGINE_ACQUIRE_DURATION.observe(acquire_duration)

There is an API for acquiring DuckDB instances, easy. We also provide a system of metrics for good maintenance(at GoodData we populate them to Prometheus).

But how is TableData populated to the DuckDB?


poll_result = self._api.flight_io.poll_flight_info(
descriptor=self._descriptor,
timeout=self._task_config.gather_poll_timeout,
check_cancel_fun=_check_cancel_fun,
options=self._call_options,
debug_name=debug_name,
)


flight_data = poll_result.do_get(pushdown)

with flight_data as reader:
cursor = instance.cursor()
try:

result = cursor.execute(
f'CREATE TABLE "{self._table_name}" AS SELECT * FROM reader'
).fetchone()
finally:
cursor.close()

Have you noticed how I talked about starting/handling a server in this chapter? Load balancing? Scaling? No? That’s the beauty of FlexQuery. Internal developers focus on the logic of a new data service they want to embed. External developers just declare complex requests even chaining multiple services together and benefit from results.

Want to learn more?

As we mentioned in the introduction, this is the first part of a series of articles, where we take you on a journey of how we built our new analytics stack on top of Apache Arrow and what we learned about it in the process.

Other parts of the series are about putting the Longbow into the context of Flexquery, details about the flexible storage and caching, and last but not least, how good the DuckDB quacks with Apache Arrow!

As you can see in the article, we are opening our platform to an external audience. We not only use (and contribute to) state-of-the-art open-source projects, but we also want to allow external developers to deploy their services into our platform. Ultimately we are thinking about open source the whole analytics stack. Would you be interested in such open-sourcing? Let us know, your opinion matters!

If you’d like to discuss our analytics stack (or anything else), feel free to join our Slack community!

Want to see how well it all works in practice, you can try the GoodData free trial! Or if you’d like to try our new experimental features enabled by this new approach (AI, Machine Learning and much more), feel free to sign up for our Labs Environment.

[ad_2]

You may also like...

Leave a Reply

Your email address will not be published. Required fields are marked *