Spring Boot整合Elastic Job简单实践Demo

简介

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅Zookeeper。

你可以在官方文档页面了解更多信息.此处就不详细介绍了.

关于Zookeeper

Elastic Job依赖Zookeeper作为调度中心协调任务的分片和调度以支持分布式,所以我们首先需要搭建一个Zookeeper环境.由于这不是本文的重点,此处就不详细介绍了.为了体现其高可用和高效率的特点,一般需要三个Zookeeper节点组成集群.这里由于只是验证性的Demo,因此我是在本地搭建的单节点Zookeeper注册中心.简述一下Linux下的安装过程:

下载Zookeeper的安装包,然后解压到/opt目录.

这里下载的是 3.4.10 版本.

1
tar zxvf zookeeper-3.4.10.tar.gz -C /opt

在/opt目录新建zk_data和zk_logs 两个目录,并赋777权限

1
2
mkdir zk_data zk_logs
chmod 777 zk_data zk_logs

在Zookeeper的解压目录中/conf目录下新建zoo.cfg文件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zk_data
dataLogDir=/opt/zk_logs
# the port at which the clients will connect
clientPort=2181
server.1=127.0.0.1:2888:3888
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

启动Zookeeper服务,在/opt/zookeeper-3.4.10/bin/目录下执行

1
zkServer.sh start

查看状态

1
zkServer.sh status

你还可以:配置全局命令

编辑.bashrc(如果你使用zsh那么编辑.zshrc)添加一下配置以支持全局命令:
.bashrc和.zshrc一般在home文件夹下

1
2
export ZOOKEEPER_HOME=/opt/zookeeper-3.4.10
export PATH=$ZOOKEEPER_HOME/bin:$PATH

以上,单机Zookeeper就配置好了.
配置Zookeeper

添加Maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>1.4.2.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- 引入elastic-job-lite核心模块 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.0.5</version>
</dependency>

<!-- 使用springframework自定义命名空间时引入 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.0.5</version>
</dependency>

编辑配置文件application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#elastic job
zookeeper.serviceLists=127.0.0.1:2181
zookeeper.namespace=anduin-elastic-job-test
zookeeper.baseSleepTimeMilliseconds=5000
zookeeper.maxSleepTimeMilliseconds=5000
zookeeper.maxRetries=3


#定时任务
simpleJob.mySimpleJob.name=mySimpleJob
# Seconds Minutes Hours Day-of-Month Month Day-of-Week Year (可选字段)
simpleJob.mySimpleJob.cron=*/10 * * * * ?

simpleJob.mySimpleJob.shardingTotalCount=1

demo.demoJob.name=demoJob
demo.demoJob.cron=*/5 * * * * ?
demo.demoJob.shardingTotalCount=1

新建ZookeeperConfig配置类

以Spring Boot注解的方式配置Zookeeper,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.zobgo.job.javaconfig;

import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Created by zongbo.zhang on 8/15/18.
*/

@Configuration
public class ZookeeperConfig {
@Value("${zookeeper.serviceLists}")
private String serviceLists;

@Value("${zookeeper.namespace}")
private String nameSpace;

@Value("${zookeeper.baseSleepTimeMilliseconds}")
private int baseSleepTimeMilliseconds;

@Value("${zookeeper.maxSleepTimeMilliseconds}")
private int maxSleepTimeMilliseconds;

@Value("${zookeeper.maxRetries}")
private int maxRetries;

/**
* zookeeper 配置
* @return
*/
@Bean(initMethod = "init")
public ZookeeperRegistryCenter zookeeperRegistryCenter(){
ZookeeperConfiguration configuration = new ZookeeperConfiguration(serviceLists,nameSpace);
configuration.setBaseSleepTimeMilliseconds(baseSleepTimeMilliseconds);
configuration.setMaxSleepTimeMilliseconds(maxSleepTimeMilliseconds);
configuration.setMaxRetries(maxRetries);
return new ZookeeperRegistryCenter(configuration);
}
}

新建一个DemoJob类.

这个一个SimpleJob的实现类,具体的Job逻辑在这里实现,这个DemoJob的工作就是打印一条日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.zobgo.job.demo;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;

import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
* Created by zongbo.zhang on 8/23/18.
*/

@Slf4j
@Component
public class DemoJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("<====Demo Job Begin====>");
log.info(String.format("------Thread ID: %s, 任务总片数: %s, 当前分片项: %s",
Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("<====Demo Job End====>");
}
}

新建DemoConfig配置类

DemoConfig是对DemoJob的配置,包括注册中心,Cron表达式,分片数,任务监听器等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.zobgo.job.javaconfig;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.zobgo.job.demo.DemoJob;
import com.zobgo.job.listener.MyElasticJobListener;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
* Created by zongbo.zhang on 8/23/18.
*/

@Configuration
public class DemoConfig {
@Resource
private ZookeeperRegistryCenter registryCenter;

@Value("${demo.demoJob.name}")
private String demoJobName;

@Value("${demo.demoJob.cron}")
private String demoJobCron;

@Value("${demo.demoJob.shardingTotalCount}")
private int demoJobShardingTotalCount;

@Bean
public DemoJob demoJob(){
return new DemoJob();
}

@Bean(initMethod = "init")
public JobScheduler demoJobScheduler(final DemoJob demoJob){
MyElasticJobListener elasticJobListener = new MyElasticJobListener();
return new SpringJobScheduler(demoJob,registryCenter,liteJobConfiguration(),elasticJobListener);
}

private LiteJobConfiguration liteJobConfiguration(){
return LiteJobConfiguration.newBuilder(
new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(demoJobName,demoJobCron,demoJobShardingTotalCount).build() ,DemoJob.class.getCanonicalName()
)).overwrite(true).build();
}

}

新建MyElasticJobListener 

Job监听器,可以在Job开始和结束时进行一些操作,比如Job结束时发送一封邮件,这里的监听器的作用是统计Job的执行耗时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.zobgo.job.listener;

import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.zogbo.common.utils.TimeUtil;
import java.util.Date;

import lombok.extern.slf4j.Slf4j;

/**
* Created by zongbo.zhang on 8/17/18.
*
* 任务总监听器
*/

@Slf4j
public class MyElasticJobListener implements ElasticJobListener{

private long beginTime = 0;


@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
beginTime = System.currentTimeMillis();

log.info("===>{} JOB BEGIN TIME: {} <===",shardingContexts.getJobName(), TimeUtil.mill2Time(beginTime));
}

@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
long endTime = System.currentTimeMillis();
log.info("===>{} JOB END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), TimeUtil.mill2Time(endTime), endTime - beginTime);
}


public static void main(String[] args) {
System.out.println(TimeUtil.mill2Time(System.currentTimeMillis()));
}
}

对了,这里用到了我在Util中实现的一个TimeUtil.mill2Time方法,就是一个很简单的把Unix TimeStamp时间格式化为年月日时间的类

1
2
3
4
public static String mill2Time(long mill){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return sdf.format(mill);
}

新建启动类AnduinJobApplication

Spring Boot的启动类JobApplication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.zobgo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

import lombok.extern.slf4j.Slf4j;

/**
* Created by zongbo.zhang on 8/16/18.
*/

@SpringBootApplication
@Slf4j
@ComponentScan(basePackages = {"com.zobgo"})
public class AnduinJobApplication {
public static void main(String[] args) {
SpringApplication.run(AnduinJobApplication.class,args);
}

}

项目目录结构:

项目中用到了@Slf4j 如果日志接口导致不能运行,则把log.info换成System.out.println();

成功运行,效果如下:

效果图

简单实践

假设有这样一个场景:

有一张用户评论表comment,table comment中有一个comment_status字段标识该条评论的删除状态:0 - 删除 | 1 - 正常,由于某种原因,一直有评论需要从删除状态恢复到正常状态(假设有这种奇怪的需求).0 -> 1,这时就需要一个定时任务,每隔一定时间扫描一次table,然后找出所有的需要修改的评论,更新其状态为1,这时就可以考虑用Elastic Job来实现.comment表如图:

轮询修改数据库的定时任务MySimpleJob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.zobgo.job.simple;

import com.zobgo.business.comment.model.Comment;
import com.zobgo.business.comment.service.api.CommentService;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

/**
* Created by zongbo.zhang on 8/15/18.
*/

@Slf4j
@Component
public class MySimpleJob implements SimpleJob {

@Autowired
@Lazy
CommentService commentService;

@Override
public void execute(ShardingContext shardingContext) {
List<Comment> comments = commentService.getCommentByStatus(0);
log.info("待更新条数: {}",comments.size());
List<Long> commentIds = new ArrayList<>();
comments.forEach(comment -> commentIds.add(comment.getCommentId()));

commentService.updateCommentByCommentIds(Byte.valueOf("1"),commentIds);

log.info("更新了: {}条记录", comments.size());
}
}

当然,和上面的Demo一样,还需要一个定时任务的配置类

轮询修改数据库的定时任务的配置类MySimpleConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.zobgo.job.javaconfig;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.zobgo.job.listener.MyElasticJobListener;
import com.zobgo.job.simple.MySimpleJob;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
* Created by zongbo.zhang on 8/16/18.
*/

@Configuration
public class MySimpleConfig {

@Resource
private ZookeeperRegistryCenter registryCenter;

@Value("${simpleJob.mySimpleJob.name}")
private String mySimpleJobName;

@Value("${simpleJob.mySimpleJob.cron}")
private String mySimpleJobCron;

@Value("${simpleJob.mySimpleJob.shardingTotalCount}")
private int mySimpleJobShardingTotalCount;

@Bean
public MySimpleJob mySimpleJob(){
return new MySimpleJob();
}

@Bean(initMethod = "init")
public JobScheduler mySimpleJobScheduler(final MySimpleJob mySimpleJob){
MyElasticJobListener elasticJobListener = new MyElasticJobListener();
return new SpringJobScheduler(mySimpleJob,registryCenter, liteJobConfiguration(), elasticJobListener);
}

private LiteJobConfiguration liteJobConfiguration(){
//定义Lite作业根配置
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(mySimpleJobName,mySimpleJobCron,mySimpleJobShardingTotalCount).build(),
MySimpleJob.class.getCanonicalName()
)).overwrite(true).build();
}
}

至于Mybatis相关的Dao操作,和commentService中对comment具体的筛选与操作逻辑,由于不是本文重点,此处就不贴代码了.具体的项目源码已上传到Github,可以前往查看.

运行结果

觉得有帮助就赞赏一下吧
0%