Deploy tasks in Spring Cloud Data flow

Dhruv Saksena
2 min readMay 13, 2019

--

Spring Cloud Data flow in it’s current offering hosts a plethora of services. It let’s you deploy one time tasks for any job execution. It let’s you create data streams from any source to sink.

The intention of Spring Cloud Data flow is to have these tasks itself to be deployed as micro-services. Ofcourse, one can couple multiple tasks within a single micro-service and Spring Cloud data flow allows you to execute a specific task by parameterizing the tasks execution.

By micro-service execution, it means that once you upload a task app in Spring Cloud data flow and when you execute it on SCDF, it boots the app and execute the task.(somehow I find this an overkill of micro-services though :) )

Following are the code/config changes-

@EnableTask
@SpringBootApplication
@ComponentScan(“com.sampleapp.appdev”)
@EnableAutoConfiguration
public class TaskApplication
{
private Logger LOGGER = LoggerFactory.getLogger(TaskApplication.class);
public static void main(String[] args)
{
SpringApplication.run(TaskApplication.class, args);
}
}

Here, EnableTask annotation denotes it to be a simple Task Application. SCDF understands this to be a Task application and execute all the Job beans defined in this Task application.

In maven pom.xml, following are the set of dependencies-

<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version> <spring-data-cassandra.version>
1.1.2.RELEASE</spring-data-cassandra.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-task-starter</artifactId> <version>1.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId
<artifactId>spring-cloud-dependencies</artifactId
<version>${spring-cloud.version}</version
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

Here, we will try to deploy a simple Job composing of Reader,Processor and Writer.

As like a Spring batch job, we need to configure a datasource for storing job running information. Here, I have chosen mysql for that purpose.

@Primary
@Bean(name = “taskBatchDataSource”)
public DataSource taskBatchDataSource()
{
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName(“com.mysql.jdbc.Driver”);
dataSource.setUrl(“jdbc:mysql://localhost/task_batch”);
dataSource.setUsername(“root”);
dataSource.setPassword(“root”);
return dataSource;
}

Final, is the job configuration for ETL-

import javax.sql.DataSource;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DriverManagerDataSource;

@Configuration
@EnableBatchProcessing
public class SampleETLJobConfiguration
{
private static final Log logger = LogFactory.getLog(SampleETLJobConfiguration.class);
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
//Reader to read from@Autowired
private SampleUserDetailsReader sampleUserDetailsReader;
@Autowired
private SampleUserDetailsProcessor sampleUserDetailsProcessor;
@Autowired
private SampleUserDetailsWriter sampleUserDetailsWriter;
@Bean(name = "SampleMigratorJob")
public Job sampleDataMigrationJob()
{
StepBuilder stepBuilder = stepBuilderFactory.get("step-1");
Step step = stepBuilder.<SourceUser , TargetUser>chunk(1).
reader(sampleUserDetailsReader).
processor(sampleUserDetailsProcessor).
writer(sampleUserDetailsWriter).build();
Job job = jobBuilderFactory.get("sample-job").start(step).build();
return job;
}
}import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.task.configuration.DefaultTaskConfigurer;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TaskBatchConfigurer extends DefaultTaskConfigurer
{
public TaskBatchConfigurer(@Qualifier(“taskBatchDataSource”) DataSource dataSource)
{
super(dataSource);
}
}

And that’s it, you may deploy this task on SCDF(Spring Cloud Data flow) as an app and have it running on a click of a button.

--

--

No responses yet