I want to use the Pandas library to read BigQuery data. How do I allow large results?
For non-Pandas BigQuery interactions, this can be achieved like this.
我想用熊猫图书馆来读取BigQuery数据。我如何允许大的结果?对于非熊猫的BigQuery交互,可以这样实现。
Current code with Pandas:
当前代码与熊猫:
sProjectID = "project-id"
sQuery = '''
SELECT
column1, column2
FROM [dataset_name.tablename]
'''
from pandas.io import gbq
df = gbq.read_gbq(sQuery, sProjectID)
3 个解决方案
#1
5
EDIT: I've posted the proper way to do this with in my other answer; by dropping off the data in google storage first. This way you'll never have data that is too large.
编辑:我已经在我的另一个答案中发布了正确的方法;首先删除谷歌存储中的数据。这样你就不会有太大的数据。
Ok, I didn't find a direct way to do it with pandas, so I had to write a little extra with the normal API. Here is my fix (also most of the work to do it natively without Pandas):
我没有找到一种直接的方法来处理熊猫,所以我必须用普通的API写一些额外的东西。这里是我的解决方案(也包括大部分没有熊猫的本土工作):
sProjectID = "project-id"
sQuery = '''
SELECT
column1, column2
FROM [dataset_name.tablename]
'''
df = create_dataframe(sQuery, sProjectID, bLargeResults=True)
#*******Functions to make above work*********
def create_dataframe(sQuery, sProjectID, bLargeResults=False):
"takes a BigQuery sql query and returns a Pandas dataframe"
if bLargeResults:
oService = create_service()
dDestinationTable = run_query(sQuery, oService, sProjectID)
df = pandas_get_table(dDestinationTable)
else:
df = pandas_query(sQuery, sProjectID)
return df
def pandas_query(sQuery, sProjectID):
"go into bigquery and get the table with sql query and return dataframe"
from pandas.io import gbq
df = gbq.read_gbq(sQuery, sProjectID)
return df
def pandas_get_table(dTable):
"fetch a table and return dataframe"
from pandas.io import gbq
sProjectID = dTable['projectId']
sDatasetID = dTable['datasetId']
sTableID = dTable['tableId']
sQuery = "SELECT * FROM [{}.{}]".format(sDatasetID, sTableID)
df = gbq.read_gbq(sQuery, sProjectID)
return df
def create_service():
"create google service"
from oauth2client.client import GoogleCredentials
from apiclient.discovery import build
credentials = GoogleCredentials.get_application_default()
oService = build('bigquery', 'v2', credentials=credentials)
return oService
def run_query(sQuery, oService, sProjectID):
"runs the bigquery query"
dQuery = {
'configuration': {
'query': {
'writeDisposition': 'OVERWRITE',
'useQueryCache': False,
'allowLargeResults': True,
'query': sQuery,
'destinationTable': {
'projectId': sProjectID,
'datasetId': 'sandbox',
'tableId': 'api_large_result_dropoff',
},
}
}
}
job = oService.jobs().insert(projectId=sProjectID, body=dQuery).execute()
return job['configuration']['query']['destinationTable']
#2
2
You can do it by changing the default dialect from legacy to standard in pd.read_gbq
function.
可以通过将pd中的默认方言从遗留方言更改为标准方言来实现。read_gbq函数。
pd.read_gbq(query, 'my-super-project', dialect='standard')
Indeed, you can read in Big Query documentation for the parameter AllowLargeResults:
实际上,您可以在大型查询文档中读取参数允许的参数:
AllowLargeResults: For standard SQL queries, this flag is ignored and large results are always allowed.
allowlaresults:对于标准SQL查询,忽略这个标志,并始终允许使用大结果。
#3
2
Decided to post the proper way to do this via the
python3
google.cloud API. Looking at my previous answer I see that it would fail like yosemite_k said.
决定通过python3谷歌发布正确的方法来实现这一点。云API。看看我之前的答案,我发现它会像yosemite_k说的那样失败。
Large results really need to follow BigQuery -> Storage -> local -> dataframe pattern.
大的结果确实需要遵循BigQuery ->存储->本地-> dataframe模式。
BigQuery resources:
BigQuery资源:
- https://cloud.google.com/bigquery/docs/reference/libraries
- https://cloud.google.com/bigquery/docs/reference/libraries
- https://googlecloudplatform.github.io/google-cloud-python/stable/bigquery-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/bigquery-client.html
- http://google-cloud-python.readthedocs.io/en/latest/bigquery-usage.html
- http://google-cloud-python.readthedocs.io/en/latest/bigquery-usage.html
Storage resources:
存储资源:
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-blobs.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-blobs.html
Pandas Resources:
熊猫资源:
- http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
- http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
Installation:
安装:
pip install pandas
pip install google-cloud-storage
pip install google-cloud-bigquery
Full implementation (bigquery_to_dataframe.py):
全面实施(bigquery_to_dataframe.py):
"""
We require python 3 for the google cloud python API
mkvirtualenv --python `which python3` env3
And our dependencies:
pip install pandas
pip install google-cloud-bigquery
pip install google-cloud-storage
"""
import os
import time
import uuid
from google.cloud import bigquery
from google.cloud import storage
import pandas as pd
def bq_to_df(project_id, dataset_id, table_id, storage_uri, local_data_path):
"""Pipeline to get data from BigQuery into a local pandas dataframe.
:param project_id: Google project ID we are working in.
:type project_id: str
:param dataset_id: BigQuery dataset id.
:type dataset_id: str
:param table_id: BigQuery table id.
:type table_id: str
:param storage_uri: Google Storage uri where data gets dropped off.
:type storage_uri: str
:param local_data_path: Path where data should end up.
:type local_data_path: str
:return: Pandas dataframe from BigQuery table.
:rtype: pd.DataFrame
"""
bq_to_storage(project_id, dataset_id, table_id, storage_uri)
storage_to_local(project_id, storage_uri, local_data_path)
data_dir = os.path.join(local_data_path, "test_data")
df = local_to_df(data_dir)
return df
def bq_to_storage(project_id, dataset_id, table_id, target_uri):
"""Export a BigQuery table to Google Storage.
:param project_id: Google project ID we are working in.
:type project_id: str
:param dataset_id: BigQuery dataset name where source data resides.
:type dataset_id: str
:param table_id: BigQuery table name where source data resides.
:type table_id: str
:param target_uri: Google Storage location where table gets saved.
:type target_uri: str
:return: The random ID generated to identify the job.
:rtype: str
"""
client = bigquery.Client(project=project_id)
dataset = client.dataset(dataset_name=dataset_id)
table = dataset.table(name=table_id)
job = client.extract_table_to_storage(
str(uuid.uuid4()), # id we assign to be the job name
table,
target_uri
)
job.destination_format = 'CSV'
job.write_disposition = 'WRITE_TRUNCATE'
job.begin() # async execution
if job.errors:
print(job.errors)
while job.state != 'DONE':
time.sleep(5)
print("exporting '{}.{}' to '{}': {}".format(
dataset_id, table_id, target_uri, job.state
))
job.reload()
print(job.state)
return job.name
def storage_to_local(project_id, source_uri, target_dir):
"""Save a file or folder from google storage to a local directory.
:param project_id: Google project ID we are working in.
:type project_id: str
:param source_uri: Google Storage location where file comes form.
:type source_uri: str
:param target_dir: Local file location where files are to be stored.
:type target_dir: str
:return: None
:rtype: None
"""
client = storage.Client(project=project_id)
bucket_name = source_uri.split("gs://")[1].split("/")[0]
file_path = "/".join(source_uri.split("gs://")[1].split("/")[1::])
bucket = client.lookup_bucket(bucket_name)
folder_name = "/".join(file_path.split("/")[0:-1]) + "/"
blobs = [o for o in bucket.list_blobs() if o.name.startswith(folder_name)]
# get files if we wanted just files
blob_name = file_path.split("/")[-1]
if blob_name != "*":
print("Getting just the file '{}'".format(file_path))
our_blobs = [o for o in blobs if o.name.endswith(blob_name)]
else:
print("Getting all files in '{}'".format(folder_name))
our_blobs = blobs
print([o.name for o in our_blobs])
for blob in our_blobs:
filename = os.path.join(target_dir, blob.name)
# create a complex folder structure if necessary
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
with open(filename, 'wb') as f:
blob.download_to_file(f)
def local_to_df(data_path):
"""Import local data files into a single pandas dataframe.
:param data_path: File or folder path where csv data are located.
:type data_path: str
:return: Pandas dataframe containing data from data_path.
:rtype: pd.DataFrame
"""
# if data_dir is a file, then just load it into pandas
if os.path.isfile(data_path):
print("Loading '{}' into a dataframe".format(data_path))
df = pd.read_csv(data_path, header=1)
elif os.path.isdir(data_path):
files = [os.path.join(data_path, fi) for fi in os.listdir(data_path)]
print("Loading {} into a single dataframe".format(files))
df = pd.concat((pd.read_csv(s) for s in files))
else:
raise ValueError(
"Please enter a valid path. {} does not exist.".format(data_path)
)
return df
if __name__ == '__main__':
PROJECT_ID = "my-project"
DATASET_ID = "bq_dataset"
TABLE_ID = "bq_table"
STORAGE_URI = "gs://my-bucket/path/for/dropoff/*"
LOCAL_DATA_PATH = "/path/to/save/"
bq_to_df(PROJECT_ID, DATASET_ID, TABLE_ID, STORAGE_URI, LOCAL_DATA_PATH)
#1
5
EDIT: I've posted the proper way to do this with in my other answer; by dropping off the data in google storage first. This way you'll never have data that is too large.
编辑:我已经在我的另一个答案中发布了正确的方法;首先删除谷歌存储中的数据。这样你就不会有太大的数据。
Ok, I didn't find a direct way to do it with pandas, so I had to write a little extra with the normal API. Here is my fix (also most of the work to do it natively without Pandas):
我没有找到一种直接的方法来处理熊猫,所以我必须用普通的API写一些额外的东西。这里是我的解决方案(也包括大部分没有熊猫的本土工作):
sProjectID = "project-id"
sQuery = '''
SELECT
column1, column2
FROM [dataset_name.tablename]
'''
df = create_dataframe(sQuery, sProjectID, bLargeResults=True)
#*******Functions to make above work*********
def create_dataframe(sQuery, sProjectID, bLargeResults=False):
"takes a BigQuery sql query and returns a Pandas dataframe"
if bLargeResults:
oService = create_service()
dDestinationTable = run_query(sQuery, oService, sProjectID)
df = pandas_get_table(dDestinationTable)
else:
df = pandas_query(sQuery, sProjectID)
return df
def pandas_query(sQuery, sProjectID):
"go into bigquery and get the table with sql query and return dataframe"
from pandas.io import gbq
df = gbq.read_gbq(sQuery, sProjectID)
return df
def pandas_get_table(dTable):
"fetch a table and return dataframe"
from pandas.io import gbq
sProjectID = dTable['projectId']
sDatasetID = dTable['datasetId']
sTableID = dTable['tableId']
sQuery = "SELECT * FROM [{}.{}]".format(sDatasetID, sTableID)
df = gbq.read_gbq(sQuery, sProjectID)
return df
def create_service():
"create google service"
from oauth2client.client import GoogleCredentials
from apiclient.discovery import build
credentials = GoogleCredentials.get_application_default()
oService = build('bigquery', 'v2', credentials=credentials)
return oService
def run_query(sQuery, oService, sProjectID):
"runs the bigquery query"
dQuery = {
'configuration': {
'query': {
'writeDisposition': 'OVERWRITE',
'useQueryCache': False,
'allowLargeResults': True,
'query': sQuery,
'destinationTable': {
'projectId': sProjectID,
'datasetId': 'sandbox',
'tableId': 'api_large_result_dropoff',
},
}
}
}
job = oService.jobs().insert(projectId=sProjectID, body=dQuery).execute()
return job['configuration']['query']['destinationTable']
#2
2
You can do it by changing the default dialect from legacy to standard in pd.read_gbq
function.
可以通过将pd中的默认方言从遗留方言更改为标准方言来实现。read_gbq函数。
pd.read_gbq(query, 'my-super-project', dialect='standard')
Indeed, you can read in Big Query documentation for the parameter AllowLargeResults:
实际上,您可以在大型查询文档中读取参数允许的参数:
AllowLargeResults: For standard SQL queries, this flag is ignored and large results are always allowed.
allowlaresults:对于标准SQL查询,忽略这个标志,并始终允许使用大结果。
#3
2
Decided to post the proper way to do this via the
python3
google.cloud API. Looking at my previous answer I see that it would fail like yosemite_k said.
决定通过python3谷歌发布正确的方法来实现这一点。云API。看看我之前的答案,我发现它会像yosemite_k说的那样失败。
Large results really need to follow BigQuery -> Storage -> local -> dataframe pattern.
大的结果确实需要遵循BigQuery ->存储->本地-> dataframe模式。
BigQuery resources:
BigQuery资源:
- https://cloud.google.com/bigquery/docs/reference/libraries
- https://cloud.google.com/bigquery/docs/reference/libraries
- https://googlecloudplatform.github.io/google-cloud-python/stable/bigquery-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/bigquery-client.html
- http://google-cloud-python.readthedocs.io/en/latest/bigquery-usage.html
- http://google-cloud-python.readthedocs.io/en/latest/bigquery-usage.html
Storage resources:
存储资源:
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-client.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-blobs.html
- https://googlecloudplatform.github.io/google-cloud-python/stable/storage-blobs.html
Pandas Resources:
熊猫资源:
- http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
- http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
Installation:
安装:
pip install pandas
pip install google-cloud-storage
pip install google-cloud-bigquery
Full implementation (bigquery_to_dataframe.py):
全面实施(bigquery_to_dataframe.py):
"""
We require python 3 for the google cloud python API
mkvirtualenv --python `which python3` env3
And our dependencies:
pip install pandas
pip install google-cloud-bigquery
pip install google-cloud-storage
"""
import os
import time
import uuid
from google.cloud import bigquery
from google.cloud import storage
import pandas as pd
def bq_to_df(project_id, dataset_id, table_id, storage_uri, local_data_path):
"""Pipeline to get data from BigQuery into a local pandas dataframe.
:param project_id: Google project ID we are working in.
:type project_id: str
:param dataset_id: BigQuery dataset id.
:type dataset_id: str
:param table_id: BigQuery table id.
:type table_id: str
:param storage_uri: Google Storage uri where data gets dropped off.
:type storage_uri: str
:param local_data_path: Path where data should end up.
:type local_data_path: str
:return: Pandas dataframe from BigQuery table.
:rtype: pd.DataFrame
"""
bq_to_storage(project_id, dataset_id, table_id, storage_uri)
storage_to_local(project_id, storage_uri, local_data_path)
data_dir = os.path.join(local_data_path, "test_data")
df = local_to_df(data_dir)
return df
def bq_to_storage(project_id, dataset_id, table_id, target_uri):
"""Export a BigQuery table to Google Storage.
:param project_id: Google project ID we are working in.
:type project_id: str
:param dataset_id: BigQuery dataset name where source data resides.
:type dataset_id: str
:param table_id: BigQuery table name where source data resides.
:type table_id: str
:param target_uri: Google Storage location where table gets saved.
:type target_uri: str
:return: The random ID generated to identify the job.
:rtype: str
"""
client = bigquery.Client(project=project_id)
dataset = client.dataset(dataset_name=dataset_id)
table = dataset.table(name=table_id)
job = client.extract_table_to_storage(
str(uuid.uuid4()), # id we assign to be the job name
table,
target_uri
)
job.destination_format = 'CSV'
job.write_disposition = 'WRITE_TRUNCATE'
job.begin() # async execution
if job.errors:
print(job.errors)
while job.state != 'DONE':
time.sleep(5)
print("exporting '{}.{}' to '{}': {}".format(
dataset_id, table_id, target_uri, job.state
))
job.reload()
print(job.state)
return job.name
def storage_to_local(project_id, source_uri, target_dir):
"""Save a file or folder from google storage to a local directory.
:param project_id: Google project ID we are working in.
:type project_id: str
:param source_uri: Google Storage location where file comes form.
:type source_uri: str
:param target_dir: Local file location where files are to be stored.
:type target_dir: str
:return: None
:rtype: None
"""
client = storage.Client(project=project_id)
bucket_name = source_uri.split("gs://")[1].split("/")[0]
file_path = "/".join(source_uri.split("gs://")[1].split("/")[1::])
bucket = client.lookup_bucket(bucket_name)
folder_name = "/".join(file_path.split("/")[0:-1]) + "/"
blobs = [o for o in bucket.list_blobs() if o.name.startswith(folder_name)]
# get files if we wanted just files
blob_name = file_path.split("/")[-1]
if blob_name != "*":
print("Getting just the file '{}'".format(file_path))
our_blobs = [o for o in blobs if o.name.endswith(blob_name)]
else:
print("Getting all files in '{}'".format(folder_name))
our_blobs = blobs
print([o.name for o in our_blobs])
for blob in our_blobs:
filename = os.path.join(target_dir, blob.name)
# create a complex folder structure if necessary
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
with open(filename, 'wb') as f:
blob.download_to_file(f)
def local_to_df(data_path):
"""Import local data files into a single pandas dataframe.
:param data_path: File or folder path where csv data are located.
:type data_path: str
:return: Pandas dataframe containing data from data_path.
:rtype: pd.DataFrame
"""
# if data_dir is a file, then just load it into pandas
if os.path.isfile(data_path):
print("Loading '{}' into a dataframe".format(data_path))
df = pd.read_csv(data_path, header=1)
elif os.path.isdir(data_path):
files = [os.path.join(data_path, fi) for fi in os.listdir(data_path)]
print("Loading {} into a single dataframe".format(files))
df = pd.concat((pd.read_csv(s) for s in files))
else:
raise ValueError(
"Please enter a valid path. {} does not exist.".format(data_path)
)
return df
if __name__ == '__main__':
PROJECT_ID = "my-project"
DATASET_ID = "bq_dataset"
TABLE_ID = "bq_table"
STORAGE_URI = "gs://my-bucket/path/for/dropoff/*"
LOCAL_DATA_PATH = "/path/to/save/"
bq_to_df(PROJECT_ID, DATASET_ID, TABLE_ID, STORAGE_URI, LOCAL_DATA_PATH)