Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make ResultSet more general #97

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,50 @@

All notable changes to this project will be documented in this file.

## [0.24.0] - 2024-08-28

#### Changed

- Added a new method `ResultSet::new_from_get_query_results_response` which creates a `ResultSet` from a `GetQueryResultsResponse`.

- Breaking changes:

- Return type of `JobApi::query` changed from `Result<ResultSet, BQError>` to `Result<QueryResponse, BQError>`.
- `ResultSet::new` renamed to `ResultSet::new_from_query_response`.

- Rationale for the breaking changes:

`JobApi::query` now returns `Result<QueryResponse, BQError>` instead of `Result<ResultSet, BQError>`. A `ResultSet` wraps over a `QueryResponse` but callers didn't have acces to that internal object. To allow callers access to the internal object, `JobApi::query` now returns the internal object itself. This means older code which expected a `ResultSet` will break.

- Upgrading to the new version:

To fix broken code, you'll have to call `ResultSet::from_query_response` function. For example, if your code looked like this:

```rust
let mut result_set = client
.job()
.query(
project_id,
query_request,
)
.await?;
```

It should be updated to:

```rust
let query_response = client
.job()
.query(
project_id,
query_request,
)
.await?;
let mut result_set = ResultSet::new_from_query_response(query_response);
```

Another reason for the change was making it consistent with `JobApi::get_query_results` which already returned an unwrapped object which callers needed to manually wrap inside a `ResultSet` by calling `ResultSet::new` method.

## [0.23.0] - 2024-08-10

### Fix
Expand All @@ -17,15 +61,16 @@ All notable changes to this project will be documented in this file.

### Added

- Add partial support for BigQuery Storage Write API (by @imor).
- Add partial support for BigQuery Storage Write API (by @imor).
- append_rows
- get_write_stream
- Add GZIP support for `insert_all` (by @Deniskore). The `gzip` feature is included by default.
See https://github.com/lquerel/gcp-bigquery-client/issues/74 for more information.

Breaking changes:
- Client::from_authenticator is now async.
- ClientBuilder::build_from_authenticator is now async.

- Client::from_authenticator is now async.
- ClientBuilder::build_from_authenticator is now async.

### Maintenance

Expand Down Expand Up @@ -59,7 +104,7 @@ Breaking changes:
- Add support to bigquery-emulator (Thanks to @henriiik)
- Add support to use the ClientBuilder to build a Client with an Authenticator (Thanks to @henriiik)

### Fix
### Fix

- Fix build issue with hyper-rustls (Thanks to @OmriSteiner and @nate-kelley-buster)

Expand Down Expand Up @@ -220,11 +265,10 @@ Breaking changes:

## [0.9.3] - 2021-08-31

### Fix
### Fix

- Fix ResultSet.get_i64 not working with some valid integer notation (e.g. 123.45E4) (Thanks to @komi1230).


## [0.9.2] - 2021-08-30

### Fix
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ rows are based on a regular Rust struct implementing the trait Serialize.
.await?;

// Query
let mut rs = client
let mut query_response = client
.job()
.query(
project_id,
Expand All @@ -198,6 +198,7 @@ rows are based on a regular Rust struct implementing the trait Serialize.
)),
)
.await?;
let mut rs = ResultSet::new_from_query_response(query_response);
while rs.next_row() {
println!("Number of rows inserted: {}", rs.get_i64_by_name("c")?.unwrap());
}
Expand Down
5 changes: 3 additions & 2 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use serde::Serialize;

use gcp_bigquery_client::env_vars;
use gcp_bigquery_client::error::BQError;
use gcp_bigquery_client::model::dataset::Dataset;
use gcp_bigquery_client::model::query_request::QueryRequest;
Expand All @@ -9,6 +8,7 @@ use gcp_bigquery_client::model::table_data_insert_all_request::TableDataInsertAl
use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
use gcp_bigquery_client::model::table_schema::TableSchema;
use gcp_bigquery_client::model::time_partitioning::TimePartitioning;
use gcp_bigquery_client::{env_vars, model::query_response::ResultSet};
use std::time::{Duration, SystemTime};
use time::OffsetDateTime;

Expand Down Expand Up @@ -185,7 +185,7 @@ async fn main() -> Result<(), BQError> {
.await?;

// Query
let mut rs = client
let query_response = client
.job()
.query(
project_id,
Expand All @@ -194,6 +194,7 @@ async fn main() -> Result<(), BQError> {
)),
)
.await?;
let mut rs = ResultSet::new_from_query_response(query_response);
while rs.next_row() {
println!("Number of rows inserted: {}", rs.get_i64_by_name("c")?.unwrap());
}
Expand Down
5 changes: 3 additions & 2 deletions examples/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ mod bq {
use fake::{Fake, StringFaker};
use gcp_bigquery_client::{
model::{
dataset::Dataset, query_request::QueryRequest, table::Table,
dataset::Dataset, query_request::QueryRequest, query_response::ResultSet, table::Table,
table_data_insert_all_request::TableDataInsertAllRequest, table_field_schema::TableFieldSchema,
table_schema::TableSchema,
},
Expand Down Expand Up @@ -179,7 +179,7 @@ mod bq {
}

pub async fn get_rows(&self) -> Vec<String> {
let mut rs = self
let query_response = self
.client
.job()
.query(
Expand All @@ -192,6 +192,7 @@ mod bq {
.await
.unwrap();

let mut rs = ResultSet::new_from_query_response(query_response);
let mut rows: Vec<String> = vec![];
while rs.next_row() {
let name = rs.get_string_by_name(NAME_COLUMN).unwrap().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl DatasetApi {
let resp = self.client.execute(request).await?;

let query_response: QueryResponse = process_response(resp).await?;
let mut rs = ResultSet::new(query_response);
let mut rs = ResultSet::new_from_query_response(query_response);
let mut result = vec![];
let catalog_name_pos = *rs
.column_index("catalog_name")
Expand Down
22 changes: 12 additions & 10 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::model::job_list::JobList;
use crate::model::job_list_parameters::JobListParameters;
use crate::model::job_reference::JobReference;
use crate::model::query_request::QueryRequest;
use crate::model::query_response::{QueryResponse, ResultSet};
use crate::model::query_response::QueryResponse;
use crate::model::table_row::TableRow;
use crate::{process_response, urlencode, BIG_QUERY_V2_URL};

Expand Down Expand Up @@ -48,7 +48,7 @@ impl JobApi {
/// # Arguments
/// * `project_id` - Project ID of the query request.
/// * `query_request` - The request body contains an instance of QueryRequest.
pub async fn query(&self, project_id: &str, query_request: QueryRequest) -> Result<ResultSet, BQError> {
pub async fn query(&self, project_id: &str, query_request: QueryRequest) -> Result<QueryResponse, BQError> {
let req_url = format!(
"{base_url}/projects/{project_id}/queries",
base_url = self.base_url,
Expand All @@ -67,7 +67,7 @@ impl JobApi {
let resp = self.client.execute(request).await?;

let query_response: QueryResponse = process_response(resp).await?;
Ok(ResultSet::new(query_response))
Ok(query_response)
}

/// Runs a BigQuery SQL query, paginating through all the results synchronously.
Expand Down Expand Up @@ -626,7 +626,7 @@ mod test {
assert!(result.insert_errors.is_none(), "{:?}", result);

// Query
let mut rs = client
let query_response = client
.job()
.query(
project_id,
Expand All @@ -635,20 +635,22 @@ mod test {
)),
)
.await?;
while rs.next_row() {
assert!(rs.get_i64_by_name("c")?.is_some());
}

// Get job id
let job_id = rs
.query_response()
let job_id = query_response
.job_reference
.as_ref()
.expect("expected job_reference")
.job_id
.clone()
.expect("expected job_id");

let mut rs = ResultSet::new_from_query_response(query_response);

while rs.next_row() {
assert!(rs.get_i64_by_name("c")?.is_some());
}

let job = client.job_api.get_job(project_id, &job_id, None).await?;
assert_eq!(job.status.unwrap().state.unwrap(), "DONE");

Expand All @@ -657,7 +659,7 @@ mod test {
.job()
.get_query_results(project_id, &job_id, Default::default())
.await?;
let mut query_results_rs = ResultSet::new(QueryResponse::from(query_results));
let mut query_results_rs = ResultSet::new_from_query_response(QueryResponse::from(query_results));
assert_eq!(query_results_rs.row_count(), rs.row_count());
while query_results_rs.next_row() {
assert!(rs.get_i64_by_name("c")?.is_some());
Expand Down
44 changes: 35 additions & 9 deletions src/model/query_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ impl From<GetQueryResultsResponse> for QueryResponse {
pub struct ResultSet {
cursor: i64,
row_count: i64,
query_response: QueryResponse,
rows: Vec<TableRow>,
fields: HashMap<String, usize>,
}

impl ResultSet {
pub fn new(query_response: QueryResponse) -> Self {
pub fn new_from_query_response(query_response: QueryResponse) -> Self {
if query_response.job_complete.unwrap_or(false) && query_response.schema.is_some() {
// rows and tables schema are only present for successfully completed jobs.
let row_count = query_response.rows.as_ref().map_or(0, Vec::len) as i64;
Expand All @@ -86,24 +86,52 @@ impl ResultSet {
.enumerate()
.map(|(pos, field)| (field.name.clone(), pos))
.collect();
let rows = query_response.rows.unwrap_or_default();
Self {
cursor: -1,
row_count,
query_response,
rows,
fields,
}
} else {
Self {
cursor: -1,
row_count: 0,
query_response,
rows: vec![],
fields: HashMap::new(),
}
}
}

pub fn query_response(&self) -> &QueryResponse {
&self.query_response
pub fn new_from_get_query_results_response(get_query_results_response: GetQueryResultsResponse) -> Self {
if get_query_results_response.job_complete.unwrap_or(false) && get_query_results_response.schema.is_some() {
// rows and tables schema are only present for successfully completed jobs.
let row_count = get_query_results_response.rows.as_ref().map_or(0, Vec::len) as i64;
let table_schema = get_query_results_response.schema.as_ref().expect("Expecting a schema");
let table_fields = table_schema
.fields
.as_ref()
.expect("Expecting a non empty list of fields");
let fields: HashMap<String, usize> = table_fields
.iter()
.enumerate()
.map(|(pos, field)| (field.name.clone(), pos))
.collect();
let rows = get_query_results_response.rows.unwrap_or_default();
Self {
cursor: -1,
row_count,
rows,
fields,
}
} else {
Self {
cursor: -1,
row_count: 0,
rows: vec![],
fields: HashMap::new(),
}
}
}

/// Moves the cursor froward one row from its current position.
Expand Down Expand Up @@ -306,10 +334,8 @@ impl ResultSet {
}

Ok(self
.query_response
.rows
.as_ref()
.and_then(|rows| rows.get(self.cursor as usize))
.get(self.cursor as usize)
.and_then(|row| row.columns.as_ref())
.and_then(|cols| cols.get(col_index))
.and_then(|col| col.value.clone()))
Expand Down
Loading