Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
189 views
in Technique[技术] by (71.8m points)

how to batch insert data into Google BigQuery from a Java service?

I've read through a few similar questions on SO and GCP docs - but did not get a definitive answer...

Is there a way to batch insert data from my Java service into BigQuery directly, without using intermediary files, PubSub, or other Google services?

The key here is the "batch" mode: I do not want to use streaming API as it costs a lot. I know there are other ways to do batch inserts using Dataflow, Google Cloud Storage, etc. - I am not interested in those, I need to do batch inserts programmatically for my use case.

I was hoping to use the REST batch API but it looks like it is deprecated now: https://cloud.google.com/bigquery/batch

Alternatives that are pointed to by the docs are:

Create an BatchRequest object from this Google API client instance.

Sample usage:

 client.batch(httpRequestInitializer)
 .queue(...)
 .queue(...)
 .execute();

Is this API using the batch mode, not streaming one, and is the right way to go ?

thank you!

question from:https://stackoverflow.com/questions/66056242/how-to-batch-insert-data-into-google-bigquery-from-a-java-service

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The "batch" version of writing data is called a "load job" in the Java client library. The bigquery.writer method creates an object which can be used to write data bytes as a batch load job. Set the format options based on the type of file you'd like to serialize to.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;

public class LoadLocalFile {

  public static void main(String[] args) throws IOException, InterruptedException {
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    Path csvPath = FileSystems.getDefault().getPath(".", "my-data.csv");
    loadLocalFile(datasetName, tableName, csvPath, FormatOptions.csv());
  }

  public static void loadLocalFile(
      String datasetName, String tableName, Path csvPath, FormatOptions formatOptions)
      throws IOException, InterruptedException {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      TableId tableId = TableId.of(datasetName, tableName);

      WriteChannelConfiguration writeChannelConfiguration =
          WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(formatOptions).build();

      // The location and JobName must be specified; other fields can be auto-detected.
      String jobName = "jobId_" + UUID.randomUUID().toString();
      JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();

      // Imports a local file into a table.
      try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
          OutputStream stream = Channels.newOutputStream(writer)) {

        // This example writes CSV data from a local file,
        // but bytes can also be written in batch from memory.
        // In addition to CSV, other formats such as
        // Newline-Delimited JSON (https://jsonlines.org/) are
        // supported.
        Files.copy(csvPath, stream);

      }

      // Get the Job created by the TableDataWriteChannel and wait for it to complete.
      Job job = bigquery.getJob(jobId);
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load local file to the table due to an error: 
"
                + job.getStatus().getError());
        return;
      }

      // Get output status
      LoadStatistics stats = job.getStatistics();
      System.out.printf("Successfully loaded %d rows. 
", stats.getOutputRows());
    } catch (BigQueryException e) {
      System.out.println("Local file not loaded. 
" + e.toString());
    }
  }
}

Resources:


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...