Contents
- Architecture — Job, Step & Chunk
- Dependencies & Setup
- Building Your First Job
- CSV to Database — FlatFileItemReader
- ItemProcessor — Transformation & Filtering
- JdbcBatchItemWriter — Bulk Inserts
- Skip & Retry Policies
- Step & Job Listeners
- Partitioned Steps — Parallel Processing
- Launching Jobs — On Demand & Scheduled
Spring Batch organises work in a three-level hierarchy:
- Job — the top-level unit of work. A Job consists of one or more Steps, executed in sequence or conditionally.
- Step — a single phase of a Job. A Step can be a chunk-oriented step (read-process-write in chunks) or a tasklet (arbitrary code).
- Chunk — a unit of records. Spring reads N items, processes them, then writes them in a single transaction. If the write fails, only that chunk is rolled back.
Spring Batch stores Job execution metadata (status, parameters, step progress) in a database — the Job Repository. This enables restart: if a Job fails at step 3, re-running it resumes from step 3, not step 1.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- Spring Batch needs a datasource for its job repository -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
# application.yml
spring:
batch:
job:
enabled: false # don't auto-run on startup — we'll launch manually
jdbc:
initialize-schema: always # create batch metadata tables automatically
import org.springframework.batch.core.*;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.*;
import org.springframework.context.annotation.*;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
public class HelloBatchConfig {
// Simple tasklet step — runs arbitrary code
@Bean
public Step helloStep(JobRepository jobRepository,
PlatformTransactionManager txManager) {
return new StepBuilder("helloStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
System.out.println("Hello from Spring Batch!");
return RepeatStatus.FINISHED;
}, txManager)
.build();
}
@Bean
public Job helloJob(JobRepository jobRepository, Step helloStep) {
return new JobBuilder("helloJob", jobRepository)
.start(helloStep)
.build();
}
}
FlatFileItemReader reads flat files (CSV, TSV) line by line and maps each line to a Java object using a LineMapper. DelimitedLineTokenizer splits the line; BeanWrapperFieldSetMapper sets properties by name.
import org.springframework.batch.item.file.*;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.core.io.ClassPathResource;
// Domain object
public record Product(Long id, String name, String category, Double price) {}
@Bean
public FlatFileItemReader<Product> csvReader() {
return new FlatFileItemReaderBuilder<Product>()
.name("productCsvReader")
.resource(new ClassPathResource("products.csv"))
.linesToSkip(1) // skip header row
.delimited()
.names("id", "name", "category", "price")
.targetType(Product.class)
.build();
}
// products.csv format:
// id,name,category,price
// 1,Laptop,Electronics,999.99
// 2,Desk,Furniture,299.99
ItemProcessor<I, O> transforms or filters items. Returning null from a processor filters the item out — it will not be passed to the writer. Chain multiple processors with CompositeItemProcessor.
import org.springframework.batch.item.ItemProcessor;
// Enrich product and filter out discontinued items
@Component
public class ProductEnrichmentProcessor implements ItemProcessor<Product, ProductEntity> {
private final CategoryService categoryService;
public ProductEnrichmentProcessor(CategoryService categoryService) {
this.categoryService = categoryService;
}
@Override
public ProductEntity process(Product product) {
// Return null to skip this item — it won't be written
if (product.price() <= 0) {
return null; // filter out invalid products
}
return ProductEntity.builder()
.externalId(product.id())
.name(product.name().trim())
.categoryId(categoryService.getIdByName(product.category()))
.priceInCents(Math.round(product.price() * 100))
.importedAt(java.time.Instant.now())
.build();
}
}
// Composite processor — chain validation then enrichment
@Bean
public CompositeItemProcessor<Product, ProductEntity> compositeProcessor(
ProductValidationProcessor validator,
ProductEnrichmentProcessor enricher) {
CompositeItemProcessor<Product, ProductEntity> composite =
new CompositeItemProcessor<>();
composite.setDelegates(List.of(validator, enricher));
return composite;
}
JdbcBatchItemWriter writes all items in a chunk with a single JDBC batch statement — orders of magnitude faster than individual inserts. Configure it with a named-parameter SQL and a BeanPropertyItemSqlParameterSourceProvider.
import org.springframework.batch.item.database.*;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
@Bean
public JdbcBatchItemWriter<ProductEntity> dbWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<ProductEntity>()
.dataSource(dataSource)
.sql("""
INSERT INTO products (external_id, name, category_id, price_cents, imported_at)
VALUES (:externalId, :name, :categoryId, :priceInCents, :importedAt)
ON CONFLICT (external_id) DO UPDATE
SET name = EXCLUDED.name,
price_cents = EXCLUDED.price_cents
""")
.beanMapped() // maps ProductEntity fields to :namedParams
.build();
}
// Wire reader, processor, and writer into a chunk-oriented step
@Bean
public Step importProductsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
FlatFileItemReader<Product> csvReader,
ProductEnrichmentProcessor processor,
JdbcBatchItemWriter<ProductEntity> dbWriter) {
return new StepBuilder("importProductsStep", jobRepository)
.<Product, ProductEntity>chunk(500, txManager) // commit every 500 items
.reader(csvReader)
.processor(processor)
.writer(dbWriter)
.build();
}
The chunk size is a critical tuning parameter. Too small (e.g., 10) means too many transactions and commits. Too large (e.g., 10,000) means large rollback scopes on failure and high memory usage. Start with 100–500 and tune based on profiling.
Real data is messy. Spring Batch lets you skip bad records (up to a configurable limit) and retry transient failures (like deadlocks) without failing the entire job.
@Bean
public Step robustImportStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
FlatFileItemReader<Product> reader,
ProductEnrichmentProcessor processor,
JdbcBatchItemWriter<ProductEntity> writer) {
return new StepBuilder("robustImportStep", jobRepository)
.<Product, ProductEntity>chunk(500, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Skip — tolerate up to 100 bad records without failing the step
.faultTolerant()
.skip(FlatFileParseException.class) // skip malformed CSV lines
.skip(ValidationException.class) // skip invalid domain objects
.skipLimit(100) // fail if more than 100 skipped
// Retry — retry transient DB errors up to 3 times
.retry(org.springframework.dao.DeadlockLoserDataAccessException.class)
.retryLimit(3)
// Log every skipped item
.listener(new SkipListener<Product, ProductEntity>() {
@Override
public void onSkipInProcess(Product item, Throwable t) {
System.err.println("Skipped: " + item + " — " + t.getMessage());
}
})
.build();
}
import org.springframework.batch.core.*;
// Job listener — log job start/end and send notification on failure
@Component
public class JobNotificationListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("Job starting: " + jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.FAILED) {
System.err.println("Job FAILED: " + jobExecution.getAllFailureExceptions());
// alertService.sendAlert(...)
} else {
System.out.println("Job completed: "
+ jobExecution.getStepExecutions().stream()
.mapToLong(StepExecution::getWriteCount).sum()
+ " records written");
}
}
}
// Wire listener into the job
@Bean
public Job importProductsJob(JobRepository jobRepository,
Step importProductsStep,
JobNotificationListener listener) {
return new JobBuilder("importProductsJob", jobRepository)
.listener(listener)
.start(importProductsStep)
.build();
}
Partitioning divides the input data into independent ranges (partitions) that are processed concurrently by worker steps on a thread pool. This is the primary mechanism for parallel batch processing without external infrastructure.
import org.springframework.batch.core.partition.support.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
// Partitioner — divides work into N ranges by ID
@Component
public class ProductIdRangePartitioner implements Partitioner {
private final JdbcTemplate jdbc;
public ProductIdRangePartitioner(JdbcTemplate jdbc) { this.jdbc = jdbc; }
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
long minId = jdbc.queryForObject("SELECT MIN(id) FROM products_staging", Long.class);
long maxId = jdbc.queryForObject("SELECT MAX(id) FROM products_staging", Long.class);
long rangeSize = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext ctx = new ExecutionContext();
ctx.putLong("minId", minId + i * rangeSize);
ctx.putLong("maxId", Math.min(minId + (i + 1) * rangeSize - 1, maxId));
partitions.put("partition" + i, ctx);
}
return partitions;
}
}
// Thread pool for parallel worker steps
@Bean
public TaskExecutor partitionTaskExecutor() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(8);
exec.setMaxPoolSize(8);
exec.setThreadNamePrefix("batch-worker-");
exec.initialize();
return exec;
}
// Master step — splits work and fans out to worker threads
@Bean
public Step masterStep(JobRepository jobRepository,
Step workerStep,
ProductIdRangePartitioner partitioner,
TaskExecutor partitionTaskExecutor) {
return new StepBuilder("masterStep", jobRepository)
.partitioner("workerStep", partitioner)
.step(workerStep)
.gridSize(8) // 8 parallel partitions
.taskExecutor(partitionTaskExecutor)
.build();
}
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/batch")
public class BatchController {
private final JobLauncher jobLauncher;
private final Job importProductsJob;
public BatchController(JobLauncher jobLauncher, Job importProductsJob) {
this.jobLauncher = jobLauncher;
this.importProductsJob = importProductsJob;
}
// Trigger job via HTTP — useful for on-demand runs
@PostMapping("/import-products")
public ResponseEntity<String> runImport() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("runAt", System.currentTimeMillis()) // unique params = new run
.toJobParameters();
JobExecution exec = jobLauncher.run(importProductsJob, params);
return ResponseEntity.ok("Job launched: " + exec.getStatus());
}
}
// Schedule daily at 02:00 AM
@Component
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final Job importProductsJob;
public BatchScheduler(JobLauncher jobLauncher, Job importProductsJob) {
this.jobLauncher = jobLauncher;
this.importProductsJob = importProductsJob;
}
@Scheduled(cron = "0 0 2 * * *")
public void runNightlyImport() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("runAt", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importProductsJob, params);
}
}
Spring Batch uses job parameters to uniquely identify a run. Reusing identical parameters on a completed job will NOT start a new run — Spring Batch detects it as a duplicate. Always include a unique parameter (timestamp, UUID, or file name) to allow repeated execution of the same job.