I have been assigned a requirement in my project i.e. is to implement a Quartz scheduler with in a custom NiFi processor.
At the initial stage of work, the functionality I need to develop is that to schedule the transfer of the flowfile from its preceding processor to its defined processor at a scheduled time.
I will the receive the scheduling instructions as a dynamic parameter and need to schedule it accordingly using quartz library.
So at the base level of development I am hardcoding the scheduling expressions (CRON expression).
-> What I have tried
- I have created a custom processor and have added the quartz library.
- Then created a class (ScheduleFlowFile) to implement the org.quartz.Job and have implemented the abstract method execute which is used to execute the logic at the scheduled time. Within this class I am getting the reference to the ProcessSession, ProcessContext variables of the custom NiFi processor that I have created using JobDataMap while defining a JobDetail which is to be scheduled.
package com.chellyvishal.scheduleQuartz; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public class ScheduleFlowFile implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); ProcessSession session = (ProcessSession) dataMap.get("processSession"); ProcessContext context = (ProcessContext) dataMap.get("processContext"); Relationship success = (Relationship) dataMap.get("successRelationship"); FlowFile getFlowFile = session.get(); session.transfer(getFlowFile, success); } }
- Inside MyProcessors.class with in the onTrigger method I have created the JobDetail and the trigger to run them at the instructed schedule

And I am stuck here and not getting an idea how to deal with it
Источник: https://stackoverflow.com/questions/780 ... processors