Пользовательское приложение потока данных golang завислоApache

Ответить
Anonymous
 Пользовательское приложение потока данных golang зависло

Сообщение Anonymous »

Я создаю простое приложение, которое загружает некоторые конфигурации из MongoDB и запускает конвейер Apache с помощью средства запуска потока данных, например:

Код: Выделить всё

func main(){
client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://mongodb:27017"))
if err != nil {
panic(err)
}

beam.Init()
beamPipeline := beam.NewPipeline()
scope := beamPipeline.Root()

collection := textio.Read(scope, "gs://mybucket/input.txt")

ctx := context.Background()
var transResult beam.PCollection
// run ParDo functions
textio.Write(scope, "gs://mybucket/output.txt", collection)

if err := beamx.Run(ctx, beamPipeline); err != nil {
panic(errMsg)
}
}
Однако, когда я запускаю этот лучевой конвейер Apache с помощью средства управления потоками данных, рабочий процесс не запускается, потому что рабочий пытается запустить код базы данных mongo:
Изображение

PS: этот код работает с использованием прямого бегуна.
Мои вопросы:
  • Почему средство выполнения потока данных пытается запустить код за пределами конвейера?
  • Если работник потока данных запускает весь код внутри основной функции, как создать собственное приложение для запуска заданий потока данных? например, запуск задания, когда сообщение приходит из pub/sub, или загрузка пользовательских параметров преобразования из mongodb... как в этом примере


Подробнее здесь: https://stackoverflow.com/questions/793 ... on-stucked
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Apache»