Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
383 views
in Technique[技术] by (71.8m points)

python - Apache Beam Pipeline to read from REST API runs locally but not on Dataflow

I've been trying to get my pipeline to run with a classic template on Dataflow.

The pipeline is supposed to read runtime parameters from_date and to_dateand pass those on to a REST API. The answer coming back from the API should then be written into a bigquery table.

It runs without any errors on Dataflow but my data does simply not appear in the gbq table that is the data sink. When I execute it locally, it works like a charm: no errors and I can write to gbq using a service account and local files.

I suspect that I misunderstand what is available to the pipeline steps in the different environments and that no data is actually passed along the pipeline.

The requests package might not be available on the Dataflow runner, but I would expect an error message...

When I tried to run it on dataflow but wrote to text (commented line below), a folder was created on cloud storage but no file appeared inside.

Also I suspect that this is why I cannot get any of my debug messages to show up in the monitoring UI.

Help much appreciated - here is my pipeline code:

#!/usr/bin/env python
# coding: utf-8

import logging
import argparse

# Beam/Dataflow related imports
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.value_provider import RuntimeValueProvider

# Handling of API calls
import requests
import json


class get_api_data(beam.DoFn):
    def __init__(self):
        logging.debug("fetching api data")

    def process(self, dates):

        bearer_token = "api_secret"

        from_date = str(dates[0])
        to_date = str(dates[1])

        logging.debug("Now fetching from ", from_date, " to ", to_date)

        payload = {'stuff': 'stuff',
                   'from': from_date,
                   'to': to_date,
                   'other_stuff': 'other_stuff'
                   }

        payload = json.dumps(payload)

        headers = {
                  'Content-Type': 'application/json',
                  'Authorization': 'Bearer ' + bearer_token,
                  'Accept': 'application/json',
                  'Content-Type': 'application/json'
                  }

        r = requests.post("api_url", data= payload, headers=headers)

        return [line.decode("utf-8") for line in r.iter_lines()][1:]


class Split(beam.DoFn):
    def process(self, element):

        try:
            pid, date, another_kpi, yet_another_kpi = element.split(",")
            logging.debug(" | ".join(element.split(",")) )
        except ValueError:
            logging.error(" | ".join(element.split(",")) )

        return [{
            'pid':str(pid),
            'date':str(date),
            'another_kpi':int(another_kpi),
            'yet_another_kpi':float(yet_another_kpi)
        }]


class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        
        parser.add_value_provider_argument('--to_date', dest='to_date', type=str) 
        parser.add_value_provider_argument('--from_date', dest='from_date', type=str)


def run(argv=None):
  
    parser = argparse.ArgumentParser()
    path_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)

    print("Google Cloud Options: ", pipeline_options.view_as(GoogleCloudOptions))

    from_date = pipeline_options.view_as(UserOptions).from_date
    to_date = pipeline_options.view_as(UserOptions).to_date

    logging.debug("Data from ", from_date, " to ", to_date)

    table_spec = bigquery.TableReference(
        projectId='my_project',
        datasetId='my_dataset',
        tableId='my_table')

    table_schema = 'pid:STRING, date:STRING, another_kpi:INT64, yet_another_kpi:FLOAT64'

    p1 = beam.Pipeline(options=pipeline_options)

    ingest_data = (
        p1
        | 'pass dates' >> beam.Create([[from_date, to_date]])
        | 'fetch API data' >> beam.ParDo(get_api_data()) 
        | 'split records' >> beam.ParDo(Split())
        | 'write into gbq' >> beam.io.gcp.bigquery.WriteToBigQuery(table = table_spec, schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
        #| 'write to text' >> beam.io.WriteToText("./test_v2.csv")
    )

    result = p1.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.DEBUG)
    run()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Using the ValueProvider in combination with Create is apparently forbidden, although I did not get an error message.

I solved it by using:

class OutputValueProviderFn(beam.DoFn):
    def __init__(self, vp1, vp2):
       self.vp1 = vp1
       self.vp2 = vp2

    def process(self, unused_elm):
        logging.info("Providing dates: ", self.vp1.get(), self.vp2.get() )
        yield [self.vp1.get(), self.vp2.get()]
...

from_date = pipeline_options.view_as(UserOptions).from_date
to_date = pipeline_options.view_as(UserOptions).to_date

pipel = (
        p1
        | 'Start Pipeline' >> beam.Create([None])
        | 'Read from and to date' >> beam.ParDo(OutputValueProviderFn(from_date, to_date))
        | 'fetch API data' >> beam.ParDo(get_api_data())
        ...
    )

Inspiration here


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...