06 Aug 2021

Subjects
Technical

The NLP pipelines at BenevolentAI process millions of scientific documents with billions of sentences in them. The machine learning algorithms often use a large proportion of the data for training and evaluation. Therefore, it’s crucial that we apply methods that allow this to be done efficiently.

Our machine learning driven target identification uses a range of models, often requiring different kinds and formats of the input data. This, combined with the data coming from various sources and in different shapes makes the preprocessing a key step in our pipeline. In our previous post we presented how we use Spark on k8s to scale our NLP processing. In this post, we look into using Spark as a way to speed up feature extraction and data preprocessing for ML models.

Data Preprocessing

Data preprocessing is an integral part of machine learning. In fact, often enough researchers spend more time in data related operations than in building the models. Before the data is fed into the model, it firstly needs to be properly cleaned, normalised and featurised. The quality and speed of the preprocessing can have an immense impact on the performance of the model, often exceeding the importance of model architecture itself. 

Depending on the use case, data preprocessing may involve a number of different steps, varying from handling missing values, to sampling and tokenising textual data. For each item in the dataset, such as a document, the series of steps that need to be performed are often the same and can be performed in parallel. To make this process fast, efficient and scalable, we heavily rely on Apache Spark.

Apache Spark is a framework that allows for quick data processing on large amounts of data across multiple computers (i.e. on a cluster). Spark is implemented in Scala and runs in JVM but has APIs for Java, Scala, Python, R and SparkSQL. Most ML software utilises Python so we mostly use Spark through the Python API. Spark provides a facility called User Defined Functions (UDF) to carry out non-JVM tasks for a DataFrame.

In contrast to other widely popular packages for data preprocessing such as Pandas, Spark runs operations in parallel on different nodes in the cluster.

Sentence preprocessing with BERT tokenizer using Spark

Machine learning models usually require a specific data format as input. One such model is BERT, which is a state-of-the-art language model that can be used for a range of NLP tasks. Below, we are going to show how we can use Spark to preprocess sentences, turning them from raw text to a format which can be fed directly into BERT.

First, let’s create a SparkSession. The way how you do this depends on the platform where Spark is run; you can find more details from the Spark deployment manual, or the code snippet below should work if you've installed the pyspark package from PyPi.

from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName('UDF Blog')
.master('local[*]')
.getOrCreate()
)

 

The next step is to create a DataFrame that contains the text we wish to tokenise. This can be read from data files using the spark.read module but in this case, we are creating a small sample dataset. The DataFrame contains a single column text and has three rows of type StringType.
 

from pyspark.sql.types import StringType, StructField, StructType
schema = StructType([
StructField('text', StringType())
])
df = spark.createDataFrame(
[
['The NLP pipelines at BenevolentAI process millions of scientific documents with billions of sentences in them.'],
['The machine learning algorithms often use a large proportion of the data for training and evaluation.'],
['Therefore, it’s crucial that we apply methods that allow this to be done efficiently.']
], schema=schema
)

Next, we’ll create the tokeniser using AutoTokeniser from the transformers package, that automatically downloads the tokeniser from the HuggingFace repository. It’s good practice to use Spark’s broadcast mechanism to pre-distribute large objects to executors.

from transformers import AutoTokenizer
tokeniser = AutoTokenizer.from_pretrained('bert-base-uncased')
broadcast_tokeniser = spark.sparkContext.broadcast(tokeniser)

 

To define a UDF, we need to define a normal Python function that takes a number of column value(s) as input and returns the output. The output schema needs to be specified using Spark’s data types.

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
from pyspark.broadcast import Broadcast
from functools import partial
from typing import List
encode_schema = StructType([
StructField('input_ids', ArrayType(IntegerType())),
StructField('token_type_ids', ArrayType(IntegerType())),
StructField('attention_mask', ArrayType(IntegerType())),
])
def encode_row(tokeniser: Broadcast, text: str) -> List[int]:
encoded_text = tokeniser.value.encode_plus(text)
return (encoded_text['input_ids'], encoded_text['token_type_ids'], encoded_text['attention_mask'])
encode_row_udf = udf(partial(encode_row, broadcast_tokeniser), encode_schema)

A few things require explanations in the above code:

  • A UDF needs to declare the return type, since the data will be serialised from python to JVM and both sides need to know what type of data is expected.
  • The function encode_row requires the tokeniser instance for tokenisation. The tokeniser is not part of the DataFrame but we can provide the tokeniser using the functools.partial method that’s pretty handy.
  • Since we use Spark’s broadcast mechanism, the actual tokeniser needs to be unwrapped via .value attribute.
  • There are other ways to declare UDFs but we prefer this since it allows us to directly test the python function without Spark.

Finally, we are ready to run the code and produce our tokenised output

from pyspark.sql.functions import col
tokenised_df = (
df
.withColumn('input', encode_row_udf(col('text')))
)

The output has a schema like this

root
|-- text: string (nullable = true)
|-- input: struct (nullable = true)
| |-- input_ids: array (nullable = true)
| | |-- element: integer (containsNull = true)
| |-- token_type_ids: array (nullable = true)
| | |-- element: integer (containsNull = true)
| |-- attention_mask: array (nullable = true)
| | |-- element: integer (containsNull = true)

You can then choose to further process the tokenised output or write to disk. We prefer Avro format for saving features since it preserves float precision and allows checking against schemas easily. The tooling for Avro in different programming languages is also very good.
 

(
tokenised_df
.write.format('avro').save('/tmp/output') # See https://spark.apache.org/docs/latest/sql-data-sources-avro.html
)

Note that Spark reads, processes and writes data in parallel. Therefore, you will most of the time get multiple files at the output. Your code should be able to handle this, although machine learning frameworks like PyTorch and Tensorflow both have support for reading multiple input files. You can also write your data in many other formats like CSV, JSON or Parquet.

Next steps

After our data is properly preprocessed, the next step is to load the data into a data loader. This is framework specific, so for example in PyTorch, this would be a DataLoader object. Once the data is loaded, we can use it to train our model. Although Spark supports scheduling tasks on GPUs, we currently prefer using Pytorch Lightning to run out training and prediction tasks.

Conclusion

Use of Spark allows us to focus on using features that would otherwise be difficult to develop, particularly when the model is trained with a lot of data. We showed in this blog that Spark allows flexible use of Spark DataFrame API and Python. We don’t need to think about the reading/writing or data aggregations since we can rely on Spark doing them much better than any custom code we might have in Python. 

The biggest advantage of Spark is that it allows us to process a large amount of training data in a matter of minutes. Leveraging distributed computing and parallel processing, it can handle massive datasets, helping us avoid processing everything sequentially, for example by using a for loop.


Back to blog post and videos