Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
183 views
in Technique[技术] by (71.8m points)

python - GCP dataflow, argparse.ArgumentError using DataflowRunner but not DirectRunner

Dataflow pipeline with runtime arguments runs well using DirectRunner, but encounters argument error when switching to DataflowRunner.

  File "/home/user/miniconda3/lib/python3.8/site-packages/apache_beam/options/pipeline_options.py", line 124, in add_value_provider_argument
    self.add_argument(*args, **kwargs)
  File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1386, in add_argument
    return self._add_action(action)
  File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1749, in _add_action
    self._optionals._add_action(action)
  File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1590, in _add_action
    action = super(_ArgumentGroup, self)._add_action(action)
  File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1400, in _add_action
    self._check_conflict(action)
  File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1539, in _check_conflict
    conflict_handler(action, confl_optionals)
  File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1548, in _handle_conflict_error
    raise ArgumentError(action, message % conflict_string)
argparse.ArgumentError: argument --bucket_input: conflicting option string: --bucket_input

Here is how the argument defined and called

class CustomPipelineOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--bucket_input',
            default="device-file-dev",
            help='Raw device file bucket')

pipeline = beam.Pipeline(options=pipeline_options)

custom_options = pipeline_options.view_as(CustomPipelineOptions)

_ = (
        pipeline
        | 'Initiate dataflow' >> beam.Create(["Start"])
        | 'Create P collection with file paths' >> beam.ParDo(
            CreateGcsPCol(input_bucket=custom_options.bucket_input)
)

Notice this only happens with DataflowRunner. Anyone knows how to solve it? Thanks a lot.

question from:https://stackoverflow.com/questions/65945352/gcp-dataflow-argparse-argumenterror-using-dataflowrunner-but-not-directrunner

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Copying the answer from the comment here:

The error is caused by importing a local Python sub-module via a relative path. With the DirectRunner, the relative path works because it's on the local machine. However, the DataflowRunner is on a different machine (GCE Instance) and needs the absolute path. Thus, the problem was solved by installing both the Dataflow pipeline module, the sub-module, and importing from the installed sub-module -- instead of using the relative path.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...