Динамическое планирование передачи Flowfile с использованием Quartz в процессорах NiFiJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Динамическое планирование передачи Flowfile с использованием Quartz в процессорах NiFi

Сообщение Anonymous »


I have been assigned a requirement in my project i.e. is to implement a Quartz scheduler with in a custom NiFi processor.

At the initial stage of work, the functionality I need to develop is that to schedule the transfer of the flowfile from its preceding processor to its defined processor at a scheduled time.

I will the receive the scheduling instructions as a dynamic parameter and need to schedule it accordingly using quartz library.

So at the base level of development I am hardcoding the scheduling expressions (CRON expression).

-> What I have tried
  • I have created a custom processor and have added the quartz library.
  • Then created a class (ScheduleFlowFile) to implement the org.quartz.Job and have implemented the abstract method execute which is used to execute the logic at the scheduled time. Within this class I am getting the reference to the ProcessSession, ProcessContext variables of the custom NiFi processor that I have created using JobDataMap while defining a JobDetail which is to be scheduled.

package com.chellyvishal.scheduleQuartz; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public class ScheduleFlowFile implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); ProcessSession session = (ProcessSession) dataMap.get("processSession"); ProcessContext context = (ProcessContext) dataMap.get("processContext"); Relationship success = (Relationship) dataMap.get("successRelationship"); FlowFile getFlowFile = session.get(); session.transfer(getFlowFile, success); } }
  • Inside MyProcessors.class with in the onTrigger method I have created the JobDetail and the trigger to run them at the instructed schedule
public class MyProcessor extends AbstractProcessor { public static final Relationship SUCCESS = new Relationship.Builder() .name("success") .description("FlowFiles that have been successfully processed") .build(); public static final Relationship FAILURE = new Relationship.Builder() .name("failure") .description("FlowFiles that have failed processing") .build(); . . . . . . @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { FlowFile flowFile = session.get(); if ( flowFile == null ) { return; } // TODO implement try { scheduler = schedulerFactory.getScheduler(); } catch (SchedulerException e) { throw new RuntimeException(e); } long currentTime = System.currentTimeMillis(); String uniqueJobIdentifier = "sampleJob_" + currentTime; String cronExpression = "0/10 0/2 * ? * * *"; JobDetail jobDetail = newJob(ScheduleFlowFile.class) .withIdentity(uniqueJobIdentifier, "newGroupFirst") .build(); jobDetail.getJobDataMap().put("processSession", session); jobDetail.getJobDataMap().put("processContext", context); jobDetail.getJobDataMap().put("successRelationship", SUCCESS); //A simple trigger which runs once after 5 seconds from the moment of getting initiated Trigger trigger = newTrigger() .withIdentity("newTrigger"+currentTime, "newGroupFirst") .startNow() .withSchedule(simpleSchedule() .withIntervalInSeconds(5) .withRepeatCount(0)) .build(); //A CRON trigger which schedules the job according the CRON expression Trigger cronTrigger = TriggerBuilder.newTrigger() .withIdentity("newCRONTrigger"+currentTime", "newGroupFirst") .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) .build(); try { scheduler.scheduleJob(jobDetail, trigger); } catch (SchedulerException e) { throw new RuntimeException(e); } }
Изображение


And I am stuck here and not getting an idea how to deal with it


Источник: https://stackoverflow.com/questions/780 ... processors
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»