Archives November 2022

Building advanced Beam pipelines in Scala with SCIO

Building advanced Beam pipelines in Scala with SCIO

Apache Beam is an open source, unified programming model with a set of language-specific SDKs for defining and executing data processing workflows. Scio, pronounced shee-o, is Scala API for Beam developed by Spotify to build both Batch and Streaming pipelines.

In this blog we will uncover the need for SCIO and a few reference patterns.

Why Scio

SCIO provides high level abstraction for developers and is preferred for following reasons:

Striking balance between concise and performance. Pipeline written in Scala are concise compared to java with similar performance

Easier migration for Scalding/Spark developers due to similar semantics compared to Beam API thereby avoiding a steep learning curve for developers.

Enables access to a large ecosystem of infrastructure libraries in Java e.g. Hadoop, Avro, Parquet and high level numerical processing libraries in Scala like Algebirdand Breeze.

Supports Interactive exploration of data and code snippets using SCIO REPL

Reference Patterns

 Let us checkout few concepts along with examples: 

1. Graph Composition

If you have a complex pipeline consisting of several transforms, the feasible approach is to compose the logically related transforms into blocks.  This would make it easy to manage and debug the graph rendered on dataflow UI. Let us consider an example using popular WordCount pipeline.

 Fig:  Word Count Pipeline without Graph Composition

 Let us modify the code to group the related transforms into blocks:

Fig:  Word Count Pipeline with Graph Composition

2. Distributed Cache

Distributed Cache allows to load the data from a given URI on workers and use the corresponding data across all tasks (DoFn’s) executing on the worker. Some of the common use cases are loading serialized machine learning model from object stores like Google Cloud Storage for running predictions,  lookup data references etc.

Let us checkout an example that loads lookup data from CSV file on worker during initialization and utilizes to count the number of matching lookups for each input element.

Fig:  Example demonstrating Distribute Cache

3. Scio Joins

Joins in Beam are expressed using CoGroupByKey  while Scio allows to express various join types like inner, left outer and full outer joins through flattening the CoGbkResult

Hash joins (syntactic sugar over a beam side input) can be used, if one of the dataset is extremely small (max ~1GB) by representing a smaller dataset on the right hand side. Side inputs are small, in-memory data structures that are replicated to all workers and avoids shuffling. 

MultiJoin can be used to join up to 22 data sets. It is recommended that all data sets be ordered in descending size, because non-shuffle joins do require the largest data sets to be on the left of any chain of operators 

Sparse Joins can be used for cases where the left collection (LHS) is much larger than the right collection (RHS) that cannot fit in memory but contains a sparse intersection of keys matching with the left collection .  Sparse Joins are implemented by constructing a Bloom filter of keys from the right collection and split the left side collection into 2 partitions. Only the partition with keys in the filter go through the join and the rest are either concatenated (i.e Outer join) or discarded (Inner join). Sparse Join is especially useful for joining historical aggregates with incremental updates.

Skewed Joins are a more appropriate choice for cases where the left collection (LHS) is much larger and contains hotkeys.  Skewed join uses Count Mink Sketch which is a probabilistic data structure to count the frequency of keys in the LHS collection.  LHS is partitioned into Hot and chill partitions.  While the Hot partition is joined with corresponding keys on RHS using a Hash join, chill partition uses a regular join and finally both the partitions are combined through union operation.

Fig:  Example demonstrating Scio Joins

Note that while using Beam Java SDK you can also take advantage of some of the similar join abstractions using Join Library extension

4. AlgeBird Aggregators and SemiGroup

Algebird is Twitter’s abstract algebra library containing several reusable modules for parallel aggregation and approximation. Algebird Aggregator or Semigroup can be used with aggregate and sum transforms on SCollection[T] or aggregateByKey and sumByKey transforms on SCollection[(K, V)].  Below example illustrates computing parallel aggregation on customer orders and composition of result into OrderMetrics class

Fig:  Example demonstrating Algebird Aggregators

 Below code snippet expands on previous example and demonstrates the SemiGroup for aggregation of objects by combining fields.

Fig:  Example demonstrating Algebird SemiGroup

5. GroupMap and GroupMapReduce

GroupMap can be used as a replacement of groupBy(key) + mapValues(_.map(func)) or _.map(e  => kv.of(keyfunc, valuefunc)) + groupBy(key)

Let us consider the below example that calculates the length of words for each type. Instead of grouping by each type and applying length function, the GroupMap allows combining these operations by applying keyfunc and valuefunc.

 Fig:  Example demonstrating GroupMap

GroupMapReduce  can be used to derive the key and apply the associative operation on the values associated with each key. The associative function is performed locally on each mapper similarly to a “combiner” in MapReduce (aka combiner lifting) before sending the results to the reducer.  This is equivalent to keyBy(keyfunc) + reduceByKey(reducefunc) 

Let us consider the below example that calculates the cumulative sum of odd and even numbers in a given range.  In this case individual values are combined on each worker and the local results are aggregated to calculate the final result

Fig:  Example demonstrating GroupMapReduce

Conclusion

Thanks for reading and I hope now you are motivated to learn more about SCIO.  Beyond the patterns covered above, SCIO contains several interesting features likeimplicit coders for Scala case classes,  Chaining jobs using I/O Taps , Distinct Count using HyperLogLog++ , Writing sorted output to files etc.  Several use case specific libraries like BigDiffy (comparison of large datasets) , FeaTran (used for ML Feature Engineering) were also built on top of SCIO. 

For Beam lovers with Scala background, SCIO is the perfect recipe for building complex distributed data pipelines.

Source : Data Analytics Read More

Real-time Data Integration from Oracle to Google BigQuery Using Striim

Real-time Data Integration from Oracle to Google BigQuery Using Striim

Editor’s notes: In this guest blog, we have the pleasure of inviting Alok Pareek, Founder & EVP Products, at Striim to share latest experimental results from a performance study on real-time data integration from Oracle to Google Cloud BigQuery using Striim. 

Relational databases like Oracle are designed to store data, but they aren’t well suited for supporting analytics at scale. Google Cloud BigQuery is a serverless, scalable cloud data warehouse that is ideal for analytics use cases. To ensure timely and accurate analytics, it is essential to be able to continuously move data streams to BigQuery with minimal latency. 

The best way to stream data from databases to BigQuery is through log-based Change Data Capture(CDC). Log-based CDC works by directly reading the transaction logs to collect DML operations, such as inserts, updates, and deletes. Unlike other CDC methods, log-based CDC provides a non-intrusive approach to streaming database changes that puts minimal load on the database.

Striim — a unified real-time data integration and streaming platform — comes with out-of-the-box log-based CDC readers that can move data from various databases (including Oracle) to BigQuery in real-time. Striim enables teams to act on data quickly, producing new insights, supporting optimal customer experiences, and driving innovation. 

In this blog post, we will outline experimental results cited in Striim’s recent white paper, Real-Time Data Integration from Oracle to Google BigQuery: A Performance Study. 

Building a Data Pipeline from Oracle to Google BigQuery with Striim: Components

We used the following components to build a data pipeline to move data between an Oracle database to BigQuery in real time:

Oracle CDC Adapters

A Striim adapter is a process that connects the Striim platform to a specific type of external application or file. Adapters enable various data sources to be connected to target systems with streaming data pipelines for real-time data integration.

Striim comes with two Oracle CDC adapters to help manage different workloads.

LogMiner-based Oracle CDC Reader uses Oracle LogMiner to ingest database changes on the server side and replicate them to the streaming platform. This adapter is ideal for low and medium workloads.

OJet adapter uses a high-performance log mining API to support high volumes of database changes on the source and replicate them in real time.   This adaptor is ideal for high volume high throughput CDC workloads.

With two types of Oracle adapters to choose from, when is it advisable to use one over the other?

Our results show that if your DB workload profile is between 20GB and 80GB of CDC data per hour, the LogMiner based Oracle CDC reader is a good choice. If you work with a higher amount of data, then the OJet adapter is better; currently, it’s the fastest Oracle CDC Reader available. Here’s a table and chart that shows the latency (read-lag)  for both adapters:

BigQuery Writer

Striim’s BigQuery Writer is designed to save time and storage; it takes advantage of partitioned tables on the target BigQuery system and supports partition pruning in its merge queries. 

Database Workload

For our experiment, we used a custom-built, high-scale database workload simulation. This workload, SwingerMultiOps, is based on Swingbench — a popular workload for Oracle databases. It’s a multithreaded JDBC (Java Database Connectivity) application that generates concurrent DB sessions against the source database. We took the Order Entry (OE) schema of the Swingbench workload. In SwingerMultiOps, we continued to add more tables until we reached a total of 50 tables. Each of these tables comprised of  varying data types.

Building the Data Pipeline: Steps

We built the data pipeline for our experiment following these steps:

1. Configure the source database and profile the workload

Striim’s Oracle adapters connect to Oracle server instances to mine for redo data. Therefore it’s important to have the source database instance tuned for optimum redo mining performance. Here’s what you need to keep in mind about the configuration:

Profile the DB workload to measure the load it generates on the source database

Redo log sizes to a reasonably large value of 2G per log group

For the OJet adapter, set a large size for the DB streams_pool_size to mine redo as quickly as possible

For an extremely high CDC data rate of around 150 Gb/hour, set streams_pool_size to 4G

2. Configure the Oracle adapter

For both adapters, default settings are enough to get started. The only configuration required is to set the DB endpoints to read data from the source database. Based on your need, you can use Striim to perform any of the following:

Handle large transactions

Read and write data to a downstream database

Mine from a specific SCN or timestamp

Regardless of which Oracle adapter you choose, only one adapter is needed to collect all data streams from the source database. This practice helps to cut the overhead incurred by both adapters.

3. Configure the BigQuery Writer

Use BigQuery Writer to configure how your data moves from source to database. For instance, you can set your writers to work with a specified dataset to move large amounts of data in parallel.

For performance improvement, you can use multiple BigQuery writers to integrate incoming data in parallel. Using a router ensures that events are distributed such that a single event isn’t sent to multiple writers.

Tuning the number of writers and their properties helps to ensure that data is moved from Oracle to BigQuery in real time. Since we’re dealing with large volumes of incoming streams, we configure 20 BigQuery Writers in our experiment. There are many other BigQuery Writer properties that can help you to move and control data. You can learn about them in detail here.

How to Execute the Striim App and Analyze Results

We used a Google BigQuery dataset to run our data integration infrastructure. We performed the following tasks to run our simulation and capture results for analysis:

Start the Striim app on the Striim server

Start monitoring our app components using the Tungsten Console by passing a simple script

Start the Database Workload

Capture all DB events in the Striim app, and let the app commit all incoming data to the BigQuery target

Analyze the app performance

The Striim UI image below shows our app running on the Striim server. From this UI, we can monitor the app throughput and latency in real time.

Results Analysis: Comparing the Performance of two Oracle Readers

At the end of the DB workload run, we looked at our captured performance data and analyzed the performance. Details are tabulated below for each of the source adapter types.

The charts below show how the CDC reader lag varies with the input rate as the workload progresses on the DB server.

Lag chart for Oracle Reader:

Lag chart for OJet Reader:

Use Striim to Move Data in Real Time to Google Cloud BigQuery

This experiment showed how to use Striim to move large amounts of data in real time from Oracle to BigQuery. Striim offers two high-performance Oracle CDC readers to support data streaming from Oracle databases. We demonstrated that Striim’s OJet Oracle reader is optimal for larger workloads, as measured by read-lag, end-to-end lag, and CPU and memory utilization. For smaller workloads, Striim’s LogMiner-based Oracle reader offers excellent performance. For more in-depth information, please refer to the white paper or contact Striim directly.

Source : Data Analytics Read More