I want to use the Pandas library to read BigQuery data. How do I allow large results?
For non-Pandas BigQuery interactions, this can be achieved like this.
Current code with Pandas:
sProjectID = "project-id"
sQuery = '''
column1, column2
FROM [dataset_name.tablename]
from pandas.io import gbq
df = gbq.read_gbq(sQuery, sProjectID)
3 个解决方案
EDIT: I've posted the proper way to do this with in my other answer; by dropping off the data in google storage first. This way you'll never have data that is too large.
Ok, I didn't find a direct way to do it with pandas, so I had to write a little extra with the normal API. Here is my fix (also most of the work to do it natively without Pandas):
sProjectID = "project-id"
sQuery = '''
column1, column2
FROM [dataset_name.tablename]
df = create_dataframe(sQuery, sProjectID, bLargeResults=True)
#*******Functions to make above work*********
def create_dataframe(sQuery, sProjectID, bLargeResults=False):
"takes a BigQuery sql query and returns a Pandas dataframe"
if bLargeResults:
oService = create_service()
dDestinationTable = run_query(sQuery, oService, sProjectID)
df = pandas_get_table(dDestinationTable)
df = pandas_query(sQuery, sProjectID)
return df
def pandas_query(sQuery, sProjectID):
"go into bigquery and get the table with sql query and return dataframe"
from pandas.io import gbq
df = gbq.read_gbq(sQuery, sProjectID)
return df
def pandas_get_table(dTable):
"fetch a table and return dataframe"
from pandas.io import gbq
sProjectID = dTable['projectId']
sDatasetID = dTable['datasetId']
sTableID = dTable['tableId']
sQuery = "SELECT * FROM [{}.{}]".format(sDatasetID, sTableID)
df = gbq.read_gbq(sQuery, sProjectID)
return df
def create_service():
"create google service"
from oauth2client.client import GoogleCredentials
from apiclient.discovery import build
credentials = GoogleCredentials.get_application_default()
oService = build('bigquery', 'v2', credentials=credentials)
return oService
def run_query(sQuery, oService, sProjectID):
"runs the bigquery query"
dQuery = {
'configuration': {
'query': {
'writeDisposition': 'OVERWRITE',
'useQueryCache': False,
'allowLargeResults': True,
'query': sQuery,
'destinationTable': {
'projectId': sProjectID,
'datasetId': 'sandbox',
'tableId': 'api_large_result_dropoff',
job = oService.jobs().insert(projectId=sProjectID, body=dQuery).execute()
return job['configuration']['query']['destinationTable']
You can do it by changing the default dialect from legacy to standard in pd.read_gbq
pd.read_gbq(query, 'my-super-project', dialect='standard')
Indeed, you can read in Big Query documentation for the parameter AllowLargeResults:
AllowLargeResults: For standard SQL queries, this flag is ignored and large results are always allowed.
Decided to post the proper way to do this via the
google.cloud API. Looking at my previous answer I see that it would fail like yosemite_k said.
Large results really need to follow BigQuery -> Storage -> local -> dataframe pattern.
大的结果确实需要遵循BigQuery ->存储->本地-> dataframe模式。
BigQuery resources:
- https://cloud.google.com/bigquery/docs/reference/libraries
- https://cloud.google.com/bigquery/docs/reference/libraries
- https://googlecloudplatform.github.io/google-cloud-python/stable/bigquery-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/bigquery-client.html
- http://google-cloud-python.readthedocs.io/en/latest/bigquery-usage.html
- http://google-cloud-python.readthedocs.io/en/latest/bigquery-usage.html
Storage resources:
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-blobs.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-blobs.html
Pandas Resources:
- http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
- http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
pip install pandas
pip install google-cloud-storage
pip install google-cloud-bigquery
Full implementation (bigquery_to_dataframe.py):
We require python 3 for the google cloud python API
mkvirtualenv --python `which python3` env3
And our dependencies:
pip install pandas
pip install google-cloud-bigquery
pip install google-cloud-storage
import os
import time
import uuid
from google.cloud import bigquery
from google.cloud import storage
import pandas as pd
def bq_to_df(project_id, dataset_id, table_id, storage_uri, local_data_path):
"""Pipeline to get data from BigQuery into a local pandas dataframe.
:param project_id: Google project ID we are working in.
:type project_id: str
:param dataset_id: BigQuery dataset id.
:type dataset_id: str
:param table_id: BigQuery table id.
:type table_id: str
:param storage_uri: Google Storage uri where data gets dropped off.
:type storage_uri: str
:param local_data_path: Path where data should end up.
:type local_data_path: str
:return: Pandas dataframe from BigQuery table.
:rtype: pd.DataFrame
bq_to_storage(project_id, dataset_id, table_id, storage_uri)
storage_to_local(project_id, storage_uri, local_data_path)
data_dir = os.path.join(local_data_path, "test_data")
df = local_to_df(data_dir)
return df
def bq_to_storage(project_id, dataset_id, table_id, target_uri):
"""Export a BigQuery table to Google Storage.
:param project_id: Google project ID we are working in.
:type project_id: str
:param dataset_id: BigQuery dataset name where source data resides.
:type dataset_id: str
:param table_id: BigQuery table name where source data resides.
:type table_id: str
:param target_uri: Google Storage location where table gets saved.
:type target_uri: str
:return: The random ID generated to identify the job.
:rtype: str
client = bigquery.Client(project=project_id)
dataset = client.dataset(dataset_name=dataset_id)
table = dataset.table(name=table_id)
job = client.extract_table_to_storage(
str(uuid.uuid4()), # id we assign to be the job name
job.destination_format = 'CSV'
job.write_disposition = 'WRITE_TRUNCATE'
job.begin() # async execution
if job.errors:
while job.state != 'DONE':
print("exporting '{}.{}' to '{}': {}".format(
dataset_id, table_id, target_uri, job.state
return job.name
def storage_to_local(project_id, source_uri, target_dir):
"""Save a file or folder from google storage to a local directory.
:param project_id: Google project ID we are working in.
:type project_id: str
:param source_uri: Google Storage location where file comes form.
:type source_uri: str
:param target_dir: Local file location where files are to be stored.
:type target_dir: str
:return: None
:rtype: None
client = storage.Client(project=project_id)
bucket_name = source_uri.split("gs://")[1].split("/")[0]
file_path = "/".join(source_uri.split("gs://")[1].split("/")[1::])
bucket = client.lookup_bucket(bucket_name)
folder_name = "/".join(file_path.split("/")[0:-1]) + "/"
blobs = [o for o in bucket.list_blobs() if o.name.startswith(folder_name)]
# get files if we wanted just files
blob_name = file_path.split("/")[-1]
if blob_name != "*":
print("Getting just the file '{}'".format(file_path))
our_blobs = [o for o in blobs if o.name.endswith(blob_name)]
print("Getting all files in '{}'".format(folder_name))
our_blobs = blobs
print([o.name for o in our_blobs])
for blob in our_blobs:
filename = os.path.join(target_dir, blob.name)
# create a complex folder structure if necessary
if not os.path.isdir(os.path.dirname(filename)):
with open(filename, 'wb') as f:
def local_to_df(data_path):
"""Import local data files into a single pandas dataframe.
:param data_path: File or folder path where csv data are located.
:type data_path: str
:return: Pandas dataframe containing data from data_path.
:rtype: pd.DataFrame
# if data_dir is a file, then just load it into pandas
if os.path.isfile(data_path):
print("Loading '{}' into a dataframe".format(data_path))
df = pd.read_csv(data_path, header=1)
elif os.path.isdir(data_path):
files = [os.path.join(data_path, fi) for fi in os.listdir(data_path)]
print("Loading {} into a single dataframe".format(files))
df = pd.concat((pd.read_csv(s) for s in files))
raise ValueError(
"Please enter a valid path. {} does not exist.".format(data_path)
return df
if __name__ == '__main__':
PROJECT_ID = "my-project"
DATASET_ID = "bq_dataset"
TABLE_ID = "bq_table"
STORAGE_URI = "gs://my-bucket/path/for/dropoff/*"
LOCAL_DATA_PATH = "/path/to/save/"
EDIT: I've posted the proper way to do this with in my other answer; by dropping off the data in google storage first. This way you'll never have data that is too large.
Ok, I didn't find a direct way to do it with pandas, so I had to write a little extra with the normal API. Here is my fix (also most of the work to do it natively without Pandas):
sProjectID = "project-id"
sQuery = '''
column1, column2
FROM [dataset_name.tablename]
df = create_dataframe(sQuery, sProjectID, bLargeResults=True)
#*******Functions to make above work*********
def create_dataframe(sQuery, sProjectID, bLargeResults=False):
"takes a BigQuery sql query and returns a Pandas dataframe"
if bLargeResults:
oService = create_service()
dDestinationTable = run_query(sQuery, oService, sProjectID)
df = pandas_get_table(dDestinationTable)
df = pandas_query(sQuery, sProjectID)
return df
def pandas_query(sQuery, sProjectID):
"go into bigquery and get the table with sql query and return dataframe"
from pandas.io import gbq
df = gbq.read_gbq(sQuery, sProjectID)
return df
def pandas_get_table(dTable):
"fetch a table and return dataframe"
from pandas.io import gbq
sProjectID = dTable['projectId']
sDatasetID = dTable['datasetId']
sTableID = dTable['tableId']
sQuery = "SELECT * FROM [{}.{}]".format(sDatasetID, sTableID)
df = gbq.read_gbq(sQuery, sProjectID)
return df
def create_service():
"create google service"
from oauth2client.client import GoogleCredentials
from apiclient.discovery import build
credentials = GoogleCredentials.get_application_default()
oService = build('bigquery', 'v2', credentials=credentials)
return oService
def run_query(sQuery, oService, sProjectID):
"runs the bigquery query"
dQuery = {
'configuration': {
'query': {
'writeDisposition': 'OVERWRITE',
'useQueryCache': False,
'allowLargeResults': True,
'query': sQuery,
'destinationTable': {
'projectId': sProjectID,
'datasetId': 'sandbox',
'tableId': 'api_large_result_dropoff',
job = oService.jobs().insert(projectId=sProjectID, body=dQuery).execute()
return job['configuration']['query']['destinationTable']
You can do it by changing the default dialect from legacy to standard in pd.read_gbq
pd.read_gbq(query, 'my-super-project', dialect='standard')
Indeed, you can read in Big Query documentation for the parameter AllowLargeResults:
AllowLargeResults: For standard SQL queries, this flag is ignored and large results are always allowed.
Decided to post the proper way to do this via the
google.cloud API. Looking at my previous answer I see that it would fail like yosemite_k said.
Large results really need to follow BigQuery -> Storage -> local -> dataframe pattern.
大的结果确实需要遵循BigQuery ->存储->本地-> dataframe模式。
BigQuery resources:
- https://cloud.google.com/bigquery/docs/reference/libraries
- https://cloud.google.com/bigquery/docs/reference/libraries
- https://googlecloudplatform.github.io/google-cloud-python/stable/bigquery-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/bigquery-client.html
- http://google-cloud-python.readthedocs.io/en/latest/bigquery-usage.html
- http://google-cloud-python.readthedocs.io/en/latest/bigquery-usage.html
Storage resources:
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-blobs.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-blobs.html
Pandas Resources:
- http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
- http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
pip install pandas
pip install google-cloud-storage
pip install google-cloud-bigquery
Full implementation (bigquery_to_dataframe.py):
We require python 3 for the google cloud python API
mkvirtualenv --python `which python3` env3
And our dependencies:
pip install pandas
pip install google-cloud-bigquery
pip install google-cloud-storage
import os
import time
import uuid
from google.cloud import bigquery
from google.cloud import storage
import pandas as pd
def bq_to_df(project_id, dataset_id, table_id, storage_uri, local_data_path):
"""Pipeline to get data from BigQuery into a local pandas dataframe.
:param project_id: Google project ID we are working in.
:type project_id: str
:param dataset_id: BigQuery dataset id.
:type dataset_id: str
:param table_id: BigQuery table id.
:type table_id: str
:param storage_uri: Google Storage uri where data gets dropped off.
:type storage_uri: str
:param local_data_path: Path where data should end up.
:type local_data_path: str
:return: Pandas dataframe from BigQuery table.
:rtype: pd.DataFrame
bq_to_storage(project_id, dataset_id, table_id, storage_uri)
storage_to_local(project_id, storage_uri, local_data_path)
data_dir = os.path.join(local_data_path, "test_data")
df = local_to_df(data_dir)
return df
def bq_to_storage(project_id, dataset_id, table_id, target_uri):
"""Export a BigQuery table to Google Storage.
:param project_id: Google project ID we are working in.
:type project_id: str
:param dataset_id: BigQuery dataset name where source data resides.
:type dataset_id: str
:param table_id: BigQuery table name where source data resides.
:type table_id: str
:param target_uri: Google Storage location where table gets saved.
:type target_uri: str
:return: The random ID generated to identify the job.
:rtype: str
client = bigquery.Client(project=project_id)
dataset = client.dataset(dataset_name=dataset_id)
table = dataset.table(name=table_id)
job = client.extract_table_to_storage(
str(uuid.uuid4()), # id we assign to be the job name
job.destination_format = 'CSV'
job.write_disposition = 'WRITE_TRUNCATE'
job.begin() # async execution
if job.errors:
while job.state != 'DONE':
print("exporting '{}.{}' to '{}': {}".format(
dataset_id, table_id, target_uri, job.state
return job.name
def storage_to_local(project_id, source_uri, target_dir):
"""Save a file or folder from google storage to a local directory.
:param project_id: Google project ID we are working in.
:type project_id: str
:param source_uri: Google Storage location where file comes form.
:type source_uri: str
:param target_dir: Local file location where files are to be stored.
:type target_dir: str
:return: None
:rtype: None
client = storage.Client(project=project_id)
bucket_name = source_uri.split("gs://")[1].split("/")[0]
file_path = "/".join(source_uri.split("gs://")[1].split("/")[1::])
bucket = client.lookup_bucket(bucket_name)
folder_name = "/".join(file_path.split("/")[0:-1]) + "/"
blobs = [o for o in bucket.list_blobs() if o.name.startswith(folder_name)]
# get files if we wanted just files
blob_name = file_path.split("/")[-1]
if blob_name != "*":
print("Getting just the file '{}'".format(file_path))
our_blobs = [o for o in blobs if o.name.endswith(blob_name)]
print("Getting all files in '{}'".format(folder_name))
our_blobs = blobs
print([o.name for o in our_blobs])
for blob in our_blobs:
filename = os.path.join(target_dir, blob.name)
# create a complex folder structure if necessary
if not os.path.isdir(os.path.dirname(filename)):
with open(filename, 'wb') as f:
def local_to_df(data_path):
"""Import local data files into a single pandas dataframe.
:param data_path: File or folder path where csv data are located.
:type data_path: str
:return: Pandas dataframe containing data from data_path.
:rtype: pd.DataFrame
# if data_dir is a file, then just load it into pandas
if os.path.isfile(data_path):
print("Loading '{}' into a dataframe".format(data_path))
df = pd.read_csv(data_path, header=1)
elif os.path.isdir(data_path):
files = [os.path.join(data_path, fi) for fi in os.listdir(data_path)]
print("Loading {} into a single dataframe".format(files))
df = pd.concat((pd.read_csv(s) for s in files))
raise ValueError(
"Please enter a valid path. {} does not exist.".format(data_path)
return df
if __name__ == '__main__':
PROJECT_ID = "my-project"
DATASET_ID = "bq_dataset"
TABLE_ID = "bq_table"
STORAGE_URI = "gs://my-bucket/path/for/dropoff/*"
LOCAL_DATA_PATH = "/path/to/save/"