English 中文(简体)
How can we share data between the different steps of a Job in Spring Batch?
原标题:

Digging into Spring Batch, I d like to know as to How can we share data between the different steps of a Job?

Can we use JobRepository for this? If yes, how can we do that?

Is there any other way of doing/achieving the same?

问题回答

the job repository is used indirectly for passing data between steps (Jean-Philippe is right that the best way to do that is to put data into the StepExecutionContext and then use the verbosely named ExecutionContextPromotionListener to promote the step execution context keys to the JobExecutionContext.

It s helpful to note that there is a listener for promoting JobParameter keys to a StepExecutionContext as well (the even more verbosely named JobParameterExecutionContextCopyListener); you will find that you use these a lot if your job steps aren t completely independent of one another.

Otherwise you re left passing data between steps using even more elaborate schemes, like JMS queues or (heaven forbid) hard-coded file locations.

As to the size of data that is passed in the context, I would also suggest that you keep it small (but I haven t any specifics on the

From a step, you can put data into the StepExecutionContext. Then, with a listener, you can promote data from StepExecutionContext to JobExecutionContext.

This JobExecutionContext is available in all the following steps.

Becareful : data must be short. These contexts are saved in the JobRepository by serialization and the length is limited (2500 chars if I remember well).

So these contexts are good to share strings or simple values, but not for sharing collections or huge amounts of data.

Sharing huge amounts of data is not the philosophy of Spring Batch. Spring Batch is a set of distinct actions, not a huge Business processing unit.

I would say you have 3 options:

  1. Use StepContext and promote it to JobContext and you have access to it from each step, you must as noted obey limit in size
  2. Create @JobScope bean and add data to that bean, @Autowire it where needed and use it (drawback is that it is in-memory structure and if job fails data is lost, migh cause problems with restartability)
  3. We had larger datasets needed to be processed across steps (read each line in csv and write to DB, read from DB, aggregate and send to API) so we decided to model data in new table in same DB as spring batch meta tables, keep ids in JobContext and access when needed and delete that temporary table when job finishes successfully.

Here is what I did to save an object which is accessible through out the steps.

  1. Created a listener for setting the object in job context
@Component("myJobListener")
public class MyJobListener implements JobExecutionListener {

    public void beforeJob(JobExecution jobExecution) {

        String myValue = someService.getValue();
        jobExecution.getExecutionContext().putString("MY_VALUE", myValue);
    }
}
  1. Defined the listener in the job context
<listeners>
         <listener ref="myJobListener"/>
</listeners>
  1. Consumed the value in step using BeforeStep annotation
@BeforeStep
public void initializeValues(StepExecution stepExecution) {

String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE");

}

You can use a Java Bean Object

  1. Execute one step
  2. Store the result in the Java object
  3. Next step will refer the same java object to get the result stored by step 1

In this way you can store a huge collection of data if you want

You can store data in the simple object. Like:

AnyObject yourObject = new AnyObject();

public Job build(Step step1, Step step2) {
    return jobBuilderFactory.get("jobName")
            .incrementer(new RunIdIncrementer())
            .start(step1)
            .next(step2)
            .build();
}

public Step step1() {
    return stepBuilderFactory.get("step1Name")
            .<Some, Any> chunk(someInteger1)
            .reader(itemReader1())
            .processor(itemProcessor1())
            .writer(itemWriter1(yourObject))
            .build();
}

public Step step2() {
    return stepBuilderFactory.get("step2Name")
            .<Some, Any> chunk(someInteger2)
            .reader(itemReader2())
            .processor(itemProcessor2(yourObject))
            .writer(itemWriter2())
            .build();
}

Just add data to object in the writer or any other method and get it in any stage of next step

Another very simply approach, leaving here for future reference:

class MyTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
        getExecutionContext.put("foo", "bar");
    }
}

and

class MyOtherTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
        getExecutionContext.get("foo");
    }   
}

getExecutionContext here is:

ExecutionContext getExecutionContext(ChunkContext chunkContext) {
    return chunkContext.getStepContext()
                       .getStepExecution()
                       .getJobExecution()
                       .getExecutionContext();
}     

Put it in a super class, in an interface as a default method, or simply paste in your Tasklets.

Use ExecutionContextPromotionListener:

public class YourItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;
    public void write(List<? extends Object> items) throws Exception {
        // Some Business Logic

        // put your data into stepexecution context
        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }
    @BeforeStep
    public void saveStepExecution(Final StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

Now you need to add promotionListener to your job

@Bean
public Step step1() {
        return stepBuilder
        .get("step1")<Company,Company>  chunk(10)
        .reader(reader()).processor(processor()).writer(writer())
        .listener(promotionListener()).build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    listener.setKeys(new String[] {"someKey"});
    listener.setStrict(true);
    return listener;
}

Now, in step2 get your data from job ExecutionContext

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;
    public void write(List<? extends Object> items) throws Exception {
        // ...
    }
    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}

If you are working with tasklets, then use the following to get or put ExecutionContext

List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");

I was given a task to invoke the batch job one by one.Each job depends on another. First job result needs to execute the consequent job program. I was searching how to pass the data after job execution. I found that this ExecutionContextPromotionListener comes in handy.

1) I have added a bean for "ExecutionContextPromotionListener" like below

@Bean
public ExecutionContextPromotionListener promotionListener()
{
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    listener.setKeys( new String[] { "entityRef" } );
    return listener;
}

2) Then I attached one of the listener to my Steps

Step step = builder.faultTolerant()
            .skipPolicy( policy )
            .listener( writer )
            .listener( promotionListener() )
            .listener( skiplistener )
            .stream( skiplistener )
            .build();

3) I have added stepExecution as a reference in my Writer step implementation and populated in the Beforestep

@BeforeStep
public void saveStepExecution( StepExecution stepExecution )
{
    this.stepExecution = stepExecution;
}   

4) in the end of my writer step, i populated the values in the stepexecution as the keys like below

lStepContext.put( "entityRef", lMap );

5) After the job execution, I retrieved the values from the lExecution.getExecutionContext() and populated as job response.

6) from the job response object, I will get the values and populate the required values in the rest of the jobs.

The above code is for promoting the data from the steps to ExecutionContext using ExecutionContextPromotionListener. It can done for in any steps.

Spring Batch creates metadata tables for itself (like batch_job_execution, batch_job_execution_context, batch_step_instance, etc).

And I have tested (using postgres DB) that you can have at least 51,428 chars worth of data in one column (batch_job_execution_context.serialized_content). It could be more, it is just how much I tested.

When you are using Tasklets for your step (like class MyTasklet implements Tasklet) and override the RepeatStatus method in there, you have immediate access to ChunkContext.

class MyTasklet implements Tasklet {

    @Override
    public RepeatStatus execute(@NonNull StepContribution contribution, 
                                @NonNull ChunkContext chunkContext) {
        List<MyObject> myObjects = getObjectsFromSomewhereAndUseThemInNextStep();
        chunkContext.getStepContext().getStepExecution()
        .getJobExecution()
        .getExecutionContext()
        .put("mydatakey", myObjects);
    }
}

And now you have another step with a different Tasklet where you can access those objects

class MyOtherTasklet implements Tasklet {

    @Override
    public RepeatStatus execute(@NonNull StepContribution contribution, 
                                @NonNull ChunkContext chunkContext) {
        List<MyObject> myObjects = (List<MyObject>) 
        chunkContext.getStepContext().getStepExecution()
        .getJobExecution()
        .getExecutionContext()
        .get("mydatakey"); 
    }
}

Or if you dont have a Tasklet and have like a Reader/Writer/Processor, then

class MyReader implements ItemReader<MyObject> {

    @Value("#{jobExecutionContext[ mydatakey ]}")
    List<MyObject> myObjects;
    // And now myObjects are available in here

    @Override
    public MyObject read() throws Exception {

    }
}

Simple solution using Tasklets. No need to access the execution context. I used a map as the data element to move around. (Kotlin code.)

Tasklet

class MyTasklet : Tasklet {

    lateinit var myMap: MutableMap<String, String>

    override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus? {
        myMap.put("key", "some value")
        return RepeatStatus.FINISHED
    }

}

Batch configuration

@Configuration
@EnableBatchProcessing
class BatchConfiguration {

    @Autowired
    lateinit var jobBuilderFactory: JobBuilderFactory

    @Autowired
    lateinit var stepBuilderFactory: StepBuilderFactory

    var myMap: MutableMap<String, String> = mutableMapOf()

    @Bean
    fun jobSincAdUsuario(): Job {
        return jobBuilderFactory
                .get("my-SO-job")
                .incrementer(RunIdIncrementer())
                .start(stepMyStep())    
                .next(stepMyOtherStep())        
                .build()
    }

    @Bean
    fun stepMyStep() = stepBuilderFactory.get("MyTaskletStep")        
        .tasklet(myTaskletAsBean())
        .build()

    @Bean
    fun myTaskletAsBean(): MyTasklet {
        val tasklet = MyTasklet()
        tasklet.myMap = myMap      // collection gets visible in the tasklet
        return tasklet
    }
}

Then in MyOtherStep you can replicate the same idiom seen in MyStep. This other Tasklet will see the data created in MyStep.

Important:

  • tasklets are created via a @Bean fun so that they can use @Autowired (full explanation).
  • for a more robust implementation, the tasklet should implement InitializingBean with
    override fun afterPropertiesSet() {
        Assert.notNull(myMap, "myMap must be set before calling the tasklet")
    }

As Nenad Bozic said in his 3rd option, use temp tables to share the data between steps, using context to share also does same thing, it writes to table and loads back in next step, but if you write into temp tables you can clean at the end of job.





相关问题
Spring batch JMS writer/reader example

Anybody know of a good resource for a detailed (more so than the Spring Batch docs) look at the uses of JMS Item Writer/Reader in Spring Batch? Specifically, and because I m being tasked with trying ...

Ways to reduce memory churn

Background I have a Spring batch program that reads a file (example file I am working with is ~ 4 GB in size), does a small amount of processing on the file, and then writes it off to an Oracle ...

How to set up multi-threading in Spring Batch?

I ve successfully set up a tutorial Spring Batch project. I d really like to know if it s possible to make it multi-threaded at the "Spring level". The basic idea of what I want is to make a list ...

Spring Batch resourceless JobRepository

I m using Spring Batch for a system that does a lot of batch operations. I m using SimpleJobRepository with in memory DAOs. I would like to know if there is a way to avoid using a JobRepository? ...

热门标签