View on TensorFlow.org | Run in Google Colab | View source on GitHub | Download notebook |
Overview
This tutorial shows how to use BigQuery TensorFlow reader for training neural network using the Keras sequential API.
Dataset
This tutorial uses the United States Census Income Dataset provided by the UC Irvine Machine Learning Repository. This dataset contains information about people from a 1994 Census database, including age, education, marital status, occupation, and whether they make more than $50,000 a year.
Setup
Set up your GCP project
The following steps are required, regardless of your notebook environment.
- Select or create a GCP project.
- Make sure that billing is enabled for your project.
- Enable the BigQuery Storage API
- Enter your project ID in the cell below. Then run the cell to make sure the Cloud SDK uses the right project for all the commands in this notebook.
Install required Packages, and restart runtime
try: # Use the Colab's preinstalled TensorFlow 2.x %tensorflow_version 2.x except: pass pip install fastavropip install tensorflow-io==0.9.0
pip install google-cloud-bigquery-storageAuthenticate
from google.colab import auth auth.authenticate_user() print('Authenticated') Set your PROJECT ID
PROJECT_ID = "<YOUR PROJECT>" ! gcloud config set project $PROJECT_ID %env GCLOUD_PROJECT=$PROJECT_ID Import Python libraries, define constants
from __future__ import absolute_import, division, print_function, unicode_literals import os from six.moves import urllib import tempfile import numpy as np import pandas as pd import tensorflow as tf from google.cloud import bigquery from google.api_core.exceptions import GoogleAPIError LOCATION = 'us' # Storage directory DATA_DIR = os.path.join(tempfile.gettempdir(), 'census_data') # Download options. DATA_URL = 'https://storage.googleapis.com/cloud-samples-data/ml-engine/census/data' TRAINING_FILE = 'adult.data.csv' EVAL_FILE = 'adult.test.csv' TRAINING_URL = '%s/%s' % (DATA_URL, TRAINING_FILE) EVAL_URL = '%s/%s' % (DATA_URL, EVAL_FILE) DATASET_ID = 'census_dataset' TRAINING_TABLE_ID = 'census_training_table' EVAL_TABLE_ID = 'census_eval_table' CSV_SCHEMA = [ bigquery.SchemaField("age", "FLOAT64"), bigquery.SchemaField("workclass", "STRING"), bigquery.SchemaField("fnlwgt", "FLOAT64"), bigquery.SchemaField("education", "STRING"), bigquery.SchemaField("education_num", "FLOAT64"), bigquery.SchemaField("marital_status", "STRING"), bigquery.SchemaField("occupation", "STRING"), bigquery.SchemaField("relationship", "STRING"), bigquery.SchemaField("race", "STRING"), bigquery.SchemaField("gender", "STRING"), bigquery.SchemaField("capital_gain", "FLOAT64"), bigquery.SchemaField("capital_loss", "FLOAT64"), bigquery.SchemaField("hours_per_week", "FLOAT64"), bigquery.SchemaField("native_country", "STRING"), bigquery.SchemaField("income_bracket", "STRING"), ] UNUSED_COLUMNS = ["fnlwgt", "education_num"] Import census data into BigQuery
Define helper methods to load data into BigQuery
def create_bigquery_dataset_if_necessary(dataset_id): # Construct a full Dataset object to send to the API. client = bigquery.Client(project=PROJECT_ID) dataset = bigquery.Dataset(bigquery.dataset.DatasetReference(PROJECT_ID, dataset_id)) dataset.location = LOCATION try: dataset = client.create_dataset(dataset) # API request return True except GoogleAPIError as err: if err.code != 409: # http_client.CONFLICT raise return False def load_data_into_bigquery(url, table_id): create_bigquery_dataset_if_necessary(DATASET_ID) client = bigquery.Client(project=PROJECT_ID) dataset_ref = client.dataset(DATASET_ID) table_ref = dataset_ref.table(table_id) job_config = bigquery.LoadJobConfig() job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE job_config.source_format = bigquery.SourceFormat.CSV job_config.schema = CSV_SCHEMA load_job = client.load_table_from_uri( url, table_ref, job_config=job_config ) print("Starting job {}".format(load_job.job_id)) load_job.result() # Waits for table load to complete. print("Job finished.") destination_table = client.get_table(table_ref) print("Loaded {} rows.".format(destination_table.num_rows)) Load Census data in BigQuery.
load_data_into_bigquery(TRAINING_URL, TRAINING_TABLE_ID) load_data_into_bigquery(EVAL_URL, EVAL_TABLE_ID) Starting job 2ceffef8-e6e4-44bb-9e86-3d97b0501187 Job finished. Loaded 32561 rows. Starting job bf66f1b3-2506-408b-9009-c19f4ae9f58a Job finished. Loaded 16278 rows.
Confirm that data was imported
TODO: replace <YOUR PROJECT> with your PROJECT_ID
%%bigquery --use_bqstorage_api SELECT * FROM `<YOUR PROJECT>.census_dataset.census_training_table` LIMIT 5 Load census data in TensorFlow DataSet using BigQuery reader
Read and transform cesnus data from BigQuery into TensorFlow DataSet
from tensorflow.python.framework import ops from tensorflow.python.framework import dtypes from tensorflow_io.bigquery import BigQueryClient from tensorflow_io.bigquery import BigQueryReadSession def transform_row(row_dict): # Trim all string tensors trimmed_dict = { column: (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) for (column,tensor) in row_dict.items() } # Extract feature column income_bracket = trimmed_dict.pop('income_bracket') # Convert feature column to 0.0/1.0 income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), lambda: tf.constant(1.0), lambda: tf.constant(0.0)) return (trimmed_dict, income_bracket_float) def read_bigquery(table_name): tensorflow_io_bigquery_client = BigQueryClient() read_session = tensorflow_io_bigquery_client.read_session( "projects/" + PROJECT_ID, PROJECT_ID, table_name, DATASET_ID, list(field.name for field in CSV_SCHEMA if not field.name in UNUSED_COLUMNS), list(dtypes.double if field.field_type == 'FLOAT64' else dtypes.string for field in CSV_SCHEMA if not field.name in UNUSED_COLUMNS), requested_streams=2) dataset = read_session.parallel_read_rows() transformed_ds = dataset.map(transform_row) return transformed_ds BATCH_SIZE = 32 training_ds = read_bigquery(TRAINING_TABLE_ID).shuffle(10000).batch(BATCH_SIZE) eval_ds = read_bigquery(EVAL_TABLE_ID).batch(BATCH_SIZE) Define feature columns
def get_categorical_feature_values(column): query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID) client = bigquery.Client(project=PROJECT_ID) dataset_ref = client.dataset(DATASET_ID) job_config = bigquery.QueryJobConfig() query_job = client.query(query, job_config=job_config) result = query_job.to_dataframe() return result.values[:,0] from tensorflow import feature_column feature_columns = [] # numeric cols for header in ['capital_gain', 'capital_loss', 'hours_per_week']: feature_columns.append(feature_column.numeric_column(header)) # categorical cols for header in ['workclass', 'marital_status', 'occupation', 'relationship', 'race', 'native_country', 'education']: categorical_feature = feature_column.categorical_column_with_vocabulary_list( header, get_categorical_feature_values(header)) categorical_feature_one_hot = feature_column.indicator_column(categorical_feature) feature_columns.append(categorical_feature_one_hot) # bucketized cols age = feature_column.numeric_column('age') age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65]) feature_columns.append(age_buckets) feature_layer = tf.keras.layers.DenseFeatures(feature_columns) Build and train model
Build model
Dense = tf.keras.layers.Dense model = tf.keras.Sequential( [ feature_layer, Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'), Dense(75, activation=tf.nn.relu), Dense(50, activation=tf.nn.relu), Dense(25, activation=tf.nn.relu), Dense(1, activation=tf.nn.sigmoid) ]) # Compile Keras model model.compile( loss='binary_crossentropy', metrics=['accuracy']) Train model
model.fit(training_ds, epochs=5) WARNING:tensorflow:Layer sequential is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2. The layer has dtype float32 because it's dtype defaults to floatx. If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2. To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor. WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4276: IndicatorColumn._variable_shape (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version. Instructions for updating: The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead. WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4331: VocabularyListCategoricalColumn._num_buckets (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version. Instructions for updating: The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead. Epoch 1/5 1018/1018 [==============================] - 17s 17ms/step - loss: 0.5985 - accuracy: 0.8105 Epoch 2/5 1018/1018 [==============================] - 10s 10ms/step - loss: 0.3670 - accuracy: 0.8324 Epoch 3/5 1018/1018 [==============================] - 11s 10ms/step - loss: 0.3487 - accuracy: 0.8393 Epoch 4/5 1018/1018 [==============================] - 11s 10ms/step - loss: 0.3398 - accuracy: 0.8435 Epoch 5/5 1018/1018 [==============================] - 11s 11ms/step - loss: 0.3377 - accuracy: 0.8455 <tensorflow.python.keras.callbacks.History at 0x7f978f5b91d0> Evaluate model
Evaluate model
loss, accuracy = model.evaluate(eval_ds) print("Accuracy", accuracy) 509/509 [==============================] - 8s 15ms/step - loss: 0.3338 - accuracy: 0.8398 Accuracy 0.8398452
Evaluate a couple of random samples
sample_x = { 'age' : np.array([56, 36]), 'workclass': np.array(['Local-gov', 'Private']), 'education': np.array(['Bachelors', 'Bachelors']), 'marital_status': np.array(['Married-civ-spouse', 'Married-civ-spouse']), 'occupation': np.array(['Tech-support', 'Other-service']), 'relationship': np.array(['Husband', 'Husband']), 'race': np.array(['White', 'Black']), 'gender': np.array(['Male', 'Male']), 'capital_gain': np.array([0, 7298]), 'capital_loss': np.array([0, 0]), 'hours_per_week': np.array([40, 36]), 'native_country': np.array(['United-States', 'United-States']) } model.predict(sample_x) array([[0.5541261], [0.6209938]], dtype=float32)
View on TensorFlow.org
Run in Google Colab
View source on GitHub
Download notebook