Любая помощь приветствуется!
Вот мои ParDos и часть определения моего конвейера:
Код: Выделить всё
class FetchDBCredentials(beam.DoFn):
def process(self, element):
# get credentials here
cred = {
'username': 'username'
'password': 'password'
}
yield cred
with Pipeline(DataflowRunner(), options=pipeline_options) as p:
credentials = (
p
| 'Create' >> beam.Create([None])
| 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
)
username = beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username']))
password = beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))
rows = (
p
| 'ReadFromJdbc' >> ReadFromJdbc(
table_name='test',
driver_class_name='org.postgresql.Driver',
jdbc_url=jdbc_url,
username=username,
password=password,
query='''
SELECT
employee_id,
user_id
FROM db.test
'''
)
)
Подробнее здесь: https://stackoverflow.com/questions/793 ... -connector
Мобильная версия