Post

Data Engineering Fundamentals

Reference

Chapter 3 - Designing Machine Learning Systems by Chip Huyen

Introduction

The rise of ML in recent years is tightly coupled with the rise of big data.

Large data systems, even without ML, are complex. If you haven’t spent years and years working with them, it’s easy to get lost in acronyms. There are many challenges and possible solutions that these systems generate. Industry standards, if there are any, evolve quickly as new tools come out and the needs of the industry expand, creating a dynamic and ever-changing environment. If you look into the data stack for different tech companies, it might seem like each is doing its own thing.

Knowing how to collect, process, store, retrieve, and process an increasingly growing amount of data is essential to people who want to build ML systems in production.

Data Sources

An ML system can work with data from many different sources. They have different characteristics, can be used for different purposes, and require different processing methods. Understanding the sources your data comes from can help you use your data more efficiently.

User input data

One source is user input data, data explicitly input by users. User input can be text, images, videos, uploaded files, etc.

If it’s even remotely possible for users to input wrong data, they are going to do it. As a result, user input data can be easily malformatted. User input data requires more heavy-duty checking and processing.

On top of that, users also have little patience. In most cases, when we input data, we expect to get results back immediately. Therefore, user input data tends to require fast processing.

System generated data

This is the data generated by different components of your systems, which include various types of logs and system outputs such as model predictions.

These types of logs provide visibility into how the system is doing. The main purpose of this visibility is for debugging and potentially improving the application.

Because logs are system generated, they are much less likely to be malformatted the way user input data is. Overall, logs don’t need to be processed as soon as they arrive, the way you would want to process user input data.

Note: The system also generates data to record users’ behaviors, such as clicking, choosing a suggestion, scrolling, zooming, ignoring a pop-up, or spending an unusual amount of time on certain pages. Even though this is system-generated data, it’s still considered part of user data and might be subject to privacy regulations.

Internal databases

There are also internal databases, generated by various services and enterprise applications in a company. These databases manage their assets such as inventory, customer relationship, users, and more. This kind of data can be used by ML models directly or by various components of an ML system.

Third-party data

First-party data is the data that your company already collects about your users or customers. Second-party data is the data collected by another company on their own customers that they make available to you, though you’ll probably have to pay for it. Third-party data companies collect data on the public who aren’t their direct customers.

Data Formats

Once you have data, you might want to store it (or “persist” it, in technical terms). Since your data comes from multiple sources with different access patterns,6 storing your data isn’t always straightforward and, for some cases, can be costly. It’s important to think about how the data will be used in the future so that the format you use will make sense.

Here are some of the questions you might want to consider:

  • How do I store multimodal data, e.g., a sample that might contain both images and texts?
  • Where do I store my data so that it’s cheap and still fast to access?
  • How do I store complex models so that they can be loaded and run correctly on different hardware?

The process of converting a data structure or object state into a format that can be stored or transmitted and reconstructed later is data serialization.

FormatBinary/TextHuman-readableExample use cases
JSONTextYesEverywhere
CSVTextYesEverywhere
ParquetBinaryNoHadoop, Amazon Redshift
AvroBinary primaryNoHadoop
ProtobufBinary primaryNoGoogle, TensorFlow (TFRecord)
PickleBinaryNoPython, PyTorch serialization

JSON

JSON, JavaScript Object Notation, is everywhere. Even though it was derived from JavaScript, it’s language-independent—most modern programming languages can generate and parse JSON. It’s human-readable. Its key-value pair paradigm is simple but powerful, capable of handling data of different levels of structuredness. For example, your data can be stored in a structured format like the following:

1
2
3
4
5
6
7
8
9
10
11
{
  "firstName": "Boatie",
  "lastName": "McBoatFace",
  "isVibing": true,
  "age": 12,
  "address": {
    "streetAddress": "12 Ocean Drive",
    "city": "Port Royal",
    "postalCode": "10021-3100"
  }
}

Once you’ve committed the data in your JSON files to a schema, it’s pretty painful to retrospectively go back to change the schema. Also JSON files are text files, which means they take up a lot of space.

Row-Major Versus Column-Major Format

The two formats that are common and represent two distinct paradigms are CSV and Parquet. CSV (comma-separated values) is row-major, which means consecutive elements in a row are stored next to each other in memory. Parquet is column-major, which means consecutive elements in a column are stored next to each other.

Column-major formats allow flexible column-based reads, especially if your data is large with thousands, if not millions, of features. Consider if you have data about ride-sharing transactions that has 1,000 features but you only want 4 features: time, location, distance, price. With column-major formats, you can read the four columns corresponding to these four features directly. However, with row-major formats, if you don’t know the sizes of the rows, you will have to read in all columns then filter down to these four columns. Even if you know the sizes of the rows, it can still be slow as you’ll have to jump around the memory, unable to take advantage of caching.

Row-major formats allow faster data writes. Consider the situation when you have to keep adding new individual examples to your data. For each individual example, it’d be much faster to write it to a file where your data is already in a row-major format.

Numpy versus Pandas

One subtle point that a lot of people don’t pay attention to, which leads to misuses of pandas, is that this library is built around the columnar format.

Text Versus Binary Format

Text files are files that are in plain text, which usually means they are human-readable. Binary files are the catchall that refers to all nontext files.

Binary files are more compact. Here’s a simple example to show how binary files can save space compared to text files. Consider that you want to store the number 1000000. If you store it in a text file, it’ll require 7 characters, and if each character is 1 byte, it’ll require 7 bytes. If you store it in a binary file as int32, it’ll take only 32 bits or 4 bytes.

As an illustration, consider a file interviews.csv, which is a CSV file (text format) of 17,654 rows and 10 columns. When I converted it to a binary format (Parquet), the file size went from 14 MB to 6 MB, as shown in figure below. image

AWS recommends using the Parquet format because “the Parquet format is up to 2x faster to unload and consumes up to 6x less storage in Amazon S3, compared to text formats.”

Data Models

Data models describe how data is represented. Consider cars in the real world. In a database, a car can be described using its make, its model, its year, its color, and its price. These attributes make up a data model for cars. Alternatively, you can also describe a car using its owner, its license plate, and its history of registered addresses. This is another data model for cars.

How you choose to represent data not only affects the way your systems are built, but also the problems your systems can solve. For example, the way you represent cars in the first data model makes it easier for people looking to buy cars, whereas the second data model makes it easier for police officers to track down criminals.

Relational model

In this model, data is organized into relations; each relation is a set of tuples. A table is an accepted visual representation of a relation, and each row of a table makes up a tuple.

Relations are unordered. You can shuffle the order of the rows or the order of the columns in a relation and it’s still the same relation. Data following the relational model is usually stored in file formats like CSV or Parquet.

Once you’ve put data in your databases, you’ll want a way to retrieve it. The language that you can use to specify the data that you want from a database is called a query language. The most popular query language for relational databases today is SQL.

The most important thing to note about SQL is that it’s a declarative language, as opposed to Python, which is an imperative language. In the imperative paradigm, you specify the steps needed for an action and the computer executes these steps to return the outputs. In the declarative paradigm, you specify the outputs you want, and the computer figures out the steps needed to get you the queried outputs.

With certain added features, SQL can be Turing-complete, which means that, in theory, SQL can be used to solve any computation problem (without making any guarantee about the time or memory required). However, in practice, it’s not always easy to write a query to solve a specific task, and it’s not always feasible or tractable to execute a query.

Figuring out how to execute an arbitrary query is the hard part, which is the job of query optimizers. A query optimizer examines all possible ways to execute a query and finds the fastest way to do so. It’s possible to use ML to improve query optimizers based on learning from incoming queries. Query optimization is one of the most challenging problems in database systems.

NoSQL

The relational data model has been able to generalize to a lot of use cases, from ecommerce to finance to social networks. However, for certain use cases, this model can be restrictive. For example, it demands that your data follows a strict schema, and schema management is painful. In a survey by Couchbase in 2014, frustration with schema management was the #1 reason for the adoption of their nonrelational database.

The latest movement against the relational data model is NoSQL. Originally started as a hashtag for a meetup to discuss nonrelational databases, NoSQL has been retroactively reinterpreted as Not Only SQL, as many NoSQL data systems also support relational models.

Document Model

A document is often a single continuous string, encoded as JSON, XML, or a binary format like BSON (Binary JSON). All documents in a document database are assumed to be encoded in the same format. Each document has a unique key that represents that document, which can be used to retrieve it.

A collection of documents could be considered analogous to a table in a relational database, and a document analogous to a row. In fact, you can convert a relation into a collection of documents that way.

Because the document model doesn’t enforce a schema, it’s often referred to as schemaless. This is misleading because, as discussed previously, data stored in documents will be read later. The application that reads the documents usually assumes some kind of structure of the documents. Document databases just shift the responsibility of assuming structures from the application that writes the data to the application that reads the data.

Graph Model

A database that uses graph structures to store its data is called a graph database. If in document databases, the content of each document is the priority, then in graph databases, the relationships between data items are the priority.

Because the relationships are modeled explicitly in graph models, it’s faster to retrieve data based on relationships. Consider an example of a graph database in figure below. The data from this example could potentially come from a simple social network. In this graph, nodes can be of different data types: person, city, country, company, etc.

image

Imagine you want to find everyone who was born in the USA. Given this graph, you can start from the node USA and traverse the graph following the edges “within” and “born_in” to find all the nodes of the type “person.” Now, imagine that instead of using the graph model to represent this data, we use the relational model. There’d be no easy way to write an SQL query to find everyone who was born in the USA, especially given that there are an unknown number of hops between country and person—there are three hops between Zhenzhong Xu and USA while there are only two hops between Chloe He and USA. Similarly, there’d be no easy way for this type of query with a document database.

Structured Versus Unstructured Data

Structured data follows a predefined data model, also known as a data schema. The disadvantage of structured data is that you have to commit your data to a predefined schema. If your schema changes, you’ll have to retrospectively update all your data, often causing mysterious bugs in the process.

Because business requirements change over time, committing to a predefined data schema can become too restricting. Or you might have data from multiple data sources that are beyond your control, and it’s impossible to make them follow the same schema. This is where unstructured data becomes appealing. Unstructured data doesn’t adhere to a predefined data schema. It’s usually text but can also be numbers, dates, images, audio, etc. For example, a text file of logs generated by your ML model is unstructured data.

A repository for storing structured data is called a data warehouse. A repository for storing unstructured data is called a data lake. Data lakes are usually used to store raw data before processing. Data warehouses are used to store data that has been processed into formats ready to be used.

Structured dataUnstructured data
Schema clearly definedData doesn’t have to follow a schema
Easy to search and analyzeFast arrival
Can only handle data with a specific schemaCan handle data from any source
Schema changes will cause a lot of troublesNo need to worry about schema changes (yet), as the worry is shifted to the downstream applications that use this data
Stored in data warehousesStored in data lakes

Data Storage Engines and Processing

Data formats and data models specify the interface for how users can store and retrieve data. Storage engines, also known as databases, are the implementation of how data is stored and retrieved on machines. It’s useful to understand different types of databases as your team or your adjacent team might need to select a database appropriate for your application.

Transactional and Analytical Processing

Traditionally, a transaction refers to the action of buying or selling something. In the digital world, a transaction refers to any kind of action: tweeting, ordering a ride through a ride-sharing service, uploading a new model, watching a YouTube video, and so on. Though the data is of different types the transactions are inserted as they are generated, and occasionally updated when something changes, or deleted when they are no longer needed. This type of processing is known as online transaction processing (OLTP).

Because these transactions often involve users, they need to be processed fast (low latency) so that they don’t keep users waiting. The processing method needs to have high availability—that is, the processing system needs to be available any time a user wants to make a transaction.

Transactional databases are designed to process online transactions and satisfy the low latency, high availability requirements. When people hear transactional databases, they usually think of ACID (atomicity, consistency, isolation, durability).

However, transactional databases don’t necessarily need to be ACID, and some developers find ACID to be too restrictive. According to Martin Kleppmann, “systems that do not meet the ACID criteria are sometimes called BASE, which stands for Basically Available, Soft state, and Eventual consistency.

Because each transaction is often processed as a unit separately from other transactions, transactional databases are often row-major. This also means that transactional databases might not be efficient for questions such as “What’s the average price for all the rides in September in San Francisco?” Analytical databases are designed for this purpose. They are efficient with queries that allow you to look at data from different viewpoints. We call this type of processing online analytical processing (OLAP).

However, both the terms OLTP and OLAP have become outdated as of 2021 due to the fact that in the traditional OLTP or OLAP paradigms, storage and processing are tightly coupled — how data is stored is also how data is processed. An interesting paradigm in the last decade has been to decouple storage from processing (also known as compute), as adopted by many data vendors including Google’s BigQuery, Snowflake, IBM, and Teradata where the data can be stored in the same place, with a processing layer on top that can be optimized for different types of queries.

ETL: Extract, Transform, and Load

In the early days of the relational data model, data was mostly structured. When data is extracted from different sources, it’s first transformed into the desired format before being loaded into the target destination such as a database or a data warehouse. This process is called ETL, which stands for extract, transform, and load.

In the extracting phase, you need to validate your data and reject the data that doesn’t meet your requirements. Transform is the meaty part of the process, where most of the data processing is done. You might want to join data from multiple sources and clean it. You might want to standardize the value ranges and apply operations such as transposing, deduplicating, sorting, aggregating, deriving new features, more data validating, etc. Load is deciding how and how often to load your transformed data into the target destination which can be a file, a database, or a data warehouse.

The idea of ETL sounds simple but powerful, and it’s the underlying structure of the data layer at many organizations. image

When the internet first became ubiquitous and hardware had just become so much more powerful, collecting data suddenly became so much easier. The amount of data grew rapidly.

Finding it difficult to keep data structured, some companies had this idea: “Why not just store all data in a data lake so we don’t have to deal with schema changes? Whichever application needs data can just pull out raw data from there and process it.” This process of loading data into storage first then processing it later is sometimes called ELT (extract, load, transform). However, as data keeps on growing, this idea becomes less attractive. It’s inefficient to search through a massive amount of raw data for the data that you want.

As companies weigh the pros and cons of storing structured data versus storing unstructured data, vendors evolve to offer hybrid solutions that combine the flexibility of data lakes and the data management aspect of data warehouses. For example, Databricks and Snowflake both provide data lakehouse solutions.

Modes of Dataflow

When data is passed from one process to another, we say that the data flows from one process to another, which gives us a dataflow. There are three main modes of dataflow:

  • Data passing through databases
  • Data passing through services using requests such as the requests provided by REST and RPC APIs (e.g., POST/GET requests)
  • Data passing through a real-time transport like Apache Kafka and Amazon Kinesis

Data Passing Through Databases

For example, to pass data from process A to process B, process A can write that data into a database, and process B simply reads from that database.

This mode, however, doesn’t always work because of two reasons.

  • It requires that both processes must be able to access the same database which might be infeasible, especially if the two processes are run by two different companies.
  • It requires both processes to access data from databases, and read/write from databases can be slow, making it unsuitable for applications with strict latency requirements.

Data Passing Through Services

To pass data from process B to process A, process A first sends a request to process B that specifies the data A needs, and B returns the requested data through the same network. Because processes communicate through requests, we say that this is request-driven.

This mode of data passing is tightly coupled with the service-oriented architecture. A service is a process that can be accessed remotely, e.g., through a network. Two services in communication with each other can be run by different companies in different applications. Two services in communication with each other can also be parts of the same application.

The most popular styles of requests used for passing data through networks are REST (representational state transfer) and RPC (remote procedure call). REST seems to be the predominant style for public APIs. The main focus of RPC frameworks is on requests between services owned by the same organization, typically within the same data center.

Data Passing Through Real-Time Transport

Request-driven data passing is synchronous: the target service has to listen to the request for the request to go through. So instead of having services request data directly from each other and creating a web of complex interservice data passing, each service only has to communicate with the broker. Whichever service wants data can check with the broker instead.

Technically, a database can be a broker but reading and writing from databases are too slow for applications with strict latency requirements. Instead we use in-memory storage to broker data. Real-time transports can be thought of as in-memory storage for data passing among services.

A piece of data broadcast to a real-time transport is called an event. This architecture is, therefore, also called event-driven. A real-time transport is sometimes called an event bus.

Request-driven architecture works well for systems that rely more on logic than on data. Event-driven architecture works better for systems that are data-heavy.

The two most common types of real-time transports are:

PubSub model Any service can publish to different topics in a real-time transport, and any service that subscribes to a topic can read all the events in that topic. The services that produce data don’t care about what services consume their data. Pubsub solutions often have a retention policy—data will be retained in the real-time transport for a certain period of time (e.g., seven days) before being deleted or moved to a permanent storage (like Amazon S3).

Message Queue model In a message queue model, an event often has intended consumers (an event with intended consumers is called a message), and the message queue is responsible for getting the message to the right consumers.

Examples of pubsub solutions are Apache Kafka and Amazon Kinesis. Examples of message queues are Apache RocketMQ and RabbitMQ. Both paradigms have gained a lot of traction in the last few years.

Batch Processing Versus Stream Processing

Once your data arrives in data storage engines like databases, data lakes, or data warehouses, it becomes historical data. This is opposed to streaming data (data that is still streaming in). Historical data is often processed in batch jobs—jobs that are kicked off periodically.

When data is processed in batch jobs, we refer to it as batch processing. Batch processing has been a research subject for many decades, and companies have come up with distributed systems like MapReduce and Spark to process batch data efficiently.

When you have data in real-time transports like Apache Kafka and Amazon Kinesis, we say that you have streaming data. Stream processing refers to doing computation on streaming data. Computation on streaming data can also be kicked off periodically, but the periods are usually much shorter than the periods for batch jobs (e.g., every five minutes instead of every day). Computation on streaming data can also be kicked off whenever the need arises. For example, whenever a user requests a ride, you process your data stream to see what drivers are currently available.

Stream processing, when done right, can give low latency because you can process data as soon as data is generated, without having to first write it into databases.

Because batch processing happens much less frequently than stream processing, in ML, batch processing is usually used to compute features that change less often, such as drivers’ ratings

Stream processing is used to compute features that change quickly, such as how many drivers are available right now, how many rides have been requested in the last minute, how many rides will be finished in the next two minutes, the median price of the last 10 rides in this area, etc.

To do computation on data streams, you need a stream computation engine (the way Spark and MapReduce are batch computation engines). For ML systems that leverage streaming features, the streaming computation is rarely simple. The stream feature extraction logic can require complex queries with join and aggregation along different dimensions. To extract these features requires efficient stream processing engines. For this purpose, you might want to look into tools like Apache Flink, KSQL, and Spark Streaming.

Stream processing is more difficult because the data amount is unbounded and the data comes in at variable rates and speeds. It’s easier to make a stream processor do batch processing than to make a batch processor do stream processing.

Apache Flink’s core maintainers have been arguing for years that batch processing is a special case of stream processing.

This post is licensed under CC BY 4.0 by the author.