16 May 2017

Introduction

A typical batch program runs on large number of records from a database, file, queue. For a typical batch job there would be some processor which takes large dataset as input then process and generates the desired output. In my example I am going to show how it can be achieved using spring batch. Before jumping into the code, lets understand the spring batch architecture at high level. Below high level architecture is taken from spring official documentation.

This layered architure has three high level components called Application, Core and Infrastructure.

  • Application: Application contains all batch jobs and custom codes
  • Core: Core controls a batch job. It has JabLauncher, Job and Steps implementations
  • Infrastructure: Both Core and Application is built on top of the batch infrastructure. Readers, Writers and services are the typical example of infrastructure.

Below domain language of batch is referred from the spring official documentation. A job consists of multiple steps. Each step will have a ItemReader, ItemProcessor and ItemWriter. There might be interdepence between steps. Suppose there are a total of 4 steps for a job and Step4 runs if step3 fails. JobRepository provides CRUD operations for JobLauncher, Job and Steps implementation. When a Job is launched, a JobExecution is obtained from the repository and during the course of execution StepExecution and JobExecution implementations are persisted by passing them to the repository

One of the very important aspect of spring batch is schedule a job. Scheduling of job could be

  • Simple: Set of batch program runs independently
  • Medium: Batch program will have some soft of relation among them. Such as, one job must follow another (serial). One job must run at the same time as another (parallel) and One job must run if the previous job is successful(Conditional).
  • Complex: Nested Conditions and dependent comditions

Required Software

  • Eclipse for J2EE
  • JDK 1.7
  • Maven 2.2.x or above

High level description about the code

This application is written in spring boot. The spring batch is integraed on top of the spring boot application. A job is scheduled in every 5 seconds. Item reader reads some constant values. Item processor process it to upper case. Item writer writes into console. Once a task is completed the listener mark the job as completed. The whole process again runs after 5 seconds

Steps to write code

The project structure and important file details are given below

SL No File Name Description
1 com.ashish.spring.batch.config.WebConfig This file configures the Spring boot application, enables batch processing and scheduling.
2 com.ashish.spring.batch.config.BatchConfig This file configures the JobLauncher, Job and Steps.
3 com.ashish.spring.batch.listener.BatchJobCompletionListener Continuously monitors (listen) the application. Once job is executed prints a message.
4 com.ashish.spring.batch.step.BatchItemReader Item Reader reads input for a given batch step.
5 com.ashish.spring.batch.step.BatchItemProcessor Item Processor process the data read by the item reader.
6 com.ashish.spring.batch.step.BatchItemWriter Item Writer writes the processed data.
7 src/main/resources/application.properties database url, server port and log level property is defined in this file

pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.ashish.spring.batch</groupId>
	<artifactId>spring-boot-batch</artifactId>
	<version>1.0-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.3.3.RELEASE</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
		</dependency>

		<!-- Below dependency has provided scope defined because during deployment 
			in tomcat this jar is not required. This jar is required to execute the stand 
			alone application with embedded tomcat -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-tomcat</artifactId>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-aop</artifactId>
		</dependency>
		<dependency>
			<groupId>org.aspectj</groupId>
			<artifactId>aspectjweaver</artifactId>
		</dependency>

		<!-- Spring batch dependency START -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>
		<!-- Spring batch dependency END -->
		
		<!-- h2db dependency START -->
		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
		</dependency>
		
		<!-- Below dependency act as the h2 db client -->
		<!-- URL to access the web client: http://localhost:9000/h2-console/login.do -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<optional>true</optional>
		</dependency>
	</dependencies>
	
	<!-- h2db dependency END -->
</project>

WebConfig.java :

Spring boot application configuration and batch configuration is present in this file


package com.ashish.spring.batch.config;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication(scanBasePackages="com.ashish")
@EnableBatchProcessing
@EnableScheduling // Enables scheduling
public class WebConfig {


    /**
     * Run the spring boot application in embedded tomcat 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        SpringApplication.run(WebConfig.class, args);
    }

}

BatchConfig.java:

This file has configuration for JobLauncher, Job and Steps.


package com.ashish.spring.batch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;

import com.ashish.spring.batch.listener.BatchJobCompletionListener;
import com.ashish.spring.batch.step.BatchItemProcessor;
import com.ashish.spring.batch.step.BatchItemReader;
import com.ashish.spring.batch.step.BatchItemWriter;

@Configuration
public class BatchConfig {

	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Autowired
	private JobLauncher jobLauncher;

	@Autowired
	private Job processJob;

	// This job runs in every 5 seconds
	@Scheduled(fixedRate = 5000)
	public void printMessage() {
		try {
			JobParameters jobParameters = new JobParametersBuilder().addLong(
					"time", System.currentTimeMillis()).toJobParameters();
			jobLauncher.run(processJob, jobParameters);
			System.out.println("I have been scheduled with Spring scheduler");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Bean
	public Job processJob() {
		return jobBuilderFactory.get("processJob")
				.incrementer(new RunIdIncrementer()).listener(listener())
				.flow(orderStep1()).end().build();
	}

	/**
	 * To create a step, reader, processor and writer has been passed serially
	 * 
	 * @return
	 */
	@Bean
	public Step orderStep1() {
		return stepBuilderFactory.get("orderStep1").<String, String> chunk(1)
				.reader(new BatchItemReader())
				.processor(new BatchItemProcessor())
				.writer(new BatchItemWriter()).build();
	}

	@Bean
	public JobExecutionListener listener() {
		return new BatchJobCompletionListener();
	}

	@Bean
	public ResourcelessTransactionManager transactionManager() {
		return new ResourcelessTransactionManager();
	}

}

SpringBootAppWS.java

This file has commented out entry for now. However, provision is there to add restful service if reqired


package com.ashish.spring.batch.app;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/rest")
public class SpringBootAppWS {

//	@Autowired
//	JobLauncher jobLauncher;
//
//	@Autowired
//	Job processJob;
//
//	@RequestMapping(path = "/startJob", method = RequestMethod.GET, headers = "Accept=application/json", produces = "application/json")
//	public String startJob() throws Exception {
//
//		JobParameters jobParameters = new JobParametersBuilder().addLong(
//				"time", System.currentTimeMillis()).toJobParameters();
//		jobLauncher.run(processJob, jobParameters);
//
//		return "Batch job has been invoked";
//	}

}

BatchItemReader.java

Item Reader reads input for a given batch step

package com.ashish.spring.batch.step;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

public class BatchItemReader implements ItemReader<String> {

	private String[] messages = { "Hello World",
			"Welcome to Spring Batch using Spring boot Example",
			"H2 Database has been used in this example" };

	private int count = 0;

//	@Override
	public String read() throws Exception, UnexpectedInputException,
			ParseException, NonTransientResourceException {

		if (count < messages.length) {
			return messages[count++];
		} else {
			count = 0;
		}
		return null;
	}

}

BatchItemProcessor.java

Item Processor process the data read by the item reader

package com.ashish.spring.batch.step;

import org.springframework.batch.item.ItemProcessor;

public class BatchItemProcessor implements ItemProcessor<String, String> {

//	@Override
	public String process(String data) throws Exception {
		return data.toUpperCase();
	}

}

BatchItemWriter.java

Item Writer writes the processed data

package com.ashish.spring.batch.step;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class BatchItemWriter implements ItemWriter<String> {

	//@Override
	public void write(List<? extends String> messages) throws Exception {
		for (String msg : messages) {
			System.out.println("Writing the data using batch writer: " + msg);
		}
	}

}

BatchJobCompletionListener.java

This class extends JobExecutionListenerSupport and continuously listen the application change and prints the message once completed


package com.ashish.spring.batch.listener;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;

/**
 * This class extends JobExecutionListenerSupport and continuously listen the application change and 
 * prints the message once completed.
 * @author Ashish Mondal
 *
 */
public class BatchJobCompletionListener extends JobExecutionListenerSupport {

	@Override
	public void afterJob(JobExecution jobExecution) {
		if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
			System.out.println("BATCH JOB COMPLETED SUCCESSFULLY");
		}
	}

}


blog comments powered by Disqus
J2EE,SOAP,RESTful,SVN,PMD,SONAR,JaCoCo,HTTP,API,MAVEN,AngularJS,GitHub,LDAP,AOP,ORM,JMS,MVC,AWS,SQL,PHP,H2DB,JDBC