English 中文(简体)
原标题:How to process logically related rows after ItemReader in SpringBatch?



  1. The first 10 rows belong to student A

  2. 接下来的5人属于B学生。

  3. 其余10人属于C学生

I want to aggregate them together logically say by studentId and flatten them to end up with one row per student.



  1. Send 5 rows to the Processor (which will aggregate them or do any business logic I tell it to).
  2. After Processed will write 5 rows.
  3. Then it will do it again for the next 5 rows and so on.


I personally do no like that.

  1. What is the best practice to handle a situation like this in Spring Batch?


Sometimes I feel that it is much easier to write a regular Spring JDBC main program and then I have full control of what I want to do. However, I wanted to take advantage of of the job repository state monitoring of the job, ability to restart, skip, job and step listeners....

My Spring Batch Code

My module-context.xml

   <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <description>Example job to get you started. It provides a skeleton for a typical batch application.</description>

    <batch:job id="job1">
        <batch:step id="step1"  >           
            <batch:tasklet transaction-manager="transactionManager" start-limit="100" >             
                 <batch:chunk reader="attendanceItemReader"


    <bean id="attendanceItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> 
        <property name="dataSource">
            <ref bean="sourceDataSource"/>
        <property name="sql"                                                    
                  value="select s.student_name ,s.student_id ,fas.attendance_days ,fas.attendance_value from K12INTEL_DW.ftbl_attendance_stumonabssum fas inner join k12intel_dw.dtbl_students s on fas.student_key = s.student_key inner join K12INTEL_DW.dtbl_schools ds on fas.school_key = ds.school_key inner join k12intel_dw.dtbl_school_dates dsd on fas.school_dates_key = dsd.school_dates_key where dsd.rolling_local_school_yr_number = 0 and ds.school_code = ? and s.student_activity_indicator =  Active  and fas.LOCAL_GRADING_PERIOD =  G1  and s.student_current_grade_level =  Gr 9  order by s.student_id"/>
        <property name="preparedStatementSetter" ref="attendanceStatementSetter"/>           
        <property name="rowMapper" ref="attendanceRowMapper"/> 

    <bean id="attendanceStatementSetter" class="edu.kdc.visioncards.preparedstatements.AttendanceStatementSetter"/>

    <bean id="attendanceRowMapper" class="edu.kdc.visioncards.rowmapper.AttendanceRowMapper"/>

    <bean id="attendanceProcessor" class="edu.kdc.visioncards.AttendanceProcessor" />  

    <bean id="attendanceItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"> 
        <property name="resource" value="file:target/outputs/passthrough.txt"/> 
        <property name="lineAggregator"> 
            <bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" /> 


My supporting classes for the Reader.

A PreparationSetter

package edu.kdc.visioncards.preparedstatements;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.jdbc.core.PreparedStatementSetter;

public class AttendanceStatementSetter implements PreparedStatementSetter {

    public void setValues(PreparedStatement ps) throws SQLException {

        ps.setInt(1, 7);



package edu.kdc.visioncards.rowmapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceRowMapper<T> implements RowMapper<AttendanceDTO> {

    public static final String STUDENT_NAME = "STUDENT_NAME";
    public static final String STUDENT_ID = "STUDENT_ID";
    public static final String ATTENDANCE_DAYS = "ATTENDANCE_DAYS";
    public static final String ATTENDANCE_VALUE = "ATTENDANCE_VALUE";

    public AttendanceDTO mapRow(ResultSet rs, int rowNum) throws SQLException {

        AttendanceDTO dto = new AttendanceDTO();

        return dto;


package edu.kdc.visioncards;

import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.ItemProcessor;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceProcessor implements ItemProcessor<AttendanceDTO, Map<Integer, AttendanceDTO>> {

    private Map<Integer, AttendanceDTO> map = new HashMap<Integer, AttendanceDTO>();

    public Map<Integer, AttendanceDTO> process(AttendanceDTO dto) throws Exception {

        if(map.containsKey(new Integer(dto.getStudentId()))){

            AttendanceDTO attDto = (AttendanceDTO)map.get(new Integer(dto.getStudentId()));
            attDto.setAttDays(attDto.getAttDays() + dto.getAttDays());
            attDto.setAttValue(attDto.getAttValue() + dto.getAttValue());

            map.put(new Integer(dto.getStudentId()), dto);
        return map;


My concerns from code above

在加工商中,我设立了哈希姆普,在我处理时,我检查一下我是否在地图上有该学生,如果我没有的话。 如果我已经坐下来的话,那就掌握了我所关心的价值观,并增加了我目前正在处理中的这些价值观。

After that, Spring Batch Framework writes to a File according to my configuration

My question is as follows:

  1. I do not want it to go to the writer. I want to process all the remaining rows. How do I keep this Map that I have created in memory for the next set of rows that need to go through this same Processor? Everytime, a row is processed through AttendanceProcessor the Map is initialized. Should I put the Map initialization in a static block?

In my application I created a CollectingJdbcCursorItemReader that extends the standard JdbcCursorItemReader and performs exactly what you need. Internally it uses my CollectingRowMapper: an extension of the standard RowMapper that maps multiple related rows to one object.


import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.jdbc.core.RowMapper;

 * A JdbcCursorItemReader that uses a {@link CollectingRowMapper}.
 * Like the superclass this reader is not thread-safe.
 * @author Pino Navato
public class CollectingJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> {

    private CollectingRowMapper<T> rowMapper;
    private boolean firstRead = true;

     * Accepts a {@link CollectingRowMapper} only.
    public void setRowMapper(RowMapper<T> rowMapper) {
        this.rowMapper = (CollectingRowMapper<T>)rowMapper;

     * Read next row and map it to item.
    protected T doRead() throws Exception {
        if (rs == null) {
            throw new ReaderNotOpenException("Reader must be open before it can be read.");

        try {
            if (firstRead) {
                if (!rs.next()) {  //Subsequent calls to next() will be executed by rowMapper
                    return null;
                firstRead = false;
            } else if (!rowMapper.hasNext()) {
                return null;
            T item = readCursor(rs, getCurrentItemCount());
            return item;
        catch (SQLException se) {
            throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);

    protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
        T result = super.readCursor(rs, currentRow);
        return result;


You can use it just like the classic JdbcCursorItemReader: the only requirement is that you provide it a CollectingRowMapper instead of the classic RowMapper.

I always follow this pattern:

  1. I make my reader scope to be "step", and in @PostConstruct I fetch the results, and put them in a Map
  2. In processor, I convert the associatedCollection into writable list, and send the writable list
  3. In ItemWriter, I persist the writable item(s) depending on the case


如果学生被命令,就不需要名单/地图,那么你就能够使用一名学生。 加工商在出现新的“现时”之前保持“现时”和总合的目标(现值:id change)



  • the processor needs to know when the reader is exhausted
  • its hard to get it working with any commit-rate and "id" concept if you aggregate items that are somehow identical the processor just can t know if the currently processed item is the last one
  • basically the usecase is either solved at reader level completely or at writer level (see other answer)
private SimpleItem currentItem;
private StepExecution stepExecution;

public SimpleItem process(SimpleItem newItem) throws Exception {
    SimpleItem returnItem = null;

    if (currentItem == null) {
        currentItem = new SimpleItem(newItem.getId(), newItem.getValue());
    } else if (currentItem.getId() == newItem.getId()) {
        // aggregate somehow
        String value = currentItem.getValue() + newItem.getValue();
    } else {
        // "clone"/copy currentItem
        returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
        // replace currentItem
        currentItem = newItem;

    // reader exhausted?
            && (Boolean)stepExecution.getExecutionContext().get("readerExhausted")
            && currentItem.getId() == stepExecution.getExecutionContext().getInt("lastItemId")) {
        returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());

    return returnItem;



  • ItemWriter which checks the list of items for an id change
  • before the change the items are stored in a temporary datastore(2) (List, Map, whatever), and are not written out
  • when the id changes, the aggregating/flattening business code runs on the items in the datastore and one item should be written, now the datastore can be used for the next items with the next id
  • this concept needs a reader which tells the step "i m exhausted" to properly flush the temporary datastore on end of items (file/database)


public void write(List<? extends SimpleItem> items) throws Exception {

    // setup with first sharedId at startup
    if (currentId == null){
        currentId = items.get(0).getSharedId();

    // check for change of sharedId in input
    // keep items in temporary dataStore until id change of input
    // call delegate if there is an id change or if the reader is exhausted
    for (SimpleItem item : items) {
        // already known sharedId, add to tempData
        if (item.getSharedId() == currentId) {
        } else {
            // or new sharedId, write tempData, empty it, keep new id
            // the delegate does the flattening/aggregating
            currentId = item.getSharedId();

    // check if reader is exhausted, flush tempData
    if ((Boolean) stepExecution.getExecutionContext().get("readerExhausted")
            && tempData.size() > 0) {
        // optional delegate.clear(); 

(1) 假设这些物品由身份证订购(也可归结)

(2)a hashmap spring bean for thread safety


Spring batch JMS writer/reader example

Anybody know of a good resource for a detailed (more so than the Spring Batch docs) look at the uses of JMS Item Writer/Reader in Spring Batch? Specifically, and because I m being tasked with trying ...

Ways to reduce memory churn

Background I have a Spring batch program that reads a file (example file I am working with is ~ 4 GB in size), does a small amount of processing on the file, and then writes it off to an Oracle ...

How to set up multi-threading in Spring Batch?

I ve successfully set up a tutorial Spring Batch project. I d really like to know if it s possible to make it multi-threaded at the "Spring level". The basic idea of what I want is to make a list ...

Spring Batch resourceless JobRepository

I m using Spring Batch for a system that does a lot of batch operations. I m using SimpleJobRepository with in memory DAOs. I would like to know if there is a way to avoid using a JobRepository? ...
