Use RunInference for Generative AI

Run in Google Colab View source on GitHub

This notebook shows how to use the Apache Beam RunInference transform for generative AI tasks. It uses a large language model (LLM) from the Hugging Face Model Hub.

This notebook demonstrates the following steps:

  • Load and save a model from the Hugging Face Model Hub.
  • Use the PyTorch model handler for RunInference.

For more information about using RunInference, see Get started with AI/ML pipelines in the Apache Beam documentation.

Install the Apache Beam SDK and dependencies

Use the following code to install the Apache Beam Python SDK, PyTorch, and Transformers.

pip install apache_beam[gcp]==2.48.0 pip install torch pip install transformers

Use the following code to import dependencies

import os import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import RunInference from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor import torch from transformers import AutoConfig from transformers import AutoModelForSeq2SeqLM from transformers import AutoTokenizer from transformers.tokenization_utils import PreTrainedTokenizer   MAX_RESPONSE_TOKENS = 256  model_name = "google/flan-t5-small" state_dict_path = "saved_model" 

Download and save the model

This notebook uses the auto classes from Hugging Face to instantly load the model in memory. Later, the model is saved to the path defined previously.

model = AutoModelForSeq2SeqLM.from_pretrained(         model_name, torch_dtype=torch.bfloat16     )  directory = os.path.dirname(state_dict_path) torch.save(model.state_dict(), state_dict_path) 

Define utility functions

The input and output for the google/flan-t5-small model are token tensors. These utility functions are used for the conversion of text to token tensors and then back to text.

def to_tensors(input_text: str, tokenizer) -> torch.Tensor:     """Encodes input text into token tensors.     Args:         input_text: Input text for the LLM model.         tokenizer: Tokenizer for the LLM model.     Returns: Tokenized input tokens.     """     return tokenizer(input_text, return_tensors="pt").input_ids[0]   def from_tensors(result: PredictionResult, tokenizer) -> str:     """Decodes output token tensors into text.     Args:         result: Prediction results from the RunInference transform.         tokenizer: Tokenizer for the LLM model.     Returns: The model's response as text.     """     output_tokens = result.inference     return tokenizer.decode(output_tokens, skip_special_tokens=True) 
# Load the tokenizer. tokenizer = AutoTokenizer.from_pretrained(model_name)  # Create an instance of the PyTorch model handler. model_handler = PytorchModelHandlerTensor(             state_dict_path=state_dict_path,             model_class=AutoModelForSeq2SeqLM.from_config,             model_params={"config": AutoConfig.from_pretrained(model_name)},             inference_fn=make_tensor_model_fn("generate"),             ) 

Run the Pipeline

example = ["translate English to Spanish: We are in New York City."]  pipeline = beam.Pipeline(options=PipelineOptions(save_main_session=True,pickle_library="cloudpickle"))  with pipeline as p:   _ = (           p           | "Create Examples" >> beam.Create(example)           | "To tensors" >> beam.Map(to_tensors, tokenizer)           | "RunInference"             >> RunInference(                 model_handler,                 inference_args={"max_new_tokens": MAX_RESPONSE_TOKENS},             )           | "From tensors" >> beam.Map(from_tensors, tokenizer)           | "Print" >> beam.Map(print)       ) 
 Estamos en Nueva York City.